diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index 631fbcb39..4d9f35bdc 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -362,6 +362,7 @@ ValueTask PublishConcurrentAsync( /// /// The JetStream API subject to send the request to. /// The request message object. + /// Asserts whether the JetStream server that's responding supports this API level. /// A used to cancel the API call. /// The type of the request message. /// The type of the response message. @@ -369,6 +370,7 @@ ValueTask PublishConcurrentAsync( ValueTask JSRequestResponseAsync( string subject, TRequest? request, + NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class; diff --git a/src/NATS.Client.JetStream/NatsJSApiLevel.cs b/src/NATS.Client.JetStream/NatsJSApiLevel.cs new file mode 100644 index 000000000..2083a8d5b --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSApiLevel.cs @@ -0,0 +1,17 @@ +namespace NATS.Client.JetStream; + +public readonly struct NatsJSApiLevel +{ + public const string Header = "Nats-Required-Api-Level"; + public static readonly NatsJSApiLevel None = default; + public static readonly NatsJSApiLevel V1 = new(1); + public static readonly NatsJSApiLevel V2 = new(2); + + private readonly int _level = 0; + + internal NatsJSApiLevel(int level) => _level = level; + + public bool IsSet() => _level > 0; + + public string GetHeaderValue() => _level.ToString(); +} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index c36c15b4b..5588c3664 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -289,6 +289,7 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul Info = await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", request: null, + apiLevel: default, cancellationToken).ConfigureAwait(false); internal async ValueTask> ConsumeInternalAsync(INatsDeserialize? serializer = default, NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 4f8f7ab99..9332d29d6 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -70,6 +70,7 @@ public async ValueTask GetConsumerAsync(string stream, string c var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}", request: null, + apiLevel: default, cancellationToken); return new NatsJSConsumer(this, response); } @@ -86,6 +87,7 @@ public async IAsyncEnumerable ListConsumersAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.LIST.{stream}", new ConsumerListRequest { Offset = offset }, + apiLevel: default, cancellationToken); if (response.Consumers.Count == 0) @@ -112,6 +114,7 @@ public async IAsyncEnumerable ListConsumerNamesAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.NAMES.{stream}", new ConsumerNamesRequest { Offset = offset }, + apiLevel: default, cancellationToken); if (response.Consumers.Count == 0) @@ -141,6 +144,7 @@ public async ValueTask DeleteConsumerAsync(string stream, string consumer, var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.DELETE.{stream}.{consumer}", request: null, + apiLevel: default, cancellationToken); return response.Success; } @@ -163,6 +167,7 @@ public async ValueTask PauseConsumerAsync(string stream, var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}", request: new ConsumerPauseRequest { PauseUntil = pauseUntil }, + apiLevel: default, cancellationToken); return response; } @@ -184,6 +189,7 @@ public async ValueTask ResumeConsumerAsync(string stream, string consumer, var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}", request: null, + apiLevel: default, cancellationToken); return !response.IsPaused; } @@ -234,6 +240,7 @@ internal ValueTask CreateOrderedConsumerInternalAsync( return JSRequestResponseAsync( subject: subject, request, + apiLevel: default, cancellationToken); } @@ -282,6 +289,7 @@ private async ValueTask CreateOrUpdateConsumerInternalAsync( Config = config, Action = action, }, + apiLevel: default, cancellationToken); return new NatsJSConsumer(this, response); diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 135d33457..7dee53751 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -55,6 +55,7 @@ public async ValueTask CreateStreamAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", config, + apiLevel: default, cancellationToken); return new NatsJSStream(this, response); } @@ -75,6 +76,7 @@ public async ValueTask CreateOrUpdateStreamAsync(StreamConfig con var response = await JSRequestAsync( subject: $"{Opts.Prefix}.STREAM.UPDATE.{config.Name}", request: config, + apiLevel: default, cancellationToken); if (response.Error is { Code: 404 }) @@ -104,6 +106,7 @@ public async ValueTask DeleteStreamAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.DELETE.{stream}", request: null, + apiLevel: default, cancellationToken); return response.Success; } @@ -128,6 +131,7 @@ public async ValueTask PurgeStreamAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}", request: request, + apiLevel: default, cancellationToken); return response; } @@ -152,6 +156,7 @@ public async ValueTask DeleteMessageAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.MSG.DELETE.{stream}", request: request, + apiLevel: default, cancellationToken); return response; } @@ -176,6 +181,7 @@ public async ValueTask GetStreamAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.INFO.{stream}", request: request, + apiLevel: default, cancellationToken); return new NatsJSStream(this, response); } @@ -198,6 +204,7 @@ public async ValueTask UpdateStreamAsync( var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.UPDATE.{request.Name}", request: request, + apiLevel: default, cancellationToken); return new NatsJSStream(this, response); } @@ -224,6 +231,7 @@ public async IAsyncEnumerable ListStreamsAsync( Offset = offset, Subject = subject, }, + apiLevel: default, cancellationToken); if (response.Streams.Count == 0) @@ -254,6 +262,7 @@ public async IAsyncEnumerable ListStreamNamesAsync(string? subject = def Subject = subject, Offset = offset, }, + apiLevel: default, cancellationToken); if (response.Streams == null || response.Streams.Count == 0) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 79cb77ee1..744013130 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -45,6 +45,7 @@ public ValueTask GetAccountInfoAsync(CancellationToken canc JSRequestResponseAsync( subject: $"{Opts.Prefix}.INFO", request: null, + apiLevel: default, cancellationToken); /// @@ -337,11 +338,12 @@ public async ValueTask PublishConcurrentAsync( public async ValueTask JSRequestResponseAsync( string subject, TRequest? request, + NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class { - var response = await JSRequestAsync(subject, request, cancellationToken); + var response = await JSRequestAsync(subject, request, apiLevel, cancellationToken); response.EnsureSuccess(); return response.Response!; } @@ -373,11 +375,12 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg internal async ValueTask> JSRequestAsync( string subject, TRequest? request, + NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class { - var result = await TryJSRequestAsync(subject, request, cancellationToken).ConfigureAwait(false); + var result = await TryJSRequestAsync(subject, request, apiLevel, cancellationToken).ConfigureAwait(false); if (!result.Success) { throw result.Error; @@ -389,6 +392,7 @@ internal async ValueTask> JSRequestAsync>> TryJSRequestAsync( string subject, TRequest? request, + NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class @@ -399,6 +403,16 @@ internal async ValueTask>> TryJSRequestAsyn // Validator.ValidateObject(request, new ValidationContext(request)); } + NatsHeaders? headers; + if (apiLevel.IsSet()) + { + headers = new NatsHeaders { [NatsJSApiLevel.Header] = apiLevel.GetHeaderValue() }; + } + else + { + headers = null; + } + if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct) { NatsMsg> msg; @@ -407,7 +421,7 @@ internal async ValueTask>> TryJSRequestAsyn msg = await Connection.RequestAsync>( subject: subject, data: request, - headers: null, + headers: headers, replyOpts: new NatsSubOpts { Timeout = Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, replySerializer: NatsJSJsonDocumentSerializer.Default, @@ -448,7 +462,7 @@ internal async ValueTask>> TryJSRequestAsyn await using var sub = await Connection.CreateRequestSubAsync>( subject: subject, data: request, - headers: default, + headers: headers, replyOpts: new NatsSubOpts { Timeout = Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, replySerializer: NatsJSJsonDocumentSerializer.Default, diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 54f8b63e0..a90156d32 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -169,6 +169,7 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul Info = await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_name}", request: null, + apiLevel: default, cancellationToken).ConfigureAwait(false); public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) @@ -185,6 +186,7 @@ public ValueTask GetAsync(StreamMsgGetRequest request, Can _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", request: request, + apiLevel: default, cancellationToken); private void ThrowIfDeleted() diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 2552f12ea..3695f5419 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -65,7 +65,7 @@ public async ValueTask CreateStoreAsync(NatsKVConfig config, Cance if (config.LimitMarkerTTL > TimeSpan.Zero) { - var info = await JetStreamContext.JSRequestResponseAsync("$JS.API.INFO", null, cancellationToken); + var info = await JetStreamContext.JSRequestResponseAsync("$JS.API.INFO", null, apiLevel: default, cancellationToken); if (info.Api.Level < 1) { throw new NatsKVException("API doesn't support LimitMarkerTTL"); diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 4a5bf0508..8ed212fbc 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -3,7 +3,6 @@ using System.Security.Cryptography; using System.Text.Json; using NATS.Client.Core; -using NATS.Client.Core.Internal; using NATS.Client.JetStream; using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; @@ -289,6 +288,7 @@ await JetStreamContext.JSRequestResponseAsync( subject: $"{JetStreamContext.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", request: null, + apiLevel: default, cancellationToken).ConfigureAwait(false); var config = info.Config; @@ -439,6 +440,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default) var response = await JetStreamContext.JSRequestResponseAsync( subject: $"{JetStreamContext.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", request: config, + apiLevel: default, cancellationToken); if (!response.Config.Sealed) diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs index 2bce33d27..d832ed2cc 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs @@ -89,7 +89,7 @@ await Retry.Until( // Fake JS API requester for (var i = 0; i < 100; i++) { - await js.TryJSRequestAsync(apiSubject, null, cts.Token); + await js.TryJSRequestAsync(apiSubject, null, apiLevel: default, cts.Token); } ctsDone.Cancel(); diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 5f6438d00..494d83321 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -1,14 +1,21 @@ using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; using NATS.Client.JetStream.Models; -using NATS.Client.Platform.Windows.Tests; +using NATS.Net; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class JetStreamTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public JetStreamTest(ITestOutputHelper output) => _output = output; + public JetStreamTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Theory] [InlineData("Invalid.DotName")] @@ -54,8 +61,7 @@ public async Task Stream_invalid_name_test(string? streamName) [InlineData(NatsRequestReplyMode.SharedInbox)] public async Task Create_stream_test(NatsRequestReplyMode mode) { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode }); // Happy user { @@ -182,4 +188,56 @@ await js.CreateStreamAsync( Assert.Equal(10059, exception.Error.ErrCode); } } + + [Fact] + public async Task Enforce_api_level() + { + var proxy = _server.CreateProxy(); + await using var nats = new NatsConnection(new NatsOpts { Url = $"127.0.0.1:{proxy.Port}" }); + var js = nats.CreateJetStreamContext(); + + // Default is not to send the header + { + Assert.Equal("0", NatsJSApiLevel.None.GetHeaderValue()); + Assert.Equal("0", ((NatsJSApiLevel)default).GetHeaderValue()); + Assert.False(NatsJSApiLevel.None.IsSet()); + + await proxy.FlushFramesAsync(nats, clear: true, CancellationToken.None); + var response = await js.JSRequestResponseAsync( + subject: "$JS.API.STREAM.NAMES", + request: null, + apiLevel: default); + Assert.NotNull(response); + await proxy.FlushFramesAsync(nats, clear: false, CancellationToken.None); + Assert.Single(proxy.ClientFrames, f => f.Message.StartsWith("PUB")); + } + + // Server would ignore the header if API level header is not supported + { + await proxy.FlushFramesAsync(nats, clear: true, CancellationToken.None); + var response = await js.JSRequestResponseAsync( + subject: "$JS.API.STREAM.NAMES", + request: null, + apiLevel: NatsJSApiLevel.V1); + Assert.NotNull(response); + await proxy.FlushFramesAsync(nats, clear: false, CancellationToken.None); + + // Make sure the header is sent + Assert.Single(proxy.ClientFrames, f => f.Message.StartsWith("HPUB")); + Assert.Contains("Nats-Required-Api-Level: 1", proxy.ClientFrames.First(f => f.Message.StartsWith("HPUB")).Message); + } + + // Server would error if API level is not supported + if (nats.ServerInfo.VersionMajorMinorIsGreaterThenOrEqualTo(2, 12)) + { + await proxy.FlushFramesAsync(nats, clear: true, CancellationToken.None); + var exception = await Assert.ThrowsAsync(async () => await js.JSRequestResponseAsync( + subject: "$JS.API.STREAM.NAMES", + request: null, + apiLevel: new NatsJSApiLevel(int.MaxValue))); + Assert.Equal(412, exception.Error.Code); + Assert.Equal(10185, exception.Error.ErrCode); + Assert.Equal("JetStream minimum api level required", exception.Error.Description); + } + } } diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 407b2fb1a..e3581895d 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -518,7 +518,7 @@ public async Task TestMessageTTL(NatsRequestReplyMode mode) Assert.Equal("This store does not support TTL", exception.Message); // Check API version - var info = await js.JSRequestResponseAsync("$JS.API.INFO", null, cancellationToken); + var info = await js.JSRequestResponseAsync("$JS.API.INFO", null, apiLevel: default, cancellationToken); Assert.True(info.Api.Level >= 1); // Config validation diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 0302ccd24..ddebb042c 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -95,7 +95,7 @@ public class MockJsContext : INatsJSContext public string NewBaseInbox() => throw new NotImplementedException(); - public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException(); diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 4e85085c6..dfe673dfb 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -95,7 +95,7 @@ public class MockJsContext : INatsJSContext public string NewBaseInbox() => throw new NotImplementedException(); - public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, NatsJSApiLevel apiLevel = default, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class => throw new NotImplementedException();