diff --git a/.gitignore b/.gitignore index b92df279..cfc2ba8b 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ _NCrunch_* **/.idea/**/contentModel.xml **/.idea/**/modules.xml **/.idea/copilot/chatSessions/ +.idea/ diff --git a/docs/guide/index-management.md b/docs/guide/index-management.md index 32ba431d..fc50563b 100644 --- a/docs/guide/index-management.md +++ b/docs/guide/index-management.md @@ -851,3 +851,30 @@ AddReindexScript(2, @" - [Migrations](/guide/migrations) - Document migrations - [Jobs](/guide/jobs) - Index maintenance jobs - [Elasticsearch Setup](/guide/elasticsearch-setup) - Connection configuration + +## Concurrency Safety + +Reindexing is protected by a distributed lock keyed on the index alias to prevent concurrent reindex operations from corrupting data. + +### Lock Strategy + +- **Lock key**: `reindex:{alias}` (e.g., `reindex:employees`) +- **Lock TTL**: 20 minutes, auto-renewed during long-running operations +- Both direct (`VersionedIndex.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock +- Only one reindex per logical index can run at a time — subsequent version transitions wait for the current one to complete + +### Why Alias-Only Keys + +Using the alias as the lock key ensures that sequential version transitions (v1→v2, then v2→v3) cannot overlap. If v2→v3 started before v1→v2 completed, v3 would contain incomplete data from v2. + +### Lock Renewal for Long-Running Reindexes + +For indexes with millions of documents that take hours to reindex, the lock is automatically renewed on every progress callback (every 1-10 seconds during the polling loop). This prevents lock expiration during legitimate long-running operations. + +### Crash Recovery + +If an instance crashes mid-reindex, the lock expires after 20 minutes. Another instance can then retry the reindex. `VersionedIndex.ReindexAsync()` is resume-safe — it picks up from the last document using timestamp-based or ID-based range queries. + +### Unique Index Names + +`ElasticConfiguration.AddIndex()` enforces unique index names (case-insensitive). Registering two indexes with the same alias throws an `ArgumentException` at startup, preventing conflicts before they can cause data corruption. diff --git a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj index ef7a8895..f5594f50 100644 --- a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj +++ b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs index bcd08a0b..e99f9b9e 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs @@ -13,6 +13,7 @@ using Foundatio.Parsers.ElasticQueries; using Foundatio.Queues; using Foundatio.Repositories.Elasticsearch.CustomFields; +using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Elasticsearch.Queries.Builders; using Foundatio.Repositories.Extensions; using Foundatio.Resilience; @@ -88,6 +89,7 @@ protected virtual void ConfigureSettings(ConnectionSettings settings) public IElasticClient Client => _client.Value; public ICacheClient Cache { get; } public IMessageBus MessageBus { get; } + public ILockProvider LockProvider => _lockProvider; public ILoggerFactory LoggerFactory { get; } public IResiliencePolicyProvider ResiliencePolicyProvider { get; } public IResiliencePolicy ResiliencePolicy { get; } @@ -116,6 +118,9 @@ public void AddIndex(IIndex index) if (_frozenIndexes.IsValueCreated) throw new InvalidOperationException("Can't add indexes after the list has been frozen."); + if (_indexes.Any(i => String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase))) + throw new ArgumentException($"An index with name '{index.Name}' has already been registered.", nameof(index)); + _indexes.Add(index); } @@ -187,8 +192,7 @@ private async Task ConfigureIndexInternalAsync(IIndex idx, bool beginReindexingO throw new InvalidOperationException("Must specify work item queue and lock provider in order to migrate index versions."); var reindexWorkItem = versionedIndex.CreateReindexWorkItem(currentVersion); - bool isReindexing = await _lockProvider.IsLockedAsync(String.Join(":", "reindex", reindexWorkItem.Alias, - reindexWorkItem.OldIndex, reindexWorkItem.NewIndex)).AnyContext(); + bool isReindexing = await _lockProvider.IsLockedAsync(ElasticReindexer.GetLockName(versionedIndex.Name)).AnyContext(); if (isReindexing) return; diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs index f520dbe4..72a2dba7 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using Foundatio.Caching; +using Foundatio.Lock; using Foundatio.Messaging; using Foundatio.Parsers.ElasticQueries; using Foundatio.Repositories.Elasticsearch.CustomFields; @@ -17,6 +18,7 @@ public interface IElasticConfiguration : IDisposable IElasticClient Client { get; } ICacheClient Cache { get; } IMessageBus MessageBus { get; } + ILockProvider LockProvider { get; } ILoggerFactory LoggerFactory { get; } IResiliencePolicyProvider ResiliencePolicyProvider { get; } TimeProvider TimeProvider { get; set; } diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs index 2dc85fab..5b454815 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs @@ -351,6 +351,7 @@ public virtual Task ReindexAsync(Func? progressCallbackAsync { OldIndex = Name, NewIndex = Name, + Alias = Name, DeleteOld = false, TimestampField = GetTimeStampField() }; diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 5b3d635f..98a8008c 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -5,6 +5,7 @@ using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; +using Foundatio.Lock; using Foundatio.Parsers.ElasticQueries; using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Extensions; @@ -250,9 +251,33 @@ public override async Task ReindexAsync(Func? progressCallba if (currentVersion < 0 || currentVersion >= Version) return; + string lockKey = ElasticReindexer.GetLockName(Name); + await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); + if (reindexLock is null) + throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes."); + + currentVersion = await GetCurrentVersionAsync().AnyContext(); + if (currentVersion < 0 || currentVersion >= Version) + return; + var reindexWorkItem = CreateReindexWorkItem(currentVersion); + + Func wrappedCallback = async (progress, message) => + { + await reindexLock.RenewAsync().AnyContext(); + + if (progressCallbackAsync is not null) + { + await progressCallbackAsync(progress, message).AnyContext(); + } + else + { + _logger.LogInformation("Reindex Progress {Progress:F1}%: {Message}", progress, message); + } + }; + var reindexer = new ElasticReindexer(Configuration.Client, _logger); - await reindexer.ReindexAsync(reindexWorkItem, progressCallbackAsync).AnyContext(); + await reindexer.ReindexAsync(reindexWorkItem, wrappedCallback).AnyContext(); } public override async Task MaintainAsync(bool includeOptionalTasks = true) diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs index d14af9d1..8d07bfc5 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs @@ -6,7 +6,7 @@ public record ReindexWorkItem { public required string OldIndex { get; init; } public required string NewIndex { get; init; } - public string? Alias { get; init; } + public required string Alias { get; init; } public string? Script { get; init; } public bool DeleteOld { get; set; } public string? TimestampField { get; init; } diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs index e0699dc2..7b40395f 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs @@ -25,7 +25,7 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider, if (workItem is not ReindexWorkItem reindexWorkItem) return Task.FromResult(null); - return _lockProvider.TryAcquireAsync(String.Join(":", "reindex", reindexWorkItem.Alias, reindexWorkItem.OldIndex, reindexWorkItem.NewIndex), TimeSpan.FromMinutes(20), cancellationToken); + return _lockProvider.TryAcquireAsync(ElasticReindexer.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); } public override Task HandleItemAsync(WorkItemContext context) diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 38fa38ed..99349e0c 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Elasticsearch.Net; +using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Extensions; using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Extensions; @@ -27,6 +28,16 @@ public class ElasticReindexer private const string ID_FIELD = "id"; private const int MAX_STATUS_FAILS = 10; + /// + /// Returns the distributed lock resource name for serializing reindex operations on the given alias. + /// + public static string GetLockName(string alias) + { + ArgumentException.ThrowIfNullOrEmpty(alias); + + return String.Concat("reindex:", alias); + } + public ElasticReindexer(IElasticClient client, ILogger? logger = null) : this(client, TimeProvider.System, logger) { } @@ -62,7 +73,14 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func + { + [nameof(workItem.OldIndex)] = workItem.OldIndex, + [nameof(workItem.NewIndex)] = workItem.NewIndex, + [nameof(workItem.Alias)] = workItem.Alias + }); + + _logger.LogInformation("Received reindex work item for {OldIndex} -> {NewIndex}", workItem.OldIndex, workItem.NewIndex); var startTime = _timeProvider.GetUtcNow().UtcDateTime.AddSeconds(-1); await progressCallbackAsync(0, "Starting reindex...").AnyContext(); var firstPassResult = await InternalReindexAsync(workItem, progressCallbackAsync, 0, 90, workItem.StartUtc).AnyContext(); @@ -96,7 +114,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); ReindexResult? secondPassResult = null; if (!String.IsNullOrEmpty(workItem.TimestampField)) @@ -117,17 +135,17 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); var newDocCountResponse = await _client.CountAsync(d => d.Index(workItem.NewIndex)).AnyContext(); _logger.LogRequest(newDocCountResponse); if (!newDocCountResponse.IsValid) - _logger.LogWarning("Failed to get new index doc count: {Error}", newDocCountResponse.ServerError); + _logger.LogWarning("Failed to get new index doc count for {NewIndex}: {Error}", workItem.NewIndex, newDocCountResponse.GetErrorMessage()); var oldDocCountResponse = await _client.CountAsync(d => d.Index(workItem.OldIndex)).AnyContext(); _logger.LogRequest(oldDocCountResponse); if (!oldDocCountResponse.IsValid) - _logger.LogWarning("Failed to get old index doc count: {Error}", oldDocCountResponse.ServerError); + _logger.LogWarning("Failed to get old index doc count for {OldIndex}: {Error}", workItem.OldIndex, oldDocCountResponse.GetErrorMessage()); await progressCallbackAsync(98, $"Old Docs: {oldDocCountResponse.Count} New Docs: {newDocCountResponse.Count}").AnyContext(); if (newDocCountResponse.IsValid && oldDocCountResponse.IsValid && newDocCountResponse.Count >= oldDocCountResponse.Count) @@ -135,16 +153,16 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func InternalReindexAsync(ReindexWorkItem workItem, Func progressCallbackAsync, int startProgress = 0, int endProgress = 100, DateTime? startTime = null, CancellationToken cancellationToken = default) @@ -173,7 +191,7 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, return response; }, cancellationToken).AnyContext(); - _logger.LogInformation("Reindex Task Id: {TaskId}", result.Task.FullyQualifiedId); + _logger.LogInformation("Reindex Task Id: {ReindexTaskId}", result.Task.FullyQualifiedId); _logger.LogRequest(result); long totalDocs = result.Total; @@ -196,7 +214,8 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, if (statusGetFails > MAX_STATUS_FAILS) { - _logger.LogError("Failed to get the status {FailureCount} times in a row", MAX_STATUS_FAILS); + _logger.LogError("Failed to get the status {FailureCount} times in a row for reindex task {ReindexTaskId} reindexing {OldIndex} -> {NewIndex}", + statusGetFails, result.Task.FullyQualifiedId, workItem.OldIndex, workItem.NewIndex); break; } @@ -221,7 +240,7 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, sw.Restart(); lastProgress = lastCompleted; - string lastMessage = $"Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; + string lastMessage = $"[{workItem.NewIndex}] Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; await progressCallbackAsync(CalculateProgress(status.Task.Status.Total, lastCompleted, startProgress, endProgress), lastMessage).AnyContext(); if (status.Completed && response?.Error == null) @@ -245,6 +264,22 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, } while (!cancellationToken.IsCancellationRequested); sw.Stop(); + if (!taskSuccess) + { + if (cancellationToken.IsCancellationRequested) + { + _logger.LogWarning("Reindex cancelled for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, lastProgress, totalDocs, sw.Elapsed); + } + else + { + _logger.LogError("Reindex abandoned for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, StatusFails: {StatusFails}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); + } + + await TryCancelTaskAsync(result.Task, workItem.OldIndex, workItem.NewIndex).AnyContext(); + } + long failures = 0; if (lastReindexResponse?.Failures != null && lastReindexResponse.Failures.Count > 0) { @@ -318,6 +353,31 @@ private async Task HandleFailureAsync(ReindexWorkItem workItem, BulkIndexByScrol _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); } + /// + /// Attempts to cancel the Elasticsearch server-side reindex task. Best-effort — failures are logged but not propagated. + /// + private async Task TryCancelTaskAsync(TaskId reindexTaskId, string oldIndex, string newIndex) + { + try + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var response = await _client.Tasks.CancelAsync(c => c.TaskId(reindexTaskId), cts.Token).AnyContext(); + if (response.IsValid) + { + _logger.LogRequest(response); + _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); + } + else + { + _logger.LogErrorRequest(response, "Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Exception cancelling reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); + } + } + private async Task> GetIndexAliasesAsync(string index) { var aliasesResponse = await _client.Indices.GetAliasAsync(index).AnyContext(); diff --git a/src/Foundatio.Repositories/Foundatio.Repositories.csproj b/src/Foundatio.Repositories/Foundatio.Repositories.csproj index bf59cf1f..aae2b03e 100644 --- a/src/Foundatio.Repositories/Foundatio.Repositories.csproj +++ b/src/Foundatio.Repositories/Foundatio.Repositories.csproj @@ -1,9 +1,9 @@ - + - + diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props index dd1bc7b0..7a4567aa 100644 --- a/tests/Directory.Build.props +++ b/tests/Directory.Build.props @@ -12,7 +12,7 @@ - + diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs index ee6d5cac..137163fb 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs @@ -5,6 +5,7 @@ using Exceptionless.DateTimeExtensions; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; +using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models; using Foundatio.Repositories.Exceptions; @@ -1532,4 +1533,36 @@ public async Task MaintainIndexesAsync_WhenMarkerExists_PreservesMarker() // Assert Assert.True(HasConfigureIndexesCacheMarker()); } + + [Fact] + public void AddIndex_WithDuplicateIndexName_ThrowsArgumentException() + { + // Arrange — fresh config so the index list isn't frozen + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); + + // Act & Assert + var ex = Assert.Throws(() => config.AddIndex(new EmployeeIndex(config))); + Assert.Contains("employees", ex.Message); + } + + [Fact] + public void AddIndex_WithUniqueIndexNames_Succeeds() + { + // Arrange — fresh config registers many unique indexes in its constructor + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); + + // Assert + Assert.True(config.Indexes.Count > 2); + } + + [Fact] + public void AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException() + { + // Arrange — fresh config so the index list isn't frozen + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); + + // Act & Assert — VersionedEmployeeIndex uses the same "employees" alias + var ex = Assert.Throws(() => config.AddIndex(new VersionedEmployeeIndex(config, 1))); + Assert.Contains("employees", ex.Message); + } } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs index 30623d57..cdd6163b 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs @@ -1,12 +1,15 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Elasticsearch.Net; using Foundatio.AsyncEx; +using Foundatio.Lock; using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; +using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models; using Foundatio.Repositories.Utility; @@ -958,6 +961,148 @@ public async Task RemoveFieldScript_WithNestedField_RemovesNestedFieldDuringRein Assert.Equal("{\"keepField\":\"keepValue\"}", json); } + [Fact] + public async Task ReindexAsync_ConcurrentCalls_OnlyOneCompletes() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + + await version2Index.ConfigureAsync(); + + // Act + var task1 = version2Index.ReindexAsync(); + var task2 = version2Index.ReindexAsync(); + await Task.WhenAll(task1, task2); + + // Assert + var aliasResponse = await _client.Indices.GetAliasAsync(version2Index.Name, ct: TestCancellationToken); + Assert.True(aliasResponse.IsValid); + Assert.Single(aliasResponse.Indices); + Assert.Equal(version2Index.VersionedName, aliasResponse.Indices.First().Key); + + var countResponse = await _client.CountAsync(d => d.Index(version2Index.VersionedName), TestCancellationToken); + Assert.True(countResponse.IsValid); + Assert.Equal(100, countResponse.Count); + + Assert.False((await _client.Indices.ExistsAsync(version1Index.VersionedName, ct: TestCancellationToken)).Exists); + } + + [Fact] + public async Task ReindexAsync_LockPreventsHandlerAndDirectConcurrency() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + await version2Index.ConfigureAsync(); + + string lockKey = ElasticReindexer.GetLockName(version2Index.Name); + await using var externalLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); + Assert.NotNull(externalLock); + + // Act - Start reindex on a background thread and verify it blocks + var reindexStarted = new TaskCompletionSource(); + var reindexTask = Task.Run(async () => + { + reindexStarted.SetResult(true); + await version2Index.ReindexAsync(); + }, TestCancellationToken); + await reindexStarted.Task; + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Assert - Lock prevented reindex from completing + Assert.False(reindexTask.IsCompleted); + + // Cleanup - Release lock and wait for reindex to complete, then delete + await externalLock.ReleaseAsync(); + await reindexTask; + await version2Index.DeleteAsync(); + } + + [Fact] + public async Task ReindexAsync_AliasLockBlocksSubsequentVersionTransition() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + var version3Index = new VersionedEmployeeIndex(_configuration, 3); + await version3Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + await version2Index.ConfigureAsync(); + await version3Index.ConfigureAsync(); + + string lockKey = ElasticReindexer.GetLockName(version1Index.Name); + await using var reindexLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); + Assert.NotNull(reindexLock); + + // Act - Start reindex on a background thread and verify it blocks + var reindexStarted = new TaskCompletionSource(); + var reindexTask = Task.Run(async () => + { + reindexStarted.SetResult(true); + await version3Index.ReindexAsync(); + }, TestCancellationToken); + await reindexStarted.Task; + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Assert - Lock prevented reindex from completing; data unchanged + Assert.False(reindexTask.IsCompleted); + var countResponse = await _client.CountAsync(d => d.Index(version1Index.VersionedName), TestCancellationToken); + Assert.True(countResponse.IsValid); + Assert.Equal(100, countResponse.Count); + + // Cleanup - Release lock and wait for reindex to complete, then delete + await reindexLock.ReleaseAsync(); + await reindexTask; + await version3Index.DeleteAsync(); + } + + [Fact] + public async Task ReindexAsync_WhenCancelled_DoesNotCompleteReindex() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(5000), o => o.ImmediateConsistency()); + + await version2Index.ConfigureAsync(); + + // Act + var reindexTask = version2Index.ReindexAsync((progress, message) => + { + _logger.LogInformation("Reindex Progress {Progress}%: {Message}", progress, message); + if (progress > 5) + throw new OperationCanceledException("Test cancellation"); + return Task.CompletedTask; + }); + + // Assert + await Assert.ThrowsAnyAsync(() => reindexTask); + Assert.True((await _client.Indices.ExistsAsync(version1Index.VersionedName, ct: TestCancellationToken)).Exists); + } + private static string GetExpectedEmployeeDailyAliases(IIndex index, DateTime utcNow, DateTime indexDateUtc) { double totalDays = utcNow.Date.Subtract(indexDateUtc.Date).TotalDays;