Skip to content

Commit 5f7701a

Browse files
committed
Now use records to pass operation properties around
1 parent 0f7166c commit 5f7701a

25 files changed

+526
-233
lines changed

src/NATS.Client.Core/Commands/CommandWriter.cs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,11 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
294294
return default;
295295
}
296296

297-
public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
297+
public ValueTask PublishAsync<T>(NatsPublishProps props, T? value, NatsHeaders? headers, INatsSerialize<T> serializer, CancellationToken cancellationToken)
298298
{
299299
if (_trace)
300300
{
301-
_logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", subject, replyTo);
301+
_logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", props.Subject, props.ReplyTo);
302302
}
303303

304304
NatsPooledBufferWriter<byte>? headersBuffer = null;
@@ -346,12 +346,12 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
346346
#pragma warning restore VSTHRD103
347347
#pragma warning restore CA2016
348348
{
349-
return PublishStateMachineAsync(false, subject, replyTo, headersBuffer, payloadBuffer, cancellationToken);
349+
return PublishStateMachineAsync(false, props, headersBuffer, payloadBuffer, cancellationToken);
350350
}
351351

352352
if (_flushTask.IsNotCompletedSuccessfully())
353353
{
354-
return PublishStateMachineAsync(true, subject, replyTo, headersBuffer, payloadBuffer, cancellationToken);
354+
return PublishStateMachineAsync(true, props, headersBuffer, payloadBuffer, cancellationToken);
355355
}
356356

357357
try
@@ -361,7 +361,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
361361
throw new ObjectDisposedException(nameof(CommandWriter));
362362
}
363363

364-
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
364+
_protocolWriter.WritePublish(_pipeWriter, props, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
365365
EnqueueCommand();
366366
}
367367
finally
@@ -381,11 +381,11 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
381381
return default;
382382
}
383383

384-
public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
384+
public ValueTask SubscribeAsync(NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
385385
{
386386
if (_trace)
387387
{
388-
_logger.LogTrace(NatsLogEvents.Protocol, "SUB {Subject} {QueueGroup} {MaxMsgs}", subject, queueGroup, maxMsgs);
388+
_logger.LogTrace(NatsLogEvents.Protocol, "SUB {Subject} {QueueGroup} {MaxMsgs}", props.Subject, props.QueueGroup, maxMsgs);
389389
}
390390

391391
#pragma warning disable CA2016
@@ -394,12 +394,12 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
394394
#pragma warning restore VSTHRD103
395395
#pragma warning restore CA2016
396396
{
397-
return SubscribeStateMachineAsync(false, sid, subject, queueGroup, maxMsgs, cancellationToken);
397+
return SubscribeStateMachineAsync(false, props, maxMsgs, cancellationToken);
398398
}
399399

400400
if (_flushTask.IsNotCompletedSuccessfully())
401401
{
402-
return SubscribeStateMachineAsync(true, sid, subject, queueGroup, maxMsgs, cancellationToken);
402+
return SubscribeStateMachineAsync(true, props, maxMsgs, cancellationToken);
403403
}
404404

405405
try
@@ -409,7 +409,7 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
409409
throw new ObjectDisposedException(nameof(CommandWriter));
410410
}
411411

412-
_protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs);
412+
_protocolWriter.WriteSubscribe(_pipeWriter, props, maxMsgs);
413413
EnqueueCommand();
414414
}
415415
finally
@@ -420,11 +420,11 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
420420
return default;
421421
}
422422

