Skip to content

Commit 0f9211b

Browse files
committed
kvbm: offload engine + policies
Signed-off-by: Ryan Olson <[email protected]>
1 parent 3ee3642 commit 0f9211b

File tree

25 files changed

+2737
-277
lines changed

25 files changed

+2737
-277
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/kvbm/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/kvbm-config/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod cache;
1010
mod discovery;
1111
mod nixl;
1212
mod nova;
13+
mod offload;
1314
mod rayon;
1415
mod tokio;
1516

@@ -19,6 +20,9 @@ pub use discovery::{
1920
};
2021
pub use nixl::NixlConfig;
2122
pub use nova::{NovaBackendConfig, NovaConfig};
23+
pub use offload::{
24+
OffloadConfig, PolicyType, PresenceFilterConfig, PresenceLfuFilterConfig, TierOffloadConfig,
25+
};
2226
pub use rayon::RayonConfig;
2327
pub use tokio::TokioConfig;
2428

@@ -69,6 +73,11 @@ pub struct KvbmConfig {
6973
#[validate(nested)]
7074
#[serde(default)]
7175
pub cache: CacheConfig,
76+
77+
/// Offload policy configuration (G1→G2, G2→G3 transitions).
78+
#[validate(nested)]
79+
#[serde(default)]
80+
pub offload: OffloadConfig,
7281
}
7382

