Skip to content

Commit c45deaa

Browse files
authored
Merge branch 'main' into feature/dependency-extraction-DYN-1235
2 parents c421945 + 227846f commit c45deaa

File tree

29 files changed

+880
-431
lines changed

29 files changed

+880
-431
lines changed

docs/guides/run_kvbm_in_trtllm.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ export DYN_KVBM_DISK_CACHE_GB=8
5858
export DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS=1200
5959
```
6060

61+
> [!NOTE]
62+
> When disk offloading is enabled, to extend SSD lifespan, disk offload filtering would be enabled by default. The current policy is only offloading KV blocks from CPU to disk if the blocks have frequency equal or more than `2`. Frequency is determined via doubling on cache hit (init with 1) and decrement by 1 on each time decay step.
63+
>
64+
> To disable disk offload filtering, set `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER` to true or 1.
65+
6166
```bash
6267
# write an example LLM API config
6368
# Note: Disable partial reuse "enable_partial_reuse: false" in the LLM API config’s "kv_connector_config" to increase offloading cache hits.

docs/guides/run_kvbm_in_vllm.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ cd $DYNAMO_HOME/components/backends/vllm
6161
> [!NOTE]
6262
> `DYN_KVBM_CPU_CACHE_GB` must be set and `DYN_KVBM_DISK_CACHE_GB` is optional.
6363
64+
> [!NOTE]
65+
> When disk offloading is enabled, to extend SSD lifespan, disk offload filtering would be enabled by default. The current policy is only offloading KV blocks from CPU to disk if the blocks have frequency equal or more than `2`. Frequency is determined via doubling on cache hit (init with 1) and decrement by 1 on each time decay step.
66+
>
67+
> To disable disk offload filtering, set `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER` to true or 1.
68+
6469
### Sample Request
6570
```bash
6671
# make a request to verify vLLM with KVBM is started up correctly

lib/bindings/python/rust/llm/block_manager.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ use anyhow::Result;
66
use dynamo_llm::block_manager::block::{
77
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
88
};
9+
use dynamo_llm::block_manager::offload::filter::FrequencyFilter;
910
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
11+
1012
use pyo3::PyResult;
13+
use std::time::Duration;
1114
use tokio_util::sync::CancellationToken;
1215

1316
mod controller;
@@ -94,13 +97,34 @@ impl BlockManager {
9497

9598
if leader.num_host_blocks() > 0 {
9699
tracing::info!("Using {} host blocks", leader.num_host_blocks());
97-
config = config.host_layout(
100+
101+
let mut host_layout_config =
98102
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
99103
.num_blocks(leader.num_host_blocks())
100-
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
101-
.build()
102-
.map_err(to_pyerr)?,
103-
);
104+
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded));
105+
106+
if leader.num_disk_blocks() > 0 {
107+
// Check if disk offload filter is disabled via environment variable
108+
let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER")
109+
.map(|v| v == "true" || v == "1")
110+
.unwrap_or(false);
111+
112+
if !disable_filter {
113+
// TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them.
114+
let frequency_filter = FrequencyFilter::new(
115+
2,
116+
Duration::from_secs(600),
117+
1e6 as usize,
118+
cancel_token.child_token(),
119+
rt.inner().runtime().primary().clone(),
120+
)
121+
.map_err(to_pyerr)?;
122+
host_layout_config =
123+
host_layout_config.offload_filter(Some(Arc::new(frequency_filter)));
124+
}
125+
}
126+
127+
config = config.host_layout(host_layout_config.build().map_err(to_pyerr)?);
104128
}
105129

106130
if leader.num_disk_blocks() > 0 {

lib/llm/src/block_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub use block::{
3333
pub use config::*;
3434

3535
pub use layout::{LayoutConfig, LayoutConfigBuilder, LayoutError, LayoutType, nixl::NixlLayout};
36-
pub use offload::request::BlockResult;
36+
pub use offload::{filter::OffloadFilter, request::BlockResult};
3737
pub use pool::{BlockPool, ManagedBlockPool};
3838
pub use storage::{
3939
DeviceStorage, DiskStorage, PinnedStorage, Storage, StorageAllocator,

lib/llm/src/block_manager/block/factory.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub mod logical;
66

77
pub use local::LocalBlockDataFactory;
88

9-
use crate::block_manager::LayoutConfig;
9+
use crate::block_manager::{LayoutConfig, OffloadFilter};
1010

1111
use super::*;
1212

@@ -47,6 +47,9 @@ pub trait BlockFactory<S: Storage, L: LocalityProvider> {
4747

4848
/// Get the layout configuration information
4949
fn layout_config(&self) -> &LayoutConfig;
50+
51+
/// Get the offload filter for this factory
52+
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>>;
5053
}
5154

5255
/// Extension trait for factories that can produce all blocks at once

lib/llm/src/block_manager/block/factory/local.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ pub struct LocalBlockDataFactory<S: Storage> {
88
layout: Arc<dyn BlockLayout<StorageType = S>>,
99
block_set_idx: usize,
1010
worker_id: WorkerID,
11+
offload_filter: Option<Arc<dyn OffloadFilter>>,
1112
}
1213

1314
impl<S: Storage> LocalBlockDataFactory<S> {
1415
pub fn new(
1516
layout: Arc<dyn BlockLayout<StorageType = S>>,
1617
block_set_idx: usize,
1718
worker_id: WorkerID,
19+
offload_filter: Option<Arc<dyn OffloadFilter>>,
1820
) -> Self {
1921
Self {
2022
layout,
2123
block_set_idx,
2224
worker_id,
25+
offload_filter,
2326
}
2427
}
2528
}
@@ -46,6 +49,10 @@ impl<S: Storage> BlockFactory<S, locality::Local> for LocalBlockDataFactory<S> {
4649
fn layout_config(&self) -> &LayoutConfig {
4750
self.layout.config()
4851
}
52+
53+
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>> {
54+
self.offload_filter.clone()
55+
}
4956
}
5057

5158
impl<S: Storage> IntoBlocks<S, locality::Local> for LocalBlockDataFactory<S> {}

lib/llm/src/block_manager/block/factory/logical.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::*;
5-
use crate::block_manager::locality::{Logical, LogicalBlockData, LogicalResources};
5+
use crate::block_manager::{
6+
OffloadFilter,
7+
locality::{Logical, LogicalBlockData, LogicalResources},
8+
};
69

710
#[derive(Debug)]
811
pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> {
@@ -12,6 +15,7 @@ pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> {
1215
resources: Arc<R>,
1316
storage_type: StorageType,
1417
storage: std::marker::PhantomData<S>,
18+
offload_filter: Option<Arc<dyn OffloadFilter>>,
1519
}
1620

1721
impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
@@ -21,6 +25,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
2125
worker_id: WorkerID,
2226
resources: Arc<R>,
2327
storage_type: StorageType,
28+
offload_filter: Option<Arc<dyn OffloadFilter>>,
2429
) -> Self {
2530
Self {
2631
layout_config,
@@ -29,6 +34,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
2934
resources,
3035
storage_type,
3136
storage: std::marker::PhantomData,
37+
offload_filter,
3238
}
3339
}
3440
}
@@ -57,6 +63,10 @@ impl<S: Storage, R: LogicalResources> BlockFactory<S, Logical<R>> for LogicalBlo
5763
fn layout_config(&self) -> &LayoutConfig {
5864
&self.layout_config
5965
}
66+
67+
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>> {
68+
self.offload_filter.clone()
69+
}
6070
}
6171

6272
impl<S: Storage, R: LogicalResources> IntoBlocks<S, Logical<R>> for LogicalBlockFactory<S, R> {}
@@ -89,6 +99,7 @@ mod tests {
8999
TEST_WORKER_ID,
90100
Arc::new(NullResources),
91101
StorageType::Pinned,
102+
None,
92103
);
93104

94105
let block_data = factory.create_block_data(0).unwrap();

lib/llm/src/block_manager/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ pub struct KvManagerLayoutConfig<S: Storage + NixlRegisterableStorage> {
116116
/// The type of block parallelism strategy to use
117117
#[builder(default)]
118118
pub logical: Option<BlockParallelismStrategy>,
119+
120+
/// The offload filter to use (if any).
121+
/// This dictates which blocks will be offloaded to the next-lowest cache level.
122+
#[builder(default = "None")]
123+
pub offload_filter: Option<Arc<dyn OffloadFilter>>,
119124
}
120125

121126
impl<S: Storage + NixlRegisterableStorage> KvManagerLayoutConfig<S> {

lib/llm/src/block_manager/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub enum ResetResponse {
9595
ResetBlocks(ResetBlocksResponse),
9696
}
9797

98-
#[cfg(all(test, feature = "testing-full"))]
98+
#[cfg(all(test, feature = "testing-etcd", feature = "testing-full"))]
9999
mod tests {
100100
use crate::tokens::Tokens;
101101

lib/llm/src/block_manager/offload.rs

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ use super::pool::{BlockPool, BlockPoolError};
4242
use super::storage::{Cuda, Storage};
4343
use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage};
4444
use nixl_sys::Agent as NixlAgent;
45-
use std::sync::Arc;
45+
use std::sync::{
46+
Arc,
47+
atomic::{AtomicU64, Ordering},
48+
};
4649
use tokio::runtime::Handle;
4750
use tokio::sync::{
48-
Mutex,
4951
mpsc::{self, error::TryRecvError},
5052
oneshot,
5153
};
@@ -56,12 +58,16 @@ use std::any::Any;
5658

5759
use std::collections::BTreeSet;
5860

61+
pub mod filter;
5962
mod pending;
6063
pub mod request;
6164

65+
use filter::OffloadFilter;
6266
use pending::{LocalTransferManager, PendingTransfer, TransferBatcher, TransferManager};
6367
use request::{BlockResult, OffloadRequest, OffloadRequestKey, OnboardRequest};
6468

69+
use derive_builder::Builder;
70+
use derive_getters::Getters;
6571
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
6672

6773
pub const MAX_CONCURRENT_TRANSFERS: usize = 4;
@@ -94,16 +100,18 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
94100
mpsc::UnboundedSender<OnboardRequest<DiskStorage, DeviceStorage, Locality, Metadata>>,
95101

96102
/// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first.
97-
tick: Arc<Mutex<u64>>,
103+
tick: Arc<AtomicU64>,
98104
}
99105

100106
impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
101107
OffloadManager<Locality, Metadata>
102108
{
109+
#[allow(clippy::too_many_arguments)]
103110
pub fn new(
104111
disk: Option<Arc<dyn BlockPool<DiskStorage, Locality, Metadata>>>,
105112
host: Option<Arc<dyn BlockPool<PinnedStorage, Locality, Metadata>>>,
106113
device: Option<Arc<dyn BlockPool<DeviceStorage, Locality, Metadata>>>,
114+
filters: OffloadFilters,
107115
config: OffloadManagerConfig,
108116
) -> Result<Arc<Self>> {
109117
let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel();
@@ -120,7 +128,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
120128
host_offload_tx,
121129
host_onboard_tx,
122130
disk_onboard_tx,
123-
tick: Arc::new(Mutex::new(0)),
131+
tick: Arc::new(AtomicU64::new(0)),
124132
});
125133

126134
let cuda_ctx = Cuda::device_or_create(0)?;
@@ -163,6 +171,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
163171
&config.async_rt_handle,
164172
config.cancellation_token.clone(),
165173
)),
174+
filters.device.clone(),
166175
device_metrics.clone(),
167176
config.cancellation_token.clone(),
168177
);
@@ -199,6 +208,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
199208
&config.async_rt_handle,
200209
config.cancellation_token.clone(),
201210
)),
211+
filters.host.clone(),
202212
host_metrics.clone(),
203213
config.cancellation_token.clone(),
204214
);
@@ -276,6 +286,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
276286
target_pool: Option<Arc<dyn BlockPool<Target, Locality, Metadata>>>,
277287
mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Locality, Metadata>>,
278288
transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>,
289+
offload_filter: Option<Arc<dyn OffloadFilter>>,
279290
pool_metrics: Arc<PoolMetrics>,
280291
cancellation_token: CancellationToken,
281292
) -> Result<()> {
@@ -331,6 +342,12 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
331342
continue;
332343
}
333344

345+
if let Some(offload_filter) = offload_filter.as_ref()
346+
&& !offload_filter.should_offload(request.sequence_hash)
347+
{
348+
continue;
349+
}
350+
334351
let target_block = 'target_block: {
335352
if let Ok(blocks) = target_pool.allocate_blocks(1).await
336353
&& let Some(block) = blocks.into_iter().next()
@@ -443,14 +460,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
443460
}
444461
}
445462

446-
let mut tick = self.tick.lock().await;
463+
let tick = self.tick.fetch_add(1, Ordering::Relaxed);
447464
let key = OffloadRequestKey {
448465
priority,
449-
timestamp: *tick,
466+
timestamp: tick,
450467
};
451-
// Increment a counter for each block. Within the same priority, blocks with lower counter values are processed first.
452-
*tick += 1;
453-
drop(tick);
454468

455469
// This can get called by all pools, regardless of whether or not they have a place to offload to.
456470
// Because of this, we need to check the block type here.
@@ -584,6 +598,47 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
584598
}
585599
}
586600

601+
#[derive(Debug, Clone, Getters, Builder)]
602+
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
603+
pub struct OffloadFilters {
604+
#[builder(default)]
605+
device: Option<Arc<dyn OffloadFilter>>,
606+
#[builder(default)]
607+
host: Option<Arc<dyn OffloadFilter>>,
608+
#[builder(default)]
609+
disk: Option<Arc<dyn OffloadFilter>>,
610+
}
611+
612+
impl OffloadFilters {
613+
pub fn builder() -> OffloadFiltersBuilder {
614+
OffloadFiltersBuilder::default()
615+
}
616+
}
617+
618+
impl OffloadFiltersBuilder {
619+
pub fn validate(&self) -> Result<(), String> {
620+
if let Some(disk) = self.disk.as_ref()
621+
&& disk.is_some()
622+
{
623+
return Err("Disk offload filter is not supported.".to_string());
624+
}
625+
626+
let host_is_none = if let Some(host) = self.host.as_ref() {
627+
host.is_none()
628+
} else {
629+
true
630+
};
631+
632+
if host_is_none {
633+
tracing::warn!(
634+
"Host to Disk offload filter is not provided. All blocks in host will be offloaded to disk. This may result in excessive disk offloading and accelerated SSD degradation."
635+
);
636+
}
637+
638+
Ok(())
639+
}
640+
}
641+
587642
#[cfg(all(test, feature = "testing-cuda"))]
588643
mod tests {
589644
use super::*;
@@ -771,6 +826,7 @@ mod tests {
771826
disk_pool.clone(),
772827
host_pool.clone(),
773828
device_pool.clone(),
829+
OffloadFilters::builder().build()?,
774830
config,
775831
)?;
776832

0 commit comments

Comments
 (0)