Skip to content

Commit b71c161

Browse files
committed
test fix
Signed-off-by: James Thompson <[email protected]>
1 parent 4b6d3fb commit b71c161

File tree

4 files changed

+27
-44
lines changed

4 files changed

+27
-44
lines changed

src/NATS.Client.Core/Internal/SubscriptionManager.cs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ internal sealed class SubscriptionManager : INatsSubscriptionManager, IAsyncDisp
1717
private readonly bool _debug;
1818
private readonly object _gate = new();
1919
private readonly NatsConnection _connection;
20-
private readonly string _inboxPrefix;
2120
private readonly ConcurrentDictionary<int, SidMetadata> _bySid = new();
2221
private readonly ConditionalWeakTable<NatsSubBase, SubscriptionMetadata> _bySub = new();
2322
private readonly CancellationTokenSource _cts;
@@ -33,16 +32,14 @@ internal sealed class SubscriptionManager : INatsSubscriptionManager, IAsyncDisp
3332
public SubscriptionManager(NatsConnection connection, string inboxPrefix)
3433
{
3534
_connection = connection;
36-
_inboxPrefix = inboxPrefix;
3735
_logger = _connection.Opts.LoggerFactory.CreateLogger<SubscriptionManager>();
3836
_debug = _logger.IsEnabled(LogLevel.Debug);
3937
_trace = _logger.IsEnabled(LogLevel.Trace);
4038
_cts = new CancellationTokenSource();
4139
_cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval;
4240
_timer = Task.Run(CleanupAsync);
4341
InboxSubBuilder = new InboxSubBuilder(connection.Opts.LoggerFactory.CreateLogger<InboxSubBuilder>());
44-
var sid = GetNextSid();
45-
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscribeProps(nameof(_inboxSubSentinel)) { SubscriptionId = sid }, default, connection, this);
42+
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscribeProps(nameof(_inboxSubSentinel)), default, connection, this);
4643
_inboxSub = _inboxSubSentinel;
4744
}
4845

@@ -52,19 +49,12 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
5249

5350
public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
5451
{
55-
var props = sub.SubscriptionProps;
56-
57-
if (props.SubscriptionId == 0)
58-
{
59-
props.SubscriptionId = GetNextSid();
60-
}
61-
6252
if (Telemetry.HasListeners())
6353
{
6454
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
6555
try
6656
{
67-
if (props.IsInboxSubject(_connection.InboxPrefix))
57+
if (sub.SubscriptionProps.IsInboxSubject(_connection.InboxPrefix))
6858
{
6959
if (sub.QueueGroup != null)
7060
{
@@ -74,7 +64,7 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT
7464
return SubscribeInboxAsync(sub, cancellationToken);
7565
}
7666

77-
return SubscribeInternalAsync(props, sub, cancellationToken);
67+
return SubscribeInternalAsync(sub, cancellationToken);
7868
}
7969
catch (Exception ex)
8070
{
@@ -83,7 +73,7 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT
8373
}
8474
}
8575

86-
if (props.IsInboxSubject(_connection.InboxPrefix))
76+
if (sub.SubscriptionProps.IsInboxSubject(_connection.InboxPrefix))
8777
{
8878
if (sub.QueueGroup != null)
8979
{
@@ -93,7 +83,7 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT
9383
return SubscribeInboxAsync(sub, cancellationToken);
9484
}
9585

96-
return SubscribeInternalAsync(props, sub, cancellationToken);
86+
return SubscribeInternalAsync(sub, cancellationToken);
9787
}
9888

9989
public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
@@ -239,7 +229,7 @@ internal INatsSubscriptionManager GetManagerFor(NatsSubscribeProps props)
239229
return this;
240230
}
241231

242-
internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellationToken)
232+
internal async Task InitializeInboxSubscriptionAsync(NatsSubscribeProps props, CancellationToken cancellationToken = default)
243233
{
244234
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
245235
{
@@ -248,18 +238,12 @@ internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellat
248238
{
249239
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
250240
{
251-
var props = new NatsSubscribeProps($"{_inboxPrefix}.*");
252-
props.SubscriptionId = GetNextSid();
253-
254241
// We need to subscribe to the real inbox subject before we can register the internal subject.
255242
// We use 'default' options here since options provided by the user are for the internal subscription.
256243
// For example if the user provides a timeout, we don't want to timeout the real inbox subscription
257244
// since it must live duration of the connection.
258245
_inboxSub = InboxSubBuilder.Build(props, opts: default, _connection, manager: this);
259-
await SubscribeInternalAsync(
260-
props,
261-
_inboxSub,
262-
cancellationToken).ConfigureAwait(false);
246+
await SubscribeInternalAsync(_inboxSub, cancellationToken).ConfigureAwait(false);
263247
}
264248
}
265249
finally
@@ -273,40 +257,45 @@ await SubscribeInternalAsync(
273257

274258
private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
275259
{
276-
await InitializeInboxSubscriptionAsync(cancellationToken).ConfigureAwait(false);
260+
await InitializeInboxSubscriptionAsync(sub.SubscriptionProps, cancellationToken).ConfigureAwait(false);
277261
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
278262
}
279263

280-
private async ValueTask SubscribeInternalAsync(NatsSubscribeProps props, NatsSubBase sub, CancellationToken cancellationToken)
264+
private async ValueTask SubscribeInternalAsync(NatsSubBase sub, CancellationToken cancellationToken)
281265
{
266+
if (sub.SubscriptionProps.UnsetSubscriptionId)
267+
{
268+
sub.SubscriptionProps.SubscriptionId = GetNextSid();
269+
}
270+
282271
if (sub is InboxSub)
283272
{
284-
Interlocked.Exchange(ref _inboxSid, props.SubscriptionId);
273+
Interlocked.Exchange(ref _inboxSid, sub.SubscriptionProps.SubscriptionId);
285274
}
286275

287276
if (_debug)
288277
{
289-
_logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, props.SubscriptionId);
278+
_logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, sub.SubscriptionProps.SubscriptionId);
290279
}
291280

292281
lock (_gate)
293282
{
294-
_bySid[props.SubscriptionId] = new SidMetadata(Properties: props, WeakReference: new WeakReference<NatsSubBase>(sub));
283+
_bySid[sub.SubscriptionProps.SubscriptionId] = new SidMetadata(Properties: sub.SubscriptionProps, WeakReference: new WeakReference<NatsSubBase>(sub));
295284
#if NETSTANDARD2_0
296285
lock (_bySub)
297286
{
298287
if (_bySub.TryGetValue(sub, out _))
299288
_bySub.Remove(sub);
300-
_bySub.Add(sub, new SubscriptionMetadata(Sid: props.SubscriptionId));
289+
_bySub.Add(sub, new SubscriptionMetadata(Sid: sub.SubscriptionProps.SubscriptionId));
301290
}
302291
#else
303-
_bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: props.SubscriptionId));
292+
_bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: sub.SubscriptionProps.SubscriptionId));
304293
#endif
305294
}
306295

