Skip to content

Commit aeca4d3

Browse files
committed
[FIX]: filter_subject and filter_subjects are exclusive
1 parent 96d66f3 commit aeca4d3

File tree

4 files changed

+46
-7
lines changed

4 files changed

+46
-7
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
187187
* `delivery_group` - (optional) When set Push consumers will only deliver messages to subscriptions with this group set
188188
* `durable_name` - The durable name of the Consumer
189189
* `filter_subject` - (optional) Only receive a subset of messages from the Stream based on the subject they entered the Stream on
190-
* `filter_subjects` - (optional) Only receive a subset tof messages from the Stream based on subjects they entered the Stream on. Only works with v2.10 or better.
190+
* `filter_subjects` - (optional) Only receive a subset tof messages from the Stream based on subjects they entered the Stream on. This is exclusive to `filter_subject`. Only works with v2.10 or better.
191191
* `max_delivery` - (optional) Maximum deliveries to attempt for each message
192192
* `replay_policy` - (optional) The rate at which messages will be replayed from the stream
193193
* `sample_freq` - (optional) The percentage of acknowledgements that will be sampled for observability purposes

docs/resources/jetstream_consumer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
3333
* `delivery_group` - (optional) When set Push consumers will only deliver messages to subscriptions with this group set
3434
* `durable_name` - The durable name of the Consumer
3535
* `filter_subject` - (optional) Only receive a subset of messages from the Stream based on the subject they entered the Stream on
36+
* `filter_subjects` - (optional) Only receive a subset tof messages from the Stream based on subjects they entered the Stream on. This is exclusive to `filter_subject`. Only works with v2.10 or better.
3637
* `max_delivery` - (optional) Maximum deliveries to attempt for each message
3738
* `replay_policy` - (optional) The rate at which messages will be replayed from the stream
3839
* `sample_freq` - (optional) The percentage of acknowledgements that will be sampled for observability purposes

jetstream/resource_jetstream_consumer.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,11 @@ func resourceConsumer() *schema.Resource {
136136
Default: "",
137137
Optional: true,
138138
ForceNew: false,
139+
ConflictsWith: []string{"filter_subjects"},
139140
},
140141
"filter_subjects": {
141142
Type: schema.TypeList,
142-
Description: "Only receive a subset of messages from the stream baseed on the subjects they entered the Streeam on. Only works with nats-server v2.10+",
143+
Description: "Only receive a subset of messages from the stream baseed on the subjects they entered the Streeam on, exlusive to filter_subject and works with nats-server v2.10 or better",
143144
Optional: true,
144145
ForceNew: false,
145146
Elem: &schema.Schema{Type: schema.TypeString},
@@ -336,12 +337,18 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
336337

337338
fs, ok := d.GetOk("filter_subjects")
338339
if ok {
339-
ar := fs.([]any)
340-
var subjects = make([]string, len(ar))
341-
for i, v := range ar {
342-
subjects[i] = v.(string)
340+
ns := fs.([]any)
341+
if len(ns) == 1 {
342+
cfg.FilterSubject = ns[0].(string)
343+
cfg.FilterSubjects = nil
344+
} else if len(ns) > 1 {
345+
var subjects = make([]string, len(ns))
346+
for i, v := range ns {
347+
subjects[i] = v.(string)
348+
}
349+
cfg.FilterSubjects = subjects
350+
cfg.FilterSubject = ""
343351
}
344-
cfg.FilterSubjects = subjects
345352
}
346353

347354
m, ok := d.GetOk("metadata")
@@ -415,6 +422,8 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error {
415422
opts := []jsm.ConsumerOption{
416423
jsm.ConsumerDescription(cfg.Description),
417424
jsm.ConsumerMetadata(cfg.Metadata),
425+
jsm.FilterStreamBySubject(cfg.FilterSubject),
426+
jsm.FilterStreamBySubject(cfg.FilterSubjects...),
418427
jsm.AckWait(cfg.AckWait),
419428
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
420429
jsm.SamplePercent(freq),

jetstream/resource_jetstream_consumer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,25 @@ resource "jetstream_consumer" "TEST_C2" {
5353
}
5454
`
5555

56+
const testConsumerConfig_singleSubject = `
57+
provider "jetstream" {
58+
servers = "%s"
59+
}
60+
61+
resource "jetstream_stream" "test" {
62+
name = "TEST"
63+
subjects = ["TEST.*"]
64+
}
65+
66+
resource "jetstream_consumer" "TEST_C3" {
67+
stream_id = jetstream_stream.test.id
68+
durable_name = "C3"
69+
stream_sequence = 10
70+
max_ack_pending = 20
71+
filter_subject = "TEST.a"
72+
}
73+
`
74+
5675
func TestResourceConsumer(t *testing.T) {
5776
srv := createJSServer(t)
5877
defer srv.Shutdown()
@@ -108,6 +127,16 @@ func TestResourceConsumer(t *testing.T) {
108127
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C2", "inactive_threshold", "0"),
109128
),
110129
},
130+
{
131+
Config: fmt.Sprintf(testConsumerConfig_singleSubject, nc.ConnectedUrl()),
132+
Check: resource.ComposeTestCheckFunc(
133+
testStreamExist(t, mgr, "TEST"),
134+
testConsumerExist(t, mgr, "TEST", "C3"),
135+
testConsumerHasFilterSubjects(t, mgr, "TEST", "C3", []string{}),
136+
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C3", "filter_subject", "TEST.a"),
137+
),
138+
},
139+
111140
},
112141
})
113142
}

0 commit comments

Comments
 (0)