@@ -15,12 +15,12 @@ package jetstream
1515
1616import (
1717 "context"
18+ "errors"
1819 "fmt"
1920 "time"
2021
2122 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
2223 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
23- "github.com/nats-io/jsm.go"
2424 "github.com/nats-io/nats.go"
2525 "github.com/nats-io/nats.go/jetstream"
2626)
@@ -115,7 +115,7 @@ func resourceKVBucket() *schema.Resource {
115115}
116116
117117func resourceKVBucketCreate (d * schema.ResourceData , m any ) error {
118- nc , mgr , err := m .( func () ( * nats. Conn , * jsm. Manager , error ))( )
118+ nc , err := getConnection ( d , m )
119119 if err != nil {
120120 return err
121121 }
@@ -145,14 +145,6 @@ func resourceKVBucketCreate(d *schema.ResourceData, m any) error {
145145 }
146146 }
147147
148- known , err := mgr .IsKnownStream ("KV_" + name )
149- if err != nil {
150- return err
151- }
152- if known {
153- return fmt .Errorf ("bucket %s already exist" , name )
154- }
155-
156148 ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
157149 defer cancel ()
158150
@@ -161,6 +153,15 @@ func resourceKVBucketCreate(d *schema.ResourceData, m any) error {
161153 return err
162154 }
163155
156+ known , err := js .KeyValue (ctx , name )
157+ if known != nil {
158+ return fmt .Errorf ("bucket %s already exist" , name )
159+ } else if err != nil {
160+ if ! errors .Is (err , jetstream .ErrBucketNotFound ) {
161+ return fmt .Errorf ("failed to load KV bucket: %s" , err )
162+ }
163+ }
164+
164165 js .CreateKeyValue (ctx , jetstream.KeyValueConfig {
165166 Bucket : name ,
166167 Description : descrption ,
@@ -185,7 +186,7 @@ func resourceKVBucketRead(d *schema.ResourceData, m any) error {
185186 return err
186187 }
187188
188- nc , _ , err := m .( func () ( * nats. Conn , * jsm. Manager , error ))( )
189+ nc , err := getConnection ( d , m )
189190 if err != nil {
190191 return err
191192 }
@@ -237,7 +238,7 @@ func resourceKVBucketRead(d *schema.ResourceData, m any) error {
237238func resourceKVBucketUpdate (d * schema.ResourceData , m any ) error {
238239 name := d .Get ("name" ).(string )
239240
240- nc , mgr , err := m .( func () ( * nats. Conn , * jsm. Manager , error ))( )
241+ nc , err := getConnection ( d , m )
241242 if err != nil {
242243 return err
243244 }
@@ -258,29 +259,46 @@ func resourceKVBucketUpdate(d *schema.ResourceData, m any) error {
258259 if err != nil {
259260 return err
260261 }
262+
261263 jStatus := status .(* jetstream.KeyValueBucketStatus )
262264
263- str , err := mgr . LoadStream ( jStatus .StreamInfo ().Config .Name )
265+ str , err := js . Stream ( ctx , jStatus .StreamInfo ().Config .Name )
264266 if err != nil {
265267 return err
266268 }
267269
270+ cfg := jetstream.KeyValueConfig {
271+ Bucket : bucket .Bucket (),
272+ Description : str .CachedInfo ().Config .Description ,
273+ MaxValueSize : str .CachedInfo ().Config .MaxMsgSize ,
274+ History : uint8 (status .History ()),
275+ TTL : status .TTL (),
276+ MaxBytes : str .CachedInfo ().Config .MaxBytes ,
277+ Storage : str .CachedInfo ().Config .Storage ,
278+ Replicas : str .CachedInfo ().Config .Replicas ,
279+ Placement : str .CachedInfo ().Config .Placement ,
280+ RePublish : str .CachedInfo ().Config .RePublish ,
281+ Mirror : str .CachedInfo ().Config .Mirror ,
282+ Sources : str .CachedInfo ().Config .Sources ,
283+ Compression : status .IsCompressed (),
284+ LimitMarkerTTL : status .LimitMarkerTTL (),
285+ }
286+
268287 history := d .Get ("history" ).(int )
269288 ttl := d .Get ("ttl" ).(int )
270289 maxV := d .Get ("max_value_size" ).(int )
271290 maxB := d .Get ("max_bucket_size" ).(int )
272291 description := d .Get ("description" ).(string )
273292 markerTTL := d .Get ("limit_marker_ttl" ).(int )
274293
275- cfg := str . Configuration ( )
276- cfg .MaxAge = time .Duration (ttl ) * time .Second
277- cfg .MaxMsgSize = int32 (maxV )
294+ cfg . History = uint8 ( history )
295+ cfg .TTL = time .Duration (ttl ) * time .Second
296+ cfg .MaxValueSize = int32 (maxV )
278297 cfg .MaxBytes = int64 (maxB )
279- cfg .MaxMsgsPer = int64 (history )
280298 cfg .Description = description
281- cfg .SubjectDeleteMarkerTTL = time .Duration (markerTTL ) * time .Second
299+ cfg .LimitMarkerTTL = time .Duration (markerTTL ) * time .Second
282300
283- err = str . UpdateConfiguration ( cfg )
301+ _ , err = js . CreateOrUpdateKeyValue ( ctx , cfg )
284302 if err != nil {
285303 return err
286304 }
@@ -291,7 +309,7 @@ func resourceKVBucketUpdate(d *schema.ResourceData, m any) error {
291309func resourceKVBucketDelete (d * schema.ResourceData , m any ) error {
292310 name := d .Get ("name" ).(string )
293311
294- nc , _ , err := m .( func () ( * nats. Conn , * jsm. Manager , error ))( )
312+ nc , err := getConnection ( d , m )
295313 if err != nil {
296314 return err
297315 }
0 commit comments