Skip to content

Commit 0188312

Browse files
authored
Merge pull request #68 from nats-io/discard_policies
support stream discard policies
2 parents 0a91eed + c64adf4 commit 0188312

File tree

6 files changed

+61
-22
lines changed

6 files changed

+61
-22
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ resource "jetstream_consumer" "ORDERS_NEW" {
170170
### Attribute Reference
171171

172172
* `description` - (optional) Contains additional information about this consumer
173+
* `discard` - (optional) When a Stream reach it's limits either old messages are deleted or new ones are denied (`new` or `old`)
174+
* `discard_new_per_subject` - (optional) When discard policy is new and the stream is one with max messages per subject set, this will apply the new behavior to every subject. Essentially turning discard new from maximum number of subjects into maximum number of messages in a subject (bool)
173175
* `ack_policy` - (optional) The delivery acknowledgement policy to apply to the Consumer
174176
* `ack_wait` - (optional) Number of seconds to wait for acknowledgement
175177
* `deliver_all` - (optional) Starts at the first available message in the Stream

docs/resources/jetstream_stream.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
3838
## Attribute Reference
3939

4040
* `description` - (optional) Contains additional information about this stream (string)
41+
* `discard` - (optional) When a Stream reach it's limits either old messages are deleted or new ones are denied (`new` or `old`)
42+
* `discard_new_per_subject` - (optional) When discard policy is new and the stream is one with max messages per subject set, this will apply the new behavior to every subject. Essentially turning discard new from maximum number of subjects into maximum number of messages in a subject (bool)
4143
* `ack` - (optional) If the Stream should support confirming receiving messages via acknowledgements (bool)
4244
* `max_age` - (optional) The maximum oldest message that can be kept in the stream, duration specified in seconds (number)
4345
* `max_bytes` - (optional) The maximum size of all messages that can be kept in the stream (number)

go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ go 1.18
55
require (
66
github.com/google/go-cmp v0.5.9
77
github.com/hashicorp/terraform-plugin-sdk v1.17.2
8-
github.com/nats-io/jsm.go v0.0.34
8+
github.com/nats-io/jsm.go v0.0.35-0.20220930094031-ec07abe149c8
99
github.com/nats-io/jwt/v2 v2.3.0
10-
github.com/nats-io/nats-server/v2 v2.9.0
11-
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
10+
github.com/nats-io/nats-server/v2 v2.9.2
11+
github.com/nats-io/nats.go v1.17.0
1212
github.com/xeipuuv/gojsonschema v1.2.0
1313
)
1414

@@ -56,7 +56,7 @@ require (
5656
github.com/imdario/mergo v0.3.12 // indirect
5757
github.com/jmespath/go-jmespath v0.4.0 // indirect
5858
github.com/jstemmer/go-junit-report v0.9.1 // indirect
59-
github.com/klauspost/compress v1.15.9 // indirect
59+
github.com/klauspost/compress v1.15.11 // indirect
6060
github.com/mattn/go-colorable v0.1.1 // indirect
6161
github.com/mattn/go-isatty v0.0.5 // indirect
6262
github.com/minio/highwayhash v1.0.2 // indirect
@@ -82,14 +82,14 @@ require (
8282
github.com/zclconf/go-cty v1.10.0 // indirect
8383
github.com/zclconf/go-cty-yaml v1.0.2 // indirect
8484
go.opencensus.io v0.22.4 // indirect
85-
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
85+
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
8686
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
8787
golang.org/x/mod v0.5.1 // indirect
8888
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
8989
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 // indirect
90-
golang.org/x/sys v0.0.0-20220908164124-27713097b956 // indirect
90+
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
9191
golang.org/x/text v0.3.7 // indirect
92-
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
92+
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
9393
golang.org/x/tools v0.1.9 // indirect
9494
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
9595
google.golang.org/api v0.34.0 // indirect

go.sum

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@ github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT
243243
github.com/keybase/go-crypto v0.0.0-20161004153544-93f5b35093ba/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M=
244244
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
245245
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
246-
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
247-
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
246+
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
247+
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
248248
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
249249
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
250250
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
@@ -287,14 +287,14 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
287287
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
288288
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
289289
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
290-
github.com/nats-io/jsm.go v0.0.34 h1:2z3b4/789aziMin8oCuMxtzbfnUDwSI/D0eA1IR4Kus=
291-
github.com/nats-io/jsm.go v0.0.34/go.mod h1:AtvowzQmaY3WK2z0qVHNkPecBy7G+l+vZ0MeijH3cNY=
290+
github.com/nats-io/jsm.go v0.0.35-0.20220930094031-ec07abe149c8 h1:QBnAFfCxoF/aaC01J6p3W/6xAPPXqhwX6ltZIr7pj3M=
291+
github.com/nats-io/jsm.go v0.0.35-0.20220930094031-ec07abe149c8/go.mod h1:S+8UHJz2fKiHS6c7HEf6DmVP91v7EmMONz4hl8Ba0wo=
292292
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
293293
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
294-
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
295-
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
296-
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 h1:dPUKD6Iv8M1y9MU8PK6H4a4/12yx5/CbaYWz/Z1arY8=
297-
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
294+
github.com/nats-io/nats-server/v2 v2.9.2 h1:XNDgJgOYYaYlquLdbSHI3xssLipfKUOq3EmYIMNCOsE=
295+
github.com/nats-io/nats-server/v2 v2.9.2/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk=
296+
github.com/nats-io/nats.go v1.17.0 h1:1jp5BThsdGlN91hW0k3YEfJbfACjiOYtUiLXG0RL4IE=
297+
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
298298
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
299299
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y=
300300
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
@@ -376,8 +376,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
376376
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
377377
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
378378
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
379-
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
380-
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
379+
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A=
380+
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
381381
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
382382
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
383383
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -498,8 +498,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
498498
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
499499
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
500500
golang.org/x/sys v0.0.0-20220517195934-5e4e11fc645e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
501-
golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY=
502-
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
501+
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI=
502+
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
503503
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
504504
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
505505
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -512,8 +512,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
512512
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
513513
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
514514
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
515-
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
516-
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
515+
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
516+
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
517517
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
518518
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
519519
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

jetstream/resource_jetstream_stream.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,19 @@ func resourceStream() *schema.Resource {
8989
Type: schema.TypeString,
9090
},
9191
},
92+
"discard": {
93+
Type: schema.TypeString,
94+
Description: "When a Stream reach it's limits either old messages are deleted or new ones are denied",
95+
Optional: true,
96+
Default: "old",
97+
ValidateFunc: validateDiscardPolicy(),
98+
},
99+
"discard_new_per_subject": {
100+
Type: schema.TypeBool,
101+
Description: "When discard policy is new and the stream is one with max messages per subject set, this will apply the new behavior to every subject. Essentially turning discard new from maximum number of subjects into maximum number of messages in a subject",
102+
Optional: true,
103+
Default: false,
104+
},
92105
"max_msgs": {
93106
Type: schema.TypeInt,
94107
Description: "The maximum amount of messages that can be kept in the stream",
@@ -301,15 +314,23 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
301314
d.Set("max_msg_size", int(str.MaxMsgSize()))
302315
d.Set("replicas", str.Replicas())
303316
d.Set("ack", !str.NoAck())
304-
d.Set("deny_delete", !str.DeleteAllow())
317+
d.Set("deny_delete", !str.DeleteAllowed())
305318
d.Set("deny_purge", !str.PurgeAllowed())
306319
d.Set("allow_rollup_hdrs", str.RollupAllowed())
307320
d.Set("allow_direct", str.DirectAllowed())
321+
d.Set("discard_new_per_subject", str.DiscardNewPerSubject())
308322

309323
if str.MaxAge() == -1 || str.MaxAge() == 0 {
310324
d.Set("max_age", "-1")
311325
}
312326

327+
switch str.DiscardPolicy() {
328+
case api.DiscardNew:
329+
d.Set("discard", "new")
330+
case api.DiscardOld:
331+
d.Set("discard", "old")
332+
}
333+
313334
switch str.Storage() {
314335
case api.FileStorage:
315336
d.Set("storage", "file")

jetstream/util.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func validateRetentionTypeString() schema.SchemaValidateFunc {
5353
return validation.StringInSlice([]string{"limits", "interest", "workqueue"}, false)
5454
}
5555

56+
func validateDiscardPolicy() schema.SchemaValidateFunc {
57+
return validation.StringInSlice([]string{"old", "new"}, false)
58+
}
59+
5660
func validateStorageTypeString() schema.SchemaValidateFunc {
5761
return validation.StringInSlice([]string{"file", "memory"}, false)
5862
}
@@ -104,6 +108,7 @@ func streamSourceFromResourceData(d any) ([]*api.StreamSource, error) {
104108
func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig, err error) {
105109
var retention api.RetentionPolicy
106110
var storage api.StorageType
111+
var discard api.DiscardPolicy
107112

108113
switch d.Get("retention").(string) {
109114
case "limits":
@@ -121,6 +126,13 @@ func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig,
121126
storage = api.MemoryStorage
122127
}
123128

129+
switch d.Get("discard").(string) {
130+
case "new":
131+
discard = api.DiscardNew
132+
case "old":
133+
discard = api.DiscardOld
134+
}
135+
124136
subs := d.Get("subjects").([]any)
125137
var subjects = make([]string, len(subs))
126138
for i, sub := range subs {
@@ -146,6 +158,8 @@ func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig,
146158
Name: d.Get("name").(string),
147159
Subjects: subjects,
148160
Retention: retention,
161+
Discard: discard,
162+
DiscardNewPer: d.Get("discard_new_per_subject").(bool),
149163
MaxConsumers: d.Get("max_consumers").(int),
150164
MaxMsgs: int64(d.Get("max_msgs").(int)),
151165
MaxBytes: int64(d.Get("max_bytes").(int)),

0 commit comments

Comments
 (0)