Skip to content

Commit 891d3a4

Browse files
authored
Merge pull request #66 from nats-io/stream_repub
support stream republish
2 parents 4c38232 + 4059389 commit 891d3a4

File tree

4 files changed

+43
-0
lines changed

4 files changed

+43
-0
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ resource "jetstream_consumer" "ORDERS_NEW" {
196196
* `replicas` - (optional) How many replicas of the data to keep in a clustered environment
197197
* `memory` - (optional) Force the consumer state to be kept in memory rather than inherit the setting from the stream
198198
* `backoff` - (optional) List of durations in Go format that represents a retry time scale for NaK'd messages. A list of durations in seconds
199+
* `republish_source` - (optional) Republish matching messages to `republish_destination`
200+
* `republish_destination` - (optional) The destination to publish messages to
201+
* `republish_headers_only` - (optional) Republish only message headers, no bodies
199202

200203
## jetstream_kv_bucket
201204

docs/resources/jetstream_stream.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,6 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
6060
* `deny_purge` - (optional) Restricts the ability to purge messages from a stream via the API. Cannot be change once set to true (bool)
6161
* `allow_rollup_hdrs` - (optional) Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message (bool)
6262
* `allow_direct` - (optional) Allow higher performance, direct access to get individual messages via the $JS.DS.GET API (bool)
63+
* `republish_source` - (optional) Republish matching messages to `republish_destination`
64+
* `republish_destination` - (optional) The destination to publish messages to
65+
* `republish_headers_only` - (optional) Republish only message headers, no bodies

jetstream/resource_jetstream_stream.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,25 @@ func resourceStream() *schema.Resource {
213213
Optional: true,
214214
Elem: &schema.Resource{Schema: sourceInfo},
215215
},
216+
"republish_source": {
217+
Type: schema.TypeString,
218+
Description: "Republish messages to republish_destination",
219+
ForceNew: false,
220+
Optional: true,
221+
},
222+
"republish_destination": {
223+
Type: schema.TypeString,
224+
Description: "The destination to publish messages to",
225+
ForceNew: false,
226+
Optional: true,
227+
RequiredWith: []string{"republish_source"},
228+
},
229+
"republish_headers_only": {
230+
Type: schema.TypeBool,
231+
Description: "Republish only message headers, no bodies",
232+
ForceNew: false,
233+
Optional: true,
234+
},
216235
},
217236
}
218237
}
@@ -340,6 +359,13 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
340359
}
341360
}
342361
}
362+
363+
if str.IsRepublishing() {
364+
d.Set("republish_source", str.Republish().Source)
365+
d.Set("republish_destination", str.Republish().Destination)
366+
d.Set("republish_headers_only", str.Republish().HeadersOnly)
367+
}
368+
343369
return nil
344370
}
345371

jetstream/util.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,17 @@ func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig,
162162
Placement: placement,
163163
}
164164

165+
repubSrc := d.Get("republish_source").(string)
166+
repubDest := d.Get("republish_destination").(string)
167+
repubHdrs := d.Get("republish_headers_only").(bool)
168+
if repubSrc != "" {
169+
stream.RePublish = &api.RePublish{
170+
Source: repubSrc,
171+
Destination: repubDest,
172+
HeadersOnly: repubHdrs,
173+
}
174+
}
175+
165176
if description, ok := d.GetOk("description"); ok {
166177
stream.Description = description.(string)
167178
}

0 commit comments

Comments
 (0)