Skip to content

Commit ff6e44b

Browse files
committed
Remove NatsSubject
Signed-off-by: James Thompson <[email protected]>
1 parent ec0d22d commit ff6e44b

File tree

11 files changed

+51
-246
lines changed

11 files changed

+51
-246
lines changed

src/NATS.Client.Core/NatsMessagingProps.cs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,7 @@ internal NatsMessagingProps(string subject)
77
{
88
}
99

10-
internal NatsMessagingProps(string subjectTemplate, string subjectId)
11-
: base(subjectTemplate, subjectId)
12-
{
13-
}
14-
15-
internal NatsMessagingProps(string subjectTemplate, Dictionary<string, object> properties)
16-
: base(subjectTemplate, properties)
17-
{
18-
}
19-
20-
public NatsSubject? ReplyTo { get; private set; } = null;
10+
public string? ReplyTo { get; private set; } = null;
2111

2212
internal int PayloadLength => TotalMessageLength - HeaderLength;
2313

@@ -29,16 +19,11 @@ internal NatsMessagingProps(string subjectTemplate, Dictionary<string, object> p
2919

3020
internal int TotalEnvelopeLength => TotalMessageLength + FramingLength;
3121

32-
public void SetReplyTo(string replyToTemplate, object replyToId)
33-
{
34-
SetReplyTo(new NatsSubject(replyToTemplate, "ReplyToId", replyToId));
35-
}
36-
3722
public void SetReplyTo(string? replyTo)
3823
{
3924
if (replyTo != null)
4025
{
41-
SetReplyTo(new NatsSubject(replyTo));
26+
ReplyTo = replyTo;
4227
}
4328
}
4429
}

src/NATS.Client.Core/NatsOperationProps.cs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,9 @@ namespace NATS.Client.Core;
33
public record NatsOperationProps
44
{
55
internal NatsOperationProps(string subject)
6-
: this(new NatsSubject(subject))
76
{
87
Subject = subject;
98
}
109

11-
internal NatsOperationProps(string subjectTemplate, string subjectId)
12-
: this(new NatsSubject(subjectTemplate, "SubjectId", subjectId))
13-
{
14-
}
15-
16-
internal NatsOperationProps(string subjectTemplate, Dictionary<string, object> properties)
17-
: this(new NatsSubject(subjectTemplate, properties))
18-
{
19-
}
20-
2110
public string Subject { get; private set; }
2211
}

src/NATS.Client.Core/NatsPublishProps.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,4 @@ public NatsPublishProps(string subject)
66
: base(subject)
77
{
88
}
9-
10-
public NatsPublishProps(string subjectTemplate, string subjectId)
11-
: base(subjectTemplate, subjectId)
12-
{
13-
}
14-
15-
public NatsPublishProps(string subjectTemplate, Dictionary<string, object> properties)
16-
: base(subjectTemplate, properties)
17-
{
18-
}
199
}

src/NATS.Client.Core/NatsSubject.cs

Lines changed: 0 additions & 66 deletions
This file was deleted.

src/NATS.Client.Core/NatsSubscriptionProps.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@ public NatsSubscriptionProps(int subscriptionId)
1414
SubscriptionId = subscriptionId;
1515
}
1616

17-
public NatsSubscriptionProps(string subjectTemplate, string subjectId, string? queueGroup = default)
18-
: base(subjectTemplate, subjectId)
19-
{
20-
QueueGroup = queueGroup;
21-
}
22-
2317
public int SubscriptionId { get; set; }
2418

2519
public string? QueueGroup { get; internal set; }

src/NATS.Client.JetStream/Internal/NatsJSConsume.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
245245
MinPending = _priorityGroup?.MinPending ?? 0,
246246
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
247247
};
248-
var prop = new NatsPublishProps($"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}");
249-
prop.SetReplyTo(Subject);
250248
await commandWriter.PublishAsync(
251-
props: prop,
249+
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
252250
value: request,
253251
headers: default,
252+
replyTo: Subject,
254253
serializer: NatsJSJsonSerializer<ConsumerGetnextRequest>.Default,
255254
cancellationToken: CancellationToken.None);
256255