307296
try
308297
{
309-
await _connection.SubscribeCoreAsync(props, sub.Opts?.MaxMsgs, cancellationToken).ConfigureAwait(false);
298+
await _connection.SubscribeCoreAsync(sub.SubscriptionProps, sub.Opts?.MaxMsgs, cancellationToken).ConfigureAwait(false);
310299
await sub.ReadyAsync().ConfigureAwait(false);
311300
}
312301
catch

src/NATS.Client.Core/NatsConnection.RequestSub.cs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,7 @@ public async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply>(
1515
{
1616
var props = requestOpts?.Props ?? new NatsPublishProps(subject);
1717
props.SetReplyTo(NewInbox());
18-
replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
19-
var subProps = new NatsSubscribeProps(props.Subject);
20-
subProps.SubscriptionId = _subscriptionManager.GetNextSid();
21-
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, subProps, replyOpts, replySerializer);
22-
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
23-
24-
requestSerializer ??= Opts.SerializerRegistry.GetSerializer<TRequest>();
25-
await PublishAsync(props, data, headers, requestSerializer, cancellationToken).ConfigureAwait(false);
26-
27-
return sub;
18+
return await CreateRequestSubAsync(props, data, headers, requestSerializer, replySerializer, replyOpts, cancellationToken).ConfigureAwait(false);
2819
}
2920

3021
/// <inheritdoc />

src/NATS.Client.Core/NatsConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ public async ValueTask ConnectAsync()
199199

200200
if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
201201
{
202-
await _subscriptionManager.InitializeInboxSubscriptionAsync(_disposedCts.Token).ConfigureAwait(false);
202+
var props = new NatsSubscribeProps($"{InboxPrefix}.*");
203+
await _subscriptionManager.InitializeInboxSubscriptionAsync(props, _disposedCts.Token).ConfigureAwait(false);
203204
}
204205
}
205206

@@ -277,7 +278,7 @@ internal ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadO
277278
{
278279
if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
279280
{
280-
if (_subscriptionManager.InboxSid == props.SubscriptionId && props.IsReplyToRequest(InboxPrefix))
281+
if (_subscriptionManager.InboxSid == props.Subscription?.SubscriptionId && props.IsReplyToRequest(InboxPrefix))
281282
{
282283
if (_replyTaskFactory.TrySetResult(props, payloadBuffer, headersBuffer))
283284
{

src/NATS.Client.Core/NatsUnsubscribeProps.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ public NatsUnsubscribeProps(int subscriptionId)
1616
internal NatsUnsubscribeProps(string subject)
1717
: base(subject)
1818
{
19-
SubscriptionId = 0;
19+
SubscriptionId = -1;
2020
}
2121

2222
/// <summary>
2323
/// A unique numeric subscription ID, generated by the client.
2424
/// </summary>
2525
public int SubscriptionId { get; set; }
26+
27+
internal bool UnsetSubscriptionId => SubscriptionId != -1;
2628
}

0 commit comments

Comments
 (0)