Skip to content

Commit 27207c9

Browse files
committed
kvbm: offload update; moved to distirbuted, updated policies
Signed-off-by: Ryan Olson <[email protected]>
1 parent 0f9211b commit 27207c9

File tree

30 files changed

+1581
-222
lines changed

30 files changed

+1581
-222
lines changed

lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/worker.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def __init__(
8080
self.runtime = KvbmRuntime.build_worker(self.kvbm_override_config)
8181

8282
# Create the Rust ConnectorWorker that handles NIXL registration
83-
self.connector_worker = ConnectorWorker(self.runtime)
83+
self.worker = ConnectorWorker(self.runtime)
8484

8585
# Store peer info for handshake
8686
instance_id, worker_addr = self.runtime.peer_info()
@@ -149,7 +149,7 @@ def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]) -> None:
149149
# This caches tensor state for deferred NIXL registration
150150
# The actual NIXL registration happens when the leader triggers
151151
# initialization via bind_connector_metadata()
152-
self.connector_worker.register_kv_caches(
152+
self.worker.register_kv_caches(
153153
tensors,
154154
num_device_blocks,
155155
page_size,
@@ -171,7 +171,7 @@ def bind_connector_metadata(self, data: bytes) -> None:
171171
"""
172172
Bind connector metadata from the leader.
173173
"""
174-
self.connector_worker.bind_connector_metadata(data)
174+
self.worker.bind_connector_metadata(data)
175175

176176
def clear_connector_metadata(self) -> None:
177177
"""
@@ -222,19 +222,11 @@ def get_finished(
222222
Returns:
223223
(None, None): No finished sends/receives
224224
"""
225-
# Just acknowledge the finished requests
226-
# Since our leader's request_finished() always returns False,
227-
# these requests have already had their blocks freed
228-
if len(finished_req_ids) > 0:
229-
print(
230-
f"SchedulerConnectorWorker.get_finished() acknowledging {len(finished_req_ids)} finished requests"
231-
)
232-
233-
return (None, None)
225+
return self.worker.get_finished()
234226

235227
def get_block_ids_with_load_errors(self) -> set[int]:
236228
"""Returns empty set - no load errors tracked."""
237-
return set()
229+
return self.worker.get_failed_onboarding()
238230

239231
def get_handshake_metadata(self) -> KVConnectorHandshakeMetadata:
240232
"""

lib/bindings/kvbm/src/v2/connector/worker/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
//! Python bindings for the v2 connector worker.
55
6+
use std::collections::HashSet;
67
use std::sync::Arc;
78

89
use pyo3::prelude::*;
@@ -139,14 +140,8 @@ impl PyConnectorWorker {
139140
/// Returns:
140141
/// tuple: (Optional[set[str]], Optional[set[str]]) for (offload_ids, onboard_ids)
141142
/// Returns None for each set if there are no completed requests of that type.
142-
#[pyo3(name = "get_finished")]
143143
#[allow(clippy::type_complexity)]
144-
pub fn py_get_finished(
145-
&self,
146-
) -> PyResult<(
147-
Option<std::collections::HashSet<String>>,
148-
Option<std::collections::HashSet<String>>,
149-
)> {
144+
pub fn get_finished(&self) -> PyResult<(Option<HashSet<String>>, Option<HashSet<String>>)> {
150145
let (offload_ids, onboard_ids) = self.inner.get_finished();
151146

152147
let offload = if offload_ids.is_empty() {
@@ -162,4 +157,8 @@ impl PyConnectorWorker {
162157

163158
Ok((offload, onboard))
164159
}
160+
161+
pub fn get_failed_onboarding(&self) -> PyResult<HashSet<usize>> {
162+
Ok(self.inner.get_failed_onboarding())
163+
}
165164
}

lib/kvbm/src/v2/distributed/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// pub mod cohort;
1010

1111
pub mod leader;
12+
pub mod offload;
1213
pub mod worker;
1314

1415
pub mod parallelism;

lib/kvbm/src/v2/integrations/offload/batch.rs renamed to lib/kvbm/src/v2/distributed/offload/batch.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::v2::logical::blocks::BlockMetadata;
1616
use crate::v2::{BlockId, SequenceHash};
1717

1818
use super::handle::TransferId;
19+
use super::pending::PendingGuard;
1920
use super::queue::CancellableQueue;
2021
use super::source::SourceBlock;
2122

@@ -73,6 +74,11 @@ pub struct QueuedBlock<T: BlockMetadata> {
7374
pub source: SourceBlock<T>,
7475
/// Transfer state for completion tracking
7576
pub state: Arc<std::sync::Mutex<TransferState>>,
77+
/// RAII guard that removes this block from pending set on drop.
78+
///
79+
/// This ensures duplicate prevention tracking is automatically cleaned up
80+
/// when the block completes transfer, is cancelled, or errors out.
81+
pub pending_guard: Option<PendingGuard>,
7682
}
7783

7884
impl<T: BlockMetadata> std::fmt::Debug for QueuedBlock<T> {
File renamed without changes.

lib/kvbm/src/v2/integrations/offload/engine.rs renamed to lib/kvbm/src/v2/distributed/offload/engine.rs

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@ use std::sync::Arc;
3333

3434
use anyhow::Result;
3535
use dashmap::DashMap;
36+
use tokio::task::JoinHandle;
3637

3738
use crate::v2::distributed::leader::InstanceLeader;
3839
use crate::v2::logical::LogicalLayoutHandle;
39-
use crate::v2::logical::blocks::{BlockMetadata, BlockRegistry};
40+
use crate::v2::logical::blocks::{BlockMetadata, BlockRegistry, WeakBlock};
4041
use crate::v2::logical::manager::BlockManager;
4142
use crate::v2::{BlockId, G1, G2, G3, G4};
4243

4344
use super::handle::{TransferHandle, TransferId, TransferState};
44-
use super::pipeline::{Pipeline, PipelineConfig};
45+
use super::pipeline::{ChainOutput, ChainOutputRx, Pipeline, PipelineConfig, PipelineInput};
46+
use super::queue::CancellableQueue;
4547
use super::source::SourceBlocks;
4648

4749
/// Central coordinator for offload pipelines.
@@ -62,6 +64,8 @@ pub struct OffloadEngine {
6264
g2_to_g4: Option<Pipeline<G2, G4>>,
6365
/// Active transfer tracking
6466
transfers: Arc<DashMap<TransferId, Arc<std::sync::Mutex<TransferState>>>>,
67+
/// Chain router task handle (routes G1→G2 output to downstream pipelines)
68+
_chain_router_handle: Option<JoinHandle<()>>,
6569
}
6670

6771
impl OffloadEngine {
@@ -276,7 +280,7 @@ impl OffloadEngineBuilder {
276280
let runtime = self.runtime.unwrap_or_else(|| self.leader.runtime());
277281

278282
// Build G1→G2 pipeline if configured
279-
let g1_to_g2 = if let Some(config) = self.g1_to_g2_config {
283+
let mut g1_to_g2 = if let Some(config) = self.g1_to_g2_config {
280284
let g1_manager = self
281285
.g1_manager
282286
.ok_or_else(|| anyhow::anyhow!("G1 manager required for G1→G2 pipeline"))?;
@@ -346,17 +350,117 @@ impl OffloadEngineBuilder {
346350
None
347351
};
348352

353+
// Wire up auto-chaining from G1→G2 to downstream G2→G3/G2→G4 pipelines
354+
let chain_router_handle = if let Some(ref mut g1_to_g2_pipeline) = g1_to_g2 {
355+
if g1_to_g2_pipeline.auto_chain() {
356+
if let Some(chain_rx) = g1_to_g2_pipeline.take_chain_rx() {
357+
// Get references to downstream pipeline queues
358+
let g2_to_g3_queue = g2_to_g3.as_ref().map(|p| p.eval_queue.clone());
359+
let g2_to_g4_queue = g2_to_g4.as_ref().map(|p| p.eval_queue.clone());
360+
361+
// Only spawn if there's at least one downstream pipeline
362+
if g2_to_g3_queue.is_some() || g2_to_g4_queue.is_some() {
363+
tracing::debug!(
364+
has_g2_to_g3 = g2_to_g3_queue.is_some(),
365+
has_g2_to_g4 = g2_to_g4_queue.is_some(),
366+
"Spawning chain router for G1→G2 auto-chaining"
367+
);
368+
Some(runtime.spawn(chain_router_task(
369+
chain_rx,
370+
g2_to_g3_queue,
371+
g2_to_g4_queue,
372+
)))
373+
} else {
374+
tracing::debug!(
375+
"G1→G2 auto_chain enabled but no downstream pipelines configured"
376+
);
377+
None
378+
}
379+
} else {
380+
None
381+
}
382+
} else {
383+
None
384+
}
385+
} else {
386+
None
387+
};
388+
349389
Ok(OffloadEngine {
350390
leader: self.leader,
351391
registry,
352392
g1_to_g2,
353393
g2_to_g3,
354394
g2_to_g4,
355395
transfers: Arc::new(DashMap::new()),
396+
_chain_router_handle: chain_router_handle,
356397
})
357398
}
358399
}
359400

401+
/// Routes chain output from G1→G2 to downstream G2→G3 and G2→G4 pipelines.
402+
///
403+
/// Blocks are converted to WeakBlocks for best-effort offloading - if they're
404+
/// evicted before the downstream pipeline processes them, that's acceptable.
405+
/// This enables graceful degradation under memory pressure.
406+
async fn chain_router_task(
407+
mut chain_rx: ChainOutputRx<G2>,
408+
g2_to_g3_queue: Option<Arc<CancellableQueue<PipelineInput<G2>>>>,
409+
g2_to_g4_queue: Option<Arc<CancellableQueue<PipelineInput<G2>>>>,
410+
) {
411+
while let Some(output) = chain_rx.recv().await {
412+
let ChainOutput {
413+
transfer_id,
414+
blocks,
415+
state,
416+
} = output;
417+
418+
if blocks.is_empty() {
419+
continue;
420+
}
421+
422+
// Convert strong blocks to weak blocks for best-effort downstream processing
423+
// This allows blocks to be evicted if memory pressure requires it
424+
let weak_blocks: Vec<WeakBlock<G2>> =
425+
blocks.iter().map(|block| block.downgrade()).collect();
426+
427+
// Drop strong references - blocks can now be evicted if needed
428+
drop(blocks);
429+
430+
tracing::debug!(
431+
%transfer_id,
432+
num_blocks = weak_blocks.len(),
433+
"Routing chain output to downstream pipelines as WeakBlocks"
434+
);
435+
436+
// Enqueue to G2→G3 if available
437+
if let Some(ref queue) = g2_to_g3_queue {
438+
let input = PipelineInput {
439+
transfer_id,
440+
source: SourceBlocks::Weak(weak_blocks.clone()),
441+
state: state.clone(),
442+
};
443+
if !queue.push(transfer_id, input) {
444+
tracing::debug!(%transfer_id, "G2→G3 chain enqueue skipped (cancelled)");
445+
}
446+
}
447+
448+
// Enqueue to G2→G4 if available
449+
if let Some(ref queue) = g2_to_g4_queue {
450+
let input = PipelineInput {
451+
transfer_id,
452+
source: SourceBlocks::Weak(weak_blocks),
453+
state,
454+
};
455+
if !queue.push(transfer_id, input) {
456+
tracing::debug!(%transfer_id, "G2→G4 chain enqueue skipped (cancelled)");
457+
}
458+
}
459+
}
460+
461+
tracing::debug!("Chain router task shutting down");
462+
}
463+
360464
#[cfg(test)]
361465
mod tests {
362466
use super::*;
File renamed without changes.

lib/kvbm/src/v2/integrations/offload/mod.rs renamed to lib/kvbm/src/v2/distributed/offload/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
//! # Example
4444
//!
4545
//! ```ignore
46-
//! use kvbm::v2::integrations::offload::{
46+
//! use kvbm::v2::distributed::offload::{
4747
//! OffloadEngine, PipelineBuilder, PresenceFilter, PresenceAndLFUFilter,
4848
//! };
4949
//!
@@ -79,6 +79,7 @@ mod batch;
7979
mod cancel;
8080
mod engine;
8181
mod handle;
82+
mod pending;
8283
mod pipeline;
8384
mod policy;
8485
mod queue;
@@ -88,11 +89,12 @@ mod source;
8889
pub use cancel::{CancelConfirmation, CancelState, CancellationToken};
8990
pub use engine::{OffloadEngine, OffloadEngineBuilder};
9091
pub use handle::{TransferHandle, TransferId, TransferResult, TransferStatus};
92+
pub use pending::{PendingGuard, PendingTracker};
9193
pub use pipeline::{Pipeline, PipelineBuilder, PipelineConfig};
9294
pub use policy::{
9395
AllOfPolicy, AnyOfPolicy, BoxFuture, EvalContext, OffloadPolicy, PassAllPolicy,
9496
PolicyBatchFuture, PolicyFuture, PresenceAndLFUFilter, PresenceFilter, async_batch_result,
95-
async_result, sync_batch_result, sync_result,
97+
async_result, create_policy_from_config, sync_batch_result, sync_result,
9698
};
9799
pub use queue::CancellableQueue;
98100
pub use source::{ExternalBlock, SourceBlock, SourceBlocks};

0 commit comments

Comments
 (0)