src/NATS.Client.JetStream/Internal/NatsJSFetch.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
192192
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
193193
};
194194

195-
var pubProps = new NatsPublishProps($"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}");
196-
pubProps.SetReplyTo(Subject);
197195
await commandWriter.PublishAsync(
198-
pubProps,
196+
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
199197
value: request,
200198
headers: default,
199+
replyTo: Subject,
201200
serializer: NatsJSJsonSerializer<ConsumerGetnextRequest>.Default,
202201
cancellationToken: CancellationToken.None);
203202
}

src/NATS.Client.JetStream/NatsJSConsumer.cs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchNoWaitAsync<T>(
287287
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
288288
public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) =>
289289
Info = await _context.JSRequestResponseAsync<object, ConsumerInfo>(
290-
props: GetConsumerProps("INFO", _stream, _consumer),
290+
subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}",
291291
request: null,
292292
cancellationToken).ConfigureAwait(false);
293293

@@ -473,25 +473,4 @@ private void ThrowIfDeleted()
473473
if (_deleted)
474474
throw new NatsJSException($"Consumer '{_stream}:{_consumer}' is deleted");
475475
}
476-
477-
private NatsPublishProps GetConsumerProps(string action, string stream, string? consumer = default)
478-
{
479-
var template = "{prefix}.{entity}.{action}.{stream}";
480-
var values = new Dictionary<string, object>()
481-
{
482-
{ "prefix", _context.Opts.Prefix },
483-
{ "entity", "CONSUMER" },
484-
{ "action", action },
485-
{ "stream", stream },
486-
};
487-
if (consumer != null)
488-
{
489-
template += ".{id}";
490-
values.Add("id", consumer);
491-
}
492-
493-
return new NatsPublishProps(
494-
template,
495-
values);
496-
}
497476
}

src/NATS.Client.JetStream/NatsJSContext.Consumers.cs

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ public async ValueTask<INatsJSConsumer> UpdateConsumerAsync(
6767
public async ValueTask<INatsJSConsumer> GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
6868
{
6969
ThrowIfInvalidStreamName(stream);
70-
var props = GetConsumerProps("INFO", stream, consumer);
71-
var response = await JSRequestResponseAsync<object, ConsumerInfo>(props, null, cancellationToken);
70+
var response = await JSRequestResponseAsync<object, ConsumerInfo>(
71+
subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}",
72+
request: null,
73+
cancellationToken);
7274
return new NatsJSConsumer(this, response);
7375
}
7476

