Skip to content

Commit 419bd99

Browse files
authored
Merge pull request #104 from iapain/main
[FEAT]: adds FilterSubjects
2 parents bf48d0e + c2082b2 commit 419bd99

File tree

5 files changed

+79
-3
lines changed

5 files changed

+79
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
187187
* `delivery_group` - (optional) When set Push consumers will only deliver messages to subscriptions with this group set
188188
* `durable_name` - The durable name of the Consumer
189189
* `filter_subject` - (optional) Only receive a subset of messages from the Stream based on the subject they entered the Stream on
190+
* `filter_subjects` - (optional) Only receive a subset tof messages from the Stream based on subjects they entered the Stream on. This is exclusive to `filter_subject`. Only works with v2.10 or better.
190191
* `max_delivery` - (optional) Maximum deliveries to attempt for each message
191192
* `replay_policy` - (optional) The rate at which messages will be replayed from the stream
192193
* `sample_freq` - (optional) The percentage of acknowledgements that will be sampled for observability purposes

docs/resources/jetstream_consumer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
3333
* `delivery_group` - (optional) When set Push consumers will only deliver messages to subscriptions with this group set
3434
* `durable_name` - The durable name of the Consumer
3535
* `filter_subject` - (optional) Only receive a subset of messages from the Stream based on the subject they entered the Stream on
36+
* `filter_subjects` - (optional) Only receive a subset tof messages from the Stream based on subjects they entered the Stream on. This is exclusive to `filter_subject`. Only works with v2.10 or better.
3637
* `max_delivery` - (optional) Maximum deliveries to attempt for each message
3738
* `replay_policy` - (optional) The rate at which messages will be replayed from the stream
3839
* `sample_freq` - (optional) The percentage of acknowledgements that will be sampled for observability purposes

