Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,15 @@ ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
/// </summary>
/// <param name="subject">The JetStream API subject to send the request to.</param>
/// <param name="request">The request message object.</param>
/// <param name="apiLevel">Asserts whether the JetStream server that's responding supports this API level.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="TRequest">The type of the request message.</typeparam>
/// <typeparam name="TResponse">The type of the response message.</typeparam>
/// <returns>A task representing the asynchronous operation, with a result of type <typeparamref name="TResponse"/>.</returns>
ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
NatsJSApiLevel apiLevel = default,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class;
Expand Down
17 changes: 17 additions & 0 deletions src/NATS.Client.JetStream/NatsJSApiLevel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NATS.Client.JetStream;

public readonly struct NatsJSApiLevel
{
public const string Header = "Nats-Required-Api-Level";
public static readonly NatsJSApiLevel None = default;
public static readonly NatsJSApiLevel V1 = new(1);
public static readonly NatsJSApiLevel V2 = new(2);

private readonly int _level = 0;

internal NatsJSApiLevel(int level) => _level = level;

public bool IsSet() => _level > 0;

public string GetHeaderValue() => _level.ToString();
}
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul
Info = await _context.JSRequestResponseAsync<object, ConsumerInfo>(
subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}",
request: null,
apiLevel: default,
cancellationToken).ConfigureAwait(false);

internal async ValueTask<NatsJSConsume<T>> ConsumeInternalAsync<T>(INatsDeserialize<T>? serializer = default, NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default)
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public async ValueTask<INatsJSConsumer> GetConsumerAsync(string stream, string c
var response = await JSRequestResponseAsync<object, ConsumerInfo>(
subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}",
request: null,
apiLevel: default,
cancellationToken);
return new NatsJSConsumer(this, response);
}
Expand All @@ -86,6 +87,7 @@ public async IAsyncEnumerable<INatsJSConsumer> ListConsumersAsync(
var response = await JSRequestResponseAsync<ConsumerListRequest, ConsumerListResponse>(
subject: $"{Opts.Prefix}.CONSUMER.LIST.{stream}",
new ConsumerListRequest { Offset = offset },
apiLevel: default,
cancellationToken);

if (response.Consumers.Count == 0)
Expand All @@ -112,6 +114,7 @@ public async IAsyncEnumerable<string> ListConsumerNamesAsync(
var response = await JSRequestResponseAsync<ConsumerNamesRequest, ConsumerNamesResponse>(
subject: $"{Opts.Prefix}.CONSUMER.NAMES.{stream}",
new ConsumerNamesRequest { Offset = offset },
apiLevel: default,
cancellationToken);

if (response.Consumers.Count == 0)
Expand Down Expand Up @@ -141,6 +144,7 @@ public async ValueTask<bool> DeleteConsumerAsync(string stream, string consumer,
var response = await JSRequestResponseAsync<object, ConsumerDeleteResponse>(
subject: $"{Opts.Prefix}.CONSUMER.DELETE.{stream}.{consumer}",
request: null,
apiLevel: default,
cancellationToken);
return response.Success;
}
Expand All @@ -163,6 +167,7 @@ public async ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream,
var response = await JSRequestResponseAsync<ConsumerPauseRequest, ConsumerPauseResponse>(
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
request: new ConsumerPauseRequest { PauseUntil = pauseUntil },
apiLevel: default,
cancellationToken);
return response;
}
Expand All @@ -184,6 +189,7 @@ public async ValueTask<bool> ResumeConsumerAsync(string stream, string consumer,
var response = await JSRequestResponseAsync<object, ConsumerPauseResponse>(
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
request: null,
apiLevel: default,
cancellationToken);
return !response.IsPaused;
}
Expand Down Expand Up @@ -234,6 +240,7 @@ internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
return JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: subject,
request,
apiLevel: default,
cancellationToken);
}

Expand Down Expand Up @@ -282,6 +289,7 @@ private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
Config = config,
Action = action,
},
apiLevel: default,
cancellationToken);

return new NatsJSConsumer(this, response);
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public async ValueTask<INatsJSStream> CreateStreamAsync(
var response = await JSRequestResponseAsync<StreamConfig, StreamInfo>(
subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}",
config,
apiLevel: default,
cancellationToken);
return new NatsJSStream(this, response);
}
Expand All @@ -75,6 +76,7 @@ public async ValueTask<INatsJSStream> CreateOrUpdateStreamAsync(StreamConfig con
var response = await JSRequestAsync<StreamConfig, StreamUpdateResponse>(
subject: $"{Opts.Prefix}.STREAM.UPDATE.{config.Name}",
request: config,
apiLevel: default,
cancellationToken);

if (response.Error is { Code: 404 })
Expand Down Expand Up @@ -104,6 +106,7 @@ public async ValueTask<bool> DeleteStreamAsync(
var response = await JSRequestResponseAsync<object, StreamMsgDeleteResponse>(
subject: $"{Opts.Prefix}.STREAM.DELETE.{stream}",
request: null,
apiLevel: default,
cancellationToken);
return response.Success;
}
Expand All @@ -128,6 +131,7 @@ public async ValueTask<StreamPurgeResponse> PurgeStreamAsync(
var response = await JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}",
request: request,
apiLevel: default,
cancellationToken);
return response;
}
Expand All @@ -152,6 +156,7 @@ public async ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
var response = await JSRequestResponseAsync<StreamMsgDeleteRequest, StreamMsgDeleteResponse>(
subject: $"{Opts.Prefix}.STREAM.MSG.DELETE.{stream}",
request: request,
apiLevel: default,
cancellationToken);
return response;
}
Expand All @@ -176,6 +181,7 @@ public async ValueTask<INatsJSStream> GetStreamAsync(
var response = await JSRequestResponseAsync<StreamInfoRequest, StreamInfoResponse>(
subject: $"{Opts.Prefix}.STREAM.INFO.{stream}",
request: request,
apiLevel: default,
cancellationToken);
return new NatsJSStream(this, response);
}
Expand All @@ -198,6 +204,7 @@ public async ValueTask<NatsJSStream> UpdateStreamAsync(
var response = await JSRequestResponseAsync<StreamConfig, StreamUpdateResponse>(
subject: $"{Opts.Prefix}.STREAM.UPDATE.{request.Name}",
request: request,
apiLevel: default,
cancellationToken);
return new NatsJSStream(this, response);
}
Expand All @@ -224,6 +231,7 @@ public async IAsyncEnumerable<INatsJSStream> ListStreamsAsync(
Offset = offset,
Subject = subject,
},
apiLevel: default,
cancellationToken);