7483
impl KvbmConfig {

lib/kvbm-config/src/offload.rs

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Offload policy configuration for KVBM.
5+
//!
6+
//! Defines configuration for offload policies that control which blocks
7+
//! are transferred between storage tiers (G1→G2, G2→G3).
8+
//!
9+
//! # Policy Types
10+
//!
11+
//! - `pass_all`: No filtering, all blocks pass
12+
//! - `presence`: Skip blocks already present in destination tier
13+
//! - `presence_lfu`: Presence check + LFU count threshold
14+
//!
15+
//! # Configuration
16+
//!
17+
//! Policies are configured per tier transition. Multiple policies in the
18+
//! `policies` list are applied in order with implicit AND logic (all must pass).
19+
//!
20+
//! ## JSON Example
21+
//!
22+
//! ```json
23+
//! {
24+
//! "offload": {
25+
//! "g1_to_g2": {
26+
//! "policies": ["presence"],
27+
//! "presence": {}
28+
//! },
29+
//! "g2_to_g3": {
30+
//! "policies": ["presence_lfu"],
31+
//! "presence_lfu": { "min_lfu_count": 8 }
32+
//! }
33+
//! }
34+
//! }
35+
//! ```
36+
37+
use serde::{Deserialize, Serialize};
38+
use validator::Validate;
39+
40+
/// Policy type enum for serialization.
41+
///
42+
/// Each variant corresponds to a policy implementation in the kvbm crate.
43+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44+
#[serde(rename_all = "snake_case")]
45+
pub enum PolicyType {
46+
/// PassAllPolicy - no filtering, all blocks pass
47+
PassAll,
48+
/// PresenceFilter - skip blocks already in destination tier
49+
Presence,
50+
/// PresenceAndLFUFilter - presence check + LFU threshold
51+
PresenceLfu,
52+
}
53+
54+
/// Configuration for presence filter.
55+
///
56+
/// Currently has no parameters, but the struct exists for future extensibility
57+
/// and to maintain consistent configuration patterns.
58+
#[derive(Debug, Clone, Default, Serialize, Deserialize, Validate)]
59+
pub struct PresenceFilterConfig {}
60+
61+
/// Default LFU count threshold.
62+
fn default_min_lfu_count() -> u32 {
63+
8
64+
}
65+
66+
/// Configuration for presence + LFU filter.
67+
///
68+
/// Combines presence checking with LFU (Least Frequently Used) count threshold.
69+
/// Only blocks with access count above the threshold are offloaded.
70+
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
71+
pub struct PresenceLfuFilterConfig {
72+
/// Minimum LFU count threshold for offload.
73+
///
74+
/// Blocks must have been accessed more than this many times to be
75+
/// considered for offload. This prevents offloading rarely-used blocks.
76+
///
77+
/// Default: 8
78+
#[serde(default = "default_min_lfu_count")]
79+
#[validate(range(min = 1))]
80+
pub min_lfu_count: u32,
81+
}
82+
83+
impl Default for PresenceLfuFilterConfig {
84+
fn default() -> Self {
85+
Self {
86+
min_lfu_count: default_min_lfu_count(),
87+
}
88+
}
89+
}
90+
91+
/// Configuration for a tier transition (e.g., G1→G2, G2→G3).
92+
///
93+
/// Defines which policies to apply when offloading blocks between tiers.
94+
/// Policies are evaluated in order with implicit AND logic - a block must
95+
/// pass ALL policies to be transferred.
96+
#[derive(Debug, Clone, Default, Serialize, Deserialize, Validate)]
97+
pub struct TierOffloadConfig {
98+
/// Ordered list of policies to apply (implicit AND).
99+
///
100+
/// If empty, defaults to pass-all behavior.
101+
/// Policies are evaluated in order; a block must pass all to be transferred.
102+
#[serde(default)]
103+
pub policies: Vec<PolicyType>,
104+
105+
/// Presence filter configuration.
106+
///
107+
/// Used when "presence" is in the policies list.
108+
#[serde(default)]
109+
#[validate(nested)]
110+
pub presence: PresenceFilterConfig,
111+
112+
/// Presence + LFU filter configuration.
113+
///
114+
/// Used when "presence_lfu" is in the policies list.
115+
#[serde(default)]
116+
#[validate(nested)]
117+
pub presence_lfu: PresenceLfuFilterConfig,
118+
}
119+
120+
/// Top-level offload configuration.
121+
///
122+
/// Groups policy configurations for each tier transition.
123+
#[derive(Debug, Clone, Default, Serialize, Deserialize, Validate)]
124+
pub struct OffloadConfig {
125+
/// G1 (GPU) → G2 (Host) offload policies.
126+
#[serde(default)]
127+
#[validate(nested)]
128+
pub g1_to_g2: TierOffloadConfig,
129+
130+
/// G2 (Host) → G3 (Disk) offload policies.
131+
#[serde(default)]
132+
#[validate(nested)]
133+
pub g2_to_g3: TierOffloadConfig,
134+
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::*;
139+
140+
#[test]
141+
fn test_default_config() {
142+
let config = OffloadConfig::default();
143+
assert!(config.g1_to_g2.policies.is_empty());
144+
assert!(config.g2_to_g3.policies.is_empty());
145+
assert_eq!(config.g2_to_g3.presence_lfu.min_lfu_count, 8);
146+
}
147+
148+
#[test]
149+
fn test_policy_type_serde() {
150+
let json = r#"["pass_all", "presence", "presence_lfu"]"#;
151+
let policies: Vec<PolicyType> = serde_json::from_str(json).unwrap();
152+
assert_eq!(policies.len(), 3);
153+
assert_eq!(policies[0], PolicyType::PassAll);
154+
assert_eq!(policies[1], PolicyType::Presence);
155+
assert_eq!(policies[2], PolicyType::PresenceLfu);
156+
157+
// Roundtrip (serde_json doesn't add spaces after commas)
158+
let serialized = serde_json::to_string(&policies).unwrap();
159+
let roundtrip: Vec<PolicyType> = serde_json::from_str(&serialized).unwrap();
160+
assert_eq!(policies, roundtrip);
161+
}
162+
163+
#[test]
164+
fn test_tier_config_serde() {
165+
let json = r#"{
166+
"policies": ["presence_lfu"],
167+
"presence_lfu": { "min_lfu_count": 16 }
168+
}"#;
169+
170+
let config: TierOffloadConfig = serde_json::from_str(json).unwrap();
171+
assert_eq!(config.policies.len(), 1);
172+
assert_eq!(config.policies[0], PolicyType::PresenceLfu);
173+
assert_eq!(config.presence_lfu.min_lfu_count, 16);
174+
}
175+
176+
#[test]
177+
fn test_offload_config_serde() {
178+
let json = r#"{
179+
"g1_to_g2": {
180+
"policies": ["presence"]
181+
},
182+
"g2_to_g3": {
183+
"policies": ["presence_lfu"],
184+
"presence_lfu": { "min_lfu_count": 4 }
185+
}
186+
}"#;
187+
188+
let config: OffloadConfig = serde_json::from_str(json).unwrap();
189+
assert_eq!(config.g1_to_g2.policies, vec![PolicyType::Presence]);
190+
assert_eq!(config.g2_to_g3.policies, vec![PolicyType::PresenceLfu]);
191+
assert_eq!(config.g2_to_g3.presence_lfu.min_lfu_count, 4);
192+
}
193+
194+
#[test]
195+
fn test_default_lfu_threshold() {
196+
let json = r#"{"policies": ["presence_lfu"]}"#;
197+
let config: TierOffloadConfig = serde_json::from_str(json).unwrap();
198+
// Should use default of 8
199+
assert_eq!(config.presence_lfu.min_lfu_count, 8);
200+
}
201+
202+
#[test]
203+
fn test_validation() {
204+
let config = OffloadConfig::default();
205+
assert!(config.validate().is_ok());
206+
207+
let config_with_lfu = OffloadConfig {
208+
g2_to_g3: TierOffloadConfig {
209+
policies: vec![PolicyType::PresenceLfu],
210+
presence_lfu: PresenceLfuFilterConfig { min_lfu_count: 1 },
211+
..Default::default()
212+
},
213+
..Default::default()
214+
};
215+
assert!(config_with_lfu.validate().is_ok());
216+
}
217+
}

lib/kvbm/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ aligned-vec = "0.6.4"
3636
bincode = { version = "2.0.1", features = ["serde", "derive"] }
3737
blake3 = { version = "1" }
3838
bytes = "1.10"
39+
crossbeam-queue = "0.3"
3940
derive-getters = "0.5"
4041
figment = { version = "0.10", features = ["env"] }
4142
lru = "0.16"

