diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index edf941522..87d0aba7c 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -160,6 +160,12 @@ public async ValueTask> TryPublishAsync( headers ??= new NatsHeaders(); headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); } + + if (opts.ExpectedLastSubjectSequenceSubject != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Subject-Sequence-Subject"] = opts.ExpectedLastSubjectSequenceSubject; + } } opts ??= NatsJSPubOpts.Default; @@ -301,6 +307,12 @@ public async ValueTask PublishConcurrentAsync( headers ??= new NatsHeaders(); headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); } + + if (opts.ExpectedLastSubjectSequenceSubject != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Subject-Sequence-Subject"] = opts.ExpectedLastSubjectSequenceSubject; + } } opts ??= NatsJSPubOpts.Default; diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index dc40439f5..f51730dba 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -238,6 +238,9 @@ public record NatsJSPubOpts : NatsPubOpts // lss *uint64 // Expected last sequence per subject public ulong? ExpectedLastSubjectSequence { get; init; } + // Expected last sequence subject filter (allows wildcard subjects) + public string? ExpectedLastSubjectSequenceSubject { get; init; } + // Publish retries for NoResponders err. // rwait time.Duration // Retry wait between attempts public TimeSpan RetryWaitBetweenAttempts { get; init; } = TimeSpan.FromMilliseconds(250); diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index d40e392a8..b350a1b52 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -180,6 +180,64 @@ public async Task Publish_test(NatsRequestReplyMode mode) } } + [SkipIfNatsServer(versionEarlierThan: "2.12")] + public async Task Publish_expected_last_subject_sequence_subject_test() + { + await using var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + ConnectTimeout = TimeSpan.FromSeconds(10), + }); + await nats.ConnectRetryAsync(); + var prefix = _server.GetNextId(); + + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.>" }, cts.Token); + + // Publish to subject1 + var ack1 = await js.PublishAsync( + subject: $"{prefix}s1.filter.subject1", + data: 1, + cancellationToken: cts.Token); + Assert.Null(ack1.Error); + + // Publish to subject2 + var ack2 = await js.PublishAsync( + subject: $"{prefix}s1.filter.subject2", + data: 2, + cancellationToken: cts.Token); + Assert.Null(ack2.Error); + + // Publish to subject1 again, using ExpectedLastSubjectSequenceSubject to check against subject2's last sequence + var ack3 = await js.PublishAsync( + subject: $"{prefix}s1.filter.subject1", + data: 3, + opts: new NatsJSPubOpts + { + ExpectedLastSubjectSequence = ack2.Seq, + ExpectedLastSubjectSequenceSubject = $"{prefix}s1.filter.subject2", + }, + cancellationToken: cts.Token); + Assert.Null(ack3.Error); + + // Publish with stale sequence for subject2 should fail + var ack4 = await js.PublishAsync( + subject: $"{prefix}s1.filter.subject1", + data: 4, + opts: new NatsJSPubOpts + { + ExpectedLastSubjectSequence = ack1.Seq, // stale sequence + ExpectedLastSubjectSequenceSubject = $"{prefix}s1.filter.subject2", + }, + cancellationToken: cts.Token); + Assert.Equal(400, ack4.Error?.Code); + Assert.Equal(10071, ack4.Error?.ErrCode); + Assert.Matches(@"wrong last sequence: \d+", ack4.Error?.Description); + } + [Theory] [InlineData(NatsRequestReplyMode.Direct)] [InlineData(NatsRequestReplyMode.SharedInbox)]