Skip to content

Commit 6c11e1e

Browse files
authored
Merge pull request #99 from nats-io/stream_subject_transforms
Support subject transforms on streams
2 parents ebe784e + aef9172 commit 6c11e1e

File tree

7 files changed

+358
-10
lines changed

7 files changed

+358
-10
lines changed

ABTaskFile

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Install https://choria-io.github.io/appbuilder/ and run `abt` to use this file
2+
3+
name: build_tasks
4+
description: NATS Developer Commands
5+
6+
commands:
7+
- name: dependencies
8+
type: parent
9+
description: Manage dependencies
10+
aliases: [d]
11+
commands:
12+
- name: update
13+
description: Update dependencies
14+
type: exec
15+
aliases: [up]
16+
flags:
17+
- name: verbose
18+
description: Log verbosely
19+
bool: true
20+
script: |
21+
. "{{ BashHelperPath }}"
22+
23+
ab_announce Updating all dependencies
24+
25+
go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./...
26+
27+
go mod tidy
28+
29+
- name: test
30+
type: parent
31+
aliases: [t]
32+
description: Perform various tests
33+
commands:
34+
- name: unit
35+
type: exec
36+
description: Run unit tests
37+
aliases: [u]
38+
dir: "{{ AppDir }}"
39+
environment:
40+
- "TF_ACC=1"
41+
script: go test -v --failfast -p=1 ./...
42+
43+
- name: lint
44+
type: exec
45+
dir: "{{ AppDir }}"
46+
flags:
47+
- name: vet
48+
description: Perform go vet
49+
bool: true
50+
default: true
51+
- name: staticcheck
52+
description: Perform staticcheck
53+
bool: true
54+
default: true
55+
- name: update
56+
description: Updates lint dependencies
57+
bool: true
58+
script: |
59+
set -e
60+
61+
. "{{ BashHelperPath }}"
62+
63+
{{ if .Flags.update }}
64+
ab_say Updating linting tools
65+
go install github.com/client9/misspell/cmd/misspell@latest
66+
go install honnef.co/go/tools/cmd/staticcheck@latest
67+
{{ else }}
68+
echo ">>> Run with --update to install required commands"
69+
echo
70+
{{ end }}
71+
72+
ab_say Formatting source files
73+
go fmt ./...
74+
75+
ab_say Tidying go mod
76+
go mod tidy
77+
78+
ab_say Checking spelling
79+
find . -type f -name "*.go" | xargs misspell -error -locale US
80+
81+
{{ if .Flags.vet }}
82+
ab_say Performing go vet
83+
go vet ./...
84+
{{ end }}
85+
86+
{{ if .Flags.staticcheck }}
87+
ab_say Running staticcheck
88+
staticcheck ./...
89+
{{ end }}
90+
91+
- name: build
92+
type: parent
93+
aliases: [b]
94+
description: Code build steps
95+
commands:
96+
- name: binary
97+
aliases: [bin]
98+
description: Build a basic test binary
99+
type: exec
100+
dir: "{{ TaskDir }}/nats"
101+
banner: |
102+
>>>
103+
>>> Building 'nats' locally {{ if .Flags.target }}for target '{{ .Flags.target }}'{{ end }}
104+
>>>
105+
flags:
106+
- name: target
107+
description: Target platform to build for
108+
enum: ["linux/amd64"]
109+
short: T
110+
script: |
111+
{{ if eq .Flags.target "linux/amd64" }}
112+
export GOOS=linux
113+
export GOARCH=amd64
114+
{{ end }}
115+
116+
go build -o nats
117+
118+
ls -l nats
119+
120+
- name: snapshot
121+
description: Goreleaser snapshot
122+
aliases: [snap]
123+
type: exec
124+
dir: "{{ TaskDir }}"
125+
script: |
126+
goreleaser release --snapshot --clean

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ go 1.20
55
require (
66
github.com/google/go-cmp v0.5.9
77
github.com/hashicorp/terraform-plugin-sdk v1.17.2
8-
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8
8+
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d
99
github.com/nats-io/jwt/v2 v2.5.2
1010
github.com/nats-io/nats-server/v2 v2.10.1
11-
github.com/nats-io/nats.go v1.30.0
11+
github.com/nats-io/nats.go v1.30.2
1212
github.com/xeipuuv/gojsonschema v1.2.0
1313
)
1414

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,14 +494,14 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
494494
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
495495
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
496496
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
497-
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8 h1:OKm9e1//rlcl4i9zXQ6QQxj7DJaeL+Oe8WBgAKO4cqI=
498-
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk=
497+
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d h1:W1vFAseJ8J2315SXVyIMCePKm1P7ucA2CDBubxSNHxg=
498+
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk=
499499
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
500500
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
501501
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
502502
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
503-
github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc=
504-
github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
503+
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
504+
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
505505
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
506506
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
507507
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