423-
public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken)
423+
public ValueTask UnsubscribeAsync(NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
424424
{
425425
if (_trace)
426426
{
427-
_logger.LogTrace(NatsLogEvents.Protocol, "UNSUB {Sid} {MaxMsgs}", sid, maxMsgs);
427+
_logger.LogTrace(NatsLogEvents.Protocol, "UNSUB {Sid} {MaxMsgs}", props.SubscriptionId, maxMsgs);
428428
}
429429

430430
#pragma warning disable CA2016
@@ -433,12 +433,12 @@ public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cance
433433
#pragma warning restore VSTHRD103
434434
#pragma warning restore CA2016
435435
{
436-
return UnsubscribeStateMachineAsync(false, sid, maxMsgs, cancellationToken);
436+
return UnsubscribeStateMachineAsync(false, props, maxMsgs, cancellationToken);
437437
}
438438

439439
if (_flushTask.IsNotCompletedSuccessfully())
440440
{
441-
return UnsubscribeStateMachineAsync(true, sid, maxMsgs, cancellationToken);
441+
return UnsubscribeStateMachineAsync(true, props, maxMsgs, cancellationToken);
442442
}
443443

444444
try
@@ -448,7 +448,7 @@ public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cance
448448
throw new ObjectDisposedException(nameof(CommandWriter));
449449
}
450450

451-
_protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs);
451+
_protocolWriter.WriteUnsubscribe(_pipeWriter, props, maxMsgs);
452452
EnqueueCommand();
453453
}
454454
finally
@@ -815,7 +815,7 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c
815815
#if !NETSTANDARD
816816
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
817817
#endif
818-
private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, NatsPooledBufferWriter<byte>? headersBuffer, NatsPooledBufferWriter<byte> payloadBuffer, CancellationToken cancellationToken)
818+
private async ValueTask PublishStateMachineAsync(bool lockHeld, NatsPublishProps props, NatsPooledBufferWriter<byte>? headersBuffer, NatsPooledBufferWriter<byte> payloadBuffer, CancellationToken cancellationToken)
819819
{
820820
try
821821
{
@@ -839,7 +839,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
839839
await _flushTask!.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false);
840840
}
841841

842-
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
842+
_protocolWriter.WritePublish(_pipeWriter, props, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
843843
EnqueueCommand();
844844
}
845845
catch (TimeoutException)
@@ -866,7 +866,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
866866
}
867867
}
868868

869-
private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
869+
private async ValueTask SubscribeStateMachineAsync(bool lockHeld, NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
870870
{
871871
if (!lockHeld)
872872
{
@@ -888,7 +888,7 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, strin
888888
await _flushTask!.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false);
889889
}
890890

891-
_protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs);
891+
_protocolWriter.WriteSubscribe(_pipeWriter, props, maxMsgs);
892892
EnqueueCommand();
893893
}
894894
catch (TimeoutException)
@@ -903,7 +903,7 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, strin
903903
}
904904
}
905905

906-
private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, int? maxMsgs, CancellationToken cancellationToken)
906+
private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, NatsSubscriptionProps props, int? maxMsgs, CancellationToken cancellationToken)
907907
{
908908
if (!lockHeld)
909909
{
@@ -925,7 +925,7 @@ private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, int
925925
await _flushTask!.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false);
926926
}
927927

928-
_protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs);
928+
_protocolWriter.WriteUnsubscribe(_pipeWriter, props, maxMsgs);
929929
EnqueueCommand();
930930
}
931931
catch (TimeoutException)

