From ec01bec05b25419eed61c6914e1344451b164dbc Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 18:35:02 -0500 Subject: [PATCH 01/13] fix: improve reindex reliability with distributed locking and observability Close a concurrency gap where VersionedIndex.ReindexAsync() bypassed the distributed lock used by ReindexWorkItemHandler, allowing simultaneous reindex operations against the same index. Unify both paths under a single alias-keyed lock (reindex:{alias}) that serializes all reindex operations for a logical index. Key changes: Concurrency safety: - VersionedIndex.ReindexAsync() now acquires the same distributed lock as the work-item handler before starting a reindex - Lock is auto-renewed on every progress callback to support multi-hour reindexes of large indexes - Simplified lock key from reindex:{alias}:{old}:{new} to reindex:{alias} to prevent overlapping sequential version transitions (v1->v2 must finish before v2->v3 can start) - Updated IsLockedAsync check in ConfigureIndexInternalAsync to match new key - Exposed ILockProvider on IElasticConfiguration interface Observability improvements: - Include TaskId, OldIndex, NewIndex in MAX_STATUS_FAILS error log - Replace .ServerError (often null) with .GetErrorMessage() for all refresh and count response warnings - Add explicit "reindex abandoned" log with full context (TaskId, StatusFails, LastProgress, TotalDocs, Elapsed) when polling is aborted - Include target index name in periodic progress messages for log correlation - Change completion progress message from null to "Reindex complete" Server-side task cancellation: - Add TryCancelTaskAsync helper that calls _client.Tasks.CancelAsync when reindex monitoring is abandoned (MAX_STATUS_FAILS, timeout, cancellation) - Prevents orphaned Elasticsearch reindex tasks from consuming resources Unique index name validation: - AddIndex() now throws ArgumentException if an index with the same Name (case-insensitive) is already registered - Catches misconfiguration at startup rather than at runtime Tests: - ReindexAsync_ConcurrentCalls_OnlyOneCompletes - ReindexAsync_LockPreventsHandlerAndDirectConcurrency - ReindexAsync_AliasLockBlocksSubsequentVersionTransition - ReindexAsync_WhenCancelled_DoesNotCompleteReindex - AddIndex_WithDuplicateIndexName_ThrowsArgumentException - AddIndex_WithUniqueIndexNames_Succeeds - AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException Documentation: - Added Concurrency Safety section to docs/guide/index-management.md --- docs/guide/index-management.md | 27 ++++ .../Configuration/ElasticConfiguration.cs | 7 +- .../Configuration/IElasticConfiguration.cs | 2 + .../Configuration/VersionedIndex.cs | 14 +- .../Jobs/ReindexWorkItemHandler.cs | 2 +- .../Repositories/ElasticReindexer.cs | 50 +++++- .../IndexTests.cs | 52 +++++++ .../ReindexTests.cs | 144 ++++++++++++++++++ 8 files changed, 286 insertions(+), 12 deletions(-) diff --git a/docs/guide/index-management.md b/docs/guide/index-management.md index 32ba431d..7823d1ce 100644 --- a/docs/guide/index-management.md +++ b/docs/guide/index-management.md @@ -851,3 +851,30 @@ AddReindexScript(2, @" - [Migrations](/guide/migrations) - Document migrations - [Jobs](/guide/jobs) - Index maintenance jobs - [Elasticsearch Setup](/guide/elasticsearch-setup) - Connection configuration + +## Concurrency Safety + +Reindexing is protected by a distributed lock keyed on the index alias to prevent concurrent reindex operations from corrupting data. + +### Lock Strategy + +- **Lock key**: `reindex:{alias}` (e.g., `reindex:employees`) +- **Lock TTL**: 20 minutes, auto-renewed during long-running operations +- Both direct (`ElasticConfiguration.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock +- Only one reindex per logical index can run at a time — subsequent version transitions wait for the current one to complete + +### Why Alias-Only Keys + +Using the alias as the lock key ensures that sequential version transitions (v1→v2, then v2→v3) cannot overlap. If v2→v3 started before v1→v2 completed, v3 would contain incomplete data from v2. + +### Lock Renewal for Long-Running Reindexes + +For indexes with millions of documents that take hours to reindex, the lock is automatically renewed on every progress callback (every 1-10 seconds during the polling loop). This prevents lock expiration during legitimate long-running operations. + +### Crash Recovery + +If an instance crashes mid-reindex, the lock expires after 20 minutes. Another instance can then retry the reindex. `VersionedIndex.ReindexAsync()` is resume-safe — it picks up from the last document using timestamp-based or ID-based range queries. + +### Unique Index Names + +`ElasticConfiguration.AddIndex()` enforces unique index names (case-insensitive). Registering two indexes with the same alias throws an `ArgumentException` at startup, preventing conflicts before they can cause data corruption. diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs index bcd08a0b..7f300222 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs @@ -88,6 +88,7 @@ protected virtual void ConfigureSettings(ConnectionSettings settings) public IElasticClient Client => _client.Value; public ICacheClient Cache { get; } public IMessageBus MessageBus { get; } + public ILockProvider LockProvider => _lockProvider; public ILoggerFactory LoggerFactory { get; } public IResiliencePolicyProvider ResiliencePolicyProvider { get; } public IResiliencePolicy ResiliencePolicy { get; } @@ -116,6 +117,9 @@ public void AddIndex(IIndex index) if (_frozenIndexes.IsValueCreated) throw new InvalidOperationException("Can't add indexes after the list has been frozen."); + if (_indexes.Any(i => i.Name.Equals(index.Name, StringComparison.OrdinalIgnoreCase))) + throw new ArgumentException($"An index with name '{index.Name}' has already been registered.", nameof(index)); + _indexes.Add(index); } @@ -187,8 +191,7 @@ private async Task ConfigureIndexInternalAsync(IIndex idx, bool beginReindexingO throw new InvalidOperationException("Must specify work item queue and lock provider in order to migrate index versions."); var reindexWorkItem = versionedIndex.CreateReindexWorkItem(currentVersion); - bool isReindexing = await _lockProvider.IsLockedAsync(String.Join(":", "reindex", reindexWorkItem.Alias, - reindexWorkItem.OldIndex, reindexWorkItem.NewIndex)).AnyContext(); + bool isReindexing = await _lockProvider.IsLockedAsync(String.Concat("reindex:", reindexWorkItem.Alias)).AnyContext(); if (isReindexing) return; diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs index f520dbe4..72a2dba7 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using Foundatio.Caching; +using Foundatio.Lock; using Foundatio.Messaging; using Foundatio.Parsers.ElasticQueries; using Foundatio.Repositories.Elasticsearch.CustomFields; @@ -17,6 +18,7 @@ public interface IElasticConfiguration : IDisposable IElasticClient Client { get; } ICacheClient Cache { get; } IMessageBus MessageBus { get; } + ILockProvider LockProvider { get; } ILoggerFactory LoggerFactory { get; } IResiliencePolicyProvider ResiliencePolicyProvider { get; } TimeProvider TimeProvider { get; set; } diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 5b3d635f..bcd8161b 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -5,6 +5,7 @@ using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; +using Foundatio.Lock; using Foundatio.Parsers.ElasticQueries; using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Extensions; @@ -251,8 +252,19 @@ public override async Task ReindexAsync(Func? progressCallba return; var reindexWorkItem = CreateReindexWorkItem(currentVersion); + string lockKey = String.Concat("reindex:", Name); + + await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); + + Func wrappedCallback = async (progress, message) => + { + await reindexLock.RenewAsync().AnyContext(); + if (progressCallbackAsync is not null) + await progressCallbackAsync(progress, message).AnyContext(); + }; + var reindexer = new ElasticReindexer(Configuration.Client, _logger); - await reindexer.ReindexAsync(reindexWorkItem, progressCallbackAsync).AnyContext(); + await reindexer.ReindexAsync(reindexWorkItem, wrappedCallback).AnyContext(); } public override async Task MaintainAsync(bool includeOptionalTasks = true) diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs index e0699dc2..0b1feeaf 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs @@ -25,7 +25,7 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider, if (workItem is not ReindexWorkItem reindexWorkItem) return Task.FromResult(null); - return _lockProvider.TryAcquireAsync(String.Join(":", "reindex", reindexWorkItem.Alias, reindexWorkItem.OldIndex, reindexWorkItem.NewIndex), TimeSpan.FromMinutes(20), cancellationToken); + return _lockProvider.TryAcquireAsync(String.Concat("reindex:", reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); } public override Task HandleItemAsync(WorkItemContext context) diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 38fa38ed..071a1e63 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Elasticsearch.Net; +using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Extensions; using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Extensions; @@ -96,7 +97,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func(d => d.Index(workItem.NewIndex)).AnyContext(); _logger.LogRequest(newDocCountResponse); if (!newDocCountResponse.IsValid) - _logger.LogWarning("Failed to get new index doc count: {Error}", newDocCountResponse.ServerError); + _logger.LogWarning("Failed to get new index doc count: {Error}", newDocCountResponse.GetErrorMessage()); var oldDocCountResponse = await _client.CountAsync(d => d.Index(workItem.OldIndex)).AnyContext(); _logger.LogRequest(oldDocCountResponse); if (!oldDocCountResponse.IsValid) - _logger.LogWarning("Failed to get old index doc count: {Error}", oldDocCountResponse.ServerError); + _logger.LogWarning("Failed to get old index doc count: {Error}", oldDocCountResponse.GetErrorMessage()); await progressCallbackAsync(98, $"Old Docs: {oldDocCountResponse.Count} New Docs: {newDocCountResponse.Count}").AnyContext(); if (newDocCountResponse.IsValid && oldDocCountResponse.IsValid && newDocCountResponse.Count >= oldDocCountResponse.Count) @@ -135,7 +136,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func InternalReindexAsync(ReindexWorkItem workItem, Func progressCallbackAsync, int startProgress = 0, int endProgress = 100, DateTime? startTime = null, CancellationToken cancellationToken = default) @@ -196,7 +197,8 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, if (statusGetFails > MAX_STATUS_FAILS) { - _logger.LogError("Failed to get the status {FailureCount} times in a row", MAX_STATUS_FAILS); + _logger.LogError("Failed to get the status {FailureCount} times in a row for task {TaskId} reindexing {OldIndex} -> {NewIndex}", + MAX_STATUS_FAILS, result.Task.FullyQualifiedId, workItem.OldIndex, workItem.NewIndex); break; } @@ -221,7 +223,7 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, sw.Restart(); lastProgress = lastCompleted; - string lastMessage = $"Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; + string lastMessage = $"[{workItem.NewIndex}] Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; await progressCallbackAsync(CalculateProgress(status.Task.Status.Total, lastCompleted, startProgress, endProgress), lastMessage).AnyContext(); if (status.Completed && response?.Error == null) @@ -245,6 +247,22 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, } while (!cancellationToken.IsCancellationRequested); sw.Stop(); + if (!taskSuccess) + { + if (cancellationToken.IsCancellationRequested) + { + _logger.LogWarning("Reindex cancelled for {OldIndex} -> {NewIndex}. TaskId: {TaskId}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, lastProgress, totalDocs, sw.Elapsed); + } + else + { + _logger.LogError("Reindex abandoned for {OldIndex} -> {NewIndex}. TaskId: {TaskId}, StatusFails: {StatusFails}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); + } + + await TryCancelTaskAsync(result.Task, workItem.OldIndex, workItem.NewIndex).AnyContext(); + } + long failures = 0; if (lastReindexResponse?.Failures != null && lastReindexResponse.Failures.Count > 0) { @@ -318,6 +336,22 @@ private async Task HandleFailureAsync(ReindexWorkItem workItem, BulkIndexByScrol _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); } + private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) + { + try + { + var response = await _client.Tasks.CancelAsync(c => c.TaskId(taskId)).AnyContext(); + if (response.IsValid) + _logger.LogInformation("Cancelled reindex task {TaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); + else + _logger.LogWarning("Failed to cancel reindex task {TaskId}: {Error}", taskId.FullyQualifiedId, response.GetErrorMessage()); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Exception cancelling reindex task {TaskId}", taskId.FullyQualifiedId); + } + } + private async Task> GetIndexAliasesAsync(string index) { var aliasesResponse = await _client.Indices.GetAliasAsync(index).AnyContext(); diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs index ee6d5cac..18b4be66 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs @@ -3,8 +3,13 @@ using System.Linq; using System.Threading.Tasks; using Exceptionless.DateTimeExtensions; +using Foundatio.Caching; +using Foundatio.Jobs; +using Foundatio.Messaging; +using Foundatio.Queues; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; +using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models; using Foundatio.Repositories.Exceptions; @@ -1532,4 +1537,51 @@ public async Task MaintainIndexesAsync_WhenMarkerExists_PreservesMarker() // Assert Assert.True(HasConfigureIndexesCacheMarker()); } + + [Fact] + public void AddIndex_WithDuplicateIndexName_ThrowsArgumentException() + { + // Arrange + var cache = new InMemoryCacheClient(); + var messageBus = new InMemoryMessageBus(); + var queue = new InMemoryQueue(); + var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); + + // Act + var act = () => configuration.AddIndex(new EmployeeIndex(configuration)); + + // Assert + var ex = Assert.Throws(act); + Assert.Contains("employees", ex.Message); + } + + [Fact] + public void AddIndex_WithUniqueIndexNames_Succeeds() + { + // Arrange + var cache = new InMemoryCacheClient(); + var messageBus = new InMemoryMessageBus(); + var queue = new InMemoryQueue(); + var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); + + // Act & Assert -- configuration already has many indexes added in its constructor + Assert.True(configuration.Indexes.Count > 2); + } + + [Fact] + public void AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException() + { + // Arrange + var cache = new InMemoryCacheClient(); + var messageBus = new InMemoryMessageBus(); + var queue = new InMemoryQueue(); + var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); + + // Act -- VersionedEmployeeIndex uses the same "employees" alias as EmployeeIndex + var act = () => configuration.AddIndex(new VersionedEmployeeIndex(configuration, 1)); + + // Assert + var ex = Assert.Throws(act); + Assert.Contains("employees", ex.Message); + } } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs index 30623d57..e25eda5b 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Elasticsearch.Net; using Foundatio.AsyncEx; +using Foundatio.Lock; using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; @@ -958,6 +960,148 @@ public async Task RemoveFieldScript_WithNestedField_RemovesNestedFieldDuringRein Assert.Equal("{\"keepField\":\"keepValue\"}", json); } + [Fact] + public async Task ReindexAsync_ConcurrentCalls_OnlyOneCompletes() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + + await version2Index.ConfigureAsync(); + + // Act + var task1 = version2Index.ReindexAsync(); + var task2 = version2Index.ReindexAsync(); + await Task.WhenAll(task1, task2); + + // Assert + var aliasResponse = await _client.Indices.GetAliasAsync(version2Index.Name, ct: TestCancellationToken); + Assert.True(aliasResponse.IsValid); + Assert.Single(aliasResponse.Indices); + Assert.Equal(version2Index.VersionedName, aliasResponse.Indices.First().Key); + + var countResponse = await _client.CountAsync(d => d.Index(version2Index.VersionedName), TestCancellationToken); + Assert.True(countResponse.IsValid); + Assert.Equal(100, countResponse.Count); + + Assert.False((await _client.Indices.ExistsAsync(version1Index.VersionedName, ct: TestCancellationToken)).Exists); + } + + [Fact] + public async Task ReindexAsync_LockPreventsHandlerAndDirectConcurrency() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + await version2Index.ConfigureAsync(); + + string lockKey = String.Concat("reindex:", version2Index.Name); + await using var externalLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); + Assert.NotNull(externalLock); + + // Act - Start reindex on a background thread and verify it blocks + var reindexStarted = new TaskCompletionSource(); + var reindexTask = Task.Run(async () => + { + reindexStarted.SetResult(true); + await version2Index.ReindexAsync(); + }, TestCancellationToken); + await reindexStarted.Task; + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Assert - Lock prevented reindex from completing + Assert.False(reindexTask.IsCompleted); + + // Cleanup - Release lock and wait for reindex to complete, then delete + await externalLock.ReleaseAsync(); + await reindexTask; + await version2Index.DeleteAsync(); + } + + [Fact] + public async Task ReindexAsync_AliasLockBlocksSubsequentVersionTransition() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + var version3Index = new VersionedEmployeeIndex(_configuration, 3); + await version3Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); + await version2Index.ConfigureAsync(); + await version3Index.ConfigureAsync(); + + string lockKey = String.Concat("reindex:", version1Index.Name); + await using var reindexLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); + Assert.NotNull(reindexLock); + + // Act - Start reindex on a background thread and verify it blocks + var reindexStarted = new TaskCompletionSource(); + var reindexTask = Task.Run(async () => + { + reindexStarted.SetResult(true); + await version3Index.ReindexAsync(); + }, TestCancellationToken); + await reindexStarted.Task; + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Assert - Lock prevented reindex from completing; data unchanged + Assert.False(reindexTask.IsCompleted); + var countResponse = await _client.CountAsync(d => d.Index(version1Index.VersionedName), TestCancellationToken); + Assert.True(countResponse.IsValid); + Assert.Equal(100, countResponse.Count); + + // Cleanup - Release lock and wait for reindex to complete, then delete + await reindexLock.ReleaseAsync(); + await reindexTask; + await version3Index.DeleteAsync(); + } + + [Fact] + public async Task ReindexAsync_WhenCancelled_DoesNotCompleteReindex() + { + // Arrange + var version1Index = new VersionedEmployeeIndex(_configuration, 1); + await version1Index.DeleteAsync(); + var version2Index = new VersionedEmployeeIndex(_configuration, 2); + await version2Index.DeleteAsync(); + + await version1Index.ConfigureAsync(); + IEmployeeRepository repo = new EmployeeRepository(_configuration); + await repo.AddAsync(EmployeeGenerator.GenerateEmployees(5000), o => o.ImmediateConsistency()); + + await version2Index.ConfigureAsync(); + + // Act + var reindexTask = version2Index.ReindexAsync((progress, message) => + { + _logger.LogInformation("Reindex Progress {Progress}%: {Message}", progress, message); + if (progress > 5) + throw new OperationCanceledException("Test cancellation"); + return Task.CompletedTask; + }); + + // Assert + await Assert.ThrowsAnyAsync(() => reindexTask); + Assert.True((await _client.Indices.ExistsAsync(version1Index.VersionedName, ct: TestCancellationToken)).Exists); + } + private static string GetExpectedEmployeeDailyAliases(IIndex index, DateTime utcNow, DateTime indexDateUtc) { double totalDays = utcNow.Date.Subtract(indexDateUtc.Date).TotalDays; From ede49205d0c692b6c708f3005b61af9526ba430e Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 19:19:56 -0500 Subject: [PATCH 02/13] =?UTF-8?q?fix:=20address=20Copilot=20review=20?= =?UTF-8?q?=E2=80=94=20null=20lock=20guard,=20re-check=20version=20after?= =?UTF-8?q?=20acquire,=20use=20GetErrorMessage=20for=20delete?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configuration/VersionedIndex.cs | 10 ++++++++-- .../Repositories/ElasticReindexer.cs | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index bcd8161b..faba4c85 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -251,10 +251,16 @@ public override async Task ReindexAsync(Func? progressCallba if (currentVersion < 0 || currentVersion >= Version) return; - var reindexWorkItem = CreateReindexWorkItem(currentVersion); string lockKey = String.Concat("reindex:", Name); - await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); + if (reindexLock is null) + throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes."); + + currentVersion = await GetCurrentVersionAsync().AnyContext(); + if (currentVersion < 0 || currentVersion >= Version) + return; + + var reindexWorkItem = CreateReindexWorkItem(currentVersion); Func wrappedCallback = async (progress, message) => { diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 071a1e63..07875c9c 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -141,7 +141,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func Date: Fri, 8 May 2026 19:30:49 -0500 Subject: [PATCH 03/13] fix: add default progress logging in wrappedCallback, fix doc reference to VersionedIndex.ReindexAsync --- docs/guide/index-management.md | 2 +- .../Configuration/VersionedIndex.cs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/guide/index-management.md b/docs/guide/index-management.md index 7823d1ce..fc50563b 100644 --- a/docs/guide/index-management.md +++ b/docs/guide/index-management.md @@ -860,7 +860,7 @@ Reindexing is protected by a distributed lock keyed on the index alias to preven - **Lock key**: `reindex:{alias}` (e.g., `reindex:employees`) - **Lock TTL**: 20 minutes, auto-renewed during long-running operations -- Both direct (`ElasticConfiguration.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock +- Both direct (`VersionedIndex.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock - Only one reindex per logical index can run at a time — subsequent version transitions wait for the current one to complete ### Why Alias-Only Keys diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index faba4c85..060888ad 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -267,6 +267,8 @@ public override async Task ReindexAsync(Func? progressCallba await reindexLock.RenewAsync().AnyContext(); if (progressCallbackAsync is not null) await progressCallbackAsync(progress, message).AnyContext(); + else + _logger.LogInformation("Reindex Progress {Progress:F1}%: {Message}", progress, message); }; var reindexer = new ElasticReindexer(Configuration.Client, _logger); From 58cced243fe2abb49c5193a298ec3f42446a6ea8 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 20:42:54 -0500 Subject: [PATCH 04/13] chore(deps): update Foundatio package references to version 13.0.1-preview.0.12 Updated the Foundatio.Extensions.Hosting, Foundatio, Foundatio.JsonNet, and Foundatio.TestHarness package references to the latest preview version for improved compatibility and features. --- .../Server/Foundatio.SampleApp.Server.csproj | 2 +- src/Foundatio.Repositories/Foundatio.Repositories.csproj | 4 ++-- tests/Directory.Build.props | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj index ef7a8895..f5594f50 100644 --- a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj +++ b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/Foundatio.Repositories/Foundatio.Repositories.csproj b/src/Foundatio.Repositories/Foundatio.Repositories.csproj index bf59cf1f..aae2b03e 100644 --- a/src/Foundatio.Repositories/Foundatio.Repositories.csproj +++ b/src/Foundatio.Repositories/Foundatio.Repositories.csproj @@ -1,9 +1,9 @@ - + - + diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props index dd1bc7b0..7a4567aa 100644 --- a/tests/Directory.Build.props +++ b/tests/Directory.Build.props @@ -12,7 +12,7 @@ - + From 8b3fff474065b8d86f05ed4116eb321ea43543c5 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 20:43:53 -0500 Subject: [PATCH 05/13] fix: address PR feedback - static lock key, log scopes, cancel timeout, test cleanup --- .../Configuration/ElasticConfiguration.cs | 5 ++- .../Configuration/VersionedIndex.cs | 12 +++++- .../Jobs/ReindexWorkItem.cs | 12 +++++- .../Jobs/ReindexWorkItemHandler.cs | 4 +- .../Repositories/ElasticReindexer.cs | 39 +++++++++++------- .../IndexTests.cs | 40 +++++-------------- .../ReindexTests.cs | 5 ++- 7 files changed, 63 insertions(+), 54 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs index 7f300222..ef58ef0b 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs @@ -13,6 +13,7 @@ using Foundatio.Parsers.ElasticQueries; using Foundatio.Queues; using Foundatio.Repositories.Elasticsearch.CustomFields; +using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Elasticsearch.Queries.Builders; using Foundatio.Repositories.Extensions; using Foundatio.Resilience; @@ -117,7 +118,7 @@ public void AddIndex(IIndex index) if (_frozenIndexes.IsValueCreated) throw new InvalidOperationException("Can't add indexes after the list has been frozen."); - if (_indexes.Any(i => i.Name.Equals(index.Name, StringComparison.OrdinalIgnoreCase))) + if (_indexes.Any(i => String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase))) throw new ArgumentException($"An index with name '{index.Name}' has already been registered.", nameof(index)); _indexes.Add(index); @@ -191,7 +192,7 @@ private async Task ConfigureIndexInternalAsync(IIndex idx, bool beginReindexingO throw new InvalidOperationException("Must specify work item queue and lock provider in order to migrate index versions."); var reindexWorkItem = versionedIndex.CreateReindexWorkItem(currentVersion); - bool isReindexing = await _lockProvider.IsLockedAsync(String.Concat("reindex:", reindexWorkItem.Alias)).AnyContext(); + bool isReindexing = await _lockProvider.IsLockedAsync(ReindexWorkItem.GetLockName(versionedIndex.Name)).AnyContext(); if (isReindexing) return; diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 060888ad..a22039e0 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -251,7 +251,7 @@ public override async Task ReindexAsync(Func? progressCallba if (currentVersion < 0 || currentVersion >= Version) return; - string lockKey = String.Concat("reindex:", Name); + string lockKey = ReindexWorkItem.GetLockName(Name); await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); if (reindexLock is null) throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes."); @@ -264,7 +264,15 @@ public override async Task ReindexAsync(Func? progressCallba Func wrappedCallback = async (progress, message) => { - await reindexLock.RenewAsync().AnyContext(); + try + { + await reindexLock.RenewAsync().AnyContext(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to renew reindex lock for {IndexName}", Name); + } + if (progressCallbackAsync is not null) await progressCallbackAsync(progress, message).AnyContext(); else diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs index d14af9d1..02706d79 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs @@ -1,4 +1,4 @@ -using System; +using System; namespace Foundatio.Repositories.Elasticsearch.Jobs; @@ -11,4 +11,14 @@ public record ReindexWorkItem public bool DeleteOld { get; set; } public string? TimestampField { get; init; } public DateTime? StartUtc { get; init; } + + /// + /// Returns the distributed lock resource name for serializing reindex operations on the given alias. + /// + public static string GetLockName(string alias) + { + ArgumentException.ThrowIfNullOrEmpty(alias); + + return String.Concat("reindex:", alias); + } } diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs index 0b1feeaf..dd617342 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs @@ -22,10 +22,10 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider, public override Task GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) { - if (workItem is not ReindexWorkItem reindexWorkItem) + if (workItem is not ReindexWorkItem reindexWorkItem || String.IsNullOrEmpty(reindexWorkItem.Alias)) return Task.FromResult(null); - return _lockProvider.TryAcquireAsync(String.Concat("reindex:", reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); + return _lockProvider.TryAcquireAsync(ReindexWorkItem.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); } public override Task HandleItemAsync(WorkItemContext context) diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 07875c9c..e992b647 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -63,7 +63,14 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func + { + ["OldIndex"] = workItem.OldIndex, + ["NewIndex"] = workItem.NewIndex, + ["Alias"] = workItem.Alias ?? "" + }); + + _logger.LogInformation("Received reindex work item for {OldIndex} -> {NewIndex}", workItem.OldIndex, workItem.NewIndex); var startTime = _timeProvider.GetUtcNow().UtcDateTime.AddSeconds(-1); await progressCallbackAsync(0, "Starting reindex...").AnyContext(); var firstPassResult = await InternalReindexAsync(workItem, progressCallbackAsync, 0, 90, workItem.StartUtc).AnyContext(); @@ -97,7 +104,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); ReindexResult? secondPassResult = null; if (!String.IsNullOrEmpty(workItem.TimestampField)) @@ -118,17 +125,17 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); var newDocCountResponse = await _client.CountAsync(d => d.Index(workItem.NewIndex)).AnyContext(); _logger.LogRequest(newDocCountResponse); if (!newDocCountResponse.IsValid) - _logger.LogWarning("Failed to get new index doc count: {Error}", newDocCountResponse.GetErrorMessage()); + _logger.LogWarning("Failed to get new index doc count for {NewIndex}: {Error}", workItem.NewIndex, newDocCountResponse.GetErrorMessage()); var oldDocCountResponse = await _client.CountAsync(d => d.Index(workItem.OldIndex)).AnyContext(); _logger.LogRequest(oldDocCountResponse); if (!oldDocCountResponse.IsValid) - _logger.LogWarning("Failed to get old index doc count: {Error}", oldDocCountResponse.GetErrorMessage()); + _logger.LogWarning("Failed to get old index doc count for {OldIndex}: {Error}", workItem.OldIndex, oldDocCountResponse.GetErrorMessage()); await progressCallbackAsync(98, $"Old Docs: {oldDocCountResponse.Count} New Docs: {newDocCountResponse.Count}").AnyContext(); if (newDocCountResponse.IsValid && oldDocCountResponse.IsValid && newDocCountResponse.Count >= oldDocCountResponse.Count) @@ -174,7 +181,7 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, return response; }, cancellationToken).AnyContext(); - _logger.LogInformation("Reindex Task Id: {TaskId}", result.Task.FullyQualifiedId); + _logger.LogInformation("Reindex Task Id: {ReindexTaskId}", result.Task.FullyQualifiedId); _logger.LogRequest(result); long totalDocs = result.Total; @@ -197,8 +204,8 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, if (statusGetFails > MAX_STATUS_FAILS) { - _logger.LogError("Failed to get the status {FailureCount} times in a row for task {TaskId} reindexing {OldIndex} -> {NewIndex}", - MAX_STATUS_FAILS, result.Task.FullyQualifiedId, workItem.OldIndex, workItem.NewIndex); + _logger.LogError("Failed to get the status {FailureCount} times in a row for reindex task {ReindexTaskId} reindexing {OldIndex} -> {NewIndex}", + statusGetFails, result.Task.FullyQualifiedId, workItem.OldIndex, workItem.NewIndex); break; } @@ -251,12 +258,12 @@ private async Task InternalReindexAsync(ReindexWorkItem workItem, { if (cancellationToken.IsCancellationRequested) { - _logger.LogWarning("Reindex cancelled for {OldIndex} -> {NewIndex}. TaskId: {TaskId}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + _logger.LogWarning("Reindex cancelled for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, lastProgress, totalDocs, sw.Elapsed); } else { - _logger.LogError("Reindex abandoned for {OldIndex} -> {NewIndex}. TaskId: {TaskId}, StatusFails: {StatusFails}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", + _logger.LogError("Reindex abandoned for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, StatusFails: {StatusFails}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); } @@ -336,19 +343,23 @@ private async Task HandleFailureAsync(ReindexWorkItem workItem, BulkIndexByScrol _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); } + /// + /// Attempts to cancel the Elasticsearch server-side reindex task. Best-effort — failures are logged but not propagated. + /// private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) { try { - var response = await _client.Tasks.CancelAsync(c => c.TaskId(taskId)).AnyContext(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var response = await _client.Tasks.CancelAsync(c => c.TaskId(taskId), cts.Token).AnyContext(); if (response.IsValid) - _logger.LogInformation("Cancelled reindex task {TaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); + _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); else - _logger.LogWarning("Failed to cancel reindex task {TaskId}: {Error}", taskId.FullyQualifiedId, response.GetErrorMessage()); + _logger.LogWarning("Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}: {Error}", taskId.FullyQualifiedId, oldIndex, newIndex, response.GetErrorMessage()); } catch (Exception ex) { - _logger.LogWarning(ex, "Exception cancelling reindex task {TaskId}", taskId.FullyQualifiedId); + _logger.LogWarning(ex, "Exception cancelling reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); } } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs index 18b4be66..ded28ff5 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs @@ -3,10 +3,6 @@ using System.Linq; using System.Threading.Tasks; using Exceptionless.DateTimeExtensions; -using Foundatio.Caching; -using Foundatio.Jobs; -using Foundatio.Messaging; -using Foundatio.Queues; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration; @@ -1541,47 +1537,29 @@ public async Task MaintainIndexesAsync_WhenMarkerExists_PreservesMarker() [Fact] public void AddIndex_WithDuplicateIndexName_ThrowsArgumentException() { - // Arrange - var cache = new InMemoryCacheClient(); - var messageBus = new InMemoryMessageBus(); - var queue = new InMemoryQueue(); - var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); - - // Act - var act = () => configuration.AddIndex(new EmployeeIndex(configuration)); + // Arrange — _configuration already has EmployeeIndex registered - // Assert - var ex = Assert.Throws(act); + // Act & Assert + var ex = Assert.Throws(() => _configuration.AddIndex(new EmployeeIndex(_configuration))); Assert.Contains("employees", ex.Message); } [Fact] public void AddIndex_WithUniqueIndexNames_Succeeds() { - // Arrange - var cache = new InMemoryCacheClient(); - var messageBus = new InMemoryMessageBus(); - var queue = new InMemoryQueue(); - var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); + // Arrange & Act — _configuration registers many unique indexes in its constructor - // Act & Assert -- configuration already has many indexes added in its constructor - Assert.True(configuration.Indexes.Count > 2); + // Assert + Assert.True(_configuration.Indexes.Count > 2); } [Fact] public void AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException() { - // Arrange - var cache = new InMemoryCacheClient(); - var messageBus = new InMemoryMessageBus(); - var queue = new InMemoryQueue(); - var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); + // Arrange — _configuration already has EmployeeIndex ("employees") registered - // Act -- VersionedEmployeeIndex uses the same "employees" alias as EmployeeIndex - var act = () => configuration.AddIndex(new VersionedEmployeeIndex(configuration, 1)); - - // Assert - var ex = Assert.Throws(act); + // Act & Assert — VersionedEmployeeIndex uses the same "employees" alias + var ex = Assert.Throws(() => _configuration.AddIndex(new VersionedEmployeeIndex(_configuration, 1))); Assert.Contains("employees", ex.Message); } } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs index e25eda5b..1dfb50a9 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs @@ -9,6 +9,7 @@ using Foundatio.Parsers.ElasticQueries.Extensions; using Foundatio.Repositories.Elasticsearch.Configuration; using Foundatio.Repositories.Elasticsearch.Extensions; +using Foundatio.Repositories.Elasticsearch.Jobs; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes; using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models; using Foundatio.Repositories.Utility; @@ -1007,7 +1008,7 @@ public async Task ReindexAsync_LockPreventsHandlerAndDirectConcurrency() await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); await version2Index.ConfigureAsync(); - string lockKey = String.Concat("reindex:", version2Index.Name); + string lockKey = ReindexWorkItem.GetLockName(version2Index.Name); await using var externalLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); Assert.NotNull(externalLock); @@ -1047,7 +1048,7 @@ public async Task ReindexAsync_AliasLockBlocksSubsequentVersionTransition() await version2Index.ConfigureAsync(); await version3Index.ConfigureAsync(); - string lockKey = String.Concat("reindex:", version1Index.Name); + string lockKey = ReindexWorkItem.GetLockName(version1Index.Name); await using var reindexLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); Assert.NotNull(reindexLock); From ef04b9ac431e83c4f904a3a3228e9070ba1c25e4 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 20:46:05 -0500 Subject: [PATCH 06/13] refactor: move GetLockName from ReindexWorkItem record to ElasticReindexer --- .../Configuration/ElasticConfiguration.cs | 2 +- .../Configuration/VersionedIndex.cs | 2 +- .../Jobs/ReindexWorkItem.cs | 12 +----------- .../Jobs/ReindexWorkItemHandler.cs | 2 +- .../Repositories/ElasticReindexer.cs | 10 ++++++++++ .../ReindexTests.cs | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs index ef58ef0b..e99f9b9e 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs @@ -192,7 +192,7 @@ private async Task ConfigureIndexInternalAsync(IIndex idx, bool beginReindexingO throw new InvalidOperationException("Must specify work item queue and lock provider in order to migrate index versions."); var reindexWorkItem = versionedIndex.CreateReindexWorkItem(currentVersion); - bool isReindexing = await _lockProvider.IsLockedAsync(ReindexWorkItem.GetLockName(versionedIndex.Name)).AnyContext(); + bool isReindexing = await _lockProvider.IsLockedAsync(ElasticReindexer.GetLockName(versionedIndex.Name)).AnyContext(); if (isReindexing) return; diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index a22039e0..16733476 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -251,7 +251,7 @@ public override async Task ReindexAsync(Func? progressCallba if (currentVersion < 0 || currentVersion >= Version) return; - string lockKey = ReindexWorkItem.GetLockName(Name); + string lockKey = ElasticReindexer.GetLockName(Name); await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); if (reindexLock is null) throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes."); diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs index 02706d79..d14af9d1 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs @@ -1,4 +1,4 @@ -using System; +using System; namespace Foundatio.Repositories.Elasticsearch.Jobs; @@ -11,14 +11,4 @@ public record ReindexWorkItem public bool DeleteOld { get; set; } public string? TimestampField { get; init; } public DateTime? StartUtc { get; init; } - - /// - /// Returns the distributed lock resource name for serializing reindex operations on the given alias. - /// - public static string GetLockName(string alias) - { - ArgumentException.ThrowIfNullOrEmpty(alias); - - return String.Concat("reindex:", alias); - } } diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs index dd617342..32a1a5c5 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs @@ -25,7 +25,7 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider, if (workItem is not ReindexWorkItem reindexWorkItem || String.IsNullOrEmpty(reindexWorkItem.Alias)) return Task.FromResult(null); - return _lockProvider.TryAcquireAsync(ReindexWorkItem.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); + return _lockProvider.TryAcquireAsync(ElasticReindexer.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); } public override Task HandleItemAsync(WorkItemContext context) diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index e992b647..5ed39c55 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -28,6 +28,16 @@ public class ElasticReindexer private const string ID_FIELD = "id"; private const int MAX_STATUS_FAILS = 10; + /// + /// Returns the distributed lock resource name for serializing reindex operations on the given alias. + /// + public static string GetLockName(string alias) + { + ArgumentException.ThrowIfNullOrEmpty(alias); + + return String.Concat("reindex:", alias); + } + public ElasticReindexer(IElasticClient client, ILogger? logger = null) : this(client, TimeProvider.System, logger) { } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs index 1dfb50a9..cdd6163b 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs @@ -1008,7 +1008,7 @@ public async Task ReindexAsync_LockPreventsHandlerAndDirectConcurrency() await repo.AddAsync(EmployeeGenerator.GenerateEmployees(100), o => o.ImmediateConsistency()); await version2Index.ConfigureAsync(); - string lockKey = ReindexWorkItem.GetLockName(version2Index.Name); + string lockKey = ElasticReindexer.GetLockName(version2Index.Name); await using var externalLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); Assert.NotNull(externalLock); @@ -1048,7 +1048,7 @@ public async Task ReindexAsync_AliasLockBlocksSubsequentVersionTransition() await version2Index.ConfigureAsync(); await version3Index.ConfigureAsync(); - string lockKey = ReindexWorkItem.GetLockName(version1Index.Name); + string lockKey = ElasticReindexer.GetLockName(version1Index.Name); await using var reindexLock = await _configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(1), TestCancellationToken); Assert.NotNull(reindexLock); From c3737dd9040ccea906a641a21c9039a4edead059 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 20:56:16 -0500 Subject: [PATCH 07/13] fix: use fresh config in AddIndex tests to avoid frozen list --- .../IndexTests.cs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs index ded28ff5..137163fb 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs @@ -1537,29 +1537,32 @@ public async Task MaintainIndexesAsync_WhenMarkerExists_PreservesMarker() [Fact] public void AddIndex_WithDuplicateIndexName_ThrowsArgumentException() { - // Arrange — _configuration already has EmployeeIndex registered + // Arrange — fresh config so the index list isn't frozen + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); // Act & Assert - var ex = Assert.Throws(() => _configuration.AddIndex(new EmployeeIndex(_configuration))); + var ex = Assert.Throws(() => config.AddIndex(new EmployeeIndex(config))); Assert.Contains("employees", ex.Message); } [Fact] public void AddIndex_WithUniqueIndexNames_Succeeds() { - // Arrange & Act — _configuration registers many unique indexes in its constructor + // Arrange — fresh config registers many unique indexes in its constructor + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); // Assert - Assert.True(_configuration.Indexes.Count > 2); + Assert.True(config.Indexes.Count > 2); } [Fact] public void AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException() { - // Arrange — _configuration already has EmployeeIndex ("employees") registered + // Arrange — fresh config so the index list isn't frozen + var config = new MyAppElasticConfiguration(_workItemQueue, _cache, _messageBus, Log); // Act & Assert — VersionedEmployeeIndex uses the same "employees" alias - var ex = Assert.Throws(() => _configuration.AddIndex(new VersionedEmployeeIndex(_configuration, 1))); + var ex = Assert.Throws(() => config.AddIndex(new VersionedEmployeeIndex(config, 1))); Assert.Contains("employees", ex.Message); } } From f6c42d4927cff5bafedd9271ead1b0b59ea6f704 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 20:59:51 -0500 Subject: [PATCH 08/13] refactor: rename taskId parameter to reindexTaskId in TryCancelTaskAsync --- .../Repositories/ElasticReindexer.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 5ed39c55..772ccd2f 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -356,20 +356,20 @@ private async Task HandleFailureAsync(ReindexWorkItem workItem, BulkIndexByScrol /// /// Attempts to cancel the Elasticsearch server-side reindex task. Best-effort — failures are logged but not propagated. /// - private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) + private async Task TryCancelTaskAsync(TaskId reindexTaskId, string oldIndex, string newIndex) { try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var response = await _client.Tasks.CancelAsync(c => c.TaskId(taskId), cts.Token).AnyContext(); + var response = await _client.Tasks.CancelAsync(c => c.TaskId(reindexTaskId), cts.Token).AnyContext(); if (response.IsValid) - _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); + _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); else - _logger.LogWarning("Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}: {Error}", taskId.FullyQualifiedId, oldIndex, newIndex, response.GetErrorMessage()); + _logger.LogWarning("Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}: {Error}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex, response.GetErrorMessage()); } catch (Exception ex) { - _logger.LogWarning(ex, "Exception cancelling reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); + _logger.LogWarning(ex, "Exception cancelling reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); } } From 518af9e992363dbaea3eee38f5ef2e8be4540560 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 21:02:08 -0500 Subject: [PATCH 09/13] fix: address PR feedback - nameof, LogRequest, progress callback try/catch --- .../Configuration/VersionedIndex.cs | 17 ++++++++++++++++- .../Repositories/ElasticReindexer.cs | 7 ++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 16733476..43596fa5 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -274,9 +274,24 @@ public override async Task ReindexAsync(Func? progressCallba } if (progressCallbackAsync is not null) - await progressCallbackAsync(progress, message).AnyContext(); + { + try + { + await progressCallbackAsync(progress, message).AnyContext(); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Progress callback threw for {IndexName}", Name); + } + } else + { _logger.LogInformation("Reindex Progress {Progress:F1}%: {Message}", progress, message); + } }; var reindexer = new ElasticReindexer(Configuration.Client, _logger); diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 772ccd2f..56e45d94 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -75,9 +75,9 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func { - ["OldIndex"] = workItem.OldIndex, - ["NewIndex"] = workItem.NewIndex, - ["Alias"] = workItem.Alias ?? "" + [nameof(workItem.OldIndex)] = workItem.OldIndex, + [nameof(workItem.NewIndex)] = workItem.NewIndex, + [nameof(workItem.Alias)] = workItem.Alias ?? "" }); _logger.LogInformation("Received reindex work item for {OldIndex} -> {NewIndex}", workItem.OldIndex, workItem.NewIndex); @@ -362,6 +362,7 @@ private async Task TryCancelTaskAsync(TaskId reindexTaskId, string oldIndex, str { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var response = await _client.Tasks.CancelAsync(c => c.TaskId(reindexTaskId), cts.Token).AnyContext(); + _logger.LogRequest(response); if (response.IsValid) _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); else From e0202ff5f88eba4e79d127784d8f05e99ea94338 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 21:09:05 -0500 Subject: [PATCH 10/13] fix: use LogErrorRequest for cancel failures, make Alias required - Use LogRequest only on success, LogErrorRequest on failure in TryCancelTaskAsync - Make ReindexWorkItem.Alias required since all callers already set it - Remove null guard in ReindexWorkItemHandler (Alias is now required) - Add Alias = Name in base Index.ReindexAsync for non-versioned indexes --- .../.idea.Foundatio.Repositories/.idea/indexLayout.xml | 8 ++++++++ .../.idea/projectSettingsUpdater.xml | 8 ++++++++ .idea/.idea.Foundatio.Repositories/.idea/vcs.xml | 6 ++++++ .../Configuration/Index.cs | 1 + .../Jobs/ReindexWorkItem.cs | 2 +- .../Jobs/ReindexWorkItemHandler.cs | 2 +- .../Repositories/ElasticReindexer.cs | 10 +++++++--- 7 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 .idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml create mode 100644 .idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml create mode 100644 .idea/.idea.Foundatio.Repositories/.idea/vcs.xml diff --git a/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml b/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml new file mode 100644 index 00000000..7b08163c --- /dev/null +++ b/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml b/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml new file mode 100644 index 00000000..ef20cb08 --- /dev/null +++ b/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml @@ -0,0 +1,8 @@ + + + + + \ No newline at end of file diff --git a/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml b/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs index 2dc85fab..5b454815 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs @@ -351,6 +351,7 @@ public virtual Task ReindexAsync(Func? progressCallbackAsync { OldIndex = Name, NewIndex = Name, + Alias = Name, DeleteOld = false, TimestampField = GetTimeStampField() }; diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs index d14af9d1..8d07bfc5 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs @@ -6,7 +6,7 @@ public record ReindexWorkItem { public required string OldIndex { get; init; } public required string NewIndex { get; init; } - public string? Alias { get; init; } + public required string Alias { get; init; } public string? Script { get; init; } public bool DeleteOld { get; set; } public string? TimestampField { get; init; } diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs index 32a1a5c5..7b40395f 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs @@ -22,7 +22,7 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider, public override Task GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) { - if (workItem is not ReindexWorkItem reindexWorkItem || String.IsNullOrEmpty(reindexWorkItem.Alias)) + if (workItem is not ReindexWorkItem reindexWorkItem) return Task.FromResult(null); return _lockProvider.TryAcquireAsync(ElasticReindexer.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs index 56e45d94..99349e0c 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs @@ -77,7 +77,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}", workItem.OldIndex, workItem.NewIndex); @@ -362,11 +362,15 @@ private async Task TryCancelTaskAsync(TaskId reindexTaskId, string oldIndex, str { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var response = await _client.Tasks.CancelAsync(c => c.TaskId(reindexTaskId), cts.Token).AnyContext(); - _logger.LogRequest(response); if (response.IsValid) + { + _logger.LogRequest(response); _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); + } else - _logger.LogWarning("Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}: {Error}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex, response.GetErrorMessage()); + { + _logger.LogErrorRequest(response, "Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); + } } catch (Exception ex) { From 97ad9c8287f7af1d7a0597a8a5fdfd3e88fd914e Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 21:11:41 -0500 Subject: [PATCH 11/13] chore: remove .idea files from tracking --- .idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml | 8 -------- .../.idea/projectSettingsUpdater.xml | 8 -------- .idea/.idea.Foundatio.Repositories/.idea/vcs.xml | 6 ------ 3 files changed, 22 deletions(-) delete mode 100644 .idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml delete mode 100644 .idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml delete mode 100644 .idea/.idea.Foundatio.Repositories/.idea/vcs.xml diff --git a/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml b/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml deleted file mode 100644 index 7b08163c..00000000 --- a/.idea/.idea.Foundatio.Repositories/.idea/indexLayout.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml b/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml deleted file mode 100644 index ef20cb08..00000000 --- a/.idea/.idea.Foundatio.Repositories/.idea/projectSettingsUpdater.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml b/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml deleted file mode 100644 index 35eb1ddf..00000000 --- a/.idea/.idea.Foundatio.Repositories/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From 69f372dd44d491dae68f825ee16e9827006d1002 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 21:20:57 -0500 Subject: [PATCH 12/13] fix: let progress callback exceptions propagate to allow reindex abort The try/catch was swallowing ApplicationException from the progress callback, which broke CanResumeReindexAsync test that relies on throwing to abort. Lock renewal remains defensive; callback exceptions are intentional signals. --- .gitignore | 1 + .../Configuration/VersionedIndex.cs | 13 +------------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index b92df279..cfc2ba8b 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ _NCrunch_* **/.idea/**/contentModel.xml **/.idea/**/modules.xml **/.idea/copilot/chatSessions/ +.idea/ diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 43596fa5..35091e55 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -275,18 +275,7 @@ public override async Task ReindexAsync(Func? progressCallba if (progressCallbackAsync is not null) { - try - { - await progressCallbackAsync(progress, message).AnyContext(); - } - catch (OperationCanceledException) - { - throw; - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Progress callback threw for {IndexName}", Name); - } + await progressCallbackAsync(progress, message).AnyContext(); } else { From 3591d4b09cf6608af8a7af1720211d8923b0742d Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 8 May 2026 21:22:36 -0500 Subject: [PATCH 13/13] fix: let lock renewal failure propagate to abort reindex If we can't renew the lock, continuing risks running without concurrency protection. Better to abort and let a retry handle it safely. --- .../Configuration/VersionedIndex.cs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs index 35091e55..98a8008c 100644 --- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs +++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs @@ -264,14 +264,7 @@ public override async Task ReindexAsync(Func? progressCallba Func wrappedCallback = async (progress, message) => { - try - { - await reindexLock.RenewAsync().AnyContext(); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Failed to renew reindex lock for {IndexName}", Name); - } + await reindexLock.RenewAsync().AnyContext(); if (progressCallbackAsync is not null) {