diff --git a/.gitignore b/.gitignore
index b92df279..cfc2ba8b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,3 +60,4 @@ _NCrunch_*
**/.idea/**/contentModel.xml
**/.idea/**/modules.xml
**/.idea/copilot/chatSessions/
+.idea/
diff --git a/docs/guide/index-management.md b/docs/guide/index-management.md
index 32ba431d..fc50563b 100644
--- a/docs/guide/index-management.md
+++ b/docs/guide/index-management.md
@@ -851,3 +851,30 @@ AddReindexScript(2, @"
- [Migrations](/guide/migrations) - Document migrations
- [Jobs](/guide/jobs) - Index maintenance jobs
- [Elasticsearch Setup](/guide/elasticsearch-setup) - Connection configuration
+
+## Concurrency Safety
+
+Reindexing is protected by a distributed lock keyed on the index alias to prevent concurrent reindex operations from corrupting data.
+
+### Lock Strategy
+
+- **Lock key**: `reindex:{alias}` (e.g., `reindex:employees`)
+- **Lock TTL**: 20 minutes, auto-renewed during long-running operations
+- Both direct (`VersionedIndex.ReindexAsync`) and work-item (`ReindexWorkItemHandler`) paths use the same lock
+- Only one reindex per logical index can run at a time — subsequent version transitions wait for the current one to complete
+
+### Why Alias-Only Keys
+
+Using the alias as the lock key ensures that sequential version transitions (v1→v2, then v2→v3) cannot overlap. If v2→v3 started before v1→v2 completed, v3 would contain incomplete data from v2.
+
+### Lock Renewal for Long-Running Reindexes
+
+For indexes with millions of documents that take hours to reindex, the lock is automatically renewed on every progress callback (every 1-10 seconds during the polling loop). This prevents lock expiration during legitimate long-running operations.
+
+### Crash Recovery
+
+If an instance crashes mid-reindex, the lock expires after 20 minutes. Another instance can then retry the reindex. `VersionedIndex.ReindexAsync()` is resume-safe — it picks up from the last document using timestamp-based or ID-based range queries.
+
+### Unique Index Names
+
+`ElasticConfiguration.AddIndex()` enforces unique index names (case-insensitive). Registering two indexes with the same alias throws an `ArgumentException` at startup, preventing conflicts before they can cause data corruption.
diff --git a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj
index ef7a8895..f5594f50 100644
--- a/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj
+++ b/samples/Foundatio.SampleApp/Server/Foundatio.SampleApp.Server.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs
index bcd08a0b..e99f9b9e 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/ElasticConfiguration.cs
@@ -13,6 +13,7 @@
using Foundatio.Parsers.ElasticQueries;
using Foundatio.Queues;
using Foundatio.Repositories.Elasticsearch.CustomFields;
+using Foundatio.Repositories.Elasticsearch.Jobs;
using Foundatio.Repositories.Elasticsearch.Queries.Builders;
using Foundatio.Repositories.Extensions;
using Foundatio.Resilience;
@@ -88,6 +89,7 @@ protected virtual void ConfigureSettings(ConnectionSettings settings)
public IElasticClient Client => _client.Value;
public ICacheClient Cache { get; }
public IMessageBus MessageBus { get; }
+ public ILockProvider LockProvider => _lockProvider;
public ILoggerFactory LoggerFactory { get; }
public IResiliencePolicyProvider ResiliencePolicyProvider { get; }
public IResiliencePolicy ResiliencePolicy { get; }
@@ -116,6 +118,9 @@ public void AddIndex(IIndex index)
if (_frozenIndexes.IsValueCreated)
throw new InvalidOperationException("Can't add indexes after the list has been frozen.");
+ if (_indexes.Any(i => String.Equals(i.Name, index.Name, StringComparison.OrdinalIgnoreCase)))
+ throw new ArgumentException($"An index with name '{index.Name}' has already been registered.", nameof(index));
+
_indexes.Add(index);
}
@@ -187,8 +192,7 @@ private async Task ConfigureIndexInternalAsync(IIndex idx, bool beginReindexingO
throw new InvalidOperationException("Must specify work item queue and lock provider in order to migrate index versions.");
var reindexWorkItem = versionedIndex.CreateReindexWorkItem(currentVersion);
- bool isReindexing = await _lockProvider.IsLockedAsync(String.Join(":", "reindex", reindexWorkItem.Alias,
- reindexWorkItem.OldIndex, reindexWorkItem.NewIndex)).AnyContext();
+ bool isReindexing = await _lockProvider.IsLockedAsync(ElasticReindexer.GetLockName(versionedIndex.Name)).AnyContext();
if (isReindexing)
return;
diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs
index f520dbe4..72a2dba7 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/IElasticConfiguration.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Foundatio.Caching;
+using Foundatio.Lock;
using Foundatio.Messaging;
using Foundatio.Parsers.ElasticQueries;
using Foundatio.Repositories.Elasticsearch.CustomFields;
@@ -17,6 +18,7 @@ public interface IElasticConfiguration : IDisposable
IElasticClient Client { get; }
ICacheClient Cache { get; }
IMessageBus MessageBus { get; }
+ ILockProvider LockProvider { get; }
ILoggerFactory LoggerFactory { get; }
IResiliencePolicyProvider ResiliencePolicyProvider { get; }
TimeProvider TimeProvider { get; set; }
diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs
index 2dc85fab..5b454815 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs
@@ -351,6 +351,7 @@ public virtual Task ReindexAsync(Func? progressCallbackAsync
{
OldIndex = Name,
NewIndex = Name,
+ Alias = Name,
DeleteOld = false,
TimestampField = GetTimeStampField()
};
diff --git a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs
index 5b3d635f..98a8008c 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs
@@ -5,6 +5,7 @@
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
+using Foundatio.Lock;
using Foundatio.Parsers.ElasticQueries;
using Foundatio.Parsers.ElasticQueries.Extensions;
using Foundatio.Repositories.Elasticsearch.Extensions;
@@ -250,9 +251,33 @@ public override async Task ReindexAsync(Func? progressCallba
if (currentVersion < 0 || currentVersion >= Version)
return;
+ string lockKey = ElasticReindexer.GetLockName(Name);
+ await using var reindexLock = await Configuration.LockProvider.AcquireAsync(lockKey, TimeSpan.FromMinutes(20), TimeSpan.FromMinutes(30)).AnyContext();
+ if (reindexLock is null)
+ throw new InvalidOperationException($"Unable to acquire reindex lock for '{Name}' after 30 minutes.");
+
+ currentVersion = await GetCurrentVersionAsync().AnyContext();
+ if (currentVersion < 0 || currentVersion >= Version)
+ return;
+
var reindexWorkItem = CreateReindexWorkItem(currentVersion);
+
+ Func wrappedCallback = async (progress, message) =>
+ {
+ await reindexLock.RenewAsync().AnyContext();
+
+ if (progressCallbackAsync is not null)
+ {
+ await progressCallbackAsync(progress, message).AnyContext();
+ }
+ else
+ {
+ _logger.LogInformation("Reindex Progress {Progress:F1}%: {Message}", progress, message);
+ }
+ };
+
var reindexer = new ElasticReindexer(Configuration.Client, _logger);
- await reindexer.ReindexAsync(reindexWorkItem, progressCallbackAsync).AnyContext();
+ await reindexer.ReindexAsync(reindexWorkItem, wrappedCallback).AnyContext();
}
public override async Task MaintainAsync(bool includeOptionalTasks = true)
diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs
index d14af9d1..8d07bfc5 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItem.cs
@@ -6,7 +6,7 @@ public record ReindexWorkItem
{
public required string OldIndex { get; init; }
public required string NewIndex { get; init; }
- public string? Alias { get; init; }
+ public required string Alias { get; init; }
public string? Script { get; init; }
public bool DeleteOld { get; set; }
public string? TimestampField { get; init; }
diff --git a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs
index e0699dc2..7b40395f 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Jobs/ReindexWorkItemHandler.cs
@@ -25,7 +25,7 @@ public ReindexWorkItemHandler(IElasticClient client, ILockProvider lockProvider,
if (workItem is not ReindexWorkItem reindexWorkItem)
return Task.FromResult(null);
- return _lockProvider.TryAcquireAsync(String.Join(":", "reindex", reindexWorkItem.Alias, reindexWorkItem.OldIndex, reindexWorkItem.NewIndex), TimeSpan.FromMinutes(20), cancellationToken);
+ return _lockProvider.TryAcquireAsync(ElasticReindexer.GetLockName(reindexWorkItem.Alias), TimeSpan.FromMinutes(20), cancellationToken);
}
public override Task HandleItemAsync(WorkItemContext context)
diff --git a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs
index 38fa38ed..99349e0c 100644
--- a/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs
+++ b/src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticReindexer.cs
@@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
+using Foundatio.Parsers.ElasticQueries.Extensions;
using Foundatio.Repositories.Elasticsearch.Extensions;
using Foundatio.Repositories.Elasticsearch.Jobs;
using Foundatio.Repositories.Extensions;
@@ -27,6 +28,16 @@ public class ElasticReindexer
private const string ID_FIELD = "id";
private const int MAX_STATUS_FAILS = 10;
+ ///
+ /// Returns the distributed lock resource name for serializing reindex operations on the given alias.
+ ///
+ public static string GetLockName(string alias)
+ {
+ ArgumentException.ThrowIfNullOrEmpty(alias);
+
+ return String.Concat("reindex:", alias);
+ }
+
public ElasticReindexer(IElasticClient client, ILogger? logger = null) : this(client, TimeProvider.System, logger)
{
}
@@ -62,7 +73,14 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func
+ {
+ [nameof(workItem.OldIndex)] = workItem.OldIndex,
+ [nameof(workItem.NewIndex)] = workItem.NewIndex,
+ [nameof(workItem.Alias)] = workItem.Alias
+ });
+
+ _logger.LogInformation("Received reindex work item for {OldIndex} -> {NewIndex}", workItem.OldIndex, workItem.NewIndex);
var startTime = _timeProvider.GetUtcNow().UtcDateTime.AddSeconds(-1);
await progressCallbackAsync(0, "Starting reindex...").AnyContext();
var firstPassResult = await InternalReindexAsync(workItem, progressCallbackAsync, 0, 90, workItem.StartUtc).AnyContext();
@@ -96,7 +114,7 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage());
ReindexResult? secondPassResult = null;
if (!String.IsNullOrEmpty(workItem.TimestampField))
@@ -117,17 +135,17 @@ public async Task ReindexAsync(ReindexWorkItem workItem, Func {NewIndex}: {Error}", workItem.OldIndex, workItem.NewIndex, refreshResponse.GetErrorMessage());
var newDocCountResponse = await _client.CountAsync