Skip to content

Commit ec0d22d

Browse files
committed
Implementation of review feedback
Signed-off-by: James Thompson <[email protected]>
1 parent 8ff8b41 commit ec0d22d

22 files changed

+102
-150
lines changed

src/NATS.Client.Core/Internal/ReplyTask.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
9595
{
9696
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
9797
var id = Interlocked.Increment(ref _nextId);
98-
98+
9999
string subject;
100100
if (_allocSubject)
101101
{
@@ -126,7 +126,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
126126

127127
public bool TrySetResult(NatsProcessProps props, in ReadOnlySequence<byte> payloadBuffer, in ReadOnlySequence<byte>? headersBuffer)
128128
{
129-
if (_replies.TryGetValue(props.SubjectNumber, out var rt))
129+
if (_replies.TryGetValue(props.SubjectNumber(_connection.InboxPrefix), out var rt))
130130
{
131131
rt.SetResult(props, payloadBuffer, headersBuffer);
132132
return true;

src/NATS.Client.Core/Internal/SubscriptionManager.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT
5757
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
5858
try
5959
{
60-
if (props.Subject.IsInbox)
60+
if (props.IsInboxSubject(_connection.InboxPrefix))
6161
{
6262
if (sub.QueueGroup != null)
6363
{
@@ -76,7 +76,7 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT
7676
}
7777
}
7878

79-
if (props.Subject.IsInbox)
79+
if (props.IsInboxSubject(_connection.InboxPrefix))
8080
{
8181
if (sub.QueueGroup != null)
8282
{
@@ -126,7 +126,7 @@ public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnl
126126
{
127127
try
128128
{
129-
return _connection.UnsubscribeAsync(new NatsSubscriptionProps(props.SubscriptionId, props.Subject.InboxPrefix));
129+
return _connection.UnsubscribeAsync(new NatsSubscriptionProps(props.SubscriptionId));
130130
}
131131
catch (Exception e)
132132
{
@@ -181,7 +181,7 @@ public ValueTask RemoveAsync(NatsSubBase sub)
181181
_logger.LogDebug(NatsLogEvents.Subscription, "Removing subscription {Subject}/{Sid}", sub.Subject, subMetadata.Sid);
182182
}
183183

184-
return _connection.UnsubscribeAsync(sub.SubscriptionProps(_connection.InboxPrefix));
184+
return _connection.UnsubscribeAsync(sub.SubscriptionProps);
185185
}
186186

187187
/// <summary>
@@ -216,7 +216,7 @@ internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter
216216

217217
foreach (var (sub, sid) in subs)
218218
{
219-
await sub.WriteReconnectCommandsAsync(commandWriter, new NatsSubscriptionProps(sid, _connection.InboxPrefix)).ConfigureAwait(false);
219+
await sub.WriteReconnectCommandsAsync(commandWriter, new NatsSubscriptionProps(sid)).ConfigureAwait(false);
220220

221221
if (_debug)
222222
{
@@ -227,7 +227,7 @@ internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter
227227

228228
internal INatsSubscriptionManager GetManagerFor(NatsSubscriptionProps props)
229229
{
230-
if (props.Subject.IsInbox)
230+
if (props.IsInboxSubject(_connection.InboxPrefix))
231231
return InboxSubBuilder;
232232
return this;
233233
}
@@ -352,7 +352,7 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
352352
try
353353
{
354354
_logger.LogWarning(NatsLogEvents.Subscription, "Unsubscribing orphan subscription {Sid}", sid);
355-
await _connection.UnsubscribeAsync(new NatsSubscriptionProps(sid, _connection.InboxPrefix)).ConfigureAwait(false);
355+
await _connection.UnsubscribeAsync(new NatsSubscriptionProps(sid)).ConfigureAwait(false);
356356
}
357357
catch (Exception e)
358358
{

src/NATS.Client.Core/NatsConnection.RequestReply.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
3232
NatsSubOpts? replyOpts = default,
3333
CancellationToken cancellationToken = default)
3434
{
35-
NatsPublishProps props = requestOpts?.Props ?? new NatsPublishProps(subject, InboxPrefix);
35+
var props = requestOpts?.Props ?? new NatsPublishProps(subject, InboxPrefix);
3636
if (Telemetry.HasListeners())
3737
{
3838
using var activity = Telemetry.StartSendActivity($"{SpanDestinationName(subject)} {Telemetry.Constants.RequestReplyActivityName}", this, subject, null);

src/NATS.Client.Core/NatsConnection.RequestSub.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ internal async ValueTask<NatsSub<TReply>> CreateRequestSubAsync<TRequest, TReply
3737
CancellationToken cancellationToken = default)
3838
{
3939
replySerializer ??= Opts.SerializerRegistry.GetDeserializer<TReply>();
40-
var subProps = new NatsSubscriptionProps(props.Subject);
40+
var subProps = replyOpts?.Props ?? new NatsSubscriptionProps(props.Subject);
4141
var sub = new NatsSub<TReply>(this, _subscriptionManager.InboxSubBuilder, subProps, replyOpts, replySerializer);
4242
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
4343

src/NATS.Client.Core/NatsConnection.Subscribe.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public partial class NatsConnection
99
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
1010
{
1111
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
12-
var props = new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
12+
var props = opts?.Props ?? new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
1313
await using var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(props), props, opts, serializer, cancellationToken);
1414
await AddSubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
1515

@@ -25,7 +25,7 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
2525
public async ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
2626
{
2727
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
28-
var props = new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
28+
var props = opts?.Props ?? new NatsSubscriptionProps(subject, InboxPrefix, queueGroup);
2929
var sub = new NatsSub<T>(this, _subscriptionManager.GetManagerFor(props), props, opts, serializer, cancellationToken);
3030
await AddSubAsync(sub, cancellationToken).ConfigureAwait(false);
3131
return sub;

src/NATS.Client.Core/NatsConnection.cs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -277,32 +277,12 @@ internal ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadO
277277
{
278278
if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
279279
{
280-
// Direct mode, check if the subject is an inbox
281-
// and if so, check if the subject is a reply to a request
282-
// by checking if the subject length is less than two NUIDs + dots
283-
// e.g. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.Hu5HPpWesrJhvQq2NG3YLw
284-
// vs. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.1234
285-
// otherwise, it's not a reply in direct mode.
286-
if (_subscriptionManager.InboxSid == props.SubscriptionId && props.Subject.ToString().Length < InboxPrefix.Length + 1 + 22 + 1 + 22)
280+
if (_subscriptionManager.InboxSid == props.SubscriptionId && props.IsReplyToRequest(InboxPrefix))
287281
{
288-
var idString = props.Subject.ToString().AsSpan().Slice(InboxPrefix.Length + 1)
289-
#if NETSTANDARD2_0
290-
.ToString()
291-
#endif
292-
;
293-
294-
if (long.TryParse(idString, out var id))
282+
if (_replyTaskFactory.TrySetResult(props, payloadBuffer, headersBuffer))
295283
{
296-
if (_replyTaskFactory.TrySetResult(props, payloadBuffer, headersBuffer))
297-
{
298-
return default;
299-
}
300-
301-
// if we can't set the result, either the task is already timed out or
302-
// it's not a reply to a request.
284+
return default;
303285
}
304-
305-
// if we can't parse the id, it's not a reply.
306286
}
307287
}
308288

src/NATS.Client.Core/NatsMessagingProps.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ namespace NATS.Client.Core;
22

33
public record NatsMessagingProps : NatsOperationProps
44
{
5-
internal NatsMessagingProps(string subject, string inboxPrefix)
6-
: base(subject, inboxPrefix)
5+
internal NatsMessagingProps(string subject)
6+
: base(subject)
77
{
88
}
99

10-
internal NatsMessagingProps(string subjectTemplate, string subjectId, string inboxPrefix)
11-
: base(subjectTemplate, subjectId, inboxPrefix)
10+
internal NatsMessagingProps(string subjectTemplate, string subjectId)
11+
: base(subjectTemplate, subjectId)
1212
{
1313
}
1414

15-
internal NatsMessagingProps(string subjectTemplate, Dictionary<string, object> properties, string inboxPrefix)
16-
: base(subjectTemplate, properties, inboxPrefix)
15+
internal NatsMessagingProps(string subjectTemplate, Dictionary<string, object> properties)
16+
: base(subjectTemplate, properties)
1717
{
1818
}
1919

@@ -31,14 +31,14 @@ internal NatsMessagingProps(string subjectTemplate, Dictionary<string, object> p
3131

3232
public void SetReplyTo(string replyToTemplate, object replyToId)
3333
{
34-
ReplyTo = new NatsSubject(replyToTemplate, "ReplyToId", replyToId, Subject.InboxPrefix);
34+
SetReplyTo(new NatsSubject(replyToTemplate, "ReplyToId", replyToId));
3535
}
3636

3737
public void SetReplyTo(string? replyTo)
3838
{
3939
if (replyTo != null)
4040
{
41-
ReplyTo = new NatsSubject(replyTo);
41+
SetReplyTo(new NatsSubject(replyTo));
4242
}
4343
}
4444
}

src/NATS.Client.Core/NatsOperationProps.cs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,21 @@ namespace NATS.Client.Core;
22

33
public record NatsOperationProps
44
{
5-
public NatsOperationProps(string subject, string inboxPrefix)
6-
: this(new NatsSubject(subject, inboxPrefix))
7-
{
8-
}
9-
10-
public NatsOperationProps(string subjectTemplate, string subjectId, string inboxPrefix)
11-
: this(new NatsSubject(subjectTemplate, "SubjectId", subjectId, inboxPrefix))
5+
internal NatsOperationProps(string subject)
6+
: this(new NatsSubject(subject))
127
{
8+
Subject = subject;
139
}
1410

15-
public NatsOperationProps(string subjectTemplate, Dictionary<string, object> properties, string inboxPrefix)
16-
: this(new NatsSubject(subjectTemplate, properties, inboxPrefix))
11+
internal NatsOperationProps(string subjectTemplate, string subjectId)
12+
: this(new NatsSubject(subjectTemplate, "SubjectId", subjectId))
1713
{
1814
}
1915

20-
public NatsOperationProps(NatsSubject subject)
16+
internal NatsOperationProps(string subjectTemplate, Dictionary<string, object> properties)
17+
: this(new NatsSubject(subjectTemplate, properties))
2118
{
22-
Subject = subject;
2319
}
2420

25-
public NatsSubject Subject { get; private set; }
21+
public string Subject { get; private set; }
2622
}

src/NATS.Client.Core/NatsProcessProps.cs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,38 @@ public record NatsProcessProps : NatsMessagingProps
44
{
55
private long _subjectValue = 0;
66

7-
internal NatsProcessProps(string subject, int subscriptionId, string inboxPrefix)
8-
: base(subject, inboxPrefix) => SubscriptionId = subscriptionId;
7+
internal NatsProcessProps(string subject, int subscriptionId)
8+
: base(subject)
9+
{
10+
SubscriptionId = subscriptionId;
11+
}
912

1013
public int SubscriptionId { get; internal set; }
1114

1215
public NatsSubscriptionProps? Subscription { get; internal set; } = null;
1316

14-
// check if the subject is a reply to a request
15-
// by checking if the subject length is less than two NUIDs + dots
17+
// check if the subject is a reply to a request by checking if the subject length is less than two NUIDs + dots
1618
// e.g. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.Hu5HPpWesrJhvQq2NG3YLw
1719
// vs. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.1234
1820
// otherwise, it's not a reply in direct mode.
19-
internal bool IsDirectReply => Subject.ToString().Length < Subject.InboxPrefix?.Length + 1 + 22 + 1 + 22;
21+
internal bool IsReplyToRequest(string inboxPrefix) => IsInboxSubject(inboxPrefix) && Subject.ToString().Length < inboxPrefix.Length + 1 + 22 + 1 + 22;
2022

21-
internal long SubjectNumber
23+
internal long SubjectNumber(string inboxPrefix)
2224
{
23-
get
25+
if (IsInboxSubject(inboxPrefix) && _subjectValue == 0)
2426
{
25-
if (Subject.IsInbox && _subjectValue == 0)
26-
{
27-
var idString = Subject.ToString().AsSpan().Slice((Subject.InboxPrefix?.Length ?? 0) + 1)
27+
var idString = Subject.ToString().AsSpan().Slice((inboxPrefix?.Length ?? 0) + 1)
2828
#if NETSTANDARD2_0
29-
.ToString()
29+
.ToString()
3030
#endif
31-
;
31+
;
3232

33-
if (long.TryParse(idString, out var id))
34-
{
35-
_subjectValue = id;
36-
}
33+
if (long.TryParse(idString, out var id))
34+
{
35+
_subjectValue = id;
3736
}
38-
39-
return _subjectValue;
4037
}
38+
39+
return _subjectValue;
4140
}
4241
}

src/NATS.Client.Core/NatsPublishProps.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ namespace NATS.Client.Core;
22

33
public record NatsPublishProps : NatsMessagingProps
44
{
5-
public NatsPublishProps(string subject, string inboxPrefix = "UNKNOWN")
6-
: base(subject, inboxPrefix)
5+
public NatsPublishProps(string subject)
6+
: base(subject)
77
{
88
}
99

10-
public NatsPublishProps(string subjectTemplate, string subjectId, string inboxPrefix = "UNKNOWN")
11-
: base(subjectTemplate, subjectId, inboxPrefix)
10+
public NatsPublishProps(string subjectTemplate, string subjectId)
11+
: base(subjectTemplate, subjectId)
1212
{
1313
}
1414

15-
internal NatsPublishProps(string subjectTemplate, Dictionary<string, object> properties, string inboxPrefix = "UNKNOWN")
16-
: base(subjectTemplate, properties, inboxPrefix)
15+
public NatsPublishProps(string subjectTemplate, Dictionary<string, object> properties)
16+
: base(subjectTemplate, properties)
1717
{
1818
}
1919
}

0 commit comments

Comments
 (0)