feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363
Open
magiccao wants to merge 3 commits into
Open
feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363magiccao wants to merge 3 commits into
magiccao wants to merge 3 commits into
Conversation
PR Reviewer Guide 🔍(Review updated until commit ea14930)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨No code suggestions found for the PR. |
added 2 commits
June 2, 2026 10:20
… consistency
Several process-local singletons (EmbeddingTaskTracker, RequestWaitTracker,
SemanticQueue coalesce versions, request stats) were backed by plain
dict + threading.Lock. On a single machine this is fine; across multiple
load-balanced instances the state silently diverges.
This PR introduces a Coordinator abstraction that unifies these behind a
small set of generic KV primitives:
- InProcessCoordinator (default): zero new dependencies, behaviour identical
to the existing singletons. Single-machine deployments are unaffected.
- RedisCoordinator (opt-in): maps each primitive onto an atomic Redis command
(INCRBY, SET NX EX, SADD, RPUSH, etc.) for cross-instance consistency.
Requires `pip install 'openviking[coordination]'`.
Configuration (ov.conf):
storage:
coordination:
backend: redis # or "memory" (default)
dsn: redis://host:6379 # or env OPENVIKING_COORD_DSN
key_prefix: "ov:coord:"
ttl_sec: 3600
Bug fix: atomic memory semantic dedupe (volcengine#769)
The previous get_int → check → set_int pattern in SemanticQueue was a
TOCTOU race: two instances could both read 0 and both enqueue the same
memory semantic task. Replaced with a single atomic set_if_absent()
(Redis SET NX EX; in-process monotonic-deadline check under lock).
Memory safety: amortized claim pruning
_claim_deadlines in InProcessCoordinator is swept in bulk once the map
exceeds a threshold, so one-shot mem_dedupe:* keys that are never
revisited cannot grow the map unboundedly.
Refactor: extract RequestQueueStats / RequestStatsAccumulator
The inline stats dict+lock duplicated across SemanticProcessor and
collection_schemas is unified into openviking/telemetry/request_queue_stats.py.
…d class path
- Introduce coordinator_factory.py with create_coordinator(config),
extracting the routing logic from StorageConfig.build_coordinator().
build_coordinator() is now a one-liner delegating to the factory,
mirroring the vectordb_adapters/factory.py pattern.
- Expand CoordinationConfig:
- backend: str now accepts a full dotted class path
(e.g. mycompany.module.CredisCoordinator) in addition to the
built-in "memory" and "redis" values, enabling custom coordinator
plugins without modifying OpenViking source.
- Nested redis sub-config (RedisCoordinationConfig) groups dsn,
key_prefix, and ttl_sec under [storage.coordination.redis].
- custom_params: Dict[str, Any] escape hatch passes arbitrary
configuration to third-party from_config() implementations.
- RedisCoordinator.__init__ now accepts a pre-built client object in
addition to a DSN string, so callers can inject any Redis-compatible
client (e.g. a proprietary SDK) without subclassing.
- Add 20 config/factory tests in tests/test_coordinator_config.py
and 3 client-injection tests in tests/misc/test_coordinator.py.
82d9a3a to
ff57950
Compare
… delivery Prevent queuefs event loss in multi-instance / Redis coordinator scenarios through a layered self-healing approach: * EmbeddingTaskTracker: owner-side poller watches shared remaining counter in Coordinator so the completion callback fires even when the final decrement lands on a different instance; exponential-backoff retry on transient coordinator errors; TTL refresh keeps keys alive during long-running tasks; deadline-triggered on_timeout callback for graceful self-healing. * RequestWaitTracker: two-phase wait (processing gate → status retrieval) with per-phase deadline; exponential backoff (0.25 s → 2 s cap) on coordinator errors; _touch_reg throttled to default_ttl_sec/4 to cap Redis call volume; cleanup() degrades gracefully to TTL-based expiry under distributed backend. * Adds embedding_completion_timeout_sec config knob (default 1800 s) for the distributed-only watchdog timeout. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Persistent review updated to latest commit ea14930 |
PR Code Suggestions ✨No code suggestions found for the PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Add a
Coordinatorabstraction that unifies several process-local singletons(
EmbeddingTaskTracker,RequestWaitTracker,SemanticQueuecoalesceversions, request stats) behind a shared backend, enabling consistent state
across multiple load-balanced server instances.
Distributed Fault-Tolerance Design
Single-instance deployments are unaffected (the default
memorybackend is athin wrapper over existing dict/lock patterns). For Redis-backed deployments, a
layered self-healing approach prevents queuefs event loss:
1. Owner-side completion poller (
EmbeddingTaskTracker)Under the distributed backend, the final
decrement()may land on anon-owner instance that holds neither the callback nor the event loop it
must run on. The registering instance launches a background poller that
watches the shared
remainingcounter in the Coordinator. When the counterreaches zero — regardless of which instance decremented it — the owner
fires the completion callback locally. The poller also holds the deadline
for the distributed-only watchdog timeout, triggering
on_timeoutforgraceful self-healing when embedding tasks stall.
2. TTL refresh (
_touch_reg)All Coordinator keys carry a configurable TTL so that a crash or restart
does not leave orphaned state. Long-running tasks extend their key TTLs
via a throttled
_touch_regcall (refresh interval =ttl_sec / 4)to prevent premature expiry without generating unbounded Redis traffic.
3. Exponential backoff on transient coordinator errors
Both
EmbeddingTaskTracker._watch_until_resolvedandRequestWaitTracker.wait_for_requestabsorb transient Redis errors andretry with exponential backoff (0.25 s → 0.5 s → 1.0 s → 2.0 s cap) rather
than propagating exceptions to callers or tight-looping under failure.
4. Two-phase request wait with grace period (
RequestWaitTracker)wait_for_requestsplits into a processing gate (waits for pending sets toempty) and a status retrieval phase. On transition to the status phase a
2-second grace window is added on top of any remaining deadline, ensuring
the final queue-status snapshot is readable even when the original timeout
is almost exhausted.
5. Graceful cleanup degradation
cleanup()under the distributed backend catches deletion errors and fallsback to TTL-based expiry with a warning log, avoiding cleanup failures from
blocking the request path.
Configuration
Coordination is configured under the
storage.coordinationkey inov.conf.The default
memorybackend requires no additional configuration.Redis backend
The
redis.dsnfield can also be omitted and supplied via theOPENVIKING_COORD_DSNenvironment variable (preferred for credentialmanagement).
Custom backend
Set
backendto a fully-qualified dotted class path. The class must expose afrom_config(cfg: CoordinationConfig) -> Coordinatorclassmethod. Arbitraryextra parameters can be passed through
custom_params.Key options
backend"memory""memory","redis", or dotted class pathredis.dsnOPENVIKING_COORD_DSNredis.key_prefix"ov:coord:"redis.ttl_sec36000disables expiryembedding_completion_timeout_sec1800custom_params{}from_config()for third-party backendsRelated Issue
N/A
Type of Change