Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions lib/kvbm/src/v2/distributed/cohort/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ pub struct TransferOptionsWire {
pub layer_range: Option<Range<usize>>,
/// NIXL write notification value delivered after RDMA write completes
pub nixl_write_notification: Option<u64>,
/// Tensor parallelism rank for distributed transfers
pub tp_rank: Option<u32>,
/// Total number of tensor parallel workers
pub tp_size: Option<u32>,
}

impl From<TransferOptionsWire> for TransferOptions {
Expand All @@ -171,6 +175,8 @@ impl From<TransferOptionsWire> for TransferOptions {
layer_range: wire.layer_range,
nixl_write_notification: wire.nixl_write_notification,
bounce_buffer: None,
tp_rank: wire.tp_rank,
tp_size: wire.tp_size,
}
}
}
Expand Down
24 changes: 4 additions & 20 deletions lib/kvbm/src/v2/distributed/worker/nova/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@ impl WorkerTransfers for NovaWorkerClient {
let event = self.nova.events().new_event()?;
let awaiter = self.nova.events().awaiter(event.handle())?;

// Convert to serializable options
// TODO: Extract bounce buffer handle if present in options.bounce_buffer
let options = SerializableTransferOptions {
layer_range: options.layer_range,
nixl_write_notification: options.nixl_write_notification,
bounce_buffer_handle: None,
bounce_buffer_block_ids: None,
};
// Convert to serializable options (uses From<TransferOptions> impl)
let options: SerializableTransferOptions = options.into();

// Create the message
let message = LocalTransferMessage {
Expand Down Expand Up @@ -85,12 +79,7 @@ impl WorkerTransfers for NovaWorkerClient {
let event = self.nova.events().new_event()?;
let awaiter = self.nova.events().awaiter(event.handle())?;

let options = SerializableTransferOptions {
layer_range: options.layer_range,
nixl_write_notification: options.nixl_write_notification,
bounce_buffer_handle: None,
bounce_buffer_block_ids: None,
};
let options: SerializableTransferOptions = options.into();

let message = RemoteOnboardMessage {
src,
Expand Down Expand Up @@ -135,12 +124,7 @@ impl WorkerTransfers for NovaWorkerClient {
let event = self.nova.events().new_event()?;
let awaiter = self.nova.events().awaiter(event.handle())?;

let options = SerializableTransferOptions {
layer_range: options.layer_range,
nixl_write_notification: options.nixl_write_notification,
bounce_buffer_handle: None,
bounce_buffer_block_ids: None,
};
let options: SerializableTransferOptions = options.into();

let message = RemoteOffloadMessage {
src,
Expand Down
8 changes: 7 additions & 1 deletion lib/kvbm/src/v2/distributed/worker/nova/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ use dynamo_nova::Nova;
use serde::{Deserialize, Serialize};

// Serializable transfer options for remote operations
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Default)]
struct SerializableTransferOptions {
layer_range: Option<std::ops::Range<usize>>,
nixl_write_notification: Option<u64>,
bounce_buffer_handle: Option<LayoutHandle>,
bounce_buffer_block_ids: Option<Vec<BlockId>>,
tp_rank: Option<u32>,
tp_size: Option<u32>,
}

impl From<SerializableTransferOptions> for TransferOptions {
Expand All @@ -64,6 +66,8 @@ impl From<SerializableTransferOptions> for TransferOptions {
nixl_write_notification: opts.nixl_write_notification,
// bounce_buffer requires TransportManager to resolve handle to layout
bounce_buffer: None,
tp_rank: opts.tp_rank,
tp_size: opts.tp_size,
}
}
}
Expand Down Expand Up @@ -94,6 +98,8 @@ impl From<TransferOptions> for SerializableTransferOptions {
nixl_write_notification: opts.nixl_write_notification,
bounce_buffer_handle,
bounce_buffer_block_ids,
tp_rank: opts.tp_rank,
tp_size: opts.tp_size,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions lib/kvbm/src/v2/physical/layout/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ where
// Disk storage needs POSIX for regular I/O OR GDS for GPU direct I/O
agent.has_backend("POSIX") || agent.has_backend("GDS_MT")
}
StorageKind::Object(_) => {
// Object storage is always registered via NIXL's OBJ plugin
agent.has_backend("OBJ")
}
};

if !should_register {
Expand Down Expand Up @@ -752,6 +756,7 @@ fn derive_nixl_metadata(agent: &NixlAgent, entries: &[MemoryEntry]) -> Result<Ni
StorageKind::Pinned => (MemType::Dram, 0),
StorageKind::Device(id) => (MemType::Vram, id as u64),
StorageKind::Disk(id) => (MemType::File, id),
StorageKind::Object(_) => (MemType::Object, 0),
};
Ok(NixlMetadata::new(
agent.name().to_string(),
Expand Down
179 changes: 178 additions & 1 deletion lib/kvbm/src/v2/physical/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::v2::physical::transfer::TransferContext;
use crate::v2::physical::transfer::context::TransferCompleteNotification;
use crate::v2::physical::transfer::executor::TransferOptionsInternal;
use crate::v2::physical::transfer::options::TransferOptions;
use crate::v2::distributed::worker::RemoteDescriptor;
use anyhow::{Result, anyhow, bail};
use dynamo_memory::StorageKind;
use dynamo_memory::{ObjectStorage, StorageKind};
use dynamo_memory::nixl::NixlAgent;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU16, Ordering};
Expand Down Expand Up @@ -253,6 +254,7 @@ impl TransferManager {
layer_range,
nixl_write_notification,
bounce_buffer,
..
} = options;

let mut internal_options = TransferOptionsInternal::builder();
Expand Down Expand Up @@ -284,6 +286,181 @@ impl TransferManager {
)
}

/// Execute a G4 offload (write to object storage).
///
/// Takes a LayoutHandle and a vector of block IDs for the source blocks and
/// a RemoteDescriptor specifying the destination (either a remote layout or object storage).
///
/// For `RemoteDescriptor::Object`, this transfers local blocks to object storage using
/// NIXL's OBJ plugin. Each source block is mapped to a corresponding object key.
///
/// # Arguments
/// * `src_handle` - Handle to the source layout (local)
/// * `src_blocks` - Block IDs to transfer from the source layout
/// * `dst_descriptor` - Destination descriptor (Layout or Object)
/// * `options` - Transfer options (layer range, notifications, etc.)
///
/// # Returns
/// A notification handle that can be awaited for transfer completion
pub fn execute_g4_offload(
&self,
src_handle: LayoutHandle,
src_blocks: &[BlockId],
dst_descriptor: RemoteDescriptor,
options: TransferOptions,
) -> Result<TransferCompleteNotification> {
match dst_descriptor {
RemoteDescriptor::Layout { handle, block_ids } => {
// Delegate to standard layout-to-layout transfer
self.execute_transfer(src_handle, src_blocks, handle, &block_ids, options)
}
RemoteDescriptor::Object { keys } => {
// Validate block count matches key count
if src_blocks.len() != keys.len() {
bail!(
"Block count mismatch: {} source blocks, {} destination keys",
src_blocks.len(),
keys.len()
);
}
// Execute G4 object storage offload
self.execute_object_offload(src_handle, src_blocks, &keys, options)
}
}
}

/// Internal helper to execute object storage offload.
///
/// Transfers blocks from a local layout to object storage using NIXL's OBJ plugin.
/// Assumes source layout is fully contiguous - each block is a single contiguous region.
///
/// The flow is:
/// 1. Register object storage slots with metadata containing the u128 object key
/// 2. Build transfer descriptors using the registered device_ids
/// 3. Execute the transfer
fn execute_object_offload(
&self,
src_handle: LayoutHandle,
src_blocks: &[BlockId],
dst_keys: &[crate::v2::SequenceHash],
options: TransferOptions,
) -> Result<TransferCompleteNotification> {
use dynamo_memory::nixl::{MemType, XferDescList, XferOp};
eprintln!("Executing object offload");
// Get source layout from registry
let src_layout = {
let registry = self.registry.read().unwrap();
registry
.get_layout(src_handle)
.ok_or_else(|| anyhow!("invalid source handle: {}", src_handle))?
.clone()
};

let nixl_agent = self.context.nixl_agent();

// Validate NIXL has OBJ backend loaded
if !nixl_agent.has_backend("OBJ") {
bail!("NIXL OBJ backend not available for object storage transfers");
}

let src_layout_inner = src_layout.layout();
let config = src_layout_inner.config();

// Calculate the size of one contiguous block
// block_size = num_layers * outer_dim * page_size * inner_dim * dtype_width_bytes
let block_size = config.num_layers
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;

// Get source NIXL metadata
let src_metadata = src_layout.nixl_metadata();
let src_mem_type = src_metadata.mem_type();
let src_device_id = src_metadata.device_id();

// Get TP rank for object key generation (default to 0 if not specified)
let tp_rank = options.tp_rank.unwrap_or(0);

// Step 1: Register object storage slots with metadata containing the object key
// The object key format is: "{sequence_hash}_{tp_rank}" to ensure uniqueness
// across both sequences and tensor parallel ranks.
// The OBJ backend stores the mapping: device_id -> object_key (from metaInfo)
// IMPORTANT: We must keep the ObjectStorage and RegistrationHandle alive until
// the transfer completes, as dropping the handle will deregister the memory.
let mut obj_storages = Vec::with_capacity(dst_keys.len());
let mut _registration_handles = Vec::with_capacity(dst_keys.len());
for dst_key in dst_keys.iter() {
// Create object key: "{sequence_hash}_{tp_rank}"
let object_key = format!("{}_{}", dst_key.as_u128(), tp_rank);
let obj_storage = ObjectStorage::new("", &object_key, block_size)
.map_err(|e| anyhow!("Failed to create object storage: {:?}", e))?;
let handle = nixl_agent
.register_memory(&obj_storage, None)
.map_err(|e| anyhow!("Failed to register object storage: {:?}", e))?;
obj_storages.push(obj_storage);
_registration_handles.push(handle);
}

// Step 2: Build transfer descriptor lists
let mut src_dl = XferDescList::new(src_mem_type)?;
let mut dst_dl = XferDescList::new(MemType::Object)?;

// Add one descriptor per block (contiguous memory assumption)
for (&src_block_id, dst_key) in src_blocks.iter().zip(dst_keys.iter()) {
// Get the base address of the block (first layer, first outer dim)
let src_region = src_layout.memory_region(src_block_id, 0, 0)?;

// Use the same object key format as registration
let object_key = format!("{}_{}", dst_key.as_u128(), tp_rank);
let device_id = ObjectStorage::device_id(&object_key);

// Add to source descriptor list - full contiguous block
src_dl.add_desc(src_region.addr(), block_size, src_device_id);

// Add to destination descriptor list
// The OBJ backend will look up the object key from the registration
dst_dl.add_desc(0, block_size, device_id);
}

// Step 3: Create transfer request
// For local plugin transfers (loopback), use the agent's own name
let agent_name = nixl_agent.name();
let xfer_req = nixl_agent.create_xfer_req(
XferOp::Write,
&src_dl,
&dst_dl,
&agent_name,
None,
)?;

// Post transfer request and handle completion
let still_pending = nixl_agent.post_xfer_req(&xfer_req, None)?;

// Registration handles will be dropped here, deregistering the memory.
// This is safe because the transfer has been posted.
drop(_registration_handles);
drop(obj_storages);

if still_pending {
// Register for async completion via status polling
Ok(self.context.register_nixl_status(xfer_req))
} else {
// Transfer completed synchronously
Ok(TransferCompleteNotification::completed())
}
}

pub fn execute_g4_onboard(
&self,
src_descriptors: &[RemoteDescriptor],
dst_handle: LayoutHandle,
dst_blocks: &[BlockId],
options: TransferOptions,
) -> Result<TransferCompleteNotification> {
todo!("implement G4 onboard")
}

// ===== Query Methods =====

/// Get the worker ID for this manager.
Expand Down
3 changes: 3 additions & 0 deletions lib/kvbm/src/v2/physical/transfer/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ fn compute_single_block_checksum(
file.read_exact(&mut system_region)?;
hasher.update(system_region.as_slice());
}
StorageKind::Object(_) => {
unimplemented!("Object storage checksums not yet supported")
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/kvbm/src/v2/physical/transfer/executor/nixl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ impl<'a> NixlTransferBuilder<'a, Set, Set, Set, Set, Set> {
}

// Add to source descriptor list
src_dl.add_desc(src_region.addr(), src_region.size(), src_device_id)?;
src_dl.add_desc(src_region.addr(), src_region.size(), src_device_id);

// Add to destination descriptor list
dst_dl.add_desc(dst_region.addr(), dst_region.size(), dst_device_id)?;
dst_dl.add_desc(dst_region.addr(), dst_region.size(), dst_device_id);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions lib/kvbm/src/v2/physical/transfer/fill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub fn fill_blocks(
file.sync_all()?;
file.flush()?;
}
StorageKind::Object(_) => {
unimplemented!("Object storage fill not yet supported")
}
}
}
}
Expand Down
Loading
Loading