lib/kvbm/src/v2/distributed/leader/instance.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,27 @@ impl InstanceLeader {
309309
self.g3_manager.as_ref()
310310
}
311311

312+
/// Get the block registry.
313+
pub fn registry(&self) -> &BlockRegistry {
314+
&self.registry
315+
}
316+
317+
/// Get the tokio runtime handle from Nova.
318+
///
319+
/// This handle should be used for spawning background tasks that need to
320+
/// run on the KVBM runtime's executor (e.g., offload engine pipelines).
321+
pub fn runtime(&self) -> tokio::runtime::Handle {
322+
self.nova.runtime().clone()
323+
}
324+
325+
/// Check if a parallel_worker is configured.
326+
///
327+
/// The parallel_worker is required for local transfer operations
328+
/// (e.g., offloading blocks between tiers).
329+
pub fn has_parallel_worker(&self) -> bool {
330+
self.parallel_worker.is_some()
331+
}
332+
312333
/// Add a remote leader to the search list.
313334
///
314335
/// Remote leaders are queried during `find_matches_with_options` when

lib/kvbm/src/v2/distributed/worker/nova/client.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ impl WorkerTransfers for NovaWorkerClient {
2828
) -> Result<TransferCompleteNotification> {
2929
// Create a single local event for this operation
3030
let event = self.nova.events().new_event()?;
31-
let handle = event.handle();
32-
let awaiter = self.nova.events().awaiter(handle)?;
31+
let awaiter = self.nova.events().awaiter(event.handle())?;
3332

3433
// Convert to serializable options
3534
// TODO: Extract bounce buffer handle if present in options.bounce_buffer
@@ -53,13 +52,13 @@ impl WorkerTransfers for NovaWorkerClient {
5352

5453
// Spawn a task for the remote instance
5554
let nova = self.nova.clone();
56-
let bytes = bytes.clone();
5755
let remote_instance = self.remote;
5856

57+
// Use unary (not am_sync) to wait for transfer completion
5958
self.nova.tracker().spawn_on(
6059
async move {
6160
let result = nova
62-
.am_sync("kvbm.worker.local_transfer")?
61+
.unary("kvbm.worker.local_transfer")?
6362
.raw_payload(bytes)
6463
.instance(remote_instance)
6564
.send()

lib/kvbm/src/v2/distributed/worker/nova/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,38 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! Nova-based RPC implementation for distributed worker communication.
5+
//!
6+
//! # RPC Pattern Guidelines
7+
//!
8+
//! This module uses only two Nova RPC patterns:
9+
//!
10+
//! 1. **`am_send` (fire-and-forget)**: Use when no response is needed.
11+
//! - Client sends message and returns immediately
12+
//! - Handler processes asynchronously, no response sent back
13+
//! - Use `NovaHandler::am_handler` or `am_handler_async`
14+
//!
15+
//! 2. **`unary` (request-response)**: Use when waiting for completion.
16+
//! - Client sends request and awaits response
17+
//! - Handler returns `Ok(Some(Bytes))` or `Ok(None)` which is sent back
18+
//! - Use `NovaHandler::unary_handler` or `unary_handler_async`
19+
//!
20+
//! # Why Not `am_sync`?
21+
//!
22+
//! We avoid `am_sync` due to observed issues where it does not reliably
23+
//! receive completion signals when paired with `am_handler_async`. While
24+
//! `am_sync` should theoretically behave like `unary` (both await completion),
25+
//! in practice pairing `am_sync` client with `am_handler_async` handler caused
26+
//! indefinite blocking during RDMA transfer tests.
27+
//!
28+
//! The root cause appears to be a mismatch in how responses are routed:
29+
//! - `am_handler_async` returns `Result<()>` - the return value is NOT sent back
30+
//! - `unary_handler_async` returns `Result<Option<Bytes>>` - the return value IS sent back
31+
//!
32+
//! Until the `am_sync` completion path is validated, prefer the simpler and
33+
//! more predictable patterns: `am_send` for fire-and-forget, `unary` for
34+
//! request-response.
35+
436
mod client;
537
mod service;
638

lib/kvbm/src/v2/distributed/worker/nova/service.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ impl NovaWorkerService {
6060
fn register_local_transfer_handler(&self) -> Result<()> {
6161
let worker = self.worker.clone();
6262

63-
let handler = NovaHandler::am_handler_async("kvbm.worker.local_transfer", move |ctx| {
63+
// Use unary_handler_async for explicit response (client waits for transfer completion)
64+
let handler = NovaHandler::unary_handler_async("kvbm.worker.local_transfer", move |ctx| {
6465
let worker = worker.clone();
6566

6667
async move {
@@ -85,7 +86,8 @@ impl NovaWorkerService {
8586
// Await the transfer completion
8687
notification.await?;
8788

88-
Ok(())
89+
// Return empty response to signal success
90+
Ok(Some(Bytes::new()))
8991
}
9092
})
9193
.build();

0 commit comments

Comments
 (0)