Skip to content

Commit af50d8b

Browse files
authored
Add Nats-Expected-Last-Subject-Sequence-Subject (#1007)
1 parent 1d4bd3b commit af50d8b

File tree

3 files changed

+73
-0
lines changed

3 files changed

+73
-0
lines changed

src/NATS.Client.JetStream/NatsJSContext.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ public async ValueTask<NatsResult<PubAckResponse>> TryPublishAsync<T>(
160160
headers ??= new NatsHeaders();
161161
headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString();
162162
}
163+
164+
if (opts.ExpectedLastSubjectSequenceSubject != null)
165+
{
166+
headers ??= new NatsHeaders();
167+
headers["Nats-Expected-Last-Subject-Sequence-Subject"] = opts.ExpectedLastSubjectSequenceSubject;
168+
}
163169
}
164170

165171
opts ??= NatsJSPubOpts.Default;
@@ -301,6 +307,12 @@ public async ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
301307
headers ??= new NatsHeaders();
302308
headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString();
303309
}
310+
311+
if (opts.ExpectedLastSubjectSequenceSubject != null)
312+
{
313+
headers ??= new NatsHeaders();
314+
headers["Nats-Expected-Last-Subject-Sequence-Subject"] = opts.ExpectedLastSubjectSequenceSubject;
315+
}
304316
}
305317

306318
opts ??= NatsJSPubOpts.Default;

src/NATS.Client.JetStream/NatsJSOpts.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ public record NatsJSPubOpts : NatsPubOpts
238238
// lss *uint64 // Expected last sequence per subject
239239
public ulong? ExpectedLastSubjectSequence { get; init; }
240240

241+
// Expected last sequence subject filter (allows wildcard subjects)
242+
public string? ExpectedLastSubjectSequenceSubject { get; init; }
243+
241244
// Publish retries for NoResponders err.
242245
// rwait time.Duration // Retry wait between attempts
243246
public TimeSpan RetryWaitBetweenAttempts { get; init; } = TimeSpan.FromMilliseconds(250);

tests/NATS.Client.JetStream.Tests/PublishTest.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,64 @@ public async Task Publish_test(NatsRequestReplyMode mode)
180180
}
181181
}
182182

183+
[SkipIfNatsServer(versionEarlierThan: "2.12")]
184+
public async Task Publish_expected_last_subject_sequence_subject_test()
185+
{
186+
await using var nats = new NatsConnection(new NatsOpts
187+
{
188+
Url = _server.Url,
189+
ConnectTimeout = TimeSpan.FromSeconds(10),
190+
});
191+
await nats.ConnectRetryAsync();
192+
var prefix = _server.GetNextId();
193+
194+
var js = new NatsJSContext(nats);
195+
196+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
197+
198+
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.>" }, cts.Token);
199+
200+
// Publish to subject1
201+
var ack1 = await js.PublishAsync(
202+
subject: $"{prefix}s1.filter.subject1",
203+
data: 1,
204+
cancellationToken: cts.Token);
205+
Assert.Null(ack1.Error);
206+
207+
// Publish to subject2
208+
var ack2 = await js.PublishAsync(
209+
subject: $"{prefix}s1.filter.subject2",
210+
data: 2,
211+
cancellationToken: cts.Token);
212+
Assert.Null(ack2.Error);
213+
214+
// Publish to subject1 again, using ExpectedLastSubjectSequenceSubject to check against subject2's last sequence
215+
var ack3 = await js.PublishAsync(
216+
subject: $"{prefix}s1.filter.subject1",
217+
data: 3,
218+
opts: new NatsJSPubOpts
219+
{
220+
ExpectedLastSubjectSequence = ack2.Seq,
221+
ExpectedLastSubjectSequenceSubject = $"{prefix}s1.filter.subject2",
222+
},
223+
cancellationToken: cts.Token);
224+
Assert.Null(ack3.Error);
225+
226+
// Publish with stale sequence for subject2 should fail
227+
var ack4 = await js.PublishAsync(
228+
subject: $"{prefix}s1.filter.subject1",
229+
data: 4,
230+
opts: new NatsJSPubOpts
231+
{
232+
ExpectedLastSubjectSequence = ack1.Seq, // stale sequence
233+
ExpectedLastSubjectSequenceSubject = $"{prefix}s1.filter.subject2",
234+
},
235+
cancellationToken: cts.Token);
236+
Assert.Equal(400, ack4.Error?.Code);
237+
Assert.Equal(10071, ack4.Error?.ErrCode);
238+
Assert.Matches(@"wrong last sequence: \d+", ack4.Error?.Description);
239+
}
240+
183241
[Theory]
184242
[InlineData(NatsRequestReplyMode.Direct)]
185243
[InlineData(NatsRequestReplyMode.SharedInbox)]

0 commit comments

Comments
 (0)