1414package jetstream
1515
1616import (
17+ "context"
1718 "fmt"
1819 "time"
1920
2021 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
2122 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
2223 "github.com/nats-io/jsm.go"
2324 "github.com/nats-io/nats.go"
25+ "github.com/nats-io/nats.go/jetstream"
2426)
2527
2628func resourceKVBucket () * schema.Resource {
@@ -100,6 +102,14 @@ func resourceKVBucket() *schema.Resource {
100102 ForceNew : true ,
101103 ValidateFunc : validation .All (validation .IntAtLeast (1 ), validation .IntAtMost (5 )),
102104 },
105+ "limit_marker_ttl" : {
106+ Type : schema .TypeInt ,
107+ Description : "Enables Per-Key TTLs and Limit Markers, duration specified in seconds" ,
108+ Optional : true ,
109+ ForceNew : false ,
110+ Default : 0 ,
111+ ValidateFunc : validation .IntAtLeast (0 ),
112+ },
103113 },
104114 }
105115}
@@ -118,11 +128,12 @@ func resourceKVBucketCreate(d *schema.ResourceData, m any) error {
118128 maxB := d .Get ("max_bucket_size" ).(int )
119129 replicas := d .Get ("replicas" ).(int )
120130 descrption := d .Get ("description" ).(string )
131+ limit_marker_ttl := d .Get ("limit_marker_ttl" ).(int )
121132
122- var placement * nats .Placement
133+ var placement * jetstream .Placement
123134 c , ok := d .GetOk ("placement_cluster" )
124135 if ok {
125- placement = & nats .Placement {Cluster : c .(string )}
136+ placement = & jetstream .Placement {Cluster : c .(string )}
126137 pt , ok := d .GetOk ("placement_tags" )
127138 if ok {
128139 ts := pt .([]any )
@@ -142,25 +153,26 @@ func resourceKVBucketCreate(d *schema.ResourceData, m any) error {
142153 return fmt .Errorf ("bucket %s already exist" , name )
143154 }
144155
145- js , err := nc .JetStream ()
156+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
157+ defer cancel ()
158+
159+ js , err := jetstream .New (nc )
146160 if err != nil {
147161 return err
148162 }
149163
150- _ , err = js .CreateKeyValue (& nats.KeyValueConfig {
151- Bucket : name ,
152- Description : descrption ,
153- MaxValueSize : int32 (maxV ),
154- History : uint8 (history ),
155- TTL : time .Duration (ttl ) * time .Second ,
156- MaxBytes : int64 (maxB ),
157- Storage : nats .FileStorage ,
158- Replicas : replicas ,
159- Placement : placement ,
164+ js .CreateKeyValue (ctx , jetstream.KeyValueConfig {
165+ Bucket : name ,
166+ Description : descrption ,
167+ MaxValueSize : int32 (maxV ),
168+ History : uint8 (history ),
169+ TTL : time .Duration (ttl ) * time .Second ,
170+ MaxBytes : int64 (maxB ),
171+ Storage : jetstream .FileStorage ,
172+ Replicas : replicas ,
173+ Placement : placement ,
174+ LimitMarkerTTL : time .Duration (limit_marker_ttl ) * time .Second ,
160175 })
161- if err != nil {
162- return err
163- }
164176
165177 d .SetId (fmt .Sprintf ("JETSTREAM_KV_%s" , name ))
166178
@@ -179,19 +191,23 @@ func resourceKVBucketRead(d *schema.ResourceData, m any) error {
179191 }
180192 defer nc .Close ()
181193
182- js , err := nc .JetStream ()
194+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
195+ defer cancel ()
196+
197+ js , err := jetstream .New (nc )
183198 if err != nil {
184199 return err
185200 }
186- bucket , err := js .KeyValue (name )
201+
202+ bucket , err := js .KeyValue (ctx , name )
187203 if err != nil {
188204 if err == nats .ErrBucketNotFound {
189205 d .SetId ("" )
190206 return nil
191207 }
192208 return err
193209 }
194- status , err := bucket .Status ()
210+ status , err := bucket .Status (ctx )
195211 if err != nil {
196212 return err
197213 }
@@ -200,7 +216,7 @@ func resourceKVBucketRead(d *schema.ResourceData, m any) error {
200216 d .Set ("history" , status .History ())
201217 d .Set ("ttl" , status .TTL ().Seconds ())
202218
203- jStatus := status .(* nats .KeyValueBucketStatus )
219+ jStatus := status .(* jetstream .KeyValueBucketStatus )
204220 si := jStatus .StreamInfo ()
205221
206222 d .Set ("max_value_size" , si .Config .MaxMsgSize )
@@ -213,6 +229,8 @@ func resourceKVBucketRead(d *schema.ResourceData, m any) error {
213229 d .Set ("placement_tags" , si .Config .Placement .Tags )
214230 }
215231
232+ d .Set ("limit_marker_ttl" , si .Config .SubjectDeleteMarkerTTL .Seconds ())
233+
216234 return nil
217235}
218236
@@ -225,19 +243,22 @@ func resourceKVBucketUpdate(d *schema.ResourceData, m any) error {
225243 }
226244 defer nc .Close ()
227245
228- js , err := nc .JetStream ()
246+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
247+ defer cancel ()
248+
249+ js , err := jetstream .New (nc )
229250 if err != nil {
230251 return err
231252 }
232- bucket , err := js .KeyValue (name )
253+ bucket , err := js .KeyValue (ctx , name )
233254 if err != nil {
234255 return err
235256 }
236- status , err := bucket .Status ()
257+ status , err := bucket .Status (ctx )
237258 if err != nil {
238259 return err
239260 }
240- jStatus := status .(* nats .KeyValueBucketStatus )
261+ jStatus := status .(* jetstream .KeyValueBucketStatus )
241262
242263 str , err := mgr .LoadStream (jStatus .StreamInfo ().Config .Name )
243264 if err != nil {
@@ -249,13 +270,15 @@ func resourceKVBucketUpdate(d *schema.ResourceData, m any) error {
249270 maxV := d .Get ("max_value_size" ).(int )
250271 maxB := d .Get ("max_bucket_size" ).(int )
251272 description := d .Get ("description" ).(string )
273+ markerTTL := d .Get ("limit_marker_ttl" ).(int )
252274
253275 cfg := str .Configuration ()
254276 cfg .MaxAge = time .Duration (ttl ) * time .Second
255277 cfg .MaxMsgSize = int32 (maxV )
256278 cfg .MaxBytes = int64 (maxB )
257279 cfg .MaxMsgsPer = int64 (history )
258280 cfg .Description = description
281+ cfg .SubjectDeleteMarkerTTL = time .Duration (markerTTL ) * time .Second
259282
260283 err = str .UpdateConfiguration (cfg )
261284 if err != nil {
0 commit comments