Skip to content

Commit e4f34cf

Browse files
authored
Merge pull request #1553 from ripienaar/durable_and_direct
Use durables for interest and wq streams as well
2 parents c2db7ef + c63b41c commit e4f34cf

File tree

4 files changed

+56
-12
lines changed

4 files changed

+56
-12
lines changed

cli/sub_command.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,12 @@ func (c *subCmd) validateInputs(ctx context.Context, nc *nats.Conn, mgr *jsm.Man
368368
return err
369369
}
370370

371+
if c.direct && c.durable != "" {
372+
return fmt.Errorf("cannot use direct get when a durable name is supplied")
373+
}
374+
371375
config := c.streamObj.CachedInfo().Config
372-
c.direct = c.direct || config.Retention == jetstream.WorkQueuePolicy || config.Retention == jetstream.InterestPolicy
376+
c.direct = c.durable == "" && (c.direct || config.Retention == jetstream.WorkQueuePolicy || config.Retention == jetstream.InterestPolicy)
373377

374378
if c.direct {
375379
if len(c.subjects) > 1 {

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ require (
1515
github.com/google/go-cmp v0.7.0
1616
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
1717
github.com/gosuri/uiprogress v0.0.1
18-
github.com/jedib0t/go-pretty/v6 v6.6.9
18+
github.com/jedib0t/go-pretty/v6 v6.7.0
1919
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
2020
github.com/klauspost/compress v1.18.1
2121
github.com/nats-io/jsm.go v0.3.1-0.20251103105609-f4ade0fffe41
2222
github.com/nats-io/jwt/v2 v2.8.0
23-
github.com/nats-io/nats-server/v2 v2.12.1
24-
github.com/nats-io/nats.go v1.47.0
23+
github.com/nats-io/nats-server/v2 v2.12.1-0.20251107115401-d458bfe6142f
24+
github.com/nats-io/nats.go v1.47.1-0.20251107121445-bc5748093780
2525
github.com/nats-io/nkeys v0.4.11
2626
github.com/nats-io/nuid v1.0.1
2727
github.com/prometheus/client_golang v1.23.2
@@ -67,6 +67,7 @@ require (
6767
github.com/shopspring/decimal v1.4.0 // indirect
6868
github.com/spf13/cast v1.10.0 // indirect
6969
github.com/synadia-io/orbit.go/natsext v0.1.1 // indirect
70+
go.uber.org/automaxprocs v1.6.0 // indirect
7071
go.yaml.in/yaml/v2 v2.4.3 // indirect
7172
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
7273
golang.org/x/net v0.46.0 // indirect

go.sum

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u
7272
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68=
7373
github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI=
7474
github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
75-
github.com/jedib0t/go-pretty/v6 v6.6.9 h1:PQecJLK3L8ODuVyMe2223b61oRJjrKnmXAncbWTv9MY=
76-
github.com/jedib0t/go-pretty/v6 v6.6.9/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU=
75+
github.com/jedib0t/go-pretty/v6 v6.7.0 h1:DanoN1RnjXTwDN+B8yqtixXzXqNBCs2Vxo2ARsnrpsY=
76+
github.com/jedib0t/go-pretty/v6 v6.7.0/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU=
7777
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
7878
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
7979
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
@@ -109,20 +109,18 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
109109
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
110110
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
111111
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
112-
github.com/nats-io/jsm.go v0.3.1-0.20251022125454-e0d9a790a753 h1:IphLag06ffJzMcXjdLUsz3pZ8HDcNOdH22vgkKA7F+s=
113-
github.com/nats-io/jsm.go v0.3.1-0.20251022125454-e0d9a790a753/go.mod h1:PsAEriiIt8f8/G6qaDt24J913QEOwsds4dKE4MW4rgc=
114-
github.com/nats-io/jsm.go v0.3.1-0.20251103100619-589746489329 h1:w0vdlBCgW38rcR7dytXuDq5nj+pmgfeGml3no7F/I0A=
115-
github.com/nats-io/jsm.go v0.3.1-0.20251103100619-589746489329/go.mod h1:vIEz/CZ61EktDu/5XkH46bnN1ku1j+pStDwwEPhy8so=
116-
github.com/nats-io/jsm.go v0.3.1-0.20251103104243-0be8e60862a2 h1:U8rkI4XHISmdIZgPDWVDIqnW0absA6IL7/gMo8WfLVk=
117-
github.com/nats-io/jsm.go v0.3.1-0.20251103104243-0be8e60862a2/go.mod h1:vIEz/CZ61EktDu/5XkH46bnN1ku1j+pStDwwEPhy8so=
118112
github.com/nats-io/jsm.go v0.3.1-0.20251103105609-f4ade0fffe41 h1:S81sizZiW7pauAwQfkdza92THqKudq3n2sk7+WsFyHU=
119113
github.com/nats-io/jsm.go v0.3.1-0.20251103105609-f4ade0fffe41/go.mod h1:vIEz/CZ61EktDu/5XkH46bnN1ku1j+pStDwwEPhy8so=
120114
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
121115
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
116+
github.com/nats-io/nats-server/v2 v2.12.1-0.20251107115401-d458bfe6142f h1:bnAueejTYz9myWfGSNtlfhBgBfwhLTE/2/obZiGeIbk=
117+
github.com/nats-io/nats-server/v2 v2.12.1-0.20251107115401-d458bfe6142f/go.mod h1:44O2kuwyjKoAsqUL0v5dg4VEi/36Fr52/WlTjnhuWeY=
122118
github.com/nats-io/nats-server/v2 v2.12.1 h1:0tRrc9bzyXEdBLcHr2XEjDzVpUxWx64aZBm7Rl1QDrA=
123119
github.com/nats-io/nats-server/v2 v2.12.1/go.mod h1:OEaOLmu/2e6J9LzUt2OuGjgNem4EpYApO5Rpf26HDs8=
124120
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
125121
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
122+
github.com/nats-io/nats.go v1.47.1-0.20251107121445-bc5748093780 h1:1W5T0ga8fpDklzjM4YrqBhPDUEnmQ+909t1SoVN+Fwk=
123+
github.com/nats-io/nats.go v1.47.1-0.20251107121445-bc5748093780/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
126124
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
127125
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
128126
github.com/nats-io/nsc/v2 v2.12.0 h1:YCs8axEfQkbVLZDuYF4V6aJvPrDmlKJe8mWV+Rgqrzo=

nats/tests/sub_command_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,19 @@ func TestJetStreamSubscribe(t *testing.T) {
799799
})
800800
})
801801

802+
t.Run("--durable and --direct", func(t *testing.T) {
803+
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
804+
createDefaultTestStream(t, mgr, 1)
805+
806+
_, err := runNatsCliWithInput(t, "", fmt.Sprintf("--server='%s' sub --stream TEST_STREAM --durable TEST --direct", srv.ClientURL()))
807+
if !strings.Contains(err.Error(), "cannot use direct get when a durable name is supplied") {
808+
t.Fatalf("expected durable+direct error, got %v", err)
809+
}
810+
811+
return nil
812+
})
813+
})
814+
802815
t.Run("--durable with push", func(t *testing.T) {
803816
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
804817
createDefaultTestStream(t, mgr, 1)
@@ -829,6 +842,34 @@ func TestJetStreamSubscribe(t *testing.T) {
829842
})
830843
})
831844

