Skip to content

Commit 4b6d3fb

Browse files
committed
implementation of review
Signed-off-by: James Thompson <[email protected]>
1 parent a210123 commit 4b6d3fb

31 files changed

+186
-94
lines changed

src/NATS.Client.Core/Commands/CommandWriter.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ public ValueTask PublishAsync<T>(NatsPublishProps props, T? value, NatsHeaders?
381381
return default;
382382
}
383383

384-
public ValueTask SubscribeAsync(NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
384+
public ValueTask SubscribeAsync(NatsSubscribeProps props, int? maxMsgs, CancellationToken cancellationToken)
385385
{
386386
if (_trace)
387387
{
@@ -420,7 +420,7 @@ public ValueTask SubscribeAsync(NatsSubscriptionProps props, int? maxMsgs, Cance
420420
return default;
421421
}
422422

423-
public ValueTask UnsubscribeAsync(NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
423+
public ValueTask UnsubscribeAsync(NatsUnsubscribeProps props, int? maxMsgs, CancellationToken cancellationToken)
424424
{
425425
if (_trace)
426426
{
@@ -866,7 +866,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, NatsPublishProps
866866
}
867867
}
868868

869-
private async ValueTask SubscribeStateMachineAsync(bool lockHeld, NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
869+
private async ValueTask SubscribeStateMachineAsync(bool lockHeld, NatsSubscribeProps props, int? maxMsgs, CancellationToken cancellationToken)
870870
{
871871
if (!lockHeld)
872872
{
@@ -903,7 +903,7 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, NatsSubscripti
903903
}
904904
}
905905

906-
private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
906+
private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, NatsUnsubscribeProps props, int? maxMsgs, CancellationToken cancellationToken)
907907
{
908908
if (!lockHeld)
909909
{

src/NATS.Client.Core/Commands/ProtocolWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void WritePublish(IBufferWriter<byte> writer, NatsPublishProps props, Rea
9393

9494
// https://docs.nats.io/reference/reference-protocols/nats-protocol#sub
9595
// SUB <subject> [queue group] <sid>
96-
public void WriteSubscribe(IBufferWriter<byte> writer, NatsSubscriptionProps props, int? maxMsgs)
96+
public void WriteSubscribe(IBufferWriter<byte> writer, NatsSubscribeProps props, int? maxMsgs)
9797
{
9898
var sid = props.SubscriptionId;
9999
var subject = props.Subject.ToString();
@@ -150,7 +150,7 @@ public void WriteSubscribe(IBufferWriter<byte> writer, NatsSubscriptionProps pro
150150

151151
// https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub
152152
// UNSUB <sid> [max_msgs]
153-
public void WriteUnsubscribe(IBufferWriter<byte> writer, NatsSubscriptionProps probs, int? maxMessages)
153+
public void WriteUnsubscribe(IBufferWriter<byte> writer, NatsUnsubscribeProps probs, int? maxMessages)
154154
{
155155
var sid = probs.SubscriptionId;
156156

src/NATS.Client.Core/Internal/InboxSub.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ internal class InboxSub : NatsSubBase
1212

1313
public InboxSub(
1414
InboxSubBuilder inbox,
15-
NatsSubscriptionProps props,
15+
NatsSubscribeProps props,
1616
NatsSubOpts? opts,
1717
NatsConnection connection,
1818
INatsSubscriptionManager manager)
@@ -46,7 +46,7 @@ internal class InboxSubBuilder : INatsSubscriptionManager
4646

4747
public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;
4848

49-
public InboxSub Build(NatsSubscriptionProps props, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
49+
public InboxSub Build(NatsSubscribeProps props, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
5050
{
5151
return new InboxSub(this, props, opts, connection, manager);
5252
}

src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ private NatsProcessProps ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
471471
msgHeader.Split(out var sidBytes, out msgHeader);
472472
msgHeader.Split(out var replyToOrSizeBytes, out msgHeader);
473473

474-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
474+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
475475
{
476476
FramingLength = framingLength,
477477
HeaderLength = 0,
@@ -526,7 +526,7 @@ private NatsProcessProps ParseHMessageHeader(ReadOnlySpan<byte> msgHeader)
526526
msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader);
527527
msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader);
528528

529-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
529+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
530530
{
531531
FramingLength = framingLength,
532532
};

src/NATS.Client.Core/Internal/SubscriptionManager.cs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace NATS.Client.Core.Internal;
88

9-
internal record struct SidMetadata(NatsSubscriptionProps Properties, WeakReference<NatsSubBase> WeakReference);
9+
internal record struct SidMetadata(NatsSubscribeProps Properties, WeakReference<NatsSubBase> WeakReference);
1010

1111
internal sealed record SubscriptionMetadata(int Sid);
1212

@@ -41,7 +41,8 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
4141
_cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval;
4242
_timer = Task.Run(CleanupAsync);
4343
InboxSubBuilder = new InboxSubBuilder(connection.Opts.LoggerFactory.CreateLogger<InboxSubBuilder>());
44-
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscriptionProps(nameof(_inboxSubSentinel), _connection.InboxPrefix), default, connection, this);
44+
var sid = GetNextSid();
45+
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscribeProps(nameof(_inboxSubSentinel)) { SubscriptionId = sid }, default, connection, this);
4546
_inboxSub = _inboxSubSentinel;
4647
}
4748

@@ -51,7 +52,13 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
5152

5253
public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
5354
{
54-
var props = new NatsSubscriptionProps(sub.Subject, _connection.InboxPrefix, sub.QueueGroup);
55+
var props = sub.SubscriptionProps;
56+
57+
if (props.SubscriptionId == 0)
58+
{
59+
props.SubscriptionId = GetNextSid();
60+
}
61+
5562
if (Telemetry.HasListeners())
5663
{
5764
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
@@ -126,7 +133,7 @@ public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnl
126133
{
127134
try
128135
{
129-
return _connection.UnsubscribeAsync(new NatsSubscriptionProps(props.SubscriptionId));
136+
return _connection.UnsubscribeAsync(props?.Subscription ?? new NatsUnsubscribeProps(props.SubscriptionId));
130137
}
131138
catch (Exception e)
132139
{
@@ -216,7 +223,7 @@ internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter
216223

217224
foreach (var (sub, sid) in subs)
218225
{
219-
await sub.WriteReconnectCommandsAsync(commandWriter, new NatsSubscriptionProps(sid)).ConfigureAwait(false);
226+
await sub.WriteReconnectCommandsAsync(commandWriter, sub.SubscriptionProps).ConfigureAwait(false);
220227

221228
if (_debug)
222229
{
@@ -225,7 +232,7 @@ internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter
225232
}
226233
}
227234

228-
internal INatsSubscriptionManager GetManagerFor(NatsSubscriptionProps props)
235+
internal INatsSubscriptionManager GetManagerFor(NatsSubscribeProps props)
229236
{
230237
if (props.IsInboxSubject(_connection.InboxPrefix))
231238
return InboxSubBuilder;
@@ -241,14 +248,14 @@ internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellat
241248
{
242249
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
243250
{
244-
var inboxSubject = new NatsSubscriptionProps($"{_inboxPrefix}.*", _connection.InboxPrefix);
251+
var props = new NatsSubscribeProps($"{_inboxPrefix}.*");
252+
props.SubscriptionId = GetNextSid();
245253

246254
// We need to subscribe to the real inbox subject before we can register the internal subject.
247255
// We use 'default' options here since options provided by the user are for the internal subscription.
248256
// For example if the user provides a timeout, we don't want to timeout the real inbox subscription
249257
// since it must live duration of the connection.
250-
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts: default, _connection, manager: this);
251-
var props = new NatsSubscriptionProps(_inboxSub.Subject, _connection.InboxPrefix, _inboxSub.QueueGroup);
258+
_inboxSub = InboxSubBuilder.Build(props, opts: default, _connection, manager: this);
252259
await SubscribeInternalAsync(
253260
props,
254261
_inboxSub,
@@ -262,16 +269,16 @@ await SubscribeInternalAsync(
262269
}
263270
}
264271

272+
internal int GetNextSid() => Interlocked.Increment(ref _sid);
273+
265274
private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
266275
{
267276
await InitializeInboxSubscriptionAsync(cancellationToken).ConfigureAwait(false);
268277
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
269278
}
270279

271-
private async ValueTask SubscribeInternalAsync(NatsSubscriptionProps props, NatsSubBase sub, CancellationToken cancellationToken)
280+
private async ValueTask SubscribeInternalAsync(NatsSubscribeProps props, NatsSubBase sub, CancellationToken cancellationToken)
272281
{
273-
props.SubscriptionId = GetNextSid();
274-
275282
if (sub is InboxSub)
276283
{
277284
Interlocked.Exchange(ref _inboxSid, props.SubscriptionId);
@@ -309,8 +316,6 @@ private async ValueTask SubscribeInternalAsync(NatsSubscriptionProps props, Nats
309316
}
310317
}
311318

312-
private int GetNextSid() => Interlocked.Increment(ref _sid);
313-
314319
private async Task CleanupAsync()
315320
{
316321
while (!_cts.Token.IsCancellationRequested)
@@ -352,7 +357,7 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
352357
try
353358
{
354359
_logger.LogWarning(NatsLogEvents.Subscription, "Unsubscribing orphan subscription {Sid}", sid);
355-
await _connection.UnsubscribeAsync(new NatsSubscriptionProps(sid)).ConfigureAwait(false);
360+
await _connection.UnsubscribeAsync(new NatsUnsubscribeProps(sid)).ConfigureAwait(false);
356361
}
357362
catch (Exception e)
358363
{

src/NATS.Client.Core/NatsConnection.Publish.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, st
1515
/// <inheritdoc />
1616
public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
1717
{
18-
var props = opts?.Props ?? new NatsPublishProps(subject, InboxPrefix);
18+
var props = opts?.Props ?? new NatsPublishProps(subject);
1919
props.SetReplyTo(replyTo);
2020
return PublishAsync(props, data, headers, serializer, cancellationToken);
2121
}

src/NATS.Client.Core/NatsConnection.RequestReply.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
3232
NatsSubOpts? replyOpts = default,
3333
CancellationToken cancellationToken = default)
3434
{
35-
var props = requestOpts?.Props ?? new NatsPublishProps(subject, InboxPrefix);
35+
var props = requestOpts?.Props ?? new NatsPublishProps(subject);
3636
if (Telemetry.HasListeners())
3737
{
3838
using var activity = Telemetry.StartSendActivity($"{SpanDestinationName(subject)} {Telemetry.Constants.RequestReplyActivityName}", this, subject, null);

src/NATS.Client.Core/NatsConnection.RequestSub.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ public async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
1313
NatsSubOpts? replyOpts = default,
1414
CancellationToken cancellationToken = default)
1515
{
16-
var props = requestOpts?.Props ?? new NatsPublishProps(subject, InboxPrefix);
16+
var props = requestOpts?.Props ?? new NatsPublishProps(subject);
1717
props.SetReplyTo(NewInbox());
1818
replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
19-
var subProps = new NatsSubscriptionProps(props.Subject);
19+
var subProps = new NatsSubscribeProps(props.Subject);
20+
subProps.SubscriptionId = _subscriptionManager.GetNextSid();
2021
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, subProps, replyOpts, replySerializer);
2122
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
2223

@@ -37,7 +38,8 @@ internal async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply
3738
CancellationToken cancellationToken = default)
3839
{
3940
replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
40-
var subProps = replyOpts?.Props ?? new NatsSubscriptionProps(props.Subject);
41+
var subProps = replyOpts?.Props ?? new NatsSubscribeProps(props.Subject);
42+
subProps.SubscriptionId = _subscriptionManager.GetNextSid();
4143
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, subProps, replyOpts, replySerializer);
4244
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
4345

src/NATS.Client.Core/NatsConnection.Subscribe.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ public partial class NatsConnection
99
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
1010
{
1111
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
12-
var props = opts?.Props ?? new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
12+
var props = opts?.Props ?? new NatsSubscribeProps(subject, queueGroup);
13+
props.SubscriptionId = _subscriptionManager.GetNextSid();
1314
await using var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(props), props, opts, serializer, cancellationToken);
1415
await AddSubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
1516

@@ -25,7 +26,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
2526
public async ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
2627
{
2728
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
28-
var props = opts?.Props ?? new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
29+
var props = opts?.Props ?? new NatsSubscribeProps(subject, queueGroup);
30+
props.SubscriptionId = _subscriptionManager.GetNextSid();
2931
var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(props), props, opts, serializer, cancellationToken);
3032
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
3133
return sub;

src/NATS.Client.Core/NatsConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ internal void ResetPongCount()
297297
internal ValueTask PongAsync() => CommandWriter.PongAsync(CancellationToken.None);
298298

299299
// called only internally
300-
internal ValueTask SubscribeCoreAsync(NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
300+
internal ValueTask SubscribeCoreAsync(NatsSubscribeProps props, int? maxMsgs, CancellationToken cancellationToken)
301301
=> CommandWriter.SubscribeAsync(props, maxMsgs, cancellationToken);
302302

303-
internal ValueTask UnsubscribeAsync(NatsSubscriptionProps props)
303+
internal ValueTask UnsubscribeAsync(NatsUnsubscribeProps props)
304304
{
305305
try
306306
{

0 commit comments

Comments
 (0)