-
-
Notifications
You must be signed in to change notification settings - Fork 27
fix: improve reindex reliability with distributed locking and observability #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ec01bec
ede4920
58bc74d
58cced2
8b3fff4
ef04b9a
c3737dd
f6c42d4
518af9e
e0202ff
97ad9c8
69f372d
3591d4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,3 +60,4 @@ _NCrunch_* | |
| **/.idea/**/contentModel.xml | ||
| **/.idea/**/modules.xml | ||
| **/.idea/copilot/chatSessions/ | ||
| .idea/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| using System.Linq.Expressions; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using Foundatio.Lock; | ||
| using Foundatio.Parsers.ElasticQueries; | ||
| using Foundatio.Parsers.ElasticQueries.Extensions; | ||
| using Foundatio.Repositories.Elasticsearch.Extensions; | ||
|
|
@@ -250,9 +251,33 @@ public override async Task ReindexAsync(Func<int, string?, Task>? progressCallba | |
| if (currentVersion < 0 || currentVersion >= Version) | ||
| return; | ||
|
|
||
| string lockKey = ElasticReindexer.GetLockName(Name); | ||
| await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be TryAcquire so it doesn't throw?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AcquireAsync with a timeout returns null on timeout (doesn't throw). The null check on line 256 handles the timeout gracefully with a clear error message. TryAcquire would have the same semantics but with a zero timeout — we actually want to wait up to 30 minutes for other reindexes to finish. |
||
| if (reindexLock is null) | ||
| throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes."); | ||
|
|
||
| currentVersion = await GetCurrentVersionAsync().AnyContext(); | ||
| if (currentVersion < 0 || currentVersion >= Version) | ||
| return; | ||
|
|
||
| var reindexWorkItem = CreateReindexWorkItem(currentVersion); | ||
|
|
||
| Func<int, string?, Task> wrappedCallback = async (progress, message) => | ||
| { | ||
| await reindexLock.RenewAsync().AnyContext(); | ||
|
|
||
| if (progressCallbackAsync is not null) | ||
| { | ||
| await progressCallbackAsync(progress, message).AnyContext(); | ||
| } | ||
| else | ||
| { | ||
| _logger.LogInformation("Reindex Progress {Progress:F1}%: {Message}", progress, message); | ||
| } | ||
| }; | ||
|
|
||
| var reindexer = new ElasticReindexer(Configuration.Client, _logger); | ||
| await reindexer.ReindexAsync(reindexWorkItem, progressCallbackAsync).AnyContext(); | ||
| await reindexer.ReindexAsync(reindexWorkItem, wrappedCallback).AnyContext(); | ||
| } | ||
|
|
||
| public override async Task MaintainAsync(bool includeOptionalTasks = true) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Elasticsearch.Net; | ||
| using Foundatio.Parsers.ElasticQueries.Extensions; | ||
| using Foundatio.Repositories.Elasticsearch.Extensions; | ||
| using Foundatio.Repositories.Elasticsearch.Jobs; | ||
| using Foundatio.Repositories.Extensions; | ||
|
|
@@ -27,6 +28,16 @@ public class ElasticReindexer | |
| private const string ID_FIELD = "id"; | ||
| private const int MAX_STATUS_FAILS = 10; | ||
|
|
||
| /// <summary> | ||
| /// Returns the distributed lock resource name for serializing reindex operations on the given alias. | ||
| /// </summary> | ||
| public static string GetLockName(string alias) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(alias); | ||
|
|
||
| return String.Concat("reindex:", alias); | ||
| } | ||
|
|
||
| public ElasticReindexer(IElasticClient client, ILogger? logger = null) : this(client, TimeProvider.System, logger) | ||
| { | ||
| } | ||
|
|
@@ -62,7 +73,14 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func<int, string?, Task | |
| }; | ||
| } | ||
|
|
||
| _logger.LogInformation("Received reindex work item for new index: {NewIndex}", workItem.NewIndex); | ||
| using var _ = _logger.BeginScope(new Dictionary<string, object> | ||
| { | ||
| [nameof(workItem.OldIndex)] = workItem.OldIndex, | ||
| [nameof(workItem.NewIndex)] = workItem.NewIndex, | ||
| [nameof(workItem.Alias)] = workItem.Alias | ||
| }); | ||
|
|
||
| _logger.LogInformation("Received reindex work item for {OldIndex} -> {NewIndex}", workItem.OldIndex, workItem.NewIndex); | ||
| var startTime = _timeProvider.GetUtcNow().UtcDateTime.AddSeconds(-1); | ||
| await progressCallbackAsync(0, "Starting reindex...").AnyContext(); | ||
| var firstPassResult = await InternalReindexAsync(workItem, progressCallbackAsync, 0, 90, workItem.StartUtc).AnyContext(); | ||
|
|
@@ -96,7 +114,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func<int, string?, Task | |
| var refreshResponse = await _client.Indices.RefreshAsync(Indices.All).AnyContext(); | ||
| _logger.LogRequest(refreshResponse); | ||
| if (!refreshResponse.IsValid) | ||
| _logger.LogWarning("Failed to refresh indices before second reindex pass: {Error}", refreshResponse.ServerError); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All these messages in this method are very generic, do we have log scope for all contextual data like index names, and or can we also update the messages to at least say the indicies?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed — added a log scope at the top of \ReindexAsync\ with OldIndex, NewIndex, and Alias. Also updated all warning messages in the method to include the index names explicitly.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added log scope with OldIndex, NewIndex, and Alias at the start of ReindexAsync. All messages within the method now automatically include this context. |
||
| _logger.LogWarning("Failed to refresh indices before second reindex pass for {OldIndex} -> {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); | ||
|
|
||
| ReindexResult? secondPassResult = null; | ||
| if (!String.IsNullOrEmpty(workItem.TimestampField)) | ||
|
|
@@ -117,34 +135,34 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func<int, string?, Task | |
| refreshResponse = await _client.Indices.RefreshAsync(Indices.All).AnyContext(); | ||
| _logger.LogRequest(refreshResponse); | ||
| if (!refreshResponse.IsValid) | ||
| _logger.LogWarning("Failed to refresh indices before doc count comparison: {Error}", refreshResponse.ServerError); | ||
| _logger.LogWarning("Failed to refresh indices before doc count comparison for {OldIndex} -> {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage()); | ||
|
|
||
| var newDocCountResponse = await _client.CountAsync<object>(d => d.Index(workItem.NewIndex)).AnyContext(); | ||
| _logger.LogRequest(newDocCountResponse); | ||
| if (!newDocCountResponse.IsValid) | ||
| _logger.LogWarning("Failed to get new index doc count: {Error}", newDocCountResponse.ServerError); | ||
| _logger.LogWarning("Failed to get new index doc count for {NewIndex}: {Error}", workItem.NewIndex, newDocCountResponse.GetErrorMessage()); | ||
|
|
||
| var oldDocCountResponse = await _client.CountAsync<object>(d => d.Index(workItem.OldIndex)).AnyContext(); | ||
| _logger.LogRequest(oldDocCountResponse); | ||
| if (!oldDocCountResponse.IsValid) | ||
| _logger.LogWarning("Failed to get old index doc count: {Error}", oldDocCountResponse.ServerError); | ||
| _logger.LogWarning("Failed to get old index doc count for {OldIndex}: {Error}", workItem.OldIndex, oldDocCountResponse.GetErrorMessage()); | ||
|
|
||
| await progressCallbackAsync(98, $"Old Docs: {oldDocCountResponse.Count} New Docs: {newDocCountResponse.Count}").AnyContext(); | ||
| if (newDocCountResponse.IsValid && oldDocCountResponse.IsValid && newDocCountResponse.Count >= oldDocCountResponse.Count) | ||
| { | ||
| var deleteIndexResponse = await _client.Indices.DeleteAsync(Indices.Index(workItem.OldIndex)).AnyContext(); | ||
| _logger.LogRequest(deleteIndexResponse); | ||
| if (!deleteIndexResponse.IsValid) | ||
| _logger.LogWarning("Failed to delete old index {OldIndex}: {Error}", workItem.OldIndex, deleteIndexResponse.ServerError); | ||
| _logger.LogWarning("Failed to delete old index {OldIndex}: {Error}", workItem.OldIndex, deleteIndexResponse.GetErrorMessage()); | ||
|
|
||
| if (deleteIndexResponse.IsValid) | ||
| await progressCallbackAsync(99, $"Deleted index: {workItem.OldIndex}").AnyContext(); | ||
| else | ||
| await progressCallbackAsync(99, $"Failed to delete old index {workItem.OldIndex}: {deleteIndexResponse.ServerError}").AnyContext(); | ||
| await progressCallbackAsync(99, $"Failed to delete old index {workItem.OldIndex}: {deleteIndexResponse.GetErrorMessage()}").AnyContext(); | ||
| } | ||
|
Comment on lines
153
to
162
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Switched to .GetErrorMessage()\ for the delete response progress message, consistent with all other error reporting in this PR. |
||
| } | ||
|
|
||
| await progressCallbackAsync(100, null).AnyContext(); | ||
| await progressCallbackAsync(100, "Reindex complete").AnyContext(); | ||
| } | ||
|
|
||
| private async Task<ReindexResult> InternalReindexAsync(ReindexWorkItem workItem, Func<int, string?, Task> progressCallbackAsync, int startProgress = 0, int endProgress = 100, DateTime? startTime = null, CancellationToken cancellationToken = default) | ||
|
|
@@ -173,7 +191,7 @@ private async Task<ReindexResult> InternalReindexAsync(ReindexWorkItem workItem, | |
| return response; | ||
| }, cancellationToken).AnyContext(); | ||
|
|
||
| _logger.LogInformation("Reindex Task Id: {TaskId}", result.Task.FullyQualifiedId); | ||
| _logger.LogInformation("Reindex Task Id: {ReindexTaskId}", result.Task.FullyQualifiedId); | ||
| _logger.LogRequest(result); | ||
| long totalDocs = result.Total; | ||
|
|
||
|
|
@@ -196,7 +214,8 @@ private async Task<ReindexResult> InternalReindexAsync(ReindexWorkItem workItem, | |
|
|
||
| if (statusGetFails > MAX_STATUS_FAILS) | ||
| { | ||
| _logger.LogError("Failed to get the status {FailureCount} times in a row", MAX_STATUS_FAILS); | ||
| _logger.LogError("Failed to get the status {FailureCount} times in a row for reindex task {ReindexTaskId} reindexing {OldIndex} -> {NewIndex}", | ||
| statusGetFails, result.Task.FullyQualifiedId, workItem.OldIndex, workItem.NewIndex); | ||
| break; | ||
| } | ||
|
|
||
|
|
@@ -221,7 +240,7 @@ private async Task<ReindexResult> InternalReindexAsync(ReindexWorkItem workItem, | |
| sw.Restart(); | ||
| lastProgress = lastCompleted; | ||
|
|
||
| string lastMessage = $"Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; | ||
| string lastMessage = $"[{workItem.NewIndex}] Total: {status.Task.Status.Total:N0} Completed: {lastCompleted:N0} VersionConflicts: {status.Task.Status.VersionConflicts:N0}"; | ||
| await progressCallbackAsync(CalculateProgress(status.Task.Status.Total, lastCompleted, startProgress, endProgress), lastMessage).AnyContext(); | ||
|
|
||
|
Comment on lines
244
to
245
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TryCancelTaskAsync is a best-effort cleanup that fires in the !taskSuccess block. If a progress callback throws, the exception propagates up the call stack and the reindex is already dead — the server-side task will still be running but will eventually idle-timeout. This is acceptable for a reliability improvement PR; a future enhancement could add cancellation in the exception path if needed. |
||
| if (status.Completed && response?.Error == null) | ||
|
|
@@ -245,6 +264,22 @@ private async Task<ReindexResult> InternalReindexAsync(ReindexWorkItem workItem, | |
| } while (!cancellationToken.IsCancellationRequested); | ||
| sw.Stop(); | ||
|
|
||
| if (!taskSuccess) | ||
| { | ||
| if (cancellationToken.IsCancellationRequested) | ||
| { | ||
| _logger.LogWarning("Reindex cancelled for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", | ||
| workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, lastProgress, totalDocs, sw.Elapsed); | ||
| } | ||
| else | ||
| { | ||
| _logger.LogError("Reindex abandoned for {OldIndex} -> {NewIndex}. ReindexTaskId: {ReindexTaskId}, StatusFails: {StatusFails}, LastProgress: {LastProgress}, TotalDocs: {TotalDocs}, Elapsed: {Elapsed}", | ||
| workItem.OldIndex, workItem.NewIndex, result.Task.FullyQualifiedId, statusGetFails, lastProgress, totalDocs, sw.Elapsed); | ||
| } | ||
|
|
||
| await TryCancelTaskAsync(result.Task, workItem.OldIndex, workItem.NewIndex).AnyContext(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is Task here? It makes it sound like a tpl task, when it's the Elastic TaskId
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed all log templates from {TaskId}\ to {ReindexTaskId}\ to make it clear these reference the Elasticsearch Tasks API task ID, not a TPL Task.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why wasn't this renamed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — renamed the parameter from taskId to reindexTaskId in TryCancelTaskAsync.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed to {ReindexTaskId} throughout to differentiate from TPL Tasks.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the other TryCancelTaskAsync comment — when the exception propagates from the polling loop, we don't get to the !taskSuccess block. The server-side task will idle-timeout. Adding cancellation in every catch path would add complexity for marginal benefit. Accepted for now. |
||
| } | ||
|
|
||
| long failures = 0; | ||
| if (lastReindexResponse?.Failures != null && lastReindexResponse.Failures.Count > 0) | ||
| { | ||
|
|
@@ -318,6 +353,31 @@ private async Task HandleFailureAsync(ReindexWorkItem workItem, BulkIndexByScrol | |
| _logger.LogErrorRequest(indexResponse, "Error indexing document {Index}/{Id}", workItem.NewIndex + "-error", gr.Id); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Attempts to cancel the Elasticsearch server-side reindex task. Best-effort — failures are logged but not propagated. | ||
| /// </summary> | ||
| private async Task TryCancelTaskAsync(TaskId reindexTaskId, string oldIndex, string newIndex) | ||
| { | ||
| try | ||
| { | ||
| using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a linked dispose token we can use too?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is only called from best-effort cleanup paths (after the main reindex has already failed/been abandoned). There's no meaningful parent token to link to at that point — the reindex CancellationToken has already been triggered or the loop has broken. The 30s timeout is a safety net to avoid blocking shutdown if the cluster is unreachable. |
||
| var response = await _client.Tasks.CancelAsync(c => c.TaskId(reindexTaskId), cts.Token).AnyContext(); | ||
| if (response.IsValid) | ||
| { | ||
| _logger.LogRequest(response); | ||
| _logger.LogInformation("Cancelled reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); | ||
| } | ||
| else | ||
| { | ||
| _logger.LogErrorRequest(response, "Failed to cancel reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _logger.LogWarning(ex, "Exception cancelling reindex task {ReindexTaskId} for {OldIndex} -> {NewIndex}", reindexTaskId.FullyQualifiedId, oldIndex, newIndex); | ||
| } | ||
|
github-code-quality[bot] marked this conversation as resolved.
Fixed
github-code-quality[bot] marked this conversation as resolved.
Fixed
github-code-quality[bot] marked this conversation as resolved.
Fixed
niemyjski marked this conversation as resolved.
Dismissed
|
||
| } | ||
|
|
||
| private async Task<List<string>> GetIndexAliasesAsync(string index) | ||
| { | ||
| var aliasesResponse = await _client.Indices.GetAliasAsync(index).AnyContext(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
| <ItemGroup> | ||
| <PackageReference Include="Foundatio" Version="13.0.1-preview.0.11" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" /> | ||
| <PackageReference Include="Foundatio" Version="13.0.1-preview.0.12" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" /> | ||
| <ProjectReference Include="$(FoundatioProjectsDir)Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" /> | ||
|
|
||
| <PackageReference Include="Foundatio.JsonNet" Version="13.0.1-preview.0.11" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" /> | ||
| <PackageReference Include="Foundatio.JsonNet" Version="13.0.1-preview.0.12" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" /> | ||
| <ProjectReference Include="$(FoundatioProjectsDir)Foundatio\src\Foundatio.JsonNet\Foundatio.JsonNet.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" /> | ||
| </ItemGroup> | ||
| </Project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Added null check after AcquireAsync — throws InvalidOperationException with a clear message if the lock can't be acquired within the 30-minute timeout.