845+
t.Run("--durable with interest streams", func(t *testing.T) {
846+
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
847+
createDefaultTestStream(t, mgr, 1, jsm.InterestRetention())
848+
849+
js, err := jetstream.New(nc)
850+
checkErr(t, err, "unable to create jetstream context")
851+
852+
_, err = js.CreateConsumer(context.TODO(), "TEST_STREAM", jetstream.ConsumerConfig{
853+
Durable: "TEST_PUSH",
854+
AckPolicy: jetstream.AckExplicitPolicy,
855+
DeliverSubject: nats.NewInbox(),
856+
DeliverGroup: "X",
857+
})
858+
checkErr(t, err, "unable to create consumer")
859+
860+
_, err = js.PublishMsg(context.TODO(), defaultTestMsg)
861+
checkErr(t, err, "unable to publish message")
862+
863+
output := string(runNatsCli(t, fmt.Sprintf("--server='%s' sub --stream TEST_STREAM --durable=TEST_PUSH --last --count=1", srv.ClientURL())))
864+
if !expectMatchLine(t, output, "Subscribing to JetStream Stream \"TEST_STREAM\" using existing push consumer \"TEST_PUSH\"") ||
865+
!expectMatchLine(t, output, primaryTestMsgData) {
866+
t.Errorf("unexpected response: %s", output)
867+
}
868+
869+
return nil
870+
})
871+
})
872+
832873
t.Run("--headers-only", func(t *testing.T) {
833874
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
834875
createDefaultTestStream(t, mgr, 1)

0 commit comments

Comments
 (0)