jetstream/resource_jetstream_stream.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@ import (
1212
)
1313

1414
func resourceStream() *schema.Resource {
15+
subjectTransform := map[string]*schema.Schema{
16+
"source": {
17+
Type: schema.TypeString,
18+
Description: "The subject transform source",
19+
Required: true,
20+
},
21+
"destination": {
22+
Type: schema.TypeString,
23+
Description: "The subject transform destination",
24+
Required: true,
25+
},
26+
}
27+
1528
sourceInfo := map[string]*schema.Schema{
1629
"name": {
1730
Type: schema.TypeString,
@@ -34,6 +47,14 @@ func resourceStream() *schema.Resource {
3447
Description: "Only copy messages matching a specific subject, not usable for mirrors",
3548
Optional: true,
3649
},
50+
"subject_transform": {
51+
Type: schema.TypeList,
52+
Description: "The subject filtering sources and associated destination transforms",
53+
Optional: true,
54+
ForceNew: false,
55+
Required: false,
56+
Elem: &schema.Resource{Schema: subjectTransform},
57+
},
3758
"external": {
3859
Type: schema.TypeList,
3960
MaxItems: 1,
@@ -225,6 +246,15 @@ func resourceStream() *schema.Resource {
225246
Type: schema.TypeString,
226247
},
227248
},
249+
"subject_transform": {
250+
Type: schema.TypeList,
251+
Description: "Subject transform to apply to matching messages",
252+
MaxItems: 1,
253+
ForceNew: false,
254+
Required: false,
255+
Optional: true,
256+
Elem: &schema.Resource{Schema: subjectTransform},
257+
},
228258
"mirror": {
229259
Type: schema.TypeList,
230260
Description: "Specifies a remote stream to mirror into this one",
@@ -382,6 +412,10 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
382412
d.Set("mirror.0.external.api", mirror.External.ApiPrefix)
383413
d.Set("mirror.0.external.deliver", mirror.External.DeliverPrefix)
384414
}
415+
for c, v := range mirror.SubjectTransforms {
416+
d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.src", c), v.Source)
417+
d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.dest", c), v.Destination)
418+
}
385419
}
386420

387421
if str.IsSourced() {
@@ -393,8 +427,12 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
393427
d.Set(fmt.Sprintf("source.%d.start_time", i), source.OptStartTime.Format(time.RFC3339))
394428
}
395429
if source.External != nil {
396-
d.Set(fmt.Sprintf("mirror.%d.external.api", i), source.External.ApiPrefix)
397-
d.Set(fmt.Sprintf("mirror.%d.external.deliver", i), source.External.DeliverPrefix)
430+
d.Set(fmt.Sprintf("source.%d.external.api", i), source.External.ApiPrefix)
431+
d.Set(fmt.Sprintf("source.%d.external.deliver", i), source.External.DeliverPrefix)
432+
}
433+
for c, v := range source.SubjectTransforms {
434+
d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.src", i, c), v.Source)
435+
d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.dest", i, c), v.Destination)
398436
}
399437
}
400438
}

jetstream/resource_jetstream_stream_test.go

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
88
"github.com/nats-io/jsm.go"
9+
"github.com/nats-io/jsm.go/api"
910
"github.com/nats-io/nats.go"
1011
)
1112

