Waiting for events…
Create, update, or delete orders and products to see live events here.
- {#if paused}
+ {#if eventStream.paused}
Event capture is paused
{/if}
{:else}
- {#each events as event (event.id)}
+ {#each eventStream.events as event (event.id)}
{formatTime(event.timestamp)}
@@ -147,7 +97,7 @@
{/if}
- {#if paused}
+ {#if eventStream.paused}
diff --git a/samples/CleanArchitectureSample/src/Web/src/routes/orders/+page.svelte b/samples/CleanArchitectureSample/src/Web/src/routes/orders/+page.svelte
index 5695c460..11c7570d 100644
--- a/samples/CleanArchitectureSample/src/Web/src/routes/orders/+page.svelte
+++ b/samples/CleanArchitectureSample/src/Web/src/routes/orders/+page.svelte
@@ -72,15 +72,14 @@
}
}
- // Reload orders whenever the user navigates to this page (including back from edit/create)
- // Reload on SPA navigations back to this page
- afterNavigate((nav) => {
- if (nav.from) loadOrders();
+ // Reload orders on SPA navigations back to this page
+ afterNavigate(({ type }) => {
+ // Skip the initial navigation — onMount handles the first load
+ if (type === 'enter') return;
+ loadOrders();
});
onMount(() => {
- // Initial data load — afterNavigate may miss the first render when
- // the layout delays mounting children (e.g. auth check)
loadOrders();
const unsubCreated = eventStream.onOrderCreated((event) => {
diff --git a/samples/CleanArchitectureSample/src/Web/src/routes/products/+page.svelte b/samples/CleanArchitectureSample/src/Web/src/routes/products/+page.svelte
index cfdf7a1d..5fedf56a 100644
--- a/samples/CleanArchitectureSample/src/Web/src/routes/products/+page.svelte
+++ b/samples/CleanArchitectureSample/src/Web/src/routes/products/+page.svelte
@@ -72,15 +72,14 @@
}
}
- // Reload products whenever the user navigates to this page (including back from edit/create)
- // Reload on SPA navigations back to this page
- afterNavigate((nav) => {
- if (nav.from) loadProducts();
+ // Reload products on SPA navigations back to this page
+ afterNavigate(({ type }) => {
+ // Skip the initial navigation — onMount handles the first load
+ if (type === 'enter') return;
+ loadProducts();
});
onMount(() => {
- // Initial data load — afterNavigate may miss the first render when
- // the layout delays mounting children (e.g. auth check)
loadProducts();
const unsubCreated = eventStream.onProductCreated((event) => {
diff --git a/samples/CleanArchitectureSample/src/Web/src/routes/queues/+page.svelte b/samples/CleanArchitectureSample/src/Web/src/routes/queues/+page.svelte
new file mode 100644
index 00000000..86103421
--- /dev/null
+++ b/samples/CleanArchitectureSample/src/Web/src/routes/queues/+page.svelte
@@ -0,0 +1,358 @@
+
+
+
+ Queue Dashboard - Clean Architecture Sample
+
+
+
+
+
+
+
Queue Dashboard
+
Monitor queue workers, job progress, and manage running jobs.
+
+
+
+
+
+
+
+ {#if error}
+
{error}
+ {/if}
+
+ {#if loading}
+
+
+
+ {:else if workers.length === 0}
+
+
No queue workers registered
+
Queue handlers will appear here when the application starts.
+
+ {:else}
+
+
+
+
+
+ | Queue |
+ Throughput |
+ Queued |
+ In Flight |
+ Dead Letter |
+
+
+
+ {#each workers as worker}
+ selectQueue(worker.queueName)}
+ >
+ |
+ {worker.queueName}
+ |
+
+
+
+
+
+
+ |
+ {worker.activeCount.toLocaleString()} |
+ {worker.inFlightCount.toLocaleString()} |
+ {worker.deadLetterCount.toLocaleString()} |
+
+ {/each}
+
+
+
+
+
+ {#if selectedQueue}
+ {@const queueWorker = workers.find((w) => w.queueName === selectedQueue)}
+
+
+
+
{selectedQueue}
+
+ Retry: {queueWorker?.retryPolicy} · Max attempts: {queueWorker?.maxAttempts}
+
+
+
+
+
+ {#if !queueWorker?.trackProgress}
+
+
Progress tracking is not enabled for this queue.
+
Add TrackProgress = true to the [Queue] attribute.
+
+ {:else if jobsLoading && !dashboard}
+
+
+
+ {:else if dashboard && (dashboard.queuedCount > 0 || dashboard.activeJobs.length > 0 || dashboard.recentJobs.length > 0)}
+
+ {#if dashboard.queuedCount > 0}
+
+ Queued
+ {dashboard.queuedCount.toLocaleString()} job{dashboard.queuedCount === 1 ? '' : 's'} waiting
+
+ {/if}
+
+
+ {#if dashboard.activeJobs.length > 0}
+
+ {#each dashboard.activeJobs as job (job.jobId)}
+
+
+
+
+ {job.status}
+
+ {#if job.attempt > 1}
+
+ Retry #{job.attempt - 1}
+
+ {/if}
+ {job.jobId.slice(0, 12)}…
+
+
+
+ {formatTime(job.createdUtc)}
+ {#if job.startedUtc}
+ · {formatDuration(job.startedUtc, job.completedUtc)}
+ {/if}
+
+
+
+
+
+
+ {job.progressMessage ?? ''}
+ {job.progress}%
+
+
+
+
+ {/each}
+
+ {/if}
+
+
+ {#if dashboard.recentJobs.length > 0}
+ {#if dashboard.activeJobs.length > 0}
+
+ {/if}
+
+ {#each dashboard.recentJobs as job (job.jobId)}
+
+
+
+
+ {job.status}
+
+ {#if job.attempt > 1}
+
+ Retry #{job.attempt - 1}
+
+ {/if}
+ {job.jobId.slice(0, 12)}…
+
+
+ {formatTime(job.createdUtc)}
+ {#if job.startedUtc}
+ · {formatDuration(job.startedUtc, job.completedUtc)}
+ {/if}
+
+
+
+ {#if job.status === 'Completed'}
+
+
+ {job.progressMessage ?? ''}
+ {job.progress}%
+
+
+
+ {/if}
+
+ {#if job.errorMessage}
+
+ {job.errorMessage}
+
+ {/if}
+
+ {/each}
+
+ {/if}
+ {:else}
+
+ No tracked jobs yet. Enqueue a message to see jobs here.
+
+ {/if}
+
+ {/if}
+ {/if}
+
diff --git a/samples/CleanArchitectureSample/src/Web/vite.config.ts b/samples/CleanArchitectureSample/src/Web/vite.config.ts
index 63f0c8b8..3efc4cea 100644
--- a/samples/CleanArchitectureSample/src/Web/vite.config.ts
+++ b/samples/CleanArchitectureSample/src/Web/vite.config.ts
@@ -6,46 +6,83 @@ import path from 'path';
import child_process from 'child_process';
import { env } from 'process';
-const baseFolder =
- env.APPDATA !== undefined && env.APPDATA !== ''
- ? `${env.APPDATA}/ASP.NET/https`
- : `${env.HOME}/.aspnet/https`;
+// When running under Aspire, WithHttpsDeveloperCertificate() handles HTTPS automatically.
+// Only generate certs manually for standalone `npm run dev`.
+const isAspire = !!env.PORT;
+let httpsConfig: { key: Buffer; cert: Buffer } | undefined;
-const certificateName = "web.frontend";
-const certFilePath = path.join(baseFolder, `${certificateName}.pem`);
-const keyFilePath = path.join(baseFolder, `${certificateName}.key`);
+if (!isAspire) {
+ const baseFolder =
+ env.APPDATA !== undefined && env.APPDATA !== ''
+ ? `${env.APPDATA}/ASP.NET/https`
+ : `${env.HOME}/.aspnet/https`;
-if (!fs.existsSync(baseFolder)) {
- fs.mkdirSync(baseFolder, { recursive: true });
+ const certificateName = "web.frontend";
+ const certFilePath = path.join(baseFolder, `${certificateName}.pem`);
+ const keyFilePath = path.join(baseFolder, `${certificateName}.key`);
+
+ if (!fs.existsSync(baseFolder)) {
+ fs.mkdirSync(baseFolder, { recursive: true });
+ }
+
+ if (!fs.existsSync(certFilePath) || !fs.existsSync(keyFilePath)) {
+ if (0 !== child_process.spawnSync('dotnet', [
+ 'dev-certs',
+ 'https',
+ '--export-path',
+ certFilePath,
+ '--format',
+ 'Pem',
+ '--no-password',
+ ], { stdio: 'inherit', }).status) {
+ throw new Error("Could not create certificate.");
+ }
+ }
+
+ httpsConfig = {
+ key: fs.readFileSync(keyFilePath),
+ cert: fs.readFileSync(certFilePath),
+ };
+}
+
+function firstDefined(values: Array): string | undefined {
+ return values.find(v => typeof v === 'string' && v.trim().length > 0);
}
-if (!fs.existsSync(certFilePath) || !fs.existsSync(keyFilePath)) {
- if (0 !== child_process.spawnSync('dotnet', [
- 'dev-certs',
- 'https',
- '--export-path',
- certFilePath,
- '--format',
- 'Pem',
- '--no-password',
- ], { stdio: 'inherit', }).status) {
- throw new Error("Could not create certificate.");
+function getAspireApiEndpoint(): string | undefined {
+ const entries = Object.entries(env);
+
+ // Aspire service discovery variables for referenced services, e.g. SERVICES__API__HTTPS__0
+ const httpsEntry = entries.find(([key, value]) =>
+ /^services__api__https__\d+$/i.test(key) && typeof value === 'string' && value.length > 0);
+ if (httpsEntry?.[1]) {
+ return httpsEntry[1];
+ }
+
+ const httpEntry = entries.find(([key, value]) =>
+ /^services__api__http__\d+$/i.test(key) && typeof value === 'string' && value.length > 0);
+ if (httpEntry?.[1]) {
+ return httpEntry[1];
}
+
+ return undefined;
}
-// Backend URL - uses ASPNETCORE_HTTPS_PORT or ASPNETCORE_URLS environment variable
-const target = env.ASPNETCORE_HTTPS_PORT ? `https://localhost:${env.ASPNETCORE_HTTPS_PORT}` :
- env.ASPNETCORE_URLS ? env.ASPNETCORE_URLS.split(';')[0] : 'https://localhost:58702';
+// Backend URL for the Vite dev-server proxy (server-side only, never bundled into client code).
+const target = firstDefined([
+ env.API_PROXY_TARGET,
+ getAspireApiEndpoint(),
+ env.ASPNETCORE_HTTPS_PORT ? `https://localhost:${env.ASPNETCORE_HTTPS_PORT}` : undefined,
+ env.ASPNETCORE_URLS ? env.ASPNETCORE_URLS.split(';')[0] : undefined,
+ 'https://localhost:5099'
+]);
export default defineConfig({
plugins: [tailwindcss(), sveltekit()],
server: {
- port: 5173,
- strictPort: true,
- https: {
- key: fs.readFileSync(keyFilePath),
- cert: fs.readFileSync(certFilePath),
- },
+ port: env.PORT ? Number(env.PORT) : 5173,
+ strictPort: false,
+ https: httpsConfig,
proxy: {
// Proxy API requests to the backend
'^/api': {
@@ -61,11 +98,6 @@ export default defineConfig({
'^/scalar': {
target,
secure: false
- },
- // Proxy SSE event stream
- '^/events/stream': {
- target,
- secure: false
}
}
}
diff --git a/src/Foundatio.Mediator.Abstractions/Foundatio.Mediator.Abstractions.csproj b/src/Foundatio.Mediator.Abstractions/Foundatio.Mediator.Abstractions.csproj
index 112376e2..fe3292fa 100644
--- a/src/Foundatio.Mediator.Abstractions/Foundatio.Mediator.Abstractions.csproj
+++ b/src/Foundatio.Mediator.Abstractions/Foundatio.Mediator.Abstractions.csproj
@@ -9,7 +9,7 @@
-
+
diff --git a/src/Foundatio.Mediator.Distributed.Aws/AwsServiceExtensions.cs b/src/Foundatio.Mediator.Distributed.Aws/AwsServiceExtensions.cs
new file mode 100644
index 00000000..09d00cc6
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/AwsServiceExtensions.cs
@@ -0,0 +1,159 @@
+using Amazon.Runtime;
+using Amazon.SimpleNotificationService;
+using Amazon.SQS;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// Extension methods for configuring AWS SQS/SNS transports on .
+///
+public static class AwsBuilderExtensions
+{
+ ///
+ /// Configures both SQS queues and SNS/SQS pub/sub as the distributed transports.
+ /// When is set, the SQS and SNS SDK
+ /// clients are automatically registered; otherwise you must register
+ /// IAmazonSQS and IAmazonSimpleNotificationService before calling this.
+ ///
+ /// The mediator builder.
+ /// Optional configuration for .
+ /// The mediator builder for chaining.
+ ///
+ ///
+ /// // LocalStack / dev — SDK clients auto-registered
+ /// services.AddMediator()
+ /// .AddDistributedQueues()
+ /// .AddDistributedNotifications()
+ /// .UseAws(aws => aws.ServiceUrl = "http://localhost:4566");
+ ///
+ /// // Production — SDK clients pre-registered via AddAWSService
+ /// services.AddAWSService<IAmazonSQS>();
+ /// services.AddAWSService<IAmazonSimpleNotificationService>();
+ /// services.AddMediator()
+ /// .AddDistributedQueues()
+ /// .AddDistributedNotifications()
+ /// .UseAws(aws => aws.Queues.AutoCreateQueues = false);
+ ///
+ ///
+ public static IMediatorBuilder UseAws(
+ this IMediatorBuilder builder,
+ Action? configure = null)
+ {
+ var options = new AwsTransportOptions();
+ configure?.Invoke(options);
+
+ if (!string.IsNullOrEmpty(options.ServiceUrl))
+ RegisterSdkClients(builder.Services, options);
+
+ builder.UseAwsQueues(opts =>
+ {
+ opts.AutoCreateQueues = options.Queues.AutoCreateQueues;
+ opts.WaitTimeSeconds = options.Queues.WaitTimeSeconds;
+ });
+
+ builder.UseAwsNotifications(opts =>
+ {
+ opts.TopicName = options.Notifications.TopicName;
+ opts.TopicArn = options.Notifications.TopicArn;
+ opts.AutoCreate = options.Notifications.AutoCreate;
+ opts.QueuePrefix = options.Notifications.QueuePrefix;
+ opts.WaitTimeSeconds = options.Notifications.WaitTimeSeconds;
+ opts.CleanupOnDispose = options.Notifications.CleanupOnDispose;
+ });
+
+ return builder;
+ }
+
+ ///
+ /// Registers as the implementation.
+ /// Requires IAmazonSQS to be registered in DI.
+ ///
+ /// The mediator builder.
+ /// Optional configuration for .
+ /// The mediator builder for chaining.
+ ///
+ ///
+ /// services.AddAWSService<IAmazonSQS>();
+ /// services.AddMediator()
+ /// .AddDistributedQueues()
+ /// .UseAwsQueues(opts => opts.AutoCreateQueues = false);
+ ///
+ ///
+ public static IMediatorBuilder UseAwsQueues(
+ this IMediatorBuilder builder,
+ Action? configure = null)
+ {
+ var services = builder.Services;
+ var options = new SqsQueueClientOptions();
+ configure?.Invoke(options);
+
+ services.AddSingleton(options);
+ services.AddSingleton();
+
+ return builder;
+ }
+
+ ///
+ /// Registers as the implementation.
+ /// Requires IAmazonSimpleNotificationService and IAmazonSQS to be registered in DI.
+ ///
+ /// The mediator builder.
+ /// Optional configuration for .
+ /// The mediator builder for chaining.
+ ///
+ ///
+ /// services.AddAWSService<IAmazonSQS>();
+ /// services.AddAWSService<IAmazonSimpleNotificationService>();
+ /// services.AddMediator()
+ /// .AddDistributedNotifications()
+ /// .UseAwsNotifications();
+ ///
+ ///
+ public static IMediatorBuilder UseAwsNotifications(
+ this IMediatorBuilder builder,
+ Action? configure = null)
+ {
+ var services = builder.Services;
+ var options = new SqsPubSubClientOptions();
+ configure?.Invoke(options);
+
+ services.AddSingleton(options);
+ services.AddSingleton(sp => new SqsPubSubClient(
+ sp.GetRequiredService(),
+ sp.GetRequiredService(),
+ options,
+ sp.GetRequiredService(),
+ sp.GetRequiredService>()));
+
+ return builder;
+ }
+
+ private static void RegisterSdkClients(IServiceCollection services, AwsTransportOptions options)
+ {
+ var credentials = options.Credentials
+ ?? new BasicAWSCredentials("test", "test");
+
+ if (!services.Any(sd => sd.ServiceType == typeof(IAmazonSQS)))
+ {
+ var sqsConfig = new AmazonSQSConfig
+ {
+ ServiceURL = options.ServiceUrl,
+ AuthenticationRegion = options.Region
+ };
+ services.AddSingleton(_ => new AmazonSQSClient(credentials, sqsConfig));
+ }
+
+ if (!services.Any(sd => sd.ServiceType == typeof(IAmazonSimpleNotificationService)))
+ {
+ var snsConfig = new AmazonSimpleNotificationServiceConfig
+ {
+ ServiceURL = options.ServiceUrl,
+ AuthenticationRegion = options.Region
+ };
+ services.AddSingleton(
+ _ => new AmazonSimpleNotificationServiceClient(credentials, snsConfig));
+ }
+ }
+}
diff --git a/src/Foundatio.Mediator.Distributed.Aws/AwsTransportOptions.cs b/src/Foundatio.Mediator.Distributed.Aws/AwsTransportOptions.cs
new file mode 100644
index 00000000..b66ff2ce
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/AwsTransportOptions.cs
@@ -0,0 +1,41 @@
+using Amazon.Runtime;
+
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// Unified options for configuring both SQS queues and SNS/SQS pub/sub transports.
+/// Use with for a single-call configuration
+/// that registers both queue and notification transports.
+///
+public class AwsTransportOptions
+{
+ ///
+ /// The AWS service URL (e.g. "http://localhost:4566" for LocalStack).
+ /// When set, the SQS and SNS SDK clients are automatically registered with this endpoint.
+ /// When null, you must register IAmazonSQS and IAmazonSimpleNotificationService
+ /// in DI before calling UseAws().
+ ///
+ public string? ServiceUrl { get; set; }
+
+ ///
+ /// The AWS region to use when is set. Default is "us-east-1".
+ ///
+ public string Region { get; set; } = "us-east-1";
+
+ ///
+ /// Optional AWS credentials. When null and is set,
+ /// dummy credentials ("test"/"test") are used (suitable for LocalStack).
+ /// When is not set, this is ignored (SDK clients must be pre-registered).
+ ///
+ public AWSCredentials? Credentials { get; set; }
+
+ ///
+ /// Options for the SQS queue client. See .
+ ///
+ public SqsQueueClientOptions Queues { get; set; } = new();
+
+ ///
+ /// Options for the SNS/SQS pub/sub client. See .
+ ///
+ public SqsPubSubClientOptions Notifications { get; set; } = new();
+}
diff --git a/src/Foundatio.Mediator.Distributed.Aws/Foundatio.Mediator.Distributed.Aws.csproj b/src/Foundatio.Mediator.Distributed.Aws/Foundatio.Mediator.Distributed.Aws.csproj
new file mode 100644
index 00000000..04ce5e42
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/Foundatio.Mediator.Distributed.Aws.csproj
@@ -0,0 +1,18 @@
+
+
+
+
+
+ net10.0
+ latest
+ enable
+ Foundatio.Mediator.Distributed.Aws
+
+
+
+
+
+
+
+
+
diff --git a/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs b/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs
new file mode 100644
index 00000000..f39f4a68
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs
@@ -0,0 +1,415 @@
+using System.Collections.Concurrent;
+using System.Text.Json;
+using Amazon.SimpleNotificationService;
+using Amazon.SimpleNotificationService.Model;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Logging;
+
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// implementation using SNS for fan-out publishing and
+/// per-node SQS queues for subscription. Each subscriber creates a dedicated SQS queue
+/// subscribed to the SNS topic, enabling true pub/sub fan-out across nodes.
+///
+public sealed class SqsPubSubClient : IPubSubClient, IAsyncDisposable
+{
+ private readonly IAmazonSimpleNotificationService _sns;
+ private readonly IAmazonSQS _sqs;
+ private readonly SqsPubSubClientOptions _options;
+ private readonly string _hostId;
+ private readonly string? _resourcePrefix;
+ private readonly ILogger _logger;
+ private readonly ConcurrentDictionary _topicArnCache = new();
+ private readonly ConcurrentDictionary _subscriptionSetupCache = new();
+ private readonly ConcurrentBag _activeSubscriptions = [];
+ private readonly SemaphoreSlim _queueSetupLock = new(1, 1);
+ private (string QueueName, string QueueUrl, string QueueArn)? _sharedQueue;
+
+ public SqsPubSubClient(
+ IAmazonSimpleNotificationService sns,
+ IAmazonSQS sqs,
+ SqsPubSubClientOptions options,
+ DistributedNotificationOptions notificationOptions,
+ ILogger logger)
+ {
+ _sns = sns;
+ _sqs = sqs;
+ _options = options;
+ _hostId = notificationOptions.HostId;
+ _resourcePrefix = notificationOptions.ResourcePrefix;
+ _logger = logger;
+ }
+
+ ///
+ public async Task PublishAsync(string topic, IReadOnlyList messages, CancellationToken cancellationToken = default)
+ {
+ var topicArn = await GetOrCreateTopicArnAsync(topic, cancellationToken).ConfigureAwait(false);
+
+ foreach (var message in messages)
+ {
+ // Wrap body + headers into a single JSON envelope for SNS
+ var envelope = new MessageEnvelope
+ {
+ Body = Convert.ToBase64String(message.Body.Span),
+ Headers = message.Headers is not null ? new Dictionary(message.Headers) : null
+ };
+
+ var json = JsonSerializer.Serialize(envelope);
+
+ await _sns.PublishAsync(new PublishRequest
+ {
+ TopicArn = topicArn,
+ Message = json
+ }, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ public async Task SubscribeAsync(string topic, Func handler, CancellationToken cancellationToken = default)
+ {
+ var setup = await EnsureSubscriptionSetupAsync(topic, cancellationToken).ConfigureAwait(false);
+
+ // Start polling the SQS queue
+ var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ var pollTask = Task.Run(async () =>
+ {
+ await PollQueueAsync(setup.QueueUrl, handler, cts.Token).ConfigureAwait(false);
+ }, cts.Token);
+
+ var handle = new SubscriptionHandle(
+ setup.SubscriptionArn, setup.TopicArn, cts, pollTask,
+ _sns, _options, _logger);
+
+ _activeSubscriptions.Add(handle);
+
+ return handle;
+ }
+
+ ///
+ /// Ensures per-node subscription infrastructure is created for a topic.
+ /// Ensures the shared per-node SQS queue exists (created once, cached).
+ ///
+ private async Task<(string QueueName, string QueueUrl, string QueueArn)> EnsureSharedQueueAsync(CancellationToken cancellationToken)
+ {
+ var current = _sharedQueue;
+ if (current is not null)
+ return current.Value;
+
+ await _queueSetupLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ current = _sharedQueue;
+ if (current is not null)
+ return current.Value;
+
+ var queuePrefix = string.IsNullOrEmpty(_resourcePrefix)
+ ? _options.QueuePrefix
+ : $"{_resourcePrefix}-{_options.QueuePrefix}";
+ var queueName = $"{queuePrefix}-{_hostId}";
+ var stepSw = System.Diagnostics.Stopwatch.StartNew();
+
+ var createResponse = await _sqs.CreateQueueAsync(new CreateQueueRequest
+ {
+ QueueName = queueName
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogDebug("EnsureSharedQueue: CreateQueue completed in {ElapsedMs}ms", stepSw.ElapsedMilliseconds);
+ stepSw.Restart();
+
+ // Get the queue ARN — needed for the SNS subscription policy
+ var queueAttrs = await _sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest
+ {
+ QueueUrl = createResponse.QueueUrl,
+ AttributeNames = ["QueueArn"]
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogDebug("EnsureSharedQueue: GetQueueAttributes completed in {ElapsedMs}ms", stepSw.ElapsedMilliseconds);
+
+ _sharedQueue = (queueName, createResponse.QueueUrl, queueAttrs.QueueARN);
+ return _sharedQueue.Value;
+ }
+ finally
+ {
+ _queueSetupLock.Release();
+ }
+ }
+
+ ///
+ /// Ensures per-node subscription infrastructure is created for a topic.
+ /// Creates the SNS topic, reuses the shared per-node SQS queue, sets the queue policy,
+ /// and subscribes the queue to the topic. Results are cached so subsequent
+ /// calls (including from ) make no API calls.
+ ///
+ private async Task EnsureSubscriptionSetupAsync(string topic, CancellationToken cancellationToken)
+ {
+ if (_subscriptionSetupCache.TryGetValue(topic, out var cached))
+ return cached;
+
+ var topicArn = await GetOrCreateTopicArnAsync(topic, cancellationToken).ConfigureAwait(false);
+ var queue = await EnsureSharedQueueAsync(cancellationToken).ConfigureAwait(false);
+
+ // Subscribe the SQS queue to the SNS topic
+ var subscribeResponse = await _sns.SubscribeAsync(new SubscribeRequest
+ {
+ TopicArn = topicArn,
+ Protocol = "sqs",
+ Endpoint = queue.QueueArn,
+ Attributes = new Dictionary
+ {
+ // Enable raw message delivery so we get the message directly without SNS wrapper
+ ["RawMessageDelivery"] = "true"
+ }
+ }, cancellationToken).ConfigureAwait(false);
+ var subscriptionArn = subscribeResponse.SubscriptionArn;
+
+ _logger.LogInformation(
+ "Subscribed to SNS topic {TopicArn} via SQS queue {QueueName} (subscription={SubscriptionArn})",
+ topicArn, queue.QueueName, subscriptionArn);
+
+ var setup = new SubscriptionSetup(topicArn, queue.QueueName, queue.QueueUrl, subscriptionArn);
+ _subscriptionSetupCache[topic] = setup;
+ return setup;
+ }
+
+ private async Task PollQueueAsync(string queueUrl, Func handler, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ var response = await _sqs.ReceiveMessageAsync(new ReceiveMessageRequest
+ {
+ QueueUrl = queueUrl,
+ MaxNumberOfMessages = 10,
+ WaitTimeSeconds = _options.WaitTimeSeconds
+ }, cancellationToken).ConfigureAwait(false);
+
+ if (response.Messages is not { Count: > 0 })
+ continue;
+
+ foreach (var sqsMessage in response.Messages)
+ {
+ try
+ {
+ var envelope = JsonSerializer.Deserialize(sqsMessage.Body);
+ if (envelope is null)
+ continue;
+
+ var body = Convert.FromBase64String(envelope.Body);
+ var headers = envelope.Headers is not null
+ ? new Dictionary(envelope.Headers)
+ : new Dictionary();
+
+ var message = new PubSubMessage
+ {
+ Body = body,
+ Headers = headers
+ };
+
+ await handler(message, cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error processing bus message from SQS queue");
+ }
+
+ // Delete processed message
+ try
+ {
+ await _sqs.DeleteMessageAsync(new DeleteMessageRequest
+ {
+ QueueUrl = queueUrl,
+ ReceiptHandle = sqsMessage.ReceiptHandle
+ }, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to delete processed message from SQS queue");
+ }
+ }
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error polling SQS queue, retrying...");
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) { break; }
+ }
+ }
+ }
+
+ private async Task GetOrCreateTopicArnAsync(string topic, CancellationToken cancellationToken)
+ {
+ if (!string.IsNullOrEmpty(_options.TopicArn))
+ return _options.TopicArn;
+
+ if (_topicArnCache.TryGetValue(topic, out var cached))
+ return cached;
+
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+
+ if (_options.AutoCreate)
+ {
+ var response = await _sns.CreateTopicAsync(new CreateTopicRequest
+ {
+ Name = topic
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogDebug("CreateTopic {Topic} completed in {ElapsedMs}ms", topic, sw.ElapsedMilliseconds);
+
+ _topicArnCache[topic] = response.TopicArn;
+ return response.TopicArn;
+ }
+
+ // Find existing topic
+ var findResponse = await _sns.FindTopicAsync(topic).ConfigureAwait(false);
+ _logger.LogDebug("FindTopic {Topic} completed in {ElapsedMs}ms", topic, sw.ElapsedMilliseconds);
+
+ if (findResponse?.TopicArn is null)
+ throw new InvalidOperationException($"SNS topic '{topic}' not found and AutoCreate is disabled.");
+
+ _topicArnCache[topic] = findResponse.TopicArn;
+ return findResponse.TopicArn;
+ }
+
+ ///
+ public async Task EnsureTopicsAsync(IReadOnlyList topics, CancellationToken cancellationToken = default)
+ {
+ if (topics.Count == 0)
+ return;
+
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+
+ // 1. Create the shared per-node SQS queue and all SNS topics in parallel
+ var queueTask = EnsureSharedQueueAsync(cancellationToken);
+ var topicTasks = topics.Select(t => GetOrCreateTopicArnAsync(t.Name, cancellationToken)).ToArray();
+
+ await Task.WhenAll(topicTasks).ConfigureAwait(false);
+ var queue = await queueTask.ConfigureAwait(false);
+
+ _logger.LogInformation("EnsureTopics: queue + topics created in {ElapsedMs}ms", sw.ElapsedMilliseconds);
+
+ var topicArns = topicTasks.Select(t => t.Result).ToList();
+
+ // 2. Set a single SQS policy allowing ALL SNS topics to send messages
+ var arnList = string.Join("\", \"", topicArns);
+ var policy = $$"""
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"Service": "sns.amazonaws.com"},
+ "Action": "sqs:SendMessage",
+ "Resource": "{{queue.QueueArn}}",
+ "Condition": {
+ "ArnEquals": { "aws:SourceArn": ["{{arnList}}"] }
+ }
+ }]
+ }
+ """;
+
+ await _sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest
+ {
+ QueueUrl = queue.QueueUrl,
+ Attributes = new Dictionary
+ {
+ ["Policy"] = policy
+ }
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogInformation("EnsureTopics: policy set in {ElapsedMs}ms", sw.ElapsedMilliseconds);
+
+ // 3. Subscribe the queue to all topics in parallel
+ await Task.WhenAll(topics.Select(topic => EnsureSubscriptionSetupAsync(topic.Name, cancellationToken))).ConfigureAwait(false);
+
+ _logger.LogInformation("EnsureTopics: complete in {ElapsedMs}ms ({Count} topics)", sw.ElapsedMilliseconds, topics.Count);
+ }
+
+ private record SubscriptionSetup(string TopicArn, string QueueName, string QueueUrl, string SubscriptionArn);
+
+ public async ValueTask DisposeAsync()
+ {
+ foreach (var handle in _activeSubscriptions)
+ await handle.DisposeAsync().ConfigureAwait(false);
+
+ // Clean up the shared per-node SQS queue if configured.
+ // Individual SubscriptionHandle disposals unsubscribe from SNS, but the
+ // shared queue is owned by this client and must be deleted here.
+ if (_sharedQueue is not null && _options.CleanupOnDispose)
+ {
+ try
+ {
+ await _sqs.DeleteQueueAsync(new DeleteQueueRequest
+ {
+ QueueUrl = _sharedQueue.Value.QueueUrl
+ }).ConfigureAwait(false);
+
+ _logger.LogInformation("Deleted shared per-node SQS queue {QueueName}", _sharedQueue.Value.QueueName);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Failed to delete shared per-node SQS queue {QueueName}", _sharedQueue.Value.QueueName);
+ }
+ }
+ }
+
+ private sealed class MessageEnvelope
+ {
+ public string Body { get; set; } = string.Empty;
+ public Dictionary? Headers { get; set; }
+ }
+
+ private sealed class SubscriptionHandle(
+ string subscriptionArn,
+ string topicArn,
+ CancellationTokenSource cts,
+ Task pollTask,
+ IAmazonSimpleNotificationService sns,
+ SqsPubSubClientOptions options,
+ ILogger logger) : IAsyncDisposable
+ {
+ private int _disposed;
+
+ public async ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _disposed, 1) != 0)
+ return;
+
+ // Stop polling
+ await cts.CancelAsync().ConfigureAwait(false);
+ try { await pollTask.ConfigureAwait(false); } catch (OperationCanceledException) { }
+ cts.Dispose();
+
+ if (!options.CleanupOnDispose)
+ return;
+
+ // Unsubscribe from SNS
+ try
+ {
+ await sns.UnsubscribeAsync(new UnsubscribeRequest
+ {
+ SubscriptionArn = subscriptionArn
+ }).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ logger.LogWarning(ex, "Failed to unsubscribe {SubscriptionArn} from SNS topic {TopicArn}",
+ subscriptionArn, topicArn);
+ }
+
+ // Shared per-node SQS queue is cleaned up by SqsPubSubClient.DisposeAsync()
+ }
+ }
+}
diff --git a/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClientOptions.cs b/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClientOptions.cs
new file mode 100644
index 00000000..9735bc58
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClientOptions.cs
@@ -0,0 +1,43 @@
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// Options for configuring the SNS+SQS pub/sub client.
+///
+public class SqsPubSubClientOptions
+{
+ ///
+ /// The SNS topic name. This is used to create or look up the topic.
+ /// When is set, this is ignored.
+ /// Default is "distributed-notifications".
+ ///
+ public string TopicName { get; set; } = "distributed-notifications";
+
+ ///
+ /// When set, the topic ARN is used directly instead of creating/looking up by name.
+ ///
+ public string? TopicArn { get; set; }
+
+ ///
+ /// When true, the SNS topic and per-node SQS queue are automatically created if they
+ /// do not exist. Default is true. Disable in production where infrastructure is
+ /// provisioned via IaC.
+ ///
+ public bool AutoCreate { get; set; } = true;
+
+ ///
+ /// Prefix for the per-node SQS queue name. The queue is named
+ /// {QueuePrefix}-{HostId}. Default is "notifications".
+ ///
+ public string QueuePrefix { get; set; } = "notifications";
+
+ ///
+ /// SQS long-poll wait time in seconds. Default is 20 (maximum).
+ ///
+ public int WaitTimeSeconds { get; set; } = 20;
+
+ ///
+ /// When true, the per-node SQS queue and SNS subscription are deleted on dispose.
+ /// Default is true.
+ ///
+ public bool CleanupOnDispose { get; set; } = true;
+}
diff --git a/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClient.cs b/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClient.cs
new file mode 100644
index 00000000..91275b4d
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClient.cs
@@ -0,0 +1,360 @@
+using System.Collections.Concurrent;
+using System.Globalization;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using Microsoft.Extensions.Logging;
+
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// implementation backed by Amazon SQS.
+/// Headers are mapped to SQS MessageAttributes. Body is sent as the MessageBody string
+/// (base64-encoded from the raw bytes).
+///
+public sealed class SqsQueueClient : IQueueClient
+{
+ private readonly IAmazonSQS _sqs;
+ private readonly SqsQueueClientOptions _options;
+ private readonly TimeProvider _timeProvider;
+ private readonly ILogger _logger;
+ private readonly ConcurrentDictionary _queueUrlCache = new();
+ private readonly ConcurrentDictionary _dlqNotFound = new();
+
+ public SqsQueueClient(IAmazonSQS sqs, SqsQueueClientOptions? options = null, TimeProvider? timeProvider = null, ILogger? logger = null)
+ {
+ _sqs = sqs;
+ _options = options ?? new SqsQueueClientOptions();
+ _timeProvider = timeProvider ?? TimeProvider.System;
+ _logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance;
+ }
+
+ ///
+ /// SQS allows a maximum of 10 message attributes per message.
+ ///
+ private const int MaxSqsMessageAttributes = 10;
+
+ public async Task SendAsync(string queueName, IReadOnlyList entries, CancellationToken cancellationToken = default)
+ {
+ if (entries.Count == 0)
+ return;
+
+ var queueUrl = await GetQueueUrlAsync(queueName, cancellationToken).ConfigureAwait(false);
+
+ // SQS batch limit is 10 messages
+ for (int i = 0; i < entries.Count; i += 10)
+ {
+ var batch = new SendMessageBatchRequest
+ {
+ QueueUrl = queueUrl,
+ Entries = []
+ };
+
+ var end = Math.Min(i + 10, entries.Count);
+ for (int j = i; j < end; j++)
+ {
+ var entry = entries[j];
+
+ ValidateHeaderCount(entry.Headers, queueName);
+
+ var batchEntry = new SendMessageBatchRequestEntry
+ {
+ Id = j.ToString(CultureInfo.InvariantCulture),
+ MessageBody = Convert.ToBase64String(entry.Body.Span),
+ MessageAttributes = new Dictionary()
+ };
+
+ if (entry.Headers is { Count: > 0 })
+ {
+ foreach (var (key, value) in entry.Headers)
+ {
+ batchEntry.MessageAttributes[key] = new MessageAttributeValue
+ {
+ DataType = "String",
+ StringValue = value
+ };
+ }
+ }
+
+ batch.Entries.Add(batchEntry);
+ }
+
+ var response = await _sqs.SendMessageBatchAsync(batch, cancellationToken).ConfigureAwait(false);
+
+ if (response.Failed is { Count: > 0 })
+ {
+ var first = response.Failed[0];
+ throw new InvalidOperationException(
+ $"Failed to send {response.Failed.Count} message(s) to SQS queue '{queueName}': [{first.Code}] {first.Message}");
+ }
+ }
+ }
+
+ public async Task> ReceiveAsync(string queueName, int maxCount, CancellationToken cancellationToken = default)
+ {
+ var queueUrl = await GetQueueUrlAsync(queueName, cancellationToken).ConfigureAwait(false);
+
+ // SQS maximum is 10 messages per receive
+ var request = new ReceiveMessageRequest
+ {
+ QueueUrl = queueUrl,
+ MaxNumberOfMessages = Math.Min(maxCount, 10),
+ WaitTimeSeconds = _options.WaitTimeSeconds,
+ MessageSystemAttributeNames = ["ApproximateReceiveCount", "SentTimestamp"],
+ MessageAttributeNames = ["All"]
+ };
+
+ var response = await _sqs.ReceiveMessageAsync(request, cancellationToken).ConfigureAwait(false);
+
+ if (response.Messages is not { Count: > 0 })
+ return [];
+
+ var now = _timeProvider.GetUtcNow();
+ var results = new List(response.Messages.Count);
+
+ foreach (var sqsMessage in response.Messages)
+ {
+ var headers = new Dictionary();
+ if (sqsMessage.MessageAttributes is { Count: > 0 })
+ {
+ foreach (var (key, attr) in sqsMessage.MessageAttributes)
+ headers[key] = attr.StringValue;
+ }
+
+ int dequeueCount = 1;
+ if (sqsMessage.Attributes?.TryGetValue("ApproximateReceiveCount", out var receiveCountStr) == true
+ && int.TryParse(receiveCountStr, out var parsed))
+ dequeueCount = parsed;
+
+ var enqueuedAt = now;
+ if (sqsMessage.Attributes?.TryGetValue("SentTimestamp", out var sentTimestampStr) == true
+ && long.TryParse(sentTimestampStr, out var epochMs))
+ enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(epochMs);
+
+ results.Add(new QueueMessage
+ {
+ Id = sqsMessage.MessageId,
+ Body = Convert.FromBase64String(sqsMessage.Body),
+ Headers = headers,
+ QueueName = queueName,
+ DequeueCount = dequeueCount,
+ EnqueuedAt = enqueuedAt,
+ DequeuedAt = now,
+ NativeMessage = sqsMessage // Carry the full SQS message for ReceiptHandle access
+ });
+ }
+
+ return results;
+ }
+
+ public async Task CompleteAsync(QueueMessage message, CancellationToken cancellationToken = default)
+ {
+ var queueUrl = await GetQueueUrlAsync(message.QueueName, cancellationToken).ConfigureAwait(false);
+ var sqsMessage = GetNativeMessage(message);
+
+ await _sqs.DeleteMessageAsync(new DeleteMessageRequest
+ {
+ QueueUrl = queueUrl,
+ ReceiptHandle = sqsMessage.ReceiptHandle
+ }, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task AbandonAsync(QueueMessage message, TimeSpan delay = default, CancellationToken cancellationToken = default)
+ {
+ var queueUrl = await GetQueueUrlAsync(message.QueueName, cancellationToken).ConfigureAwait(false);
+ var sqsMessage = GetNativeMessage(message);
+
+ var visibilityTimeout = Math.Max(0, (int)Math.Ceiling(delay.TotalSeconds));
+ await _sqs.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
+ {
+ QueueUrl = queueUrl,
+ ReceiptHandle = sqsMessage.ReceiptHandle,
+ VisibilityTimeout = visibilityTimeout
+ }, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task DeadLetterAsync(QueueMessage message, string reason, CancellationToken cancellationToken = default)
+ {
+ var dlqName = $"{message.QueueName}-dead-letter";
+
+ // Build a new entry with original body + headers + dead-letter metadata
+ var headers = new Dictionary(message.Headers)
+ {
+ [MessageHeaders.DeadLetterReason] = reason,
+ [MessageHeaders.DeadLetteredAt] = _timeProvider.GetUtcNow().ToString("O"),
+ [MessageHeaders.OriginalQueueName] = message.QueueName,
+ [MessageHeaders.DeadLetterDequeueCount] = message.DequeueCount.ToString()
+ };
+
+ var entry = new QueueEntry
+ {
+ Body = message.Body,
+ Headers = headers
+ };
+
+ // Send to DLQ then complete the original message
+ await SendAsync(dlqName, [entry], cancellationToken).ConfigureAwait(false);
+ _dlqNotFound.TryRemove(dlqName, out _);
+ await CompleteAsync(message, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task RenewTimeoutAsync(QueueMessage message, TimeSpan extension, CancellationToken cancellationToken = default)
+ {
+ var queueUrl = await GetQueueUrlAsync(message.QueueName, cancellationToken).ConfigureAwait(false);
+ var sqsMessage = GetNativeMessage(message);
+
+ try
+ {
+ await _sqs.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
+ {
+ QueueUrl = queueUrl,
+ ReceiptHandle = sqsMessage.ReceiptHandle,
+ VisibilityTimeout = (int)Math.Ceiling(extension.TotalSeconds)
+ }, cancellationToken).ConfigureAwait(false);
+ }
+ catch (ReceiptHandleIsInvalidException ex)
+ {
+ // Receipt handle expired or message already completed/deleted (e.g., leftover from a previous run).
+ // This is not fatal — the message is already gone from the queue.
+ _logger.LogDebug(ex, "Receipt handle invalid for message {MessageId} on {QueueName}, message may have been completed or expired",
+ message.Id, message.QueueName);
+ }
+ catch (MessageNotInflightException ex)
+ {
+ _logger.LogDebug(ex, "Message {MessageId} on {QueueName} is not in-flight, visibility timeout change skipped",
+ message.Id, message.QueueName);
+ }
+ catch (AmazonSQSException ex) when (ex.Message.Contains("does not exist or is not available", StringComparison.OrdinalIgnoreCase))
+ {
+ // LocalStack may throw a generic AmazonSQSException instead of the specific types above.
+ _logger.LogDebug(ex, "Receipt handle for message {MessageId} on {QueueName} is no longer valid, visibility timeout change skipped",
+ message.Id, message.QueueName);
+ }
+ }
+
+ private async Task GetQueueUrlAsync(string queueName, CancellationToken cancellationToken)
+ {
+ if (_queueUrlCache.TryGetValue(queueName, out var cached))
+ return cached;
+
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+
+ if (_options.AutoCreateQueues)
+ {
+ var createResponse = await _sqs.CreateQueueAsync(new CreateQueueRequest
+ {
+ QueueName = queueName
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogDebug("CreateQueue {QueueName} completed in {ElapsedMs}ms", queueName, sw.ElapsedMilliseconds);
+
+ _queueUrlCache[queueName] = createResponse.QueueUrl;
+ return createResponse.QueueUrl;
+ }
+
+ var response = await _sqs.GetQueueUrlAsync(new GetQueueUrlRequest
+ {
+ QueueName = queueName
+ }, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogDebug("GetQueueUrl {QueueName} completed in {ElapsedMs}ms", queueName, sw.ElapsedMilliseconds);
+
+ _queueUrlCache[queueName] = response.QueueUrl;
+ return response.QueueUrl;
+ }
+
+ ///
+ public async Task EnsureQueuesAsync(IReadOnlyList queues, CancellationToken cancellationToken = default)
+ {
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+
+ await Task.WhenAll(queues.Select(q => GetQueueUrlAsync(q.Name, cancellationToken))).ConfigureAwait(false);
+
+ _logger.LogInformation("EnsureQueues: {Count} queues ready in {ElapsedMs}ms", queues.Count, sw.ElapsedMilliseconds);
+ }
+
+ ///
+ public async Task> GetQueueStatsAsync(IReadOnlyList queueNames, CancellationToken cancellationToken = default)
+ {
+ var results = new List(queueNames.Count);
+ foreach (var queueName in queueNames)
+ {
+ var queueUrl = await GetQueueUrlAsync(queueName, cancellationToken).ConfigureAwait(false);
+
+ var response = await _sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest
+ {
+ QueueUrl = queueUrl,
+ AttributeNames = ["ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"]
+ }, cancellationToken).ConfigureAwait(false);
+
+ long activeCount = 0;
+ if (response.Attributes.TryGetValue("ApproximateNumberOfMessages", out var activeStr)
+ && long.TryParse(activeStr, out var parsedActive))
+ activeCount = parsedActive;
+
+ long inFlightCount = 0;
+ if (response.Attributes.TryGetValue("ApproximateNumberOfMessagesNotVisible", out var inFlightStr)
+ && long.TryParse(inFlightStr, out var parsedInFlight))
+ inFlightCount = parsedInFlight;
+
+ // Try to get dead-letter queue stats (DLQ is created lazily on first dead-letter)
+ long deadLetterCount = 0;
+ var dlqName = $"{queueName}-dead-letter";
+ // Skip lookup if we already know the DLQ doesn't exist.
+ // The negative cache is cleared when DeadLetterAsync creates the queue.
+ if (!_dlqNotFound.ContainsKey(dlqName))
+ {
+ try
+ {
+ string dlqUrl;
+ if (_queueUrlCache.TryGetValue(dlqName, out var cachedDlqUrl))
+ {
+ dlqUrl = cachedDlqUrl;
+ }
+ else
+ {
+ var dlqResponse = await _sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = dlqName }, cancellationToken).ConfigureAwait(false);
+ dlqUrl = dlqResponse.QueueUrl;
+ _queueUrlCache[dlqName] = dlqUrl;
+ }
+
+ var dlqAttrs = await _sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest
+ {
+ QueueUrl = dlqUrl,
+ AttributeNames = ["ApproximateNumberOfMessages"]
+ }, cancellationToken).ConfigureAwait(false);
+
+ if (dlqAttrs.Attributes.TryGetValue("ApproximateNumberOfMessages", out var dlqStr)
+ && long.TryParse(dlqStr, out var parsedDlq))
+ deadLetterCount = parsedDlq;
+ }
+ catch
+ {
+ // DLQ doesn't exist yet — remember so we don't retry on every poll
+ _dlqNotFound[dlqName] = true;
+ }
+ }
+
+ results.Add(new QueueStats
+ {
+ QueueName = queueName,
+ ActiveCount = activeCount,
+ InFlightCount = inFlightCount,
+ DeadLetterCount = deadLetterCount
+ });
+ }
+
+ return results;
+ }
+
+ private static Message GetNativeMessage(QueueMessage message)
+ => message.NativeMessage as Message
+ ?? throw new InvalidOperationException(
+ "QueueMessage.NativeMessage is not an SQS Message. This QueueMessage was not created by SqsQueueClient.");
+
+ private static void ValidateHeaderCount(Dictionary? headers, string queueName)
+ {
+ if (headers is { Count: > MaxSqsMessageAttributes })
+ throw new InvalidOperationException(
+ $"Message for queue '{queueName}' has {headers.Count} headers, but SQS allows a maximum of {MaxSqsMessageAttributes} message attributes.");
+ }
+}
diff --git a/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClientOptions.cs b/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClientOptions.cs
new file mode 100644
index 00000000..f8f9d797
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Aws/SqsQueueClientOptions.cs
@@ -0,0 +1,20 @@
+namespace Foundatio.Mediator.Distributed.Aws;
+
+///
+/// Options for configuring the SQS queue client.
+///
+public class SqsQueueClientOptions
+{
+ ///
+ /// When true, queues are automatically created if they do not exist.
+ /// Default is true (convenient for dev/test). Disable in production where
+ /// queues are provisioned via IaC.
+ ///
+ public bool AutoCreateQueues { get; set; } = true;
+
+ ///
+ /// SQS long-poll wait time in seconds. Default is 20 (maximum).
+ /// Set to 0 for short polling.
+ ///
+ public int WaitTimeSeconds { get; set; } = 20;
+}
diff --git a/src/Foundatio.Mediator.Distributed.Redis/Foundatio.Mediator.Distributed.Redis.csproj b/src/Foundatio.Mediator.Distributed.Redis/Foundatio.Mediator.Distributed.Redis.csproj
new file mode 100644
index 00000000..cec91da0
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Redis/Foundatio.Mediator.Distributed.Redis.csproj
@@ -0,0 +1,17 @@
+
+
+
+
+
+ net10.0
+ latest
+ enable
+ Foundatio.Mediator.Distributed.Redis
+
+
+
+
+
+
+
+
diff --git a/src/Foundatio.Mediator.Distributed.Redis/RedisJobStateStoreOptions.cs b/src/Foundatio.Mediator.Distributed.Redis/RedisJobStateStoreOptions.cs
new file mode 100644
index 00000000..173e741c
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Redis/RedisJobStateStoreOptions.cs
@@ -0,0 +1,29 @@
+namespace Foundatio.Mediator.Distributed.Redis;
+
+///
+/// Options for configuring .
+///
+public class RedisJobStateStoreOptions
+{
+ ///
+ /// Key prefix for all Redis keys. Default is "fm:jobs".
+ ///
+ public string KeyPrefix { get; set; } = "fm:jobs";
+
+ ///
+ /// Optional prefix applied before for app-level scoping.
+ /// When set, all Redis keys become "{ResourcePrefix}:{KeyPrefix}:...".
+ /// When null or empty (default), only is used.
+ ///
+ ///
+ /// Use this to isolate multiple applications sharing the same Redis instance
+ /// (e.g., "myapp" produces keys like "myapp:fm:jobs:...").
+ ///
+ public string? ResourcePrefix { get; set; }
+
+ ///
+ /// Default TTL for terminal job states (Completed, Failed, Cancelled).
+ /// Default is 24 hours. Set to null to disable auto-expiry.
+ ///
+ public TimeSpan? DefaultExpiry { get; set; } = TimeSpan.FromHours(24);
+}
diff --git a/src/Foundatio.Mediator.Distributed.Redis/RedisQueueJobStateStore.cs b/src/Foundatio.Mediator.Distributed.Redis/RedisQueueJobStateStore.cs
new file mode 100644
index 00000000..aa69dc1c
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Redis/RedisQueueJobStateStore.cs
@@ -0,0 +1,358 @@
+using System.Globalization;
+using StackExchange.Redis;
+
+namespace Foundatio.Mediator.Distributed.Redis;
+
+///
+/// Redis-backed implementation of .
+/// Each job is stored as a Redis Hash. Per-queue job lists are maintained as sorted sets
+/// scored by creation time for efficient pagination. Cancellation uses a separate key.
+///
+public sealed class RedisQueueJobStateStore : IQueueJobStateStore
+{
+ private readonly IConnectionMultiplexer _redis;
+ private readonly RedisJobStateStoreOptions _options;
+ private readonly TimeProvider _timeProvider;
+ private readonly string _keyPrefix;
+
+ public RedisQueueJobStateStore(IConnectionMultiplexer redis, RedisJobStateStoreOptions? options = null, TimeProvider? timeProvider = null)
+ {
+ _redis = redis;
+ _options = options ?? new RedisJobStateStoreOptions();
+ _timeProvider = timeProvider ?? TimeProvider.System;
+ _keyPrefix = string.IsNullOrEmpty(_options.ResourcePrefix)
+ ? _options.KeyPrefix
+ : $"{_options.ResourcePrefix}:{_options.KeyPrefix}";
+ }
+
+ public async Task SetJobStateAsync(QueueJobState state, TimeSpan? expiry = null, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var key = JobKey(state.JobId);
+ var score = state.CreatedUtc.ToUnixTimeMilliseconds();
+
+ // Read old status before overwriting (for status set migration)
+ var oldStatusValue = await db.HashGetAsync(key, "Status").ConfigureAwait(false);
+
+ var entries = new HashEntry[]
+ {
+ new("JobId", state.JobId),
+ new("QueueName", state.QueueName),
+ new("MessageType", state.MessageType),
+ new("Status", ((int)state.Status).ToString(CultureInfo.InvariantCulture)),
+ new("Progress", state.Progress.ToString(CultureInfo.InvariantCulture)),
+ new("ProgressMessage", state.ProgressMessage ?? string.Empty),
+ new("CreatedUtc", score.ToString(CultureInfo.InvariantCulture)),
+ new("StartedUtc", state.StartedUtc?.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture) ?? string.Empty),
+ new("CompletedUtc", state.CompletedUtc?.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture) ?? string.Empty),
+ new("ErrorMessage", state.ErrorMessage ?? string.Empty),
+ new("LastUpdatedUtc", state.LastUpdatedUtc.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture))
+ };
+
+ await db.HashSetAsync(key, entries).ConfigureAwait(false);
+
+ // Add to the per-queue sorted set (scored by creation timestamp for ordering)
+ var queueSetKey = QueueSetKey(state.QueueName);
+ await db.SortedSetAddAsync(queueSetKey, state.JobId, score).ConfigureAwait(false);
+
+ // Maintain per-status sorted sets
+ // Remove from old status set if status changed
+ if (!oldStatusValue.IsNullOrEmpty && int.TryParse(oldStatusValue.ToString(), out var oldStatusInt))
+ {
+ var oldStatus = (QueueJobStatus)oldStatusInt;
+ if (oldStatus != state.Status)
+ await db.SortedSetRemoveAsync(StatusSetKey(state.QueueName, oldStatus), state.JobId).ConfigureAwait(false);
+ }
+
+ // Add to current status set
+ await db.SortedSetAddAsync(StatusSetKey(state.QueueName, state.Status), state.JobId, score).ConfigureAwait(false);
+
+ // Set TTL on all keys
+ var ttl = expiry ?? _options.DefaultExpiry;
+ if (ttl.HasValue)
+ {
+ await db.KeyExpireAsync(key, ttl.Value).ConfigureAwait(false);
+ await db.KeyExpireAsync(queueSetKey, ttl.Value).ConfigureAwait(false);
+ await db.KeyExpireAsync(StatusSetKey(state.QueueName, state.Status), ttl.Value).ConfigureAwait(false);
+ }
+ }
+
+ public async Task GetJobStateAsync(string jobId, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var entries = await db.HashGetAllAsync(JobKey(jobId)).ConfigureAwait(false);
+
+ if (entries.Length == 0)
+ return null;
+
+ return ParseJobState(entries);
+ }
+
+ public async Task UpdateJobStatusAsync(string jobId, QueueJobStatus status, DateTimeOffset? startedUtc = null, DateTimeOffset? completedUtc = null, string? errorMessage = null, int? progress = null, int? attempt = null, TimeSpan? expiry = null, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var key = JobKey(jobId);
+ var now = _timeProvider.GetUtcNow();
+
+ // Read queue name, created score, and current status for sorted set migration.
+ var fields = await db.HashGetAsync(key, ["QueueName", "CreatedUtc", "Status"]).ConfigureAwait(false);
+ if (fields[0].IsNullOrEmpty)
+ return; // Job doesn't exist
+
+ var queueName = fields[0].ToString();
+ var createdScore = long.TryParse(fields[1].ToString(), out var cs) ? cs : 0L;
+ var oldStatusInt = int.TryParse(fields[2].ToString(), out var osi) ? osi : -1;
+ var oldStatus = (QueueJobStatus)oldStatusInt;
+
+ // Build hash field updates
+ var updates = new List
+ {
+ new("Status", ((int)status).ToString(CultureInfo.InvariantCulture)),
+ new("LastUpdatedUtc", now.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture))
+ };
+
+ if (startedUtc.HasValue)
+ updates.Add(new("StartedUtc", startedUtc.Value.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture)));
+ if (completedUtc.HasValue)
+ updates.Add(new("CompletedUtc", completedUtc.Value.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture)));
+ if (errorMessage is not null)
+ updates.Add(new("ErrorMessage", errorMessage));
+ if (progress.HasValue)
+ updates.Add(new("Progress", progress.Value.ToString(CultureInfo.InvariantCulture)));
+ if (attempt.HasValue)
+ updates.Add(new("Attempt", attempt.Value.ToString(CultureInfo.InvariantCulture)));
+
+ var ttl = expiry ?? _options.DefaultExpiry;
+ var newStatusSetKey = StatusSetKey(queueName, status);
+
+ // All writes in a single MULTI/EXEC transaction — atomic without Lua
+ var txn = db.CreateTransaction();
+ txn.AddCondition(Condition.KeyExists(key));
+
+ _ = txn.HashSetAsync(key, updates.ToArray());
+
+ if (oldStatus != status)
+ _ = txn.SortedSetRemoveAsync(StatusSetKey(queueName, oldStatus), jobId);
+ _ = txn.SortedSetAddAsync(newStatusSetKey, jobId, createdScore);
+
+ if (ttl.HasValue)
+ {
+ _ = txn.KeyExpireAsync(key, ttl.Value);
+ _ = txn.KeyExpireAsync(newStatusSetKey, ttl.Value);
+ }
+
+ await txn.ExecuteAsync().ConfigureAwait(false);
+ }
+
+ public async Task UpdateJobProgressAsync(string jobId, int progress, string? progressMessage = null, TimeSpan? expiry = null, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var key = JobKey(jobId);
+
+ if (!await db.KeyExistsAsync(key).ConfigureAwait(false))
+ return;
+
+ var now = _timeProvider.GetUtcNow();
+ var updates = new HashEntry[]
+ {
+ new("Progress", progress.ToString(CultureInfo.InvariantCulture)),
+ new("ProgressMessage", progressMessage ?? string.Empty),
+ new("LastUpdatedUtc", now.ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture))
+ };
+
+ await db.HashSetAsync(key, updates).ConfigureAwait(false);
+
+ var ttl = expiry ?? _options.DefaultExpiry;
+ if (ttl.HasValue)
+ await db.KeyExpireAsync(key, ttl.Value).ConfigureAwait(false);
+ }
+
+ public async Task RequestCancellationAsync(string jobId, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var key = JobKey(jobId);
+
+ // Check if job exists and is in a non-terminal state
+ var statusValue = await db.HashGetAsync(key, "Status").ConfigureAwait(false);
+ if (statusValue.IsNullOrEmpty)
+ return false;
+
+ if (int.TryParse(statusValue.ToString(), out var statusInt))
+ {
+ var status = (QueueJobStatus)statusInt;
+ if (status is QueueJobStatus.Completed or QueueJobStatus.Failed or QueueJobStatus.Cancelled)
+ return false;
+ }
+
+ // Set cancellation flag
+ var cancelKey = CancelKey(jobId);
+ await db.StringSetAsync(cancelKey, "1").ConfigureAwait(false);
+
+ // Match the job's TTL
+ var jobTtl = await db.KeyTimeToLiveAsync(key).ConfigureAwait(false);
+ if (jobTtl.HasValue)
+ await db.KeyExpireAsync(cancelKey, jobTtl.Value).ConfigureAwait(false);
+
+ return true;
+ }
+
+ public Task IsCancellationRequestedAsync(string jobId, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ return db.KeyExistsAsync(CancelKey(jobId));
+ }
+
+ public async Task RemoveJobStateAsync(string jobId, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var key = JobKey(jobId);
+
+ // Read queue name and status before deleting so we can clean up sorted sets
+ var fields = await db.HashGetAsync(key, ["QueueName", "Status"]).ConfigureAwait(false);
+ var queueName = fields[0];
+ var statusValue = fields[1];
+
+ await db.KeyDeleteAsync(key).ConfigureAwait(false);
+ await db.KeyDeleteAsync(CancelKey(jobId)).ConfigureAwait(false);
+
+ if (!queueName.IsNullOrEmpty)
+ {
+ var qn = queueName.ToString();
+ await db.SortedSetRemoveAsync(QueueSetKey(qn), jobId).ConfigureAwait(false);
+
+ // Remove from per-status sorted set
+ if (!statusValue.IsNullOrEmpty && int.TryParse(statusValue.ToString(), out var statusInt))
+ await db.SortedSetRemoveAsync(StatusSetKey(qn, (QueueJobStatus)statusInt), jobId).ConfigureAwait(false);
+ }
+ }
+
+ public Task IncrementCounterAsync(string queueName, string counterName, long value = 1, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var bucketKey = CounterBucketKey(queueName, _timeProvider.GetUtcNow());
+ var task = db.HashIncrementAsync(bucketKey, counterName, value);
+
+ // Auto-expire each hourly bucket after 48h so old buckets clean themselves up
+ _ = db.KeyExpireAsync(bucketKey, TimeSpan.FromHours(48), ExpireWhen.HasNoExpiry);
+
+ return task;
+ }
+
+ public async Task GetCounterStatsAsync(string queueName, TimeSpan? window = null, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var now = _timeProvider.GetUtcNow();
+ var effectiveWindow = window ?? TimeSpan.FromHours(24);
+ var startHour = TruncateToHour(now - effectiveWindow);
+ var endHour = TruncateToHour(now);
+
+ // Build list of bucket keys to query
+ var hours = new List();
+ for (var hour = startHour; hour <= endHour; hour = hour.AddHours(1))
+ hours.Add(hour);
+
+ // Pipeline all bucket reads in a single round-trip
+ var batch = db.CreateBatch();
+ var tasks = new Task[hours.Count];
+ for (int i = 0; i < hours.Count; i++)
+ tasks[i] = batch.HashGetAllAsync(CounterBucketKey(queueName, hours[i]));
+ batch.Execute();
+
+ var totals = new Dictionary();
+ var buckets = new List(hours.Count);
+
+ for (int i = 0; i < hours.Count; i++)
+ {
+ var entries = await tasks[i].ConfigureAwait(false);
+ var counters = new Dictionary(entries.Length);
+
+ foreach (var entry in entries)
+ {
+ if (entry.Value.TryParse(out long val))
+ {
+ var name = entry.Name.ToString();
+ counters[name] = val;
+ totals[name] = totals.GetValueOrDefault(name) + val;
+ }
+ }
+
+ buckets.Add(new CounterBucket { Hour = hours[i], Counters = counters });
+ }
+
+ return new QueueCounterStats { Totals = totals, Buckets = buckets };
+ }
+
+ private string JobKey(string jobId) => $"{_keyPrefix}:{jobId}";
+ private string CancelKey(string jobId) => $"{_keyPrefix}:{jobId}:cancel";
+ private string QueueSetKey(string queueName) => $"{_keyPrefix}:queues:{queueName}";
+ private string StatusSetKey(string queueName, QueueJobStatus status) => $"{_keyPrefix}:queues:{queueName}:status:{(int)status}";
+ private string CounterBucketKey(string queueName, DateTimeOffset timestamp) => $"{_keyPrefix}:counters:{queueName}:{TruncateToHour(timestamp):yyyy-MM-ddTHH}";
+
+ private static DateTimeOffset TruncateToHour(DateTimeOffset timestamp)
+ => new(timestamp.Year, timestamp.Month, timestamp.Day, timestamp.Hour, 0, 0, TimeSpan.Zero);
+
+ public async Task> GetJobsByStatusAsync(string queueName, QueueJobStatus status, int skip = 0, int take = 50, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ var setKey = StatusSetKey(queueName, status);
+
+ // O(take) — read only the page we need from the sorted set (newest first)
+ var members = await db.SortedSetRangeByRankAsync(setKey, skip, skip + take - 1, Order.Descending).ConfigureAwait(false);
+
+ if (members.Length == 0)
+ return [];
+
+ // Pipeline all hash reads
+ var batch = db.CreateBatch();
+ var tasks = new Task[members.Length];
+ for (int i = 0; i < members.Length; i++)
+ tasks[i] = batch.HashGetAllAsync(JobKey(members[i].ToString()));
+ batch.Execute();
+
+ var results = new List(members.Length);
+ for (int i = 0; i < tasks.Length; i++)
+ {
+ var entries = await tasks[i].ConfigureAwait(false);
+ if (entries.Length > 0)
+ results.Add(ParseJobState(entries));
+ }
+
+ return results;
+ }
+
+ public Task GetJobCountByStatusAsync(string queueName, QueueJobStatus status, CancellationToken cancellationToken = default)
+ {
+ var db = _redis.GetDatabase();
+ return db.SortedSetLengthAsync(StatusSetKey(queueName, status));
+ }
+
+ private static QueueJobState ParseJobState(HashEntry[] entries)
+ {
+ var dict = entries.ToDictionary(e => e.Name.ToString(), e => e.Value.ToString());
+
+ return new QueueJobState
+ {
+ JobId = dict.GetValueOrDefault("JobId") ?? string.Empty,
+ QueueName = dict.GetValueOrDefault("QueueName") ?? string.Empty,
+ MessageType = dict.GetValueOrDefault("MessageType") ?? string.Empty,
+ Status = int.TryParse(dict.GetValueOrDefault("Status"), out var s) ? (QueueJobStatus)s : QueueJobStatus.Queued,
+ Progress = int.TryParse(dict.GetValueOrDefault("Progress"), out var p) ? p : 0,
+ ProgressMessage = NullIfEmpty(dict.GetValueOrDefault("ProgressMessage")),
+ CreatedUtc = ParseDateTimeOffset(dict.GetValueOrDefault("CreatedUtc")),
+ StartedUtc = ParseNullableDateTimeOffset(dict.GetValueOrDefault("StartedUtc")),
+ CompletedUtc = ParseNullableDateTimeOffset(dict.GetValueOrDefault("CompletedUtc")),
+ ErrorMessage = NullIfEmpty(dict.GetValueOrDefault("ErrorMessage")),
+ Attempt = int.TryParse(dict.GetValueOrDefault("Attempt"), out var a) ? a : 0,
+ LastUpdatedUtc = ParseDateTimeOffset(dict.GetValueOrDefault("LastUpdatedUtc"))
+ };
+ }
+
+ private static DateTimeOffset ParseDateTimeOffset(string? value)
+ => long.TryParse(value, out var ms) ? DateTimeOffset.FromUnixTimeMilliseconds(ms) : DateTimeOffset.MinValue;
+
+ private static DateTimeOffset? ParseNullableDateTimeOffset(string? value)
+ => string.IsNullOrEmpty(value) ? null : long.TryParse(value, out var ms) ? DateTimeOffset.FromUnixTimeMilliseconds(ms) : null;
+
+ private static string? NullIfEmpty(string? value)
+ => string.IsNullOrEmpty(value) ? null : value;
+}
diff --git a/src/Foundatio.Mediator.Distributed.Redis/RedisServiceExtensions.cs b/src/Foundatio.Mediator.Distributed.Redis/RedisServiceExtensions.cs
new file mode 100644
index 00000000..dc8e1a29
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed.Redis/RedisServiceExtensions.cs
@@ -0,0 +1,43 @@
+using Microsoft.Extensions.DependencyInjection;
+using StackExchange.Redis;
+
+namespace Foundatio.Mediator.Distributed.Redis;
+
+///
+/// Extension methods for configuring Redis-backed services on .
+///
+public static class RedisBuilderExtensions
+{
+ ///
+ /// Registers a Redis-backed for tracking queue job state.
+ /// Requires an to be registered in DI.
+ ///
+ /// The mediator builder.
+ /// Optional configuration callback.
+ /// The mediator builder for chaining.
+ ///
+ ///
+ /// services.AddSingleton<IConnectionMultiplexer>(ConnectionMultiplexer.Connect("localhost"));
+ /// services.AddMediator()
+ /// .AddDistributedQueues()
+ /// .UseRedisJobState();
+ ///
+ ///
+ public static IMediatorBuilder UseRedisJobState(
+ this IMediatorBuilder builder,
+ Action? configure = null)
+ {
+ var services = builder.Services;
+ var options = new RedisJobStateStoreOptions();
+ configure?.Invoke(options);
+
+ services.AddSingleton(options);
+ services.AddSingleton(sp =>
+ new RedisQueueJobStateStore(
+ sp.GetRequiredService(),
+ sp.GetService(),
+ sp.GetService()));
+
+ return builder;
+ }
+}
diff --git a/src/Foundatio.Mediator.Distributed/AssemblyInfo.cs b/src/Foundatio.Mediator.Distributed/AssemblyInfo.cs
new file mode 100644
index 00000000..cde18f69
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using Foundatio.Mediator;
+
+[assembly: FoundatioModule]
diff --git a/src/Foundatio.Mediator.Distributed/DistributedContext.cs b/src/Foundatio.Mediator.Distributed/DistributedContext.cs
new file mode 100644
index 00000000..8cb56354
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/DistributedContext.cs
@@ -0,0 +1,34 @@
+namespace Foundatio.Mediator.Distributed;
+
+///
+/// Ambient context that indicates the current execution scope originated from
+/// distributed infrastructure (e.g., a pub/sub bus or remote queue).
+/// Middleware such as checks this to avoid
+/// re-enqueueing messages that have already been dispatched through shared infrastructure.
+///
+public static class DistributedContext
+{
+ private static readonly AsyncLocal _isNotification = new();
+
+ ///
+ /// Gets whether the current execution scope is processing a notification
+ /// received from the distributed bus.
+ ///
+ public static bool IsNotification => _isNotification.Value;
+
+ ///
+ /// Enters a notification scope. The returned
+ /// restores the previous value when disposed.
+ ///
+ public static IDisposable BeginNotificationScope()
+ {
+ var previous = _isNotification.Value;
+ _isNotification.Value = true;
+ return new NotificationScope(previous);
+ }
+
+ private sealed class NotificationScope(bool previous) : IDisposable
+ {
+ public void Dispose() => _isNotification.Value = previous;
+ }
+}
diff --git a/src/Foundatio.Mediator.Distributed/DistributedInfrastructureInitializer.cs b/src/Foundatio.Mediator.Distributed/DistributedInfrastructureInitializer.cs
new file mode 100644
index 00000000..0749e2fc
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/DistributedInfrastructureInitializer.cs
@@ -0,0 +1,127 @@
+using System.Diagnostics;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace Foundatio.Mediator.Distributed;
+
+///
+/// Hosted service that pre-creates all queues and topics in the background.
+/// Workers and publishers await
+/// before using infrastructure, so the app can start accepting requests immediately.
+///
+internal sealed class DistributedInfrastructureInitializer(
+ IQueueClient? queueClient,
+ IPubSubClient? pubSubClient,
+ DistributedInfrastructureOptions options,
+ DistributedInfrastructureReady ready,
+ ILogger logger) : IHostedService
+{
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ if (options.QueueNames.Count == 0 && options.TopicNames.Count == 0)
+ {
+ ready.SetReady();
+ return Task.CompletedTask;
+ }
+
+ // Fire and forget — workers await ready.WaitAsync() before polling
+ _ = InitializeAsync(cancellationToken);
+ return Task.CompletedTask;
+ }
+
+ private async Task InitializeAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ var sw = Stopwatch.StartNew();
+ using var activity = MediatorActivitySource.Instance.StartActivity("Mediator Infrastructure Setup");
+
+ // Warm up the transport connections with one real call per transport.
+ // The first AWS SDK call absorbs DNS resolution, TLS handshake, and
+ // endpoint discovery (~20s against cold LocalStack). Creating one queue
+ // and one topic first means the parallel batch below gets warm connections.
+ var warmUpTasks = new List(2);
+ if (options.QueueNames.Count > 0 && queueClient is not null)
+ warmUpTasks.Add(queueClient.EnsureQueuesAsync([options.QueueNames[0]], cancellationToken));
+ if (options.TopicNames.Count > 0 && pubSubClient is not null)
+ warmUpTasks.Add(pubSubClient.EnsureTopicsAsync([options.TopicNames[0]], cancellationToken));
+ await Task.WhenAll(warmUpTasks).ConfigureAwait(false);
+ logger.LogInformation("Transport connections warm in {ElapsedMs}ms", sw.ElapsedMilliseconds);
+
+ // Now create the remaining queues/topics with warm connections
+ var tasks = new List(2);
+
+ if (options.QueueNames.Count > 1 && queueClient is not null)
+ {
+ var remaining = options.QueueNames.Skip(1).ToList();
+ logger.LogInformation("Ensuring remaining {Count} queue(s) exist: {Queues}", remaining.Count, remaining);
+ tasks.Add(queueClient.EnsureQueuesAsync(remaining, cancellationToken));
+ }
+
+ // Topic was already fully set up (queue + subscribe) during warm-up
+ // so we only need to process additional topics if there are more than one.
+ if (options.TopicNames.Count > 1 && pubSubClient is not null)
+ {
+ var remaining = options.TopicNames.Skip(1).ToList();
+ logger.LogInformation("Ensuring remaining {Count} topic(s) exist: {Topics}", remaining.Count, remaining);
+ tasks.Add(pubSubClient.EnsureTopicsAsync(remaining, cancellationToken));
+ }
+
+ if (tasks.Count > 0)
+ await Task.WhenAll(tasks).ConfigureAwait(false);
+
+ logger.LogInformation("Distributed infrastructure ready in {ElapsedMs}ms", sw.ElapsedMilliseconds);
+ ready.SetReady();
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Failed to initialize distributed infrastructure");
+ ready.SetFailed(ex);
+ }
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+}
+
+///
+/// Signals that distributed infrastructure (queues, topics) has been created.
+/// Workers await before starting their polling loops.
+///
+public sealed class DistributedInfrastructureReady
+{
+ private readonly TaskCompletionSource _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ ///
+ /// Blocks until infrastructure is ready or throws if initialization failed.
+ ///
+ public Task WaitAsync(CancellationToken cancellationToken = default)
+ {
+ if (_tcs.Task.IsCompleted)
+ return _tcs.Task;
+
+ return WaitWithCancellationAsync(cancellationToken);
+ }
+
+ private async Task WaitWithCancellationAsync(CancellationToken cancellationToken)
+ {
+ var cancelTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var reg = cancellationToken.Register(() => cancelTcs.TrySetCanceled(cancellationToken));
+ await Task.WhenAny(_tcs.Task, cancelTcs.Task).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+ await _tcs.Task.ConfigureAwait(false); // propagate any failure
+ }
+
+ internal void SetReady() => _tcs.TrySetResult();
+
+ internal void SetFailed(Exception ex) => _tcs.TrySetException(ex);
+}
+
+///
+/// Collects queue and topic names during service registration for use by
+/// at startup.
+///
+internal sealed class DistributedInfrastructureOptions
+{
+ public List QueueNames { get; } = [];
+ public List TopicNames { get; } = [];
+}
diff --git a/src/Foundatio.Mediator.Distributed/DistributedNotificationAttribute.cs b/src/Foundatio.Mediator.Distributed/DistributedNotificationAttribute.cs
new file mode 100644
index 00000000..0807ea17
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/DistributedNotificationAttribute.cs
@@ -0,0 +1,15 @@
+namespace Foundatio.Mediator.Distributed;
+
+///
+/// Marks a notification type for distributed fan-out via pub/sub.
+/// This is an alternative to implementing —
+/// use this attribute when you cannot or prefer not to modify the type hierarchy.
+///
+///
+///
+/// [DistributedNotification]
+/// public record OrderCreated(string OrderId, DateTime CreatedAt);
+///
+///
+[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)]
+public sealed class DistributedNotificationAttribute : Attribute;
diff --git a/src/Foundatio.Mediator.Distributed/DistributedNotificationOptions.cs b/src/Foundatio.Mediator.Distributed/DistributedNotificationOptions.cs
new file mode 100644
index 00000000..af8c9c7f
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/DistributedNotificationOptions.cs
@@ -0,0 +1,165 @@
+using System.Collections.Concurrent;
+using System.Reflection;
+using System.Text.Json;
+using System.Threading.Channels;
+
+namespace Foundatio.Mediator.Distributed;
+
+///
+/// Options for configuring distributed notification fan-out.
+///
+public class DistributedNotificationOptions
+{
+ ///
+ /// Unique identifier for this host instance. Messages received from the bus
+ /// with a matching host ID are skipped to prevent double-processing.
+ /// Defaults to a new GUID if not set.
+ ///
+ public string HostId { get; set; } = Guid.NewGuid().ToString("N");
+
+ ///
+ /// The topic name used for publishing and subscribing to distributed notifications.
+ /// Defaults to "distributed-notifications".
+ ///
+ public string Topic { get; set; } = "distributed-notifications";
+
+ ///
+ /// Custom JSON serializer options for notification serialization/deserialization.
+ /// When null, is used.
+ ///
+ public JsonSerializerOptions? JsonSerializerOptions { get; set; }
+
+ ///
+ /// Maximum capacity of the outbound subscription buffer.
+ /// When full, the behavior is controlled by .
+ /// Default is 1000.
+ ///
+ public int MaxCapacity { get; set; } = 1000;
+
+ ///
+ /// Behavior when the outbound subscription buffer is full.
+ /// Default is to provide backpressure
+ /// and avoid dropping notifications.
+ ///
+ public BoundedChannelFullMode FullMode { get; set; } = BoundedChannelFullMode.Wait;
+
+ ///
+ /// Optional prefix applied to topic and per-node queue names for app-level scoping.
+ /// When set, topic names become "{ResourcePrefix}-{Topic}".
+ /// When null or empty (default), names are used as-is.
+ ///
+ ///
+ /// Use this to isolate multiple applications sharing the same infrastructure
+ /// (e.g., "myapp" produces topic "myapp-distributed-notifications").
+ ///
+ public string? ResourcePrefix { get; set; }
+
+ ///
+ /// When true, all notification types are distributed via pub/sub,
+ /// regardless of whether they implement
+ /// or are decorated with .
+ /// Default is false.
+ ///
+ public bool IncludeAllNotifications { get; set; }
+
+ ///
+ /// Optional predicate evaluated for notification types that are not already included
+ /// by , ,
+ /// or explicit calls. Return true to distribute the type.
+ ///
+ ///
+ ///
+ /// opts.MessageFilter = type => type.Namespace?.StartsWith("MyApp.Events") == true;
+ ///
+ ///
+ public Func? MessageFilter { get; set; }
+
+ ///
+ /// Explicitly includes a notification type for distributed fan-out.
+ /// Use this when the type cannot implement
+ /// or be decorated with .
+ ///
+ /// The notification type to distribute.
+ /// This options instance for chaining.
+ public DistributedNotificationOptions Include()
+ {
+ IncludedTypes.Add(typeof(T));
+ _shouldDistributeCache.TryRemove(typeof(T), out _);
+ return this;
+ }
+
+ ///
+ /// Explicitly includes a notification type for distributed fan-out.
+ ///
+ /// The notification type to distribute.
+ /// This options instance for chaining.
+ public DistributedNotificationOptions Include(Type type)
+ {
+ IncludedTypes.Add(type);
+ _shouldDistributeCache.TryRemove(type, out _);
+ return this;
+ }
+
+ ///
+ /// Scans the assembly containing and includes all
+ /// public notification types (classes and structs) for distributed fan-out.
+ /// A type is considered a notification if it implements .
+ ///
+ /// A type whose assembly will be scanned.
+ /// This options instance for chaining.
+ public DistributedNotificationOptions IncludeNotificationsFromAssemblyOf()
+ {
+ foreach (var type in typeof(T).Assembly.GetExportedTypes())
+ {
+ if (typeof(INotification).IsAssignableFrom(type) && type is { IsAbstract: false, IsInterface: false })
+ {
+ IncludedTypes.Add(type);
+ _shouldDistributeCache.TryRemove(type, out _);
+ }
+ }
+
+ return this;
+ }
+
+ ///
+ /// Types explicitly added via or .
+ ///
+ internal HashSet IncludedTypes { get; } = [];
+
+ private readonly ConcurrentDictionary _shouldDistributeCache = new();
+
+ ///
+ /// Determines whether a given message type should be distributed via pub/sub.
+ /// Evaluation order: explicit includes → →
+ /// → →
+ /// .
+ ///
+ ///
+ /// Results are cached per type to avoid repeated reflection on the hot path.
+ ///
+ public bool ShouldDistribute(Type messageType)
+ {
+ return _shouldDistributeCache.GetOrAdd(messageType, static (type, self) =>
+ {
+ if (self.IncludedTypes.Contains(type))
+ return true;
+
+ if (typeof(IDistributedNotification).IsAssignableFrom(type))
+ return true;
+
+ if (type.GetCustomAttribute() is not null)
+ return true;
+
+ if (self.MessageFilter is not null)
+ return self.MessageFilter(type);
+
+ return self.IncludeAllNotifications;
+ }, this);
+ }
+
+ ///
+ /// Returns with applied when configured.
+ ///
+ internal string EffectiveTopic =>
+ string.IsNullOrEmpty(ResourcePrefix) ? Topic : $"{ResourcePrefix}-{Topic}";
+}
diff --git a/src/Foundatio.Mediator.Distributed/DistributedNotificationWorker.cs b/src/Foundatio.Mediator.Distributed/DistributedNotificationWorker.cs
new file mode 100644
index 00000000..7414e11a
--- /dev/null
+++ b/src/Foundatio.Mediator.Distributed/DistributedNotificationWorker.cs
@@ -0,0 +1,287 @@
+using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Text.Json;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace Foundatio.Mediator.Distributed;
+
+///
+/// Background service that bridges locally published distributed notifications
+/// to a remote (outbound) and re-publishes inbound bus messages
+/// to the local mediator.
+///
+///
+/// Outbound loop: uses mediator.SubscribeAsync<MessageContext<object>>()
+/// to tap into all locally published notifications, filters to types that should be distributed
+/// (via ), then serializes and publishes
+/// them to the pub/sub client. Messages that arrived from the bus (tracked by reference identity
+/// in ) are skipped to prevent re-broadcast loops.
+///
+/// Inbound loop: subscribes to the bus topic and, for each received message,
+/// checks the header. If it matches this host's ID the
+/// message is skipped (self-delivery). Otherwise the message is deserialized, added to the
+/// set, and published locally via mediator.PublishAsync().
+/// The reference set entry is removed in a finally block.
+///
+public sealed class DistributedNotificationWorker : BackgroundService
+{
+ private readonly IServiceScopeFactory _scopeFactory;
+ private readonly IPubSubClient _bus;
+ private readonly DistributedNotificationOptions _options;
+ private readonly JsonSerializerOptions _jsonOptions;
+ private readonly ILogger _logger;
+ private readonly MessageTypeResolver? _typeResolver;
+
+ ///
+ /// Tracks notification objects that arrived from the bus and are currently being
+ /// re-published locally. The outbound loop checks this set by reference identity
+ /// and skips any match, preventing infinite re-broadcast.
+ ///
+ private readonly ConcurrentDictionary