jetstream/resource_jetstream_consumer.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,19 @@ func resourceConsumer() *schema.Resource {
131131
ForceNew: false,
132132
},
133133
"filter_subject": {
134-
Type: schema.TypeString,
135-
Description: "Only receive a subset of messages from the Stream based on the subject they entered the Stream on",
136-
Default: "",
134+
Type: schema.TypeString,
135+
Description: "Only receive a subset of messages from the Stream based on the subject they entered the Stream on",
136+
Default: "",
137+
Optional: true,
138+
ForceNew: false,
139+
ConflictsWith: []string{"filter_subjects"},
140+
},
141+
"filter_subjects": {
142+
Type: schema.TypeList,
143+
Description: "Only receive a subset of messages from the stream baseed on the subjects they entered the Streeam on, exlusive to filter_subject and works with nats-server v2.10 or better",
137144
Optional: true,
138145
ForceNew: false,
146+
Elem: &schema.Schema{Type: schema.TypeString},
139147
},
140148
"replay_policy": {
141149
Type: schema.TypeString,
@@ -327,6 +335,22 @@ func consumerConfigFromResourceData(d *schema.ResourceData) (cfg api.ConsumerCon
327335
cfg.AckPolicy = api.AckNone
328336
}
329337

338+
fs, ok := d.GetOk("filter_subjects")
339+
if ok {
340+
ns := fs.([]any)
341+
if len(ns) == 1 {
342+
cfg.FilterSubject = ns[0].(string)
343+
cfg.FilterSubjects = nil
344+
} else if len(ns) > 1 {
345+
var subjects = make([]string, len(ns))
346+
for i, v := range ns {
347+
subjects[i] = v.(string)
348+
}
349+
cfg.FilterSubjects = subjects
350+
cfg.FilterSubject = ""
351+
}
352+
}
353+
330354
m, ok := d.GetOk("metadata")
331355
if ok {
332356
mt, ok := m.(map[string]any)
@@ -398,6 +422,8 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error {
398422
opts := []jsm.ConsumerOption{
399423
jsm.ConsumerDescription(cfg.Description),
400424
jsm.ConsumerMetadata(cfg.Metadata),
425+
jsm.FilterStreamBySubject(cfg.FilterSubject),
426+
jsm.FilterStreamBySubject(cfg.FilterSubjects...),
401427
jsm.AckWait(cfg.AckWait),
402428
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
403429
jsm.SamplePercent(freq),
@@ -513,6 +539,7 @@ func resourceConsumerRead(d *schema.ResourceData, m any) error {
513539
d.Set("ack_wait", cons.AckWait().Seconds())
514540
d.Set("max_delivery", cons.MaxDeliver())
515541
d.Set("filter_subject", cons.FilterSubject())
542+
d.Set("filter_subjects", cons.FilterSubjects())
516543
d.Set("stream_sequence", 0)
517544
d.Set("start_time", "")
518545
d.Set("ratelimit", cons.RateLimit())

jetstream/resource_jetstream_consumer_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,26 @@ resource "jetstream_consumer" "TEST_C2" {
4949
durable_name = "C2"
5050
stream_sequence = 10
5151
max_ack_pending = 20
52+
filter_subjects = ["TEST.a", "TEST.b"]
53+
}
54+
`
55+
56+
const testConsumerConfig_singleSubject = `
57+
provider "jetstream" {
58+
servers = "%s"
59+
}
60+
61+
resource "jetstream_stream" "test" {
62+
name = "TEST"
63+
subjects = ["TEST.*"]
64+
}
65+
66+
resource "jetstream_consumer" "TEST_C3" {
67+
stream_id = jetstream_stream.test.id
68+
durable_name = "C3"
69+
stream_sequence = 10
70+
max_ack_pending = 20
71+
filter_subject = "TEST.a"
5272
}
5373
`
5474

@@ -101,11 +121,21 @@ func TestResourceConsumer(t *testing.T) {
101121
Check: resource.ComposeTestCheckFunc(
102122
testStreamExist(t, mgr, "TEST"),
103123
testConsumerExist(t, mgr, "TEST", "C2"),
124+
testConsumerHasFilterSubjects(t, mgr, "TEST", "C2", []string{"TEST.a", "TEST.b"}),
104125
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C2", "stream_sequence", "10"),
105126
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C2", "max_ack_pending", "20"),
106127
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C2", "inactive_threshold", "0"),
107128
),
108129
},
130+
{
131+
Config: fmt.Sprintf(testConsumerConfig_singleSubject, nc.ConnectedUrl()),
132+
Check: resource.ComposeTestCheckFunc(
133+
testStreamExist(t, mgr, "TEST"),
134+
testConsumerExist(t, mgr, "TEST", "C3"),
135+
testConsumerHasFilterSubjects(t, mgr, "TEST", "C3", []string{}),
136+
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C3", "filter_subject", "TEST.a"),
137+
),
138+
},
109139
},
110140
})
111141
}

jetstream/util_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,23 @@ func testConsumerHasMetadata(t *testing.T, mgr *jsm.Manager, stream string, cons
4141
}
4242
}
4343

44+
func testConsumerHasFilterSubjects(t *testing.T, mgr *jsm.Manager, stream string, consumer string, subjects []string) resource.TestCheckFunc {
45+
return func(s *terraform.State) error {
46+
cons, err := mgr.LoadConsumer(stream, consumer)
47+
if err != nil {
48+
return err
49+
}
50+
if len(cons.FilterSubjects()) == 0 && len(subjects) == 0 {
51+
return nil
52+
}
53+
if cmp.Equal(cons.FilterSubjects(), subjects) {
54+
return nil
55+
}
56+
57+
return fmt.Errorf("expected %q got %q", subjects, cons.FilterSubjects())
58+
}
59+
}
60+
4461
func testStreamHasSubjects(t *testing.T, mgr *jsm.Manager, stream string, subjects []string) resource.TestCheckFunc {
4562
return func(s *terraform.State) error {
4663
str, err := mgr.LoadStream(stream)

0 commit comments

Comments
 (0)