@@ -79,11 +81,10 @@ public async IAsyncEnumerable<INatsJSConsumer> ListConsumersAsync(
7981
{
8082
ThrowIfInvalidStreamName(stream);
8183
var offset = 0;
82-
var props = GetConsumerProps("LIST", stream);
8384
while (!cancellationToken.IsCancellationRequested)
8485
{
8586
var response = await JSRequestResponseAsync<ConsumerListRequest, ConsumerListResponse>(
86-
props: props,
87+
subject: $"{Opts.Prefix}.CONSUMER.LIST.{stream}",
8788
new ConsumerListRequest { Offset = offset },
8889
cancellationToken);
8990

@@ -106,11 +107,10 @@ public async IAsyncEnumerable<string> ListConsumerNamesAsync(
106107
{
107108
ThrowIfInvalidStreamName(stream);
108109
var offset = 0;
109-
var props = GetConsumerProps("NAMES", stream);
110110
while (!cancellationToken.IsCancellationRequested)
111111
{
112112
var response = await JSRequestResponseAsync<ConsumerNamesRequest, ConsumerNamesResponse>(
113-
props: props,
113+
subject: $"{Opts.Prefix}.CONSUMER.NAMES.{stream}",
114114
new ConsumerNamesRequest { Offset = offset },
115115
cancellationToken);
116116

@@ -138,8 +138,10 @@ public async IAsyncEnumerable<string> ListConsumerNamesAsync(
138138
public async ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
139139
{
140140
ThrowIfInvalidStreamName(stream);
141-
var props = GetConsumerProps("DELETE", stream, consumer);
142-
var response = await JSRequestResponseAsync<object, ConsumerDeleteResponse>(props, null, cancellationToken);
141+
var response = await JSRequestResponseAsync<object, ConsumerDeleteResponse>(
142+
subject: $"{Opts.Prefix}.CONSUMER.DELETE.{stream}.{consumer}",
143+
request: null,
144+
cancellationToken);
143145
return response.Success;
144146
}
145147

@@ -159,7 +161,7 @@ public async ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream,
159161
{
160162
ThrowIfInvalidStreamName(stream);
161163
var response = await JSRequestResponseAsync<ConsumerPauseRequest, ConsumerPauseResponse>(
162-
props: GetConsumerProps("PAUSE", stream, consumer),
164+
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
163165
request: new ConsumerPauseRequest { PauseUntil = pauseUntil },
164166
cancellationToken);
165167
return response;
@@ -179,8 +181,10 @@ public async ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream,
179181
public async ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
180182
{
181183
ThrowIfInvalidStreamName(stream);
182-
var props = GetConsumerProps("PAUSE", stream, consumer);
183-
var response = await JSRequestResponseAsync<object, ConsumerPauseResponse>(props, null, cancellationToken);
184+
var response = await JSRequestResponseAsync<object, ConsumerPauseResponse>(
185+
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
186+
request: null,
187+
cancellationToken);
184188
return !response.IsPaused;
185189
}
186190

@@ -225,9 +229,12 @@ internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
225229
}
226230

227231
var name = Nuid.NewNuid();
228-
var props = GetConsumerProps("CREATE", stream, name);
232+
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}.{name}";
229233

230-
return JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(props, request, cancellationToken);
234+
return JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
235+
subject: subject,
236+
request,
237+
cancellationToken);
231238
}
232239

233240
private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
@@ -236,12 +243,16 @@ private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
236243
ConsumerCreateAction action,
237244
CancellationToken cancellationToken)
238245
{
239-
var props = GetConsumerProps("CREATE", stream, config.Name);
246+
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}";
247+
248+
if (!string.IsNullOrWhiteSpace(config.Name))
249+
{
250+
subject += $".{config.Name}";
251+
}
240252

241-
if (!string.IsNullOrWhiteSpace(config.FilterSubject) && config.FilterSubject != null)
253+
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
242254
{
243-
props.Subject.Values.Add("filterSubject", config.FilterSubject);
244-
props.Subject.Template += ".filterSubject";
255+
subject += $".{config.FilterSubject}";
245256
}
246257

247258
// ADR-42: In the initial implementation we should limit PriorityGroups to one per consumer only
@@ -264,7 +275,7 @@ private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
264275
}
265276

266277
var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
267-
props: props,
278+
subject: subject,
268279
new ConsumerCreateRequest
269280
{
270281
StreamName = stream,
@@ -275,23 +286,4 @@ private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
275286

276287
return new NatsJSConsumer(this, response);
277288
}
278-
279-
private NatsPublishProps GetConsumerProps(string action, string stream, string? consumer = default)
280-
{
281-
var template = "{prefix}.{entity}.{action}.{stream}";
282-
var values = new Dictionary<string, object>()
283-
{
284-
{ "prefix", Opts.Prefix },
285-
{ "entity", "CONSUMER" },
286-
{ "action", action },
287-
{ "stream", stream },
288-
};
289-
if (consumer != null)
290-
{
291-
template += ".{id}";
292-
values.Add("id", consumer);
293-
}
294-
295-
return new NatsPublishProps(template, values);
296-
}
297289
}

0 commit comments

Comments
 (0)