diff --git a/Cargo.lock b/Cargo.lock index 417a3d1d5b..174759fa26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1602,6 +1602,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "console" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b430743a6eb14e9764d4260d4c0d8123087d504eeb9c48f2b2a5e810dd369df4" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width 0.2.2", + "windows-sys 0.61.2", +] + [[package]] name = "console-api" version = "0.8.1" @@ -2370,7 +2383,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658bce805d770f407bc62102fca7c2c64ceef2fbcb2b8bd19d2765ce093980de" dependencies = [ - "console", + "console 0.15.11", "shell-words", "tempfile", "thiserror 1.0.69", @@ -2650,6 +2663,7 @@ dependencies = [ "bytes", "candle-core 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "chrono", + "clap 4.5.52", "criterion 0.3.6", "cudarc", "dashmap 5.5.3", @@ -2671,6 +2685,7 @@ dependencies = [ "hyper 1.8.1", "hyper-util", "image", + "indicatif 0.18.3", "insta", "itertools 0.14.0", "json-five", @@ -4050,8 +4065,8 @@ checksum = "629d8f3bbeda9d148036d6b0de0a3ab947abd08ce90626327fc3547a49d59d97" dependencies = [ "dirs", "futures", + "indicatif 0.17.11", "http 1.4.0", - "indicatif", "libc", "log", "num_cpus", @@ -4672,7 +4687,7 @@ version = "0.17.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" dependencies = [ - "console", + "console 0.15.11", "number_prefix", "portable-atomic", "rayon", @@ -4680,6 +4695,19 @@ dependencies = [ "web-time", ] +[[package]] +name = "indicatif" +version = "0.18.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9375e112e4b463ec1b1c6c011953545c65a30164fbab5b581df32b3abf0dcb88" +dependencies = [ + "console 0.16.1", + "portable-atomic", + "unicode-width 0.2.2", + "unit-prefix", + "web-time", +] + [[package]] name = "inlinable_string" version = "0.1.15" @@ -4734,7 +4762,7 @@ version = "1.44.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5c943d4415edd8153251b6f197de5eb1640e56d84e8d9159bea190421c73698" dependencies = [ - "console", + "console 0.15.11", "globset", "once_cell", "pest", @@ -6135,7 +6163,7 @@ dependencies = [ "http 1.4.0", "image", "indexmap 2.12.1", - "indicatif", + "indicatif 0.17.11", "interprocess", "itertools 0.14.0", "libc", @@ -11546,6 +11574,12 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "unit-prefix" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81e544489bf3d8ef66c953931f56617f423cd4b5494be343d9b9d3dda037b9a3" + [[package]] name = "universal-hash" version = "0.5.1" diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 8bb5533e02..97da4b9c59 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -22,6 +22,7 @@ testing-cuda = ["dep:cudarc"] testing-nixl = ["dep:nixl-sys"] testing-etcd = [] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] +block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"] cuda = ["dep:cudarc"] integration = ["dynamo-runtime/integration"] media-nixl = ["dep:nixl-sys", "dep:dynamo-memory"] @@ -105,6 +106,10 @@ nixl-sys = { version = "=0.7.1", optional = true } cudarc = { workspace = true, optional = true } nix = { version = "0.26", optional = true } +# block_manager_bench +clap = { version = "4.5.49", features = ["derive"], optional = true } +indicatif = { version = "0.18.0", optional = true } + # protocols unicode-segmentation = "1.12" @@ -188,3 +193,8 @@ mockito = "1.7.0" [build-dependencies] tonic-build = { version = "0.13.1" } + +[[bin]] +name = "bench_local_transfer_v2" +path = "bin/bench_local_transfer_v2.rs" +required-features = ["block-manager-bench"] diff --git a/lib/llm/bin/bench_local_transfer_v2.rs b/lib/llm/bin/bench_local_transfer_v2.rs new file mode 100644 index 0000000000..b82a5b89b8 --- /dev/null +++ b/lib/llm/bin/bench_local_transfer_v2.rs @@ -0,0 +1,196 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use clap::Parser; + +use core::time::Duration; +use indicatif::ProgressIterator; +use std::time::Instant; + +use dynamo_llm::block_manager::v2::physical::{ + layout::LayoutConfig, + transfer::{ + BounceBufferSpec, NixlAgent, PhysicalLayout, StorageKind, TransferOptions, + TransportManager, executor::execute_transfer, + }, +}; + +use std::sync::Arc; + +#[derive(Parser)] +struct Args { + /// Amount of layers + #[clap(long, default_value_t = 24)] + num_layers: usize, + + /// Inner dimension + #[clap(long, default_value_t = 4096)] + inner_dim: usize, + + /// Block size + #[clap(long, default_value_t = 32)] + block_size: usize, + + /// Amount of blocks per pool + #[clap(long, default_value_t = 16)] + num_blocks: usize, + + /// Amount of blocks per transferred batch + #[clap(long, default_value_t = 4)] + blocks_per_batch: usize, + + /// Amount of pinned bounce buffer blocks + #[clap(long, default_value_t = 2)] + num_bounce_blocks: usize, + + /// Amount of iterations + #[clap(long, default_value_t = 100)] + iterations: usize, +} + +struct DummyBounceBufferSpec { + pub layout: PhysicalLayout, + pub block_ids: Vec, +} + +impl BounceBufferSpec for DummyBounceBufferSpec { + fn layout(&self) -> &PhysicalLayout { + &self.layout + } + fn block_ids(&self) -> &[usize] { + &self.block_ids + } +} + +#[tokio::main] +pub async fn main() -> Result<()> { + let args = Args::parse(); + + // let manager = build_manager(&args).await?; + + benchmark(&args).await?; + + Ok(()) +} + +fn build_layout( + agent: NixlAgent, + config: LayoutConfig, + storage_kind: StorageKind, +) -> PhysicalLayout { + let builder = PhysicalLayout::builder(agent) + .with_config(config) + .fully_contiguous(); + + match storage_kind { + StorageKind::System => builder.allocate_system().build().unwrap(), + StorageKind::Pinned => builder.allocate_pinned(false).build().unwrap(), + StorageKind::Device(device_id) => builder.allocate_device(device_id).build().unwrap(), + StorageKind::Disk(_) => builder.allocate_disk(None).build().unwrap(), + } +} + +fn get_bandwidth_gbs(latencies: Vec, args: &Args) -> f64 { + let total_bytes = + args.num_layers * args.inner_dim * args.block_size * args.blocks_per_batch * 2; + let mean = latencies.iter().sum::() / latencies.len() as u32; + + total_bytes as f64 / mean.as_nanos() as f64 +} + +async fn benchmark(args: &Args) -> Result<()> { + let agent = NixlAgent::require_backends("test_agent", &["POSIX", "GDS_MT"])?; + let src_dst_config = LayoutConfig::builder() + .num_blocks(args.num_blocks) + .num_layers(args.num_layers) + .outer_dim(2) + .page_size(args.block_size) + .inner_dim(args.inner_dim) + .dtype_width_bytes(2) + .build()?; + + let disk_layout = build_layout(agent.clone(), src_dst_config.clone(), StorageKind::Disk(0)); + let device_layout = build_layout( + agent.clone(), + src_dst_config.clone(), + StorageKind::Device(0), + ); + + let bounce_config = LayoutConfig::builder() + .num_blocks(args.num_bounce_blocks) + .num_layers(args.num_layers) + .outer_dim(2) + .page_size(args.block_size) + .inner_dim(args.inner_dim) + .dtype_width_bytes(2) + .build()?; + + let bounce_layout = build_layout(agent.clone(), bounce_config.clone(), StorageKind::Pinned); + + let ctx = TransportManager::builder() + .worker_id(0) + .nixl_agent(agent) + .cuda_device_id(0) + .build()?; + + let bounce_buffer_spec: Arc = Arc::new(DummyBounceBufferSpec { + layout: bounce_layout, + block_ids: (0..args.num_bounce_blocks).collect(), + }); + + let options = TransferOptions::builder() + .bounce_buffer(bounce_buffer_spec) + .build()?; + + anyhow::ensure!( + args.blocks_per_batch <= args.num_blocks, + "blocks_per_batch must be less than or equal to num_blocks" + ); + let blocks = (0..args.blocks_per_batch).collect::>(); + + for (src, dst, name) in vec![ + (disk_layout.clone(), device_layout.clone(), "disk_to_device"), + (device_layout, disk_layout, "device_to_disk"), + ] { + println!("Starting {} benchmark...", name); + + let mut latencies = Vec::new(); + for _ in (0..args.iterations).progress() { + let options_clone = options.clone(); + let start = Instant::now(); + execute_transfer( + &src, + &dst, + blocks.as_slice(), + blocks.as_slice(), + options_clone, + ctx.context(), + )? + .await?; + let end = Instant::now(); + let duration = end.duration_since(start); + latencies.push(duration); + } + + println!( + "{} bandwidth: {:?} GB/s", + name, + get_bandwidth_gbs(latencies, args) + ); + } + + Ok(()) +} diff --git a/lib/llm/src/block_manager/v2/physical/transfer/executor/mod.rs b/lib/llm/src/block_manager/v2/physical/transfer/executor/mod.rs index a3eeb36379..896956f20b 100644 --- a/lib/llm/src/block_manager/v2/physical/transfer/executor/mod.rs +++ b/lib/llm/src/block_manager/v2/physical/transfer/executor/mod.rs @@ -17,6 +17,7 @@ use anyhow::Result; use std::ops::Range; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::Mutex; // Re-export the NIXL transfer builder for public use pub use nixl::NixlTransferBuilder; @@ -181,6 +182,64 @@ struct TwoHopTransferParams<'a> { ctx: &'a TransferContext, } +#[allow(clippy::too_many_arguments)] +async fn handle_buffered_transfer( + src: &PhysicalLayout, + bounce_layout: &PhysicalLayout, + dst: &PhysicalLayout, + src_block_ids: &[usize], + bounce_block_ids: &[usize], + dst_block_ids: &[usize], + first_strategy: TransferStrategy, + second_strategy: TransferStrategy, + layer_range: &Option>, + ctx: &TransferContext, +) -> Result<()> { + let bounce_groups = + &bounce_block_ids[0..std::cmp::min(src_block_ids.len(), bounce_block_ids.len())]; + let (bounce_group_0, bounce_group_1) = bounce_groups.split_at(bounce_groups.len() / 2); + let bounce_group_0 = bounce_group_0.to_vec(); + let bounce_group_1 = bounce_group_1.to_vec(); + + let src_dst_iter = Arc::new(Mutex::new(src_block_ids.iter().zip(dst_block_ids.iter()))); + + let transfer_task = async move |bounce_group: &[usize]| -> Result<()> { + loop { + let (src_ids, dst_ids): (Vec, Vec); + { + let mut x = src_dst_iter.lock().await; + (src_ids, dst_ids) = x.by_ref().take(bounce_group.len()).unzip(); + if src_ids.is_empty() { + break; + } + } + + execute_two_hop_transfer_chunk( + src, + bounce_layout, + dst, + &src_ids, + &bounce_group[0..src_ids.len()], + &dst_ids, + first_strategy, + second_strategy, + layer_range, + ctx, + ) + .await?; + } + + Ok(()) + }; + + let transfer_0 = transfer_task(&bounce_group_0); + let transfer_1 = transfer_task(&bounce_group_1); + + futures::future::try_join(transfer_0, transfer_1).await?; + + Ok(()) +} + fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result { let TwoHopTransferParams { src, @@ -223,22 +282,26 @@ fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result Result