Skip to content

Commit eb2c72a

Browse files
committed
Support updating some consumer properties
Also support user, pass, token and nkey auth options. Few bug fixes in CA file support and code style updates Also stop using the old KV interface, moved to nats.go version Signed-off-by: R.I.Pienaar <[email protected]>
1 parent 2ae92fd commit eb2c72a

File tree

9 files changed

+212
-76
lines changed

9 files changed

+212
-76
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,15 @@ provider "jetstream" {
9393

9494
### Argument Reference
9595

96-
* `servers` - Comma separated list of servers to connect to
97-
* `credentials` - (optional) path to the credentials file to use
98-
* `credential_data` - (optional) the credentials as a string
96+
* `servers` - The list of servers to connect to in a comma seperated list
97+
* `credentials` - (optional) Fully Qualified Path to a file holding NATS credentials.
98+
* `credential_data` - (optional) The NATS credentials as a string, intended to use with data providers.
99+
* `user` - (optional) Connects using a username, when no password is set this is assumed to be a Token.
100+
* `password` - (optional) Connects using a password
101+
* `nkey` - (optional) Connects using an nkey stored in a file
102+
* `tls.ca_file` - (optional) Fully Qualified Path to a file containing Root CA (PEM format). Use when the server has certs signed by an unknown authority.
103+
* `tls.ca_file_data` - (optional) The Root CA PEM as a string, intended to use with data providers. Use when the server has certs signed by an unknown authority.
104+
99105

100106
## jetstream_stream
101107

@@ -198,6 +204,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
198204
* `stream_id` - The name of the Stream that this consumer consumes
199205
* `stream_sequence` - (optional) The Stream Sequence that will be the first message delivered by this Consumer
200206
* `ratelimit` - (optional) The rate limit for delivering messages to push consumers, expressed in bits per second
207+
* `headers_only` - (optional) When true no message bodies will be delivered only headers
201208

202209
## jetstream_kv_bucket
203210

docs/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ resource "jetstream_consumer" "ORDERS_NEW" {
3737
* `servers` - The list of servers to connect to in a comma seperated list
3838
* `credentials` - (optional) Fully Qualified Path to a file holding NATS credentials.
3939
* `credential_data` - (optional) The NATS credentials as a string, intended to use with data providers.
40+
* `user` - (optional) Connects using a username, when no password is set this is assumed to be a Token.
41+
* `password` - (optional) Connects using a password
42+
* `nkey` - (optional) Connects using an nkey stored in a file
4043
* `tls.ca_file` - (optional) Fully Qualified Path to a file containing Root CA (PEM format). Use when the server has certs signed by an unknown authority.
4144
* `tls.ca_file_data` - (optional) The Root CA PEM as a string, intended to use with data providers. Use when the server has certs signed by an unknown authority.
4245

docs/resources/jetstream_consumer.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# jetstream_stream Resource
22

3-
The `jetstream_consumer` Resource creates or deletes JetStream Consumers on any Terraform managed Stream. Does not support editing consumers in place.
3+
The `jetstream_consumer` Resource creates, updates or deletes JetStream Consumers on any Terraform managed Stream.
44

55
## Example Usage
66

@@ -41,3 +41,4 @@ resource "jetstream_consumer" "ORDERS_NEW" {
4141
* `heartbeat` - (optional) Enable heartbeat messages for push consumers, duration specified in seconds
4242
* `flow_control` - (optional) Enable flow control for push consumers
4343
* `max_waiting` - (optional) The number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored
44+
* `headers_only` - (optional) When true no message bodies will be delivered only headers

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ go 1.16
55
require (
66
github.com/google/go-cmp v0.5.6
77
github.com/hashicorp/terraform-plugin-sdk v1.17.2
8-
github.com/nats-io/jsm.go v0.0.26
8+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686
99
github.com/nats-io/jwt v1.2.2
10-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f
11-
github.com/nats-io/nats.go v1.12.0
10+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8
11+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
1212
github.com/xeipuuv/gojsonschema v1.2.0
1313
github.com/zclconf/go-cty v1.8.4 // indirect
1414
golang.org/x/mod v0.5.1 // indirect

go.sum

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
138138
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
139139
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
140140
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
141-
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
142141
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
143142
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
144143
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -244,8 +243,9 @@ github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT
244243
github.com/keybase/go-crypto v0.0.0-20161004153544-93f5b35093ba/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M=
245244
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
246245
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
247-
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
248246
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
247+
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
248+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
249249
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
250250
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
251251
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
@@ -288,16 +288,17 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
288288
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
289289
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
290290
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
291-
github.com/nats-io/jsm.go v0.0.26 h1:Ocj9wy/tfGJ+lOaxyYwLv2skG5CFSLuhRlg7DhP3WQI=
292-
github.com/nats-io/jsm.go v0.0.26/go.mod h1:jeU4Spx3HMszhjbvCyQNoWKdIJaj8ResDalLHFysbew=
291+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686 h1:3qE1c3jOiGTmFLTQF8M0eBKmj1nq+PGyxvmHFWhwbpU=
292+
github.com/nats-io/jsm.go v0.0.27-0.20211124133852-b1f2cc1af686/go.mod h1:zuH+RjhsJ/o9Dd1ArZxcEpu48oB3pdsIVLbz3gqen/c=
293293
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
294294
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
295-
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
296-
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
297-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f h1:r+bnrlIkFeYHwqxvFGwHT3ajx3Yji6frxH+X+ZgGxeA=
298-
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
299-
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
300-
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
295+
github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE=
296+
github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
297+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8 h1:UKhEhYEiZhmFabtgUyjsVEQdlfrvVrJkavxQ+++aSk4=
298+
github.com/nats-io/nats-server/v2 v2.6.6-0.20211122213926-f094918f35b8/go.mod h1:n8O5NeknIIQgQld7//20NnQpCe1o5xIjVFxzh7IIZ6Y=
299+
github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
300+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU=
301+
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
301302
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
302303
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
303304
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=

jetstream/provider.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ func Provider() terraform.ResourceProvider {
3232
Optional: true,
3333
Description: "The contents of the NATS 2.0 Credentials file to use",
3434
},
35+
"user": {
36+
Type: schema.TypeString,
37+
Optional: true,
38+
Description: "Connect using an username, used as token when no password is given",
39+
ConflictsWith: []string{"nkey", "credentials", "credential_data"},
40+
},
41+
"password": {
42+
Type: schema.TypeString,
43+
Optional: true,
44+
Description: "Connect using a password",
45+
},
46+
"nkey": {
47+
Type: schema.TypeString,
48+
Optional: true,
49+
Description: "Connect using a NKEY seed stored in a file",
50+
ConflictsWith: []string{"user", "credentials", "credential_data"},
51+
},
3552
"tls": {
3653
Type: schema.TypeSet,
3754
MaxItems: 1,

jetstream/resource_jetstream_consumer.go

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func resourceConsumer() *schema.Resource {
1818
Create: resourceConsumerCreate,
1919
Read: resourceConsumerRead,
2020
Delete: resourceConsumerDelete,
21+
Update: resourceConsumerUpdate,
2122
Importer: &schema.ResourceImporter{
2223
State: schema.ImportStatePassthrough,
2324
},
@@ -34,7 +35,7 @@ func resourceConsumer() *schema.Resource {
3435
Type: schema.TypeString,
3536
Description: "Contains additional information about this consumer",
3637
Optional: true,
37-
ForceNew: true,
38+
ForceNew: false,
3839
},
3940
"durable_name": {
4041
Type: schema.TypeString,
@@ -47,7 +48,7 @@ func resourceConsumer() *schema.Resource {
4748
Type: schema.TypeString,
4849
Description: "The subject where a Push-based consumer will deliver messages",
4950
Optional: true,
50-
ForceNew: true,
51+
ForceNew: false,
5152
},
5253
"stream_sequence": {
5354
Type: schema.TypeInt,
@@ -111,14 +112,14 @@ func resourceConsumer() *schema.Resource {
111112
Description: "Number of seconds to wait for acknowledgement",
112113
Default: 30,
113114
Optional: true,
114-
ForceNew: true,
115+
ForceNew: false,
115116
},
116117
"max_delivery": {
117118
Type: schema.TypeInt,
118119
Description: "Maximum deliveries to attempt for each message",
119120
Default: -1,
120121
Optional: true,
121-
ForceNew: true,
122+
ForceNew: false,
122123
},
123124
"filter_subject": {
124125
Type: schema.TypeString,
@@ -141,7 +142,7 @@ func resourceConsumer() *schema.Resource {
141142
Optional: true,
142143
Default: 0,
143144
ValidateFunc: validation.IntBetween(0, 100),
144-
ForceNew: true,
145+
ForceNew: false,
145146
},
146147
"ratelimit": {
147148
Type: schema.TypeInt,
@@ -157,7 +158,7 @@ func resourceConsumer() *schema.Resource {
157158
Optional: true,
158159
Default: 20000,
159160
ValidateFunc: validation.IntAtLeast(0),
160-
ForceNew: true,
161+
ForceNew: false,
161162
},
162163
"heartbeat": {
163164
Type: schema.TypeInt,
@@ -178,9 +179,16 @@ func resourceConsumer() *schema.Resource {
178179
Description: "The number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored",
179180
Optional: true,
180181
Default: 512,
181-
ForceNew: true,
182+
ForceNew: false,
182183
ValidateFunc: validation.IntAtLeast(0),
183184
},
185+
"headers_only": {
186+
Type: schema.TypeBool,
187+
Description: "When true no message bodies will be delivered only headers",
188+
Optional: true,
189+
Default: false,
190+
ForceNew: false,
191+
},
184192
},
185193
}
186194
}
@@ -198,6 +206,7 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
198206
MaxAckPending: d.Get("max_ack_pending").(int),
199207
FlowControl: d.Get("flow_control").(bool),
200208
Heartbeat: time.Duration(d.Get("heartbeat").(int)) * time.Second,
209+
HeadersOnly: d.Get("headers_only").(bool),
201210
}
202211

203212
if description, ok := d.GetOk("description"); ok {
@@ -256,6 +265,69 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
256265
return cfg, nil
257266
}
258267

268+
func resourceConsumerUpdate(d *schema.ResourceData, m interface{}) error {
269+
stream := d.Get("stream_id").(string)
270+
if stream == "" {
271+
return fmt.Errorf("cannot determine stream name for update")
272+
}
273+
274+
durable := d.Get("durable_name").(string)
275+
if durable == "" {
276+
return fmt.Errorf("cannot determine durable name for update")
277+
}
278+
279+
nc, mgr, err := m.(func() (*nats.Conn, *jsm.Manager, error))()
280+
if err != nil {
281+
return err
282+
}
283+
defer nc.Close()
284+
285+
known, err := mgr.IsKnownConsumer(stream, durable)
286+
if err != nil {
287+
return err
288+
}
289+
if !known {
290+
d.SetId("")
291+
return nil
292+
}
293+
294+
cons, err := mgr.LoadConsumer(stream, durable)
295+
if err != nil {
296+
return err
297+
}
298+
299+
cfg, err := consumerConfigFromResourceData(d)
300+
if err != nil {
301+
return err
302+
}
303+
304+
s := strings.TrimSuffix(cons.SampleFrequency(), "%")
305+
freq, err := strconv.Atoi(s)
306+
if err != nil {
307+
return fmt.Errorf("failed to parse consumer sampling configuration: %v", err)
308+
}
309+
310+
opts := []jsm.ConsumerOption{
311+
jsm.ConsumerDescription(cfg.Description),
312+
jsm.AckWait(cfg.AckWait),
313+
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
314+
jsm.SamplePercent(freq),
315+
jsm.MaxAckPending(uint(cfg.MaxAckPending)),
316+
jsm.MaxWaiting(uint(cfg.MaxWaiting)),
317+
}
318+
319+
if cfg.HeadersOnly {
320+
opts = append(opts, jsm.DeliverHeadersOnly())
321+
}
322+
323+
err = cons.UpdateConfiguration(opts...)
324+
if err != nil {
325+
return err
326+
}
327+
328+
return resourceConsumerRead(d, m)
329+
}
330+
259331
func resourceConsumerCreate(d *schema.ResourceData, m interface{}) error {
260332
cfg, err := consumerConfigFromResourceData(d)
261333
if err != nil {
@@ -333,6 +405,7 @@ func resourceConsumerRead(d *schema.ResourceData, m interface{}) error {
333405
d.Set("flow_control", cons.FlowControl())
334406
d.Set("max_waiting", cons.MaxWaiting())
335407
d.Set("delivery_group", cons.DeliverGroup())
408+
d.Set("headers_only", cons.IsHeadersOnly())
336409

337410
switch cons.DeliverPolicy() {
338411
case api.DeliverAll:

0 commit comments

Comments
 (0)