Skip to content

Commit 0aa2736

Browse files
authored
Merge pull request #38 from nats-io/consumer_edit
Support updating some consumer properties
2 parents 2ae92fd + eb2c72a commit 0aa2736

File tree

9 files changed

+212
-76
lines changed

9 files changed

+212
-76
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,15 @@ provider "jetstream" {
9393

9494
### Argument Reference
9595

96-
* `servers` - Comma separated list of servers to connect to
97-
* `credentials` - (optional) path to the credentials file to use
98-
* `credential_data` - (optional) the credentials as a string
96+
* `servers` - The list of servers to connect to in a comma seperated list
97+
* `credentials` - (optional) Fully Qualified Path to a file holding NATS credentials.
98+
* `credential_data` - (optional) The NATS credentials as a string, intended to use with data providers.
99+
* `user` - (optional) Connects using a username, when no password is set this is assumed to be a Token.
100+
* `password` - (optional) Connects using a password
101+
* `nkey` - (optional) Connects using an nkey stored in a file
102+
* `tls.ca_file` - (optional) Fully Qualified Path to a file containing Root CA (PEM format). Use when the server has certs signed by an unknown authority.
103+
* `tls.ca_file_data` - (optional) The Root CA PEM as a string, intended to use with data providers. Use when the server has certs signed by an unknown authority.
104+
99105

100106
## jetstream_stream
101107

@@ -198,6 +204,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
198204
* `stream_id` - The name of the Stream that this consumer consumes
199205
* `stream_sequence` - (optional) The Stream Sequence that will be the first message delivered by this Consumer
200206
* `ratelimit` - (optional) The rate limit for delivering messages to push consumers, expressed in bits per second
207+
* `headers_only` - (optional) When true no message bodies will be delivered only headers
201208

202209
## jetstream_kv_bucket
203210

docs/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ resource "jetstream_consumer" "ORDERS_NEW" {
3737
* `servers` - The list of servers to connect to in a comma seperated list
3838
* `credentials` - (optional) Fully Qualified Path to a file holding NATS credentials.
3939
* `credential_data` - (optional) The NATS credentials as a string, intended to use with data providers.
40+
* `user` - (optional) Connects using a username, when no password is set this is assumed to be a Token.
41+
* `password` - (optional) Connects using a password
42+
* `nkey` - (optional) Connects using an nkey stored in a file
4043
* `tls.ca_file` - (optional) Fully Qualified Path to a file containing Root CA (PEM format). Use when the server has certs signed by an unknown authority.
4144
* `tls.ca_file_data` - (optional) The Root CA PEM as a string, intended to use with data providers. Use when the server has certs signed by an unknown authority.
4245

docs/resources/jetstream_consumer.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# jetstream_stream Resource
22

3-
The `jetstream_consumer` Resource creates or deletes JetStream Consumers on any Terraform managed Stream. Does not support editing consumers in place.
3+
The `jetstream_consumer` Resource creates, updates or deletes JetStream Consumers on any Terraform managed Stream.
44

55
## Example Usage
66

@@ -41,3 +41,4 @@ resource "jetstream_consumer" "ORDERS_NEW" {
4141
* `heartbeat` - (optional) Enable heartbeat messages for push consumers, duration specified in seconds
4242
* `flow_control` - (optional) Enable flow control for push consumers
4343
* `max_waiting` - (optional) The number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored
44+
* `headers_only` - (optional) When true no message bodies will be delivered only headers

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ go 1.16
55
require (
66
github.com/google/go-cmp v0.5.6
77
github.com/hashicorp/terraform-plugin-sdk v1.17.2
8-
github.com/nats-io/jsm.go v0.0.26
8+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686
99
github.com/nats-io/jwt v1.2.2
10-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f
11-
github.com/nats-io/nats.go v1.12.0
10+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8
11+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
1212
github.com/xeipuuv/gojsonschema v1.2.0
1313
github.com/zclconf/go-cty v1.8.4 // indirect
1414
golang.org/x/mod v0.5.1 // indirect

go.sum

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
138138
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
139139
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
140140
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
141-
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
142141
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
143142
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
144143
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -244,8 +243,9 @@ github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT
244243
github.com/keybase/go-crypto v0.0.0-20161004153544-93f5b35093ba/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M=
245244
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
246245
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
247-
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
248246
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
247+
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
248+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
249249
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
250250
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
251251
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
@@ -288,16 +288,17 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
288288
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
289289
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
290290
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
291-
github.com/nats-io/jsm.go v0.0.26 h1:Ocj9wy/tfGJ+lOaxyYwLv2skG5CFSLuhRlg7DhP3WQI=
292-
github.com/nats-io/jsm.go v0.0.26/go.mod h1:jeU4Spx3HMszhjbvCyQNoWKdIJaj8ResDalLHFysbew=
291+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686 h1:3qE1c3jOiGTmFLTQF8M0eBKmj1nq+PGyxvmHFWhwbpU=
292+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686/go.mod h1:zuH+RjhsJ/o9Dd1ArZxcEpu48oB3pdsIVLbz3gqen/c=
293293
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
294294
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
295-
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
296-
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
297-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f h1:r+bnrlIkFeYHwqxvFGwHT3ajx3Yji6frxH+X+ZgGxeA=
298-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
299-
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
300-
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
295+
github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE=
296+
github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
297+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8 h1:UKhEhYEiZhmFabtgUyjsVEQdlfrvVrJkavxQ+++aSk4=
298+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8/go.mod h1:n8O5NeknIIQgQld7//20NnQpCe1o5xIjVFxzh7IIZ6Y=
299+
github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
300+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU=
301+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
301302
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
302303
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
303304
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=

jetstream/provider.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ func Provider() terraform.ResourceProvider {
3232
Optional: true,
3333
Description: "The contents of the NATS 2.0 Credentials file to use",
3434
},
35+
"user": {
36+
Type: schema.TypeString,
37+
Optional: true,
38+
Description: "Connect using an username, used as token when no password is given",
39+
ConflictsWith: []string{"nkey", "credentials", "credential_data"},
40+
},
41+
"password": {
42+
Type: schema.TypeString,
43+
Optional: true,
44+
Description: "Connect using a password",
45+
},
46+
"nkey": {
47+
Type: schema.TypeString,
48+
Optional: true,
49+
Description: "Connect using a NKEY seed stored in a file",
50+
ConflictsWith: []string{"user", "credentials", "credential_data"},
51+
},
3552
"tls": {
3653
Type: schema.TypeSet,
3754
MaxItems: 1,

jetstream/resource_jetstream_consumer.go

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func resourceConsumer() *schema.Resource {
1818
Create: resourceConsumerCreate,
1919
Read: resourceConsumerRead,
2020
Delete: resourceConsumerDelete,
21+
Update: resourceConsumerUpdate,
2122
Importer: &schema.ResourceImporter{
2223
State: schema.ImportStatePassthrough,
2324
},
@@ -34,7 +35,7 @@ func resourceConsumer() *schema.Resource {
3435
Type: schema.TypeString,
3536
Description: "Contains additional information about this consumer",
3637
Optional: true,
37-
ForceNew: true,
38+
ForceNew: false,
3839
},
3940
"durable_name": {
4041
Type: schema.TypeString,
@@ -47,7 +48,7 @@ func resourceConsumer() *schema.Resource {
4748
Type: schema.TypeString,
4849
Description: "The subject where a Push-based consumer will deliver messages",
4950
Optional: true,
50-
ForceNew: true,
51+
ForceNew: false,
5152
},
5253
"stream_sequence": {
5354
Type: schema.TypeInt,
@@ -111,14 +112,14 @@ func resourceConsumer() *schema.Resource {
111112
Description: "Number of seconds to wait for acknowledgement",
112113
Default: 30,
113114
Optional: true,
114-
ForceNew: true,
115+
ForceNew: false,
115116
},
116117
"max_delivery": {
117118
Type: schema.TypeInt,
118119
Description: "Maximum deliveries to attempt for each message",
119120
Default: -1,
120121
Optional: true,
121-
ForceNew: true,
122+
ForceNew: false,
122123
},
123124
"filter_subject": {
124125
Type: schema.TypeString,
@@ -141,7 +142,7 @@ func resourceConsumer() *schema.Resource {
141142
Optional: true,
142143
Default: 0,
143144
ValidateFunc: validation.IntBetween(0, 100),
144-
ForceNew: true,
145+
ForceNew: false,
145146
},
146147
"ratelimit": {
147148
Type: schema.TypeInt,
@@ -157,7 +158,7 @@ func resourceConsumer() *schema.Resource {
157158
Optional: true,
158159
Default: 20000,
159160
ValidateFunc: validation.IntAtLeast(0),
160-
ForceNew: true,
161+
ForceNew: false,
161162
},
162163
"heartbeat": {
163164
Type: schema.TypeInt,
@@ -178,9 +179,16 @@ func resourceConsumer() *schema.Resource {
178179
Description: "The number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored",
179180
Optional: true,
180181
Default: 512,
181-
ForceNew: true,
182+
ForceNew: false,
182183
ValidateFunc: validation.IntAtLeast(0),
183184
},
185+
"headers_only": {
186+
Type: schema.TypeBool,
187+
Description: "When true no message bodies will be delivered only headers",
188+
Optional: true,
189+
Default: false,
190+
ForceNew: false,
191+
},
184192
},
185193
}
186194
}
@@ -198,6 +206,7 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
198206
MaxAckPending: d.Get("max_ack_pending").(int),
199207
FlowControl: d.Get("flow_control").(bool),
200208
Heartbeat: time.Duration(d.Get("heartbeat").(int)) * time.Second,
209+
HeadersOnly: d.Get("headers_only").(bool),
201210
}
202211

203212
if description, ok := d.GetOk("description"); ok {
@@ -256,6 +265,69 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
256265
return cfg, nil
257266
}
258267

268+
func resourceConsumerUpdate(d *schema.ResourceData, m interface{}) error {
269+
stream := d.Get("stream_id").(string)
270+
if stream == "" {
271+
return fmt.Errorf("cannot determine stream name for update")
272+
}
273+
274+
durable := d.Get("durable_name").(string)
275+
if durable == "" {
276+
return fmt.Errorf("cannot determine durable name for update")
277+
}
278+
279+
nc, mgr, err := m.(func() (*nats.Conn, *jsm.Manager, error))()
280+
if err != nil {
281+
return err
282+
}
283+
defer nc.Close()
284+
285+
known, err := mgr.IsKnownConsumer(stream, durable)
286+
if err != nil {
287+
return err
288+
}
289+
if !known {
290+
d.SetId("")
291+
return nil
292+
}
293+
294+
cons, err := mgr.LoadConsumer(stream, durable)
295+
if err != nil {
296+
return err
297+
}
298+
299+
cfg, err := consumerConfigFromResourceData(d)
300+
if err != nil {
301+
return err
302+
}
303+
304+
s := strings.TrimSuffix(cons.SampleFrequency(), "%")
305+
freq, err := strconv.Atoi(s)
306+
if err != nil {
307+
return fmt.Errorf("failed to parse consumer sampling configuration: %v", err)
308+
}
309+
310+
opts := []jsm.ConsumerOption{
311+
jsm.ConsumerDescription(cfg.Description),
312+
jsm.AckWait(cfg.AckWait),
313+
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
314+
jsm.SamplePercent(freq),
315+
jsm.MaxAckPending(uint(cfg.MaxAckPending)),
316+
jsm.MaxWaiting(uint(cfg.MaxWaiting)),
317+
}
318+
319+
if cfg.HeadersOnly {
320+
opts = append(opts, jsm.DeliverHeadersOnly())
321+
}
322+
323+
err = cons.UpdateConfiguration(opts...)
324+
if err != nil {
325+
return err
326+
}
327+
328+
return resourceConsumerRead(d, m)
329+
}
330+
259331
func resourceConsumerCreate(d *schema.ResourceData, m interface{}) error {
260332
cfg, err := consumerConfigFromResourceData(d)
261333
if err != nil {
@@ -333,6 +405,7 @@ func resourceConsumerRead(d *schema.ResourceData, m interface{}) error {
333405
d.Set("flow_control", cons.FlowControl())
334406
d.Set("max_waiting", cons.MaxWaiting())
335407
d.Set("delivery_group", cons.DeliverGroup())
408+
d.Set("headers_only", cons.IsHeadersOnly())
336409

337410
switch cons.DeliverPolicy() {
338411
case api.DeliverAll:

0 commit comments

Comments
 (0)