src/NATS.Client.Core/Commands/ProtocolWriter.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,10 @@ public void WritePong(IBufferWriter<byte> writer)
7777
// or
7878
// https://docs.nats.io/reference/reference-protocols/nats-protocol#hpub
7979
// HPUB <subject> [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n
80-
public void WritePublish(IBufferWriter<byte> writer, string subject, string? replyTo, ReadOnlyMemory<byte>? headers, ReadOnlyMemory<byte> payload)
80+
public void WritePublish(IBufferWriter<byte> writer, NatsPublishProps props, ReadOnlyMemory<byte>? headers, ReadOnlyMemory<byte> payload)
8181
{
82+
var subject = props.Subject;
83+
var replyTo = props.ReplyTo;
8284
if (headers == null)
8385
{
8486
WritePub(writer, subject, replyTo, payload);
@@ -91,8 +93,12 @@ public void WritePublish(IBufferWriter<byte> writer, string subject, string? rep
9193

9294
// https://docs.nats.io/reference/reference-protocols/nats-protocol#sub
9395
// SUB <subject> [queue group] <sid>
94-
public void WriteSubscribe(IBufferWriter<byte> writer, int sid, string subject, string? queueGroup, int? maxMsgs)
96+
public void WriteSubscribe(IBufferWriter<byte> writer, NatsSubscriptionProps props, int? maxMsgs)
9597
{
98+
var sid = props.SubscriptionId;
99+
var subject = props.Subject;
100+
var queueGroup = props.QueueGroup;
101+
96102
// 'SUB ' + subject +' '+ sid +'\r\n'
97103
var ctrlLen = SubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength;
98104

@@ -138,14 +144,16 @@ public void WriteSubscribe(IBufferWriter<byte> writer, int sid, string subject,
138144
// between our SUB and UNSUB calls.
139145
if (maxMsgs != null)
140146
{
141-
WriteUnsubscribe(writer, sid, maxMsgs);
147+
WriteUnsubscribe(writer, props, maxMsgs);
142148
}
143149
}
144150

145151
// https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub
146152
// UNSUB <sid> [max_msgs]
147-
public void WriteUnsubscribe(IBufferWriter<byte> writer, int sid, int? maxMessages)
153+
public void WriteUnsubscribe(IBufferWriter<byte> writer, NatsSubscriptionProps probs, int? maxMessages)
148154
{
155+
var sid = probs.SubscriptionId;
156+
149157
// 'UNSUB ' + sid +'\r\n'
150158
var ctrlLen = UnsubSpaceLength + MaxIntStringLength + NewLineLength;
151159
if (maxMessages != null)

src/NATS.Client.Core/Internal/InboxSub.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@ internal class InboxSub : NatsSubBase
1212

1313
public InboxSub(
1414
InboxSubBuilder inbox,
15-
string subject,
15+
NatsSubscriptionProps props,
1616
NatsSubOpts? opts,
1717
NatsConnection connection,
1818
INatsSubscriptionManager manager)
19-
: base(connection, manager, subject, queueGroup: default, opts)
19+
: base(connection, manager, props, opts)
2020
{
2121
_inbox = inbox;
2222
_connection = connection;
2323
}
2424

2525
// Avoid base class error handling since inboxed subscribers will be responsible for that.
26-
public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27-
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);
26+
public override ValueTask ReceiveAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27+
_inbox.ReceivedAsync(props, headersBuffer, payloadBuffer, _connection);
2828

2929
// Not used. Dummy implementation to keep base happy.
30-
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
30+
protected override ValueTask ReceiveInternalAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
3131
=> default;
3232

3333
protected override void TryComplete()
@@ -46,9 +46,9 @@ internal class InboxSubBuilder : INatsSubscriptionManager
4646

4747
public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;
4848

49-
public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
49+
public InboxSub Build(NatsSubscriptionProps props, NatsSubOpts? opts, NatsConnection connection, INatsSubscriptionManager manager)
5050
{
51-
return new InboxSub(this, subject, opts, connection, manager);
51+
return new InboxSub(this, props, opts, connection, manager);
5252
}
5353

5454
public ValueTask RegisterAsync(NatsSubBase sub)
@@ -119,11 +119,11 @@ public ValueTask RegisterAsync(NatsSubBase sub)
119119
return sub.ReadyAsync();
120120
}
121121

122-
public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
122+
public async ValueTask ReceivedAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
123123
{
124-
if (!_bySubject.TryGetValue(subject, out var subTable))
124+
if (!_bySubject.TryGetValue(props.Subject, out var subTable))
125125
{
126-
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", subject);
126+
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", props.Subject);
127127
return;
128128
}
129129

@@ -132,13 +132,13 @@ public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySe
132132
{
133133
if (weakReference.TryGetTarget(out var sub))
134134
{
135-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
135+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
136136
}
137137
}
138138
#else
139139
foreach (var (sub, _) in subTable)
140140
{
141-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
141+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
142142
}
143143
#endif
144144
}

0 commit comments

Comments
 (0)