fix: improve reindex reliability with distributed locking and observability#271
fix: improve reindex reliability with distributed locking and observability#271
Conversation
…bility
Close a concurrency gap where VersionedIndex.ReindexAsync() bypassed the
distributed lock used by ReindexWorkItemHandler, allowing simultaneous
reindex operations against the same index. Unify both paths under a single
alias-keyed lock (reindex:{alias}) that serializes all reindex operations
for a logical index.
Key changes:
Concurrency safety:
- VersionedIndex.ReindexAsync() now acquires the same distributed lock as
the work-item handler before starting a reindex
- Lock is auto-renewed on every progress callback to support multi-hour
reindexes of large indexes
- Simplified lock key from reindex:{alias}:{old}:{new} to reindex:{alias}
to prevent overlapping sequential version transitions (v1->v2 must finish
before v2->v3 can start)
- Updated IsLockedAsync check in ConfigureIndexInternalAsync to match new key
- Exposed ILockProvider on IElasticConfiguration interface
Observability improvements:
- Include TaskId, OldIndex, NewIndex in MAX_STATUS_FAILS error log
- Replace .ServerError (often null) with .GetErrorMessage() for all refresh
and count response warnings
- Add explicit "reindex abandoned" log with full context (TaskId, StatusFails,
LastProgress, TotalDocs, Elapsed) when polling is aborted
- Include target index name in periodic progress messages for log correlation
- Change completion progress message from null to "Reindex complete"
Server-side task cancellation:
- Add TryCancelTaskAsync helper that calls _client.Tasks.CancelAsync when
reindex monitoring is abandoned (MAX_STATUS_FAILS, timeout, cancellation)
- Prevents orphaned Elasticsearch reindex tasks from consuming resources
Unique index name validation:
- AddIndex() now throws ArgumentException if an index with the same Name
(case-insensitive) is already registered
- Catches misconfiguration at startup rather than at runtime
Tests:
- ReindexAsync_ConcurrentCalls_OnlyOneCompletes
- ReindexAsync_LockPreventsHandlerAndDirectConcurrency
- ReindexAsync_AliasLockBlocksSubsequentVersionTransition
- ReindexAsync_WhenCancelled_DoesNotCompleteReindex
- AddIndex_WithDuplicateIndexName_ThrowsArgumentException
- AddIndex_WithUniqueIndexNames_Succeeds
- AddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException
Documentation:
- Added Concurrency Safety section to docs/guide/index-management.md
There was a problem hiding this comment.
Pull request overview
This PR aims to make Elasticsearch reindexing safer and easier to operate in distributed deployments by unifying all reindex triggers under the same alias-keyed distributed lock and improving logging/error context when monitoring or cancellation occurs.
Changes:
- Unifies reindex concurrency control by using a single distributed lock key format (
reindex:{alias}) across both direct reindex calls and work-item processing, and exposesLockProviderviaIElasticConfiguration. - Improves reindex observability (more contextual logs, consistent error messages via
GetErrorMessage(), clearer progress messages) and adds best-effort server-side task cancellation when monitoring aborts. - Fails fast on duplicate index registrations (case-insensitive) and adds tests for lock behavior and duplicate index validation.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/Foundatio.Repositories.Elasticsearch.Tests/ReindexTests.cs | Adds coverage for alias-keyed locking behavior, concurrency, and cancellation behavior. |
| tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs | Adds tests ensuring duplicate index names are rejected (case-insensitive). |
| src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs | Enhances error logging, progress messaging, and adds task cancellation attempts on abort. |
| src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs | Switches work-item lock key to alias-only (reindex:{alias}). |
| src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs | Adds distributed lock acquisition + renewal around VersionedIndex.ReindexAsync(). |
| src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs | Adds ILockProvider LockProvider to the public configuration interface. |
| src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs | Implements LockProvider property, enforces unique index names, and updates reindex lock key checks. |
| docs/guide/index-management.md | Documents the alias-keyed lock strategy, renewal behavior, and duplicate name validation. |
Comments suppressed due to low confidence (1)
src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs:230
- If
progressCallbackAsyncthrows (includingOperationCanceledExceptionfrom caller-provided callbacks),InternalReindexAsyncwill exit before reaching the!taskSuccessblock, soTryCancelTaskAsyncwon't run and the server-side reindex task may continue indefinitely. Consider wrapping progress callback invocations in try/catch/finally so that any abort due to callback exceptions still attempts task cancellation (then rethrows).
sw.Restart();
lastProgress = lastCompleted;
string lastMessage = $"[{workItem.NewIndex}] Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}";
await progressCallbackAsync(CalculateProgress(status.Task.Status.Total, lastCompleted, startProgress, endProgress), lastMessage).AnyContext();
if (status.Completed && response?.Error == null)
{
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| var reindexWorkItem = CreateReindexWorkItem(currentVersion); | ||
| string lockKey = String.Concat("reindex:", Name); | ||
|
|
||
| await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); |
There was a problem hiding this comment.
Fixed. Added null check after AcquireAsync — throws InvalidOperationException with a clear message if the lock can't be acquired within the 30-minute timeout.
| return; | ||
|
|
||
| var reindexWorkItem = CreateReindexWorkItem(currentVersion); | ||
| string lockKey = String.Concat("reindex:", Name); |
There was a problem hiding this comment.
Fixed. Moved \GetCurrentVersionAsync()\ and \CreateReindexWorkItem()\ to after lock acquisition, with a second version check. If another instance completed the reindex while we waited for the lock, we now detect that and return early.
| @@ -144,7 +145,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func<int, string?, Task | |||
| } | |||
There was a problem hiding this comment.
Fixed. Switched to .GetErrorMessage()\ for the delete response progress message, consistent with all other error reporting in this PR.
… acquire, use GetErrorMessage for delete
| if (progressCallbackAsync is not null) | ||
| await progressCallbackAsync(progress, message).AnyContext(); |
There was a problem hiding this comment.
Fixed. Added a fallback _logger.LogInformation(...)\ in the wrappedCallback when \progressCallbackAsync\ is null, matching the same format the reindexer's default would use. This ensures lock renewal still happens while progress is logged.
|
|
||
| - **Lock key**: `reindex:{alias}` (e.g., `reindex:employees`) | ||
| - **Lock TTL**: 20 minutes, auto-renewed during long-running operations | ||
| - Both direct (`ElasticConfiguration.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock |
There was a problem hiding this comment.
Fixed — updated reference to \VersionedIndex.ReindexAsync().
…ce to VersionedIndex.ReindexAsync
| if (_frozenIndexes.IsValueCreated) | ||
| throw new InvalidOperationException("Can't add indexes after the list has been frozen."); | ||
|
|
||
| if (_indexes.Any(i => i.Name.Equals(index.Name, StringComparison.OrdinalIgnoreCase))) |
There was a problem hiding this comment.
Fixed — using \String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase)\ now.
There was a problem hiding this comment.
Fixed — using \String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase)\ now.
There was a problem hiding this comment.
Fixed — using \String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase).
| var refreshResponse = await _client.Indices.RefreshAsync(Indices.All).AnyContext(); | ||
| _logger.LogRequest(refreshResponse); | ||
| if (!refreshResponse.IsValid) | ||
| _logger.LogWarning("Failed to refresh indices before second reindex pass: {Error}", refreshResponse.ServerError); |
There was a problem hiding this comment.
All these messages in this method are very generic, do we have log scope for all contextual data like index names, and or can we also update the messages to at least say the indicies?
There was a problem hiding this comment.
Fixed — added a log scope at the top of \ReindexAsync\ with OldIndex, NewIndex, and Alias. Also updated all warning messages in the method to include the index names explicitly.
There was a problem hiding this comment.
Added log scope with OldIndex, NewIndex, and Alias at the start of ReindexAsync. All messages within the method now automatically include this context.
| workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); | ||
| } | ||
|
|
||
| await TryCancelTaskAsync(result.Task, workItem.OldIndex, workItem.NewIndex).AnyContext(); |
There was a problem hiding this comment.
What is Task here? It makes it sound like a tpl task, when it's the Elastic TaskId
There was a problem hiding this comment.
Renamed all log templates from {TaskId}\ to {ReindexTaskId}\ to make it clear these reference the Elasticsearch Tasks API task ID, not a TPL Task.
There was a problem hiding this comment.
why wasn't this renamed?
There was a problem hiding this comment.
Done — renamed the parameter from taskId to reindexTaskId in TryCancelTaskAsync.
There was a problem hiding this comment.
Renamed to {ReindexTaskId} throughout to differentiate from TPL Tasks.
| _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); | ||
| } | ||
|
|
||
| private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) |
There was a problem hiding this comment.
Added XML doc comment to \TryCancelTaskAsync.
There was a problem hiding this comment.
Added XML doc comments to TryCancelTaskAsync.
| if (response.IsValid) | ||
| _logger.LogInformation("Cancelled reindex task {TaskId} for {OldIndex} -> {NewIndex}", taskId.FullyQualifiedId, oldIndex, newIndex); | ||
| else | ||
| _logger.LogWarning("Failed to cancel reindex task {TaskId}: {Error}", taskId.FullyQualifiedId, response.GetErrorMessage()); |
There was a problem hiding this comment.
Fixed — all cancel log messages now include {OldIndex} -> {NewIndex}.
There was a problem hiding this comment.
Fixed — cancel log messages now include {OldIndex} -> {NewIndex}.
| _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); | ||
| } | ||
|
|
||
| private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) |
There was a problem hiding this comment.
really feels like we need log scopes in elastic reindexer, with taskid, index names etc.
There was a problem hiding this comment.
Done — added a BeginScope at the top of ReindexAsync with OldIndex, NewIndex, and Alias. These properties are attached to every log message within the method automatically.
There was a problem hiding this comment.
Added using var _ = _logger.BeginScope(...) with OldIndex, NewIndex, Alias at method entry. All log messages within ReindexAsync automatically get this context.
| var configuration = new MyAppElasticConfiguration(queue, cache, messageBus, Log); | ||
|
|
||
| // Act | ||
| var act = () => configuration.AddIndex(new EmployeeIndex(configuration)); |
There was a problem hiding this comment.
this is horrible and not idomatic.
There was a problem hiding this comment.
Fixed — tests now use _configuration\ from the base class directly instead of creating standalone instances.
There was a problem hiding this comment.
Rewrote. Tests now create a fresh MyAppElasticConfiguration instance using DI objects from the base class, with standard Assert.Throws pattern.
| var cache = new InMemoryCacheClient(); | ||
| var messageBus = new InMemoryMessageBus(); | ||
| var queue = new InMemoryQueue<WorkItemData>(); |
There was a problem hiding this comment.
Can you get these from DI, that would be nice for all tests added here as we want the logger, time provider, serializer, resilience flowed through...
There was a problem hiding this comment.
Fixed — tests now use _configuration\ from \ElasticRepositoryTestBase\ which already flows the logger, time provider, cache, message bus, and queue through DI.
There was a problem hiding this comment.
Done — tests now use _workItemQueue, _cache, _messageBus, and Log from the ElasticRepositoryTestBase base class which flows logger, time provider, etc. through DI.
| await progressCallbackAsync(CalculateProgress(status.Task.Status.Total, lastCompleted, startProgress, endProgress), lastMessage).AnyContext(); | ||
|
|
There was a problem hiding this comment.
TryCancelTaskAsync is a best-effort cleanup that fires in the !taskSuccess block. If a progress callback throws, the exception propagates up the call stack and the reindex is already dead — the server-side task will still be running but will eventually idle-timeout. This is acceptable for a reliability improvement PR; a future enhancement could add cancellation in the exception path if needed.
| public override Task<ILock?> GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) | ||
| { | ||
| if (workItem is not ReindexWorkItem reindexWorkItem) | ||
| return Task.FromResult<ILock?>(null); | ||
|
|
||
| return _lockProvider.TryAcquireAsync(String.Join(":", "reindex", reindexWorkItem.Alias, reindexWorkItem.OldIndex, reindexWorkItem.NewIndex), TimeSpan.FromMinutes(20), cancellationToken); | ||
| return _lockProvider.TryAcquireAsync(String.Concat("reindex:", reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); | ||
| } |
There was a problem hiding this comment.
We should throw based on the alias name, that should be a required prop no?
There was a problem hiding this comment.
Addressed — ElasticReindexer.GetLockName(alias) now throws ArgumentException.ThrowIfNullOrEmpty(alias). The handler returns null (no lock) if alias is missing, since the work item might not have one set.
…eview.0.12 Updated the Foundatio.Extensions.Hosting, Foundatio, Foundatio.JsonNet, and Foundatio.TestHarness package references to the latest preview version for improved compatibility and features.
| return; | ||
|
|
||
| string lockKey = ElasticReindexer.GetLockName(Name); | ||
| await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); |
There was a problem hiding this comment.
should this be TryAcquire so it doesn't throw?
There was a problem hiding this comment.
AcquireAsync with a timeout returns null on timeout (doesn't throw). The null check on line 256 handles the timeout gracefully with a clear error message. TryAcquire would have the same semantics but with a zero timeout — we actually want to wait up to 30 minutes for other reindexes to finish.
| } | ||
| catch (Exception ex) | ||
| { | ||
| _logger.LogWarning(ex, "Failed to renew reindex lock for {IndexName}", Name); |
There was a problem hiding this comment.
what about logging message
| _logger.LogWarning(ex, "Failed to renew reindex lock for {IndexName}", Name); | |
| _logger.LogWarning(ex, "Failed to renew reindex lock for {IndexName}: {Message}", Name, ex.Message); |
There was a problem hiding this comment.
Added — now includes ex.Message in the log template.
| } | ||
|
|
||
| if (progressCallbackAsync is not null) | ||
| await progressCallbackAsync(progress, message).AnyContext(); |
There was a problem hiding this comment.
this is a user defined callback, we should have catch around this with logging?
There was a problem hiding this comment.
Added try/catch around the progress callback. OperationCanceledException is re-thrown (to support cancellation flow control), other exceptions are logged and swallowed.
| ["OldIndex"] = workItem.OldIndex, | ||
| ["NewIndex"] = workItem.NewIndex, | ||
| ["Alias"] = workItem.Alias ?? "" |
There was a problem hiding this comment.
Fixed — using nameof(workItem.OldIndex), nameof(workItem.NewIndex), nameof(workItem.Alias) for the scope keys.
| /// <summary> | ||
| /// Attempts to cancel the Elasticsearch server-side reindex task. Best-effort — failures are logged but not propagated. | ||
| /// </summary> | ||
| private async Task TryCancelTaskAsync(TaskId taskId, string oldIndex, string newIndex) |
There was a problem hiding this comment.
this should be renamed.
There was a problem hiding this comment.
Fixed — renamed the parameter from taskId to reindexTaskId.
| { | ||
| try | ||
| { | ||
| using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); |
There was a problem hiding this comment.
is there a linked dispose token we can use too?
There was a problem hiding this comment.
This method is only called from best-effort cleanup paths (after the main reindex has already failed/been abandoned). There's no meaningful parent token to link to at that point — the reindex CancellationToken has already been triggered or the loop has broken. The 30s timeout is a safety net to avoid blocking shutdown if the cluster is unreachable.
| try | ||
| { | ||
| using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); | ||
| var response = await _client.Tasks.CancelAsync(c => c.TaskId(taskId), cts.Token).AnyContext(); |
There was a problem hiding this comment.
why are we not logging the elastic request out using the helpers?
There was a problem hiding this comment.
Fixed — added _logger.LogRequest(response) to the cancel path.
| if (workItem is not ReindexWorkItem reindexWorkItem || String.IsNullOrEmpty(reindexWorkItem.Alias)) | ||
| return Task.FromResult<ILock?>(null); | ||
|
|
||
| return _lockProvider.TryAcquireAsync(String.Join(":", "reindex", reindexWorkItem.Alias, reindexWorkItem.OldIndex, reindexWorkItem.NewIndex), TimeSpan.FromMinutes(20), cancellationToken); | ||
| return _lockProvider.TryAcquireAsync(ElasticReindexer.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken); |
There was a problem hiding this comment.
Returning null when alias is missing is intentional — it means the work item runs without a lock. This preserves backward compatibility for non-aliased reindexes. In practice, all VersionedIndex reindexes populate the alias from Name which is always set. If alias is somehow missing, the reindex still runs (just without concurrency protection), which is safer than blocking it entirely.
| workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); | ||
| } | ||
|
|
||
| await TryCancelTaskAsync(result.Task, workItem.OldIndex, workItem.NewIndex).AnyContext(); |
There was a problem hiding this comment.
Same as the other TryCancelTaskAsync comment — when the exception propagates from the polling loop, we don't get to the !taskSuccess block. The server-side task will idle-timeout. Adding cancellation in every catch path would add complexity for marginal benefit. Accepted for now.
- Use LogRequest only on success, LogErrorRequest on failure in TryCancelTaskAsync - Make ReindexWorkItem.Alias required since all callers already set it - Remove null guard in ReindexWorkItemHandler (Alias is now required) - Add Alias = Name in base Index.ReindexAsync for non-versioned indexes
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs:223
- On task-status polling failures, the code
continue;s without any delay/backoff. If Elasticsearch is temporarily unhealthy this becomes a tight loop (CPU/log spam and repeated status calls) untilMAX_STATUS_FAILSis hit. Add a small delay/backoff (and honorcancellationToken) before retrying after an invalid status response.
break;
}
continue;
}
The try/catch was swallowing ApplicationException from the progress callback, which broke CanResumeReindexAsync test that relies on throwing to abort. Lock renewal remains defensive; callback exceptions are intentional signals.
If we can't renew the lock, continuing risks running without concurrency protection. Better to abort and let a retry handle it safely.
Summary
Closes a concurrency gap where
VersionedIndex.ReindexAsync()bypassed the distributed lock used byReindexWorkItemHandler, allowing simultaneous reindex operations against the same index from multiple service instances. This could lead to data corruption, orphaned Elasticsearch tasks, and confusing error logs.This PR unifies both reindex paths (direct call and work-item handler) under a single alias-keyed distributed lock and improves observability for diagnosing reindex issues in production.
Changes
1. Concurrency Safety — Alias-Keyed Distributed Lock
Problem: Two code paths can trigger a reindex:
VersionedIndex.ReindexAsync()(called duringConfigureIndexesAsync) andReindexWorkItemHandler(processes queued work items). Only the handler acquired a lock, so the direct path could run concurrently with the handler or with other instances callingReindexAsync()simultaneously.Fix:
VersionedIndex.ReindexAsync()now acquires the same distributed lock before startingreindex:{alias}:{oldIndex}:{newIndex}toreindex:{alias}ILockProvider LockProviderexposed onIElasticConfigurationinterface2. Observability Improvements
Problem: When reindex monitoring fails (e.g., status polling errors, abandoned tasks), logs lacked sufficient context to diagnose root causes. Error messages were often null or missing critical identifiers.
Fix:
MAX_STATUS_FAILSerror now includes TaskId, OldIndex, and NewIndex.ServerErrorreferences (which can be null even on failure) replaced with.GetErrorMessage()which always returns a meaningful stringnullto"Reindex complete"3. Server-Side Task Cancellation
Problem: When reindex monitoring is abandoned (MAX_STATUS_FAILS reached, timeout, or cancellation), the Elasticsearch server-side reindex task continues running indefinitely, consuming cluster resources.
Fix:
TryCancelTaskAsynchelper that calls the Elasticsearch Tasks Cancel API4. Unique Index Name Validation
Problem: Multiple indexes could be registered with the same name (alias), leading to conflicts during reindex, alias management, and cache invalidation that are extremely difficult to diagnose at runtime.
Fix:
ElasticConfiguration.AddIndex()now throwsArgumentExceptionif an index with the same Name (case-insensitive) is already registeredBreaking Changes
IElasticConfigurationinterface now includesILockProvider LockProvider { get; }— implementations must expose their lock providerAddIndex()now throws if duplicate index names are detected (previously silently accepted duplicates)reindex:{alias}:{old}:{new}toreindex:{alias}— if external code checks for specific lock keys, it will need updatingTest Plan
All tests pass (561 total, 555 succeeded, 6 skipped):
ReindexAsync_ConcurrentCalls_OnlyOneCompletes— verifies only one reindex succeeds when called concurrently, alias points to correct index, all documents migratedReindexAsync_LockPreventsHandlerAndDirectConcurrency— proves holding the lock blocks a reindex from completingReindexAsync_AliasLockBlocksSubsequentVersionTransition— proves alias-keyed lock blocks cross-version reindexes (v1→v3 blocked while lock held)ReindexAsync_WhenCancelled_DoesNotCompleteReindex— verifies progress callback exception stops the reindex and preserves the original indexAddIndex_WithDuplicateIndexName_ThrowsArgumentException— validates duplicate name detectionAddIndex_WithUniqueIndexNames_Succeeds— validates unique names are acceptedAddIndex_WithDuplicateNameDifferentCase_ThrowsArgumentException— validates case-insensitive duplicate detection