if (response.Streams.Count == 0)
Expand Down Expand Up @@ -254,6 +262,7 @@ public async IAsyncEnumerable<string> ListStreamNamesAsync(string? subject = def
Subject = subject,
Offset = offset,
},
apiLevel: default,
cancellationToken);

if (response.Streams == null || response.Streams.Count == 0)
Expand Down
22 changes: 18 additions & 4 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public ValueTask<AccountInfoResponse> GetAccountInfoAsync(CancellationToken canc
JSRequestResponseAsync<object, AccountInfoResponse>(
subject: $"{Opts.Prefix}.INFO",
request: null,
apiLevel: default,
cancellationToken);

/// <summary>
Expand Down Expand Up @@ -337,11 +338,12 @@ public async ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
public async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
NatsJSApiLevel apiLevel = default,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken);
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, apiLevel, cancellationToken);
response.EnsureSuccess();
return response.Response!;
}
Expand Down Expand Up @@ -373,11 +375,12 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg
internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
NatsJSApiLevel apiLevel = default,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var result = await TryJSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken).ConfigureAwait(false);
var result = await TryJSRequestAsync<TRequest, TResponse>(subject, request, apiLevel, cancellationToken).ConfigureAwait(false);
if (!result.Success)
{
throw result.Error;
Expand All @@ -389,6 +392,7 @@ internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TRe
internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
NatsJSApiLevel apiLevel = default,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
Expand All @@ -399,6 +403,16 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}

NatsHeaders? headers;
if (apiLevel.IsSet())
{
headers = new NatsHeaders { [NatsJSApiLevel.Header] = apiLevel.GetHeaderValue() };
}
else
{
headers = null;
}

if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
NatsMsg<NatsJSApiResult<TResponse>> msg;
Expand All @@ -407,7 +421,7 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
msg = await Connection.RequestAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: null,
headers: headers,
replyOpts: new NatsSubOpts { Timeout = Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
Expand Down Expand Up @@ -448,7 +462,7 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
await using var sub = await Connection.CreateRequestSubAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: default,
headers: headers,
replyOpts: new NatsSubOpts { Timeout = Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul
Info = await _context.JSRequestResponseAsync<object, StreamInfoResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_name}",
request: null,
apiLevel: default,
cancellationToken).ConfigureAwait(false);

public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
Expand All @@ -185,6 +186,7 @@ public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, Can
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}",
request: request,
apiLevel: default,
cancellationToken);

private void ThrowIfDeleted()
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, Cance

if (config.LimitMarkerTTL > TimeSpan.Zero)
{
var info = await JetStreamContext.JSRequestResponseAsync<object, AccountInfoResponse>("$JS.API.INFO", null, cancellationToken);
var info = await JetStreamContext.JSRequestResponseAsync<object, AccountInfoResponse>("$JS.API.INFO", null, apiLevel: default, cancellationToken);
if (info.Api.Level < 1)
{
throw new NatsKVException("API doesn't support LimitMarkerTTL");
Expand Down
4 changes: 3 additions & 1 deletion src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Security.Cryptography;
using System.Text.Json;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;
Expand Down Expand Up @@ -289,6 +288,7 @@ await JetStreamContext.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeRes
{
Filter = GetChunkSubject(info.Nuid),
},
apiLevel: default,
cancellationToken);
}
catch (NatsJSApiException e)
Expand Down Expand Up @@ -431,6 +431,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default)
var info = await JetStreamContext.JSRequestResponseAsync<object, StreamInfoResponse>(
subject: $"{JetStreamContext.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}",
request: null,
apiLevel: default,
cancellationToken).ConfigureAwait(false);

var config = info.Config;
Expand All @@ -439,6 +440,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default)
var response = await JetStreamContext.JSRequestResponseAsync<StreamConfig, StreamUpdateResponse>(
subject: $"{JetStreamContext.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}",
request: config,
apiLevel: default,
cancellationToken);

if (!response.Config.Sealed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ await Retry.Until(
// Fake JS API requester
for (var i = 0; i < 100; i++)
{
await js.TryJSRequestAsync<object, AccountInfoResponse>(apiSubject, null, cts.Token);
await js.TryJSRequestAsync<object, AccountInfoResponse>(apiSubject, null, apiLevel: default, cts.Token);
}

ctsDone.Cancel();
Expand Down
Loading
Loading