Skip to content

Commit 8a1dae6

Browse files
authored
Add prioritized mode (#1011)
* Add prioritized mode for Consumers * Add support for consumer pinned priority policy * Fix build * Refactor Pin ID handling with reusable extension methods also added 503 handling to fetch * Test fixes * Test fixes
1 parent db9aacc commit 8a1dae6

22 files changed

+659
-22
lines changed

src/NATS.Client.JetStream/INatsJSConsumer.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ IAsyncEnumerable<NatsJSMsg<T>> FetchAsync<T>(
8181
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
8282
ValueTask RefreshAsync(CancellationToken cancellationToken = default);
8383

84+
/// <summary>
85+
/// Unpins this consumer from the current pinned client.
86+
/// </summary>
87+
/// <param name="group">The priority group name to unpin.</param>
88+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
89+
/// <returns>A task representing the asynchronous operation.</returns>
90+
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
91+
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
92+
ValueTask UnpinAsync(string group, CancellationToken cancellationToken = default);
93+
8494
/// <summary>
8595
/// Consume a set number of messages from the stream using this consumer.
8696
/// Returns immediately if no messages are available.

src/NATS.Client.JetStream/INatsJSContext.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ IAsyncEnumerable<string> ListConsumerNamesAsync(
161161
/// <remarks>This feature is only available on NATS server v2.11 and later.</remarks>
162162
ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);
163163

164+
/// <summary>
165+
/// Unpin a consumer from the currently pinned client.
166+
/// </summary>
167+
/// <param name="stream">Stream name where consumer is associated to.</param>
168+
/// <param name="consumer">Consumer name to be unpinned.</param>
169+
/// <param name="group">The priority group name to unpin.</param>
170+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
171+
/// <returns>A task representing the asynchronous operation.</returns>
172+
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
173+
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
174+
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
175+
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
176+
ValueTask UnpinConsumerAsync(string stream, string consumer, string group, CancellationToken cancellationToken = default);
177+
164178
/// <summary>
165179
/// Calls JetStream Account Info API.
166180
/// </summary>

src/NATS.Client.JetStream/INatsJSNotification.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,18 @@ public record NatsJSMessageSizeExceedsMaxBytesNotification : INatsJSNotification
3333
public string Name => "Message Size Exceeds MaxBytes";
3434
}
3535

36+
/// <summary>
37+
/// Notification sent when the server reports a pin ID mismatch for pinned client priority policy.
38+
/// </summary>
39+
/// <remarks>
40+
/// This notification indicates that the client's pin ID doesn't match what the server expects.
41+
/// The client should clear its pin ID and retry the request to get a new pin.
42+
/// </remarks>
43+
public record NatsJSPinIdMismatchNotification : INatsJSNotification
44+
{
45+
public static readonly NatsJSPinIdMismatchNotification Default = new();
46+
47+
public string Name => "Pin ID Mismatch";
48+
}
49+
3650
public record NatsJSProtocolNotification(string Name, int HeaderCode, string HeaderMessageText) : INatsJSNotification;

src/NATS.Client.JetStream/Internal/NatsJSConsume.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ internal class NatsJSConsume<TMsg> : NatsSubBase
3131
private readonly Timer _timer;
3232
private readonly Task _pullTask;
3333
private readonly NatsJSNotificationChannel? _notificationChannel;
34+
private readonly NatsJSConsumer? _jsConsumer;
3435

3536
private readonly long _maxMsgs;
3637
private readonly TimeSpan _expires;
@@ -61,6 +62,7 @@ public NatsJSConsume(
6162
INatsDeserialize<TMsg> serializer,
6263
NatsSubOpts? opts,
6364
NatsJSPriorityGroupOpts? priorityGroup,
65+
NatsJSConsumer? jsConsumer,
6466
CancellationToken cancellationToken)
6567
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
6668
{
@@ -72,6 +74,7 @@ public NatsJSConsume(
7274
_consumer = consumer;
7375
_serializer = serializer;
7476
_priorityGroup = priorityGroup;
77+
_jsConsumer = jsConsumer;
7578

7679
if (notificationHandler is { } handler)
7780
{
@@ -245,6 +248,8 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
245248
Group = _priorityGroup?.Group,
246249
MinPending = _priorityGroup?.MinPending ?? 0,
247250
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
251+
Priority = _priorityGroup?.Priority ?? 0,
252+
Id = _jsConsumer?.GetPinId(),
248253
};
249254

250255
await commandWriter.PublishAsync(
@@ -354,6 +359,12 @@ protected override async ValueTask ReceiveInternalAsync(
354359
_notificationChannel?.Notify(NatsJSLeadershipChangeNotification.Default);
355360
ResetPending();
356361
}
362+
else if (headers.Code == 423)
363+
{
364+
_logger.LogDebug(NatsJSLogEvents.PinIdMismatch, "Pin ID Mismatch");
365+
NatsJSExtensionsInternal.HandlePinIdMismatch(_jsConsumer, _notificationChannel);
366+
ResetPending();
367+
}
357368
else if (headers.Code == 503)
358369
{
359370
_logger.LogDebug(NatsJSLogEvents.NoResponders, "503 no responders");
@@ -400,6 +411,8 @@ protected override async ValueTask ReceiveInternalAsync(
400411
_serializer),
401412
_context);
402413

414+
NatsJSExtensionsInternal.TrySetPinIdFromHeaders(msg.Headers, _jsConsumer);
415+
403416
// Stop feeding the user if we are disposed.
404417
// We need to exit as soon as possible.
405418
if (Volatile.Read(ref _disposed) == 0)
@@ -479,6 +492,8 @@ private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Wri
479492
Group = _priorityGroup?.Group,
480493
MinPending = _priorityGroup?.MinPending ?? 0,
481494
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
495+
Priority = _priorityGroup?.Priority ?? 0,
496+
Id = _jsConsumer?.GetPinId(),
482497
},
483498
Origin = origin,
484499
});

src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,38 @@ namespace NATS.Client.JetStream.Internal;
44

55
internal static class NatsJSExtensionsInternal
66
{
7+
private const string NatsPinIdHeader = "Nats-Pin-Id";
8+
79
public static long ToNanos(this TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000);
810

11+
/// <summary>
12+
/// Handles Pin ID mismatch (423) response by clearing the pin ID and notifying.
13+
/// </summary>
14+
/// <param name="jsConsumer">The consumer to clear the pin ID on.</param>
15+
/// <param name="notificationChannel">The notification channel to notify.</param>
16+
public static void HandlePinIdMismatch(NatsJSConsumer? jsConsumer, NatsJSNotificationChannel? notificationChannel)
17+
{
18+
jsConsumer?.SetPinId(null);
19+
notificationChannel?.Notify(NatsJSPinIdMismatchNotification.Default);
20+
}
21+
22+
/// <summary>
23+
/// Extracts and sets the Pin ID from message headers if present.
24+
/// </summary>
25+
/// <param name="headers">The message headers.</param>
26+
/// <param name="jsConsumer">The consumer to set the pin ID on.</param>
27+
public static void TrySetPinIdFromHeaders(NatsHeaders? headers, NatsJSConsumer? jsConsumer)
28+
{
29+
if (jsConsumer != null && headers != null && headers.TryGetValue(NatsPinIdHeader, out var pinIdValues))
30+
{
31+
var pinId = pinIdValues.ToString();
32+
if (!string.IsNullOrEmpty(pinId))
33+
{
34+
jsConsumer.SetPinId(pinId);
35+
}
36+
}
37+
}
38+
939
public static bool HasTerminalJSError(this NatsHeaders headers)
1040
{
1141
// terminal codes

src/NATS.Client.JetStream/Internal/NatsJSFetch.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ internal class NatsJSFetch<TMsg> : NatsSubBase
2222
private readonly Timer _hbTimer;
2323
private readonly Timer _expiresTimer;
2424
private readonly NatsJSNotificationChannel? _notificationChannel;
25+
private readonly NatsJSConsumer? _jsConsumer;
2526

2627
private readonly long _maxMsgs;
2728
private readonly long _maxBytes;
@@ -47,6 +48,7 @@ public NatsJSFetch(
4748
INatsDeserialize<TMsg> serializer,
4849
NatsSubOpts? opts,
4950
NatsJSPriorityGroupOpts? priorityGroup,
51+
NatsJSConsumer? jsConsumer,
5052
CancellationToken cancellationToken)
5153
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
5254
{
@@ -57,6 +59,7 @@ public NatsJSFetch(
5759
_consumer = consumer;
5860
_serializer = serializer;
5961
_priorityGroup = priorityGroup;
62+
_jsConsumer = jsConsumer;
6063

6164
if (notificationHandler is { } handler)
6265
{
@@ -191,6 +194,8 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
191194
Group = _priorityGroup?.Group,
192195
MinPending = _priorityGroup?.MinPending ?? 0,
193196
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
197+
Priority = _priorityGroup?.Priority ?? 0,
198+
Id = _jsConsumer?.GetPinId(),
194199
};
195200

196201
await commandWriter.PublishAsync(
@@ -235,6 +240,17 @@ protected override async ValueTask ReceiveInternalAsync(
235240
else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat })
236241
{
237242
}
243+
else if (headers.Code == 423)
244+
{
245+
_logger.LogDebug(NatsJSLogEvents.PinIdMismatch, "Pin ID Mismatch");
246+
NatsJSExtensionsInternal.HandlePinIdMismatch(_jsConsumer, _notificationChannel);
247+
}
248+
else if (headers.Code == 503)
249+
{
250+
_logger.LogDebug(NatsJSLogEvents.NoResponders, "503 no responders");
251+
_notificationChannel?.Notify(NatsJSNoRespondersNotification.Default);
252+
EndSubscription(NatsSubEndReason.None);
253+
}
238254
else if (headers.HasTerminalJSError())
239255
{
240256
_userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText));
@@ -273,6 +289,8 @@ protected override async ValueTask ReceiveInternalAsync(
273289
_serializer),
274290
_context);
275291

292+
NatsJSExtensionsInternal.TrySetPinIdFromHeaders(msg.Headers, _jsConsumer);
293+
276294
_pendingMsgs--;
277295
_pendingBytes -= msg.Size;
278296

src/NATS.Client.JetStream/Models/ConsumerConfigPriorityPolicy.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ public enum ConsumerConfigPriorityPolicy
1919
/// Messages overflow to the next available consumer.
2020
/// </summary>
2121
Overflow = 2,
22+
23+
/// <summary>
24+
/// Consumer is pinned to a specific client.
25+
/// </summary>
26+
PinnedClient = 3,
2227
}

src/NATS.Client.JetStream/Models/ConsumerInfo.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,12 @@ public record ConsumerInfo
126126
[System.Text.Json.Serialization.JsonPropertyName("push_bound")]
127127
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
128128
public bool PushBound { get; set; }
129+
130+
/// <summary>
131+
/// Information about the currently defined priority groups.
132+
/// </summary>
133+
/// <remarks>This feature is only available on NATS server v2.11 and later.</remarks>
134+
[System.Text.Json.Serialization.JsonPropertyName("priority_groups")]
135+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
136+
public ICollection<PriorityGroupState>? PriorityGroups { get; set; }
129137
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace NATS.Client.JetStream.Models;
2+
3+
/// <summary>
4+
/// A request to the JetStream $JS.API.CONSUMER.UNPIN API
5+
/// </summary>
6+
/// <remarks>This feature is only available on NATS server v2.11 and later.</remarks>
7+
internal record ConsumerUnpinRequest
8+
{
9+
/// <summary>
10+
/// The priority group name to unpin.
11+
/// </summary>
12+
[System.Text.Json.Serialization.JsonPropertyName("group")]
13+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
14+
public string? Group { get; set; }
15+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace NATS.Client.JetStream.Models;
2+
3+
/// <summary>
4+
/// A response from the JetStream $JS.API.CONSUMER.UNPIN API
5+
/// </summary>
6+
internal record ConsumerUnpinResponse
7+
{
8+
[System.Text.Json.Serialization.JsonPropertyName("success")]
9+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
10+
public bool Success { get; set; }
11+
}

0 commit comments

Comments
 (0)