@@ -61,6 +62,74 @@ resource "jetstream_stream" "test" {
6162
}
6263
`
6364

65+
const typeStreamConfigMirrorTransformed = `
66+
provider "jetstream" {
67+
servers = "%s"
68+
}
69+
70+
resource "jetstream_stream" "other" {
71+
name = "OTHER"
72+
subjects = ["js.in.OTHER.>"]
73+
}
74+
75+
resource "jetstream_stream" "mirror_transform_test" {
76+
name = "MIRROR_TRANSFORM_TEST"
77+
description = "typeStreamConfigMirrorTransformed"
78+
mirror {
79+
name = "OTHER"
80+
start_seq = 11
81+
82+
subject_transform {
83+
source = "js.in.OTHER.1.>"
84+
destination = "1.>"
85+
}
86+
87+
subject_transform {
88+
source = "js.in.OTHER.2.>"
89+
destination = "2.>"
90+
}
91+
}
92+
}
93+
`
94+
95+
const typeStreamConfigSourcesTransformed = `
96+
provider "jetstream" {
97+
servers = "%s"
98+
}
99+
100+
resource "jetstream_stream" "other1" {
101+
name = "OTHER1"
102+
subjects = ["js.in.OTHER1.>"]
103+
}
104+
105+
resource "jetstream_stream" "other2" {
106+
name = "OTHER2"
107+
subjects = ["js.in.OTHER2.>"]
108+
}
109+
110+
resource "jetstream_stream" "source_transform_test" {
111+
name = "SOURCE_TRANSFORM_TEST"
112+
description = "typeStreamConfigSourcesTransformed"
113+
source {
114+
name = "OTHER1"
115+
116+
subject_transform {
117+
source = "js.in.OTHER1.>"
118+
destination = "1.>"
119+
}
120+
}
121+
122+
source {
123+
name = "OTHER2"
124+
125+
subject_transform {
126+
source = "js.in.OTHER2.>"
127+
destination = "2.>"
128+
}
129+
}
130+
}
131+
`
132+
64133
const testStreamConfigSources = `
65134
provider "jetstream" {
66135
servers = "%s"
@@ -106,8 +175,12 @@ func TestResourceStream(t *testing.T) {
106175
}
107176

108177
resource.Test(t, resource.TestCase{
109-
Providers: testJsProviders,
110-
CheckDestroy: testStreamDoesNotExist(t, mgr, "TEST"),
178+
Providers: testJsProviders,
179+
CheckDestroy: resource.ComposeTestCheckFunc(
180+
testStreamDoesNotExist(t, mgr, "TEST"),
181+
testStreamDoesNotExist(t, mgr, "OTHER"),
182+
testStreamDoesNotExist(t, mgr, "OTHER1"),
183+
testStreamDoesNotExist(t, mgr, "OTHER2")),
111184
Steps: []resource.TestStep{
112185
{
113186
Config: fmt.Sprintf(testStreamConfigBasic, nc.ConnectedUrl()),
@@ -145,6 +218,37 @@ func TestResourceStream(t *testing.T) {
145218
testStreamHasSubjects(t, mgr, "TEST", []string{}),
146219
),
147220
},
221+
{
222+
Config: fmt.Sprintf(typeStreamConfigMirrorTransformed, nc.ConnectedUrl()),
223+
Check: resource.ComposeTestCheckFunc(
224+
testStreamExist(t, mgr, "MIRROR_TRANSFORM_TEST"),
225+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.name", "OTHER"),
226+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.start_seq", "11"),
227+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.source", "js.in.OTHER.1.>"),
228+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.destination", "1.>"),
229+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.source", "js.in.OTHER.2.>"),
230+
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.destination", "2.>"),
231+
testStreamIsMirrorOf(t, mgr, "MIRROR_TRANSFORM_TEST", "OTHER"),
232+
testStreamIsMirrorTransformed(t, mgr, "MIRROR_TRANSFORM_TEST", api.SubjectTransformConfig{Source: "js.in.OTHER.1.>", Destination: "1.>"}),
233+
testStreamHasSubjects(t, mgr, "MIRROR_TRANSFORM_TEST", []string{}),
234+
),
235+
},
236+
{
237+
Config: fmt.Sprintf(typeStreamConfigSourcesTransformed, nc.ConnectedUrl()),
238+
Check: resource.ComposeTestCheckFunc(
239+
testStreamExist(t, mgr, "SOURCE_TRANSFORM_TEST"),
240+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.name", "OTHER1"),
241+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.source", "js.in.OTHER1.>"),
242+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.destination", "1.>"),
243+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.name", "OTHER2"),
244+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.source", "js.in.OTHER2.>"),
245+
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.destination", "2.>"),
246+
testStreamIsSourceOf(t, mgr, "SOURCE_TRANSFORM_TEST", []string{"OTHER1", "OTHER2"}),
247+
testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER1", api.SubjectTransformConfig{Source: "js.in.OTHER1.>", Destination: "1.>"}),
248+
testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER2", api.SubjectTransformConfig{Source: "js.in.OTHER2.>", Destination: "2.>"}),
249+
testStreamHasSubjects(t, mgr, "SOURCE_TRANSFORM_TEST", []string{}),
250+
),
251+
},
148252
{
149253
Config: fmt.Sprintf(testStreamConfigSources, nc.ConnectedUrl()),
150254
Check: resource.ComposeTestCheckFunc(

jetstream/util.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ func streamSourceFromResourceData(d any) ([]*api.StreamSource, error) {
112112
}
113113
}
114114

115+
transforms := s["subject_transform"].([]any)
116+
if len(transforms) > 0 {
117+
for _, transform := range transforms {
118+
st := transform.(map[string]any)
119+
source.SubjectTransforms = append(source.SubjectTransforms, api.SubjectTransformConfig{
120+
Source: st["source"].(string),
121+
Destination: st["destination"].(string),
122+
})
123+
}
124+
}
125+
115126
res = append(res, source)
116127
}
117128

0 commit comments

Comments
 (0)