Skip to content

Commit 32486d5

Browse files
authored
Merge branch 'main' into feat/add-logprobs-support-sglang-backend
2 parents 85ff86c + 3324994 commit 32486d5

File tree

35 files changed

+3025
-256
lines changed

35 files changed

+3025
-256
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/src/dynamo/frontend/main.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,16 @@ def parse_args():
190190
help="Enforce disaggregated prefill-decode. When set, unactivated prefill router will return an error instead of falling back to decode-only mode.",
191191
)
192192
parser.add_argument(
193-
"--busy-threshold",
193+
"--active-decode-blocks-threshold",
194194
type=float,
195195
default=None,
196-
help="Threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache usage. If not set, busy detection is disabled.",
196+
help="Threshold percentage (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. If not set, blocks-based busy detection is disabled.",
197+
)
198+
parser.add_argument(
199+
"--active-prefill-tokens-threshold",
200+
type=int,
201+
default=None,
202+
help="Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.",
197203
)
198204
parser.add_argument(
199205
"--model-name",
@@ -316,7 +322,11 @@ def signal_handler():
316322
"http_port": flags.http_port,
317323
"kv_cache_block_size": flags.kv_cache_block_size,
318324
"router_config": RouterConfig(
319-
router_mode, kv_router_config, flags.busy_threshold, flags.enforce_disagg
325+
router_mode,
326+
kv_router_config,
327+
active_decode_blocks_threshold=flags.active_decode_blocks_threshold,
328+
active_prefill_tokens_threshold=flags.active_prefill_tokens_threshold,
329+
enforce_disagg=flags.enforce_disagg,
320330
),
321331
}
322332

components/src/dynamo/mocker/args.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def create_temp_engine_args_file(args) -> Path:
113113
else None,
114114
"is_prefill": getattr(args, "is_prefill_worker", None),
115115
"is_decode": getattr(args, "is_decode_worker", None),
116+
"enable_local_indexer": getattr(args, "enable_local_indexer", None),
116117
}
117118

118119
# Remove None values to only include explicitly set arguments
@@ -284,6 +285,12 @@ def parse_args():
284285
default=False,
285286
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
286287
)
288+
parser.add_argument(
289+
"--enable-local-indexer",
290+
action="store_true",
291+
default=False,
292+
help="Enable worker-local KV indexer for tracking this worker's own KV cache state (default: False)",
293+
)
287294
parser.add_argument(
288295
"--store-kv",
289296
type=str,

components/src/dynamo/planner/utils/perf_interpolation.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from typing import Optional
2121

2222
import numpy as np
23-
import scipy
2423

2524
from dynamo.runtime.logging import configure_dynamo_logging
2625

@@ -80,6 +79,9 @@ def __init__(
8079
self.min_isl = min(self.prefill_isl)
8180
self.max_isl = max(self.prefill_isl)
8281

82+
# Lazy import scipy only when interpolation is actually needed
83+
import scipy.interpolate
84+
8385
# perform 1d interpolation
8486
self.ttft_interpolator = scipy.interpolate.interp1d(
8587
self.prefill_isl, self.prefill_ttft, kind="cubic"
@@ -151,6 +153,9 @@ def __init__(
151153
self.yi = np.linspace(0, max(self.y_context_length), resolution)
152154
self.X, self.Y = np.meshgrid(self.xi, self.yi)
153155

156+
# Lazy import scipy only when interpolation is actually needed
157+
import scipy.interpolate
158+
154159
# perform 2d interpolation with fallback for NaN values
155160
self.itl_interpolator = scipy.interpolate.griddata(
156161
(self.x_kv_usage, self.y_context_length),

components/src/dynamo/vllm/args.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class Config:
4040
custom_jinja_template: Optional[str] = None
4141
store_kv: str
4242
request_plane: str
43+
enable_local_indexer: bool = False
4344

4445
# mirror vLLM
4546
model: str
@@ -204,6 +205,13 @@ def parse_args() -> Config:
204205
default=os.environ.get("DYN_REQUEST_PLANE", "nats"),
205206
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
206207
)
208+
parser.add_argument(
209+
"--enable-local-indexer",
210+
type=str,
211+
choices=["true", "false"],
212+
default=os.environ.get("DYN_LOCAL_INDEXER", "false"),
213+
help="Enable worker-local KV indexer for tracking this worker's own KV cache state (can also be toggled with env var DYN_LOCAL_INDEXER).",
214+
)
207215
parser.add_argument(
208216
"--use-vllm-tokenizer",
209217
action="store_true",
@@ -214,6 +222,7 @@ def parse_args() -> Config:
214222

215223
parser = AsyncEngineArgs.add_cli_args(parser)
216224
args = parser.parse_args()
225+
args.enable_local_indexer = str(args.enable_local_indexer).lower() == "true"
217226
engine_args = AsyncEngineArgs.from_cli_args(args)
218227

219228
# Workaround for vLLM GIL contention bug with NIXL connector when using UniProcExecutor.
@@ -312,6 +321,7 @@ def parse_args() -> Config:
312321
config.mm_prompt_template = args.mm_prompt_template
313322
config.store_kv = args.store_kv
314323
config.request_plane = args.request_plane
324+
config.enable_local_indexer = args.enable_local_indexer
315325
config.use_vllm_tokenizer = args.use_vllm_tokenizer
316326

317327
# Validate custom Jinja template file exists if provided

components/src/dynamo/vllm/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def setup_kv_event_publisher(
224224
worker_id=generate_endpoint.connection_id(),
225225
kv_block_size=vllm_config.cache_config.block_size,
226226
zmq_endpoint=zmq_endpoint,
227+
enable_local_indexer=config.enable_local_indexer,
227228
)
228229
kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
229230
kv_publishers.append(kv_publisher)
@@ -336,6 +337,7 @@ async def register_vllm_model(
336337
runtime_config.total_kv_blocks = runtime_values["num_gpu_blocks"]
337338
runtime_config.max_num_seqs = runtime_values["max_num_seqs"]
338339
runtime_config.max_num_batched_tokens = runtime_values["max_num_batched_tokens"]
340+
runtime_config.enable_local_indexer = config.enable_local_indexer
339341

340342
# Add tool/reasoning parsers for decode models
341343
if model_type != ModelType.Prefill:

docs/router/kv_cache_routing.md

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ The main KV-aware routing arguments:
3131

3232
- `--no-track-active-blocks`: Disables tracking of active blocks (blocks being used for ongoing generation/decode phases). By default, the router tracks active blocks for load balancing. Disable this when routing to workers that only perform prefill (no decode phase), as tracking decode load is not relevant. This reduces router overhead and simplifies state management.
3333

34-
- `--busy-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache usage. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
34+
- `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
35+
36+
- `--active-prefill-tokens-threshold`: Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.
3537

3638
- `--router-ttl`: Time-to-live in seconds for blocks in the router's local cache predictions. Blocks older than this duration will be automatically expired and removed from the router's radix tree. Defaults to 120.0 seconds when `--no-kv-events` is used. This helps manage memory usage by removing stale cache predictions that are unlikely to be accurate.
3739

@@ -585,28 +587,32 @@ See [KV Router Architecture](../router/README.md) for performance tuning details
585587

586588
## Dynamic Threshold Configuration
587589

588-
The busy threshold can be updated at runtime without restarting the frontend. The frontend exposes HTTP endpoints at `/busy_threshold`:
590+
The busy thresholds can be updated at runtime without restarting the frontend. The frontend exposes HTTP endpoints at `/busy_threshold`:
589591

590-
**Get or set a model's threshold (POST):**
592+
**Get or set a model's thresholds (POST):**
591593
```bash
592-
# Set threshold for a model
594+
# Set both thresholds for a model
593595
curl -X POST http://localhost:8000/busy_threshold \
594596
-H "Content-Type: application/json" \
595-
-d '{"model": "meta-llama/Llama-2-7b-hf", "threshold": 0.85}'
596-
# Response: {"model": "meta-llama/Llama-2-7b-hf", "threshold": 0.85}
597+
-d '{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}'
598+
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}
597599

598-
# Get current threshold (omit threshold field)
600+
# Set only active decode blocks threshold
601+
curl -X POST http://localhost:8000/busy_threshold \
602+
-H "Content-Type: application/json" \
603+
-d '{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85}'
604+
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": <current_value>}
605+
606+
# Get current thresholds (omit threshold fields)
599607
curl -X POST http://localhost:8000/busy_threshold \
600608
-H "Content-Type: application/json" \
601609
-d '{"model": "meta-llama/Llama-2-7b-hf"}'
602-
# Response: {"model": "meta-llama/Llama-2-7b-hf", "threshold": 0.85}
603-
# Or if not configured: {"model": "...", "threshold": null}
610+
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}
611+
# Or if not configured: {"model": "...", "active_decode_blocks_threshold": null, "active_prefill_tokens_threshold": null}
604612
```
605613

606614
**List all configured thresholds (GET):**
607615
```bash
608616
curl http://localhost:8000/busy_threshold
609-
# Response: {"thresholds": [{"model": "meta-llama/Llama-2-7b-hf", "threshold": 0.85}]}
617+
# Response: {"thresholds": [{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}]}
610618
```
611-
612-
This allows you to tune the busy threshold based on observed system behavior without service interruption.

lib/bindings/c/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,9 @@ pub async fn create_worker_selection_pipeline_chat(
966966
let router_config = dynamo_llm::entrypoint::RouterConfig {
967967
router_mode,
968968
kv_router_config: kv_router_config.unwrap_or_default(),
969-
busy_threshold,
969+
// C bindings only support active_decode_blocks_threshold for now (via busy_threshold param)
970+
active_decode_blocks_threshold: busy_threshold,
971+
active_prefill_tokens_threshold: None,
970972
enforce_disagg: false,
971973
};
972974
let watcher = ModelWatcher::new(
@@ -1031,7 +1033,8 @@ pub async fn create_worker_selection_pipeline_chat(
10311033

10321034
// Create worker monitor if busy_threshold is set
10331035
// Note: C bindings don't register with ModelManager, so HTTP endpoint won't see this
1034-
let worker_monitor = busy_threshold.map(|t| KvWorkerMonitor::new(client.clone(), t));
1036+
// C bindings only support active_decode_blocks_threshold for now (active_prefill_tokens_threshold defaults to 1000000 tokens = effectively disabled)
1037+
let worker_monitor = busy_threshold.map(|t| KvWorkerMonitor::new(client.clone(), t, 1000000));
10351038

10361039
let engine = build_routed_pipeline::<
10371040
NvCreateChatCompletionRequest,

lib/bindings/python/rust/llm/entrypoint.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,24 +77,29 @@ impl KvRouterConfig {
7777
pub struct RouterConfig {
7878
router_mode: RouterMode,
7979
kv_router_config: KvRouterConfig,
80-
busy_threshold: Option<f64>,
80+
/// Threshold for active decode blocks utilization (0.0-1.0)
81+
active_decode_blocks_threshold: Option<f64>,
82+
/// Threshold for active prefill tokens utilization (literal token count)
83+
active_prefill_tokens_threshold: Option<u64>,
8184
enforce_disagg: bool,
8285
}
8386

8487
#[pymethods]
8588
impl RouterConfig {
8689
#[new]
87-
#[pyo3(signature = (mode, config=None, busy_threshold=None, enforce_disagg=false))]
90+
#[pyo3(signature = (mode, config=None, active_decode_blocks_threshold=None, active_prefill_tokens_threshold=None, enforce_disagg=false))]
8891
pub fn new(
8992
mode: RouterMode,
9093
config: Option<KvRouterConfig>,
91-
busy_threshold: Option<f64>,
94+
active_decode_blocks_threshold: Option<f64>,
95+
active_prefill_tokens_threshold: Option<u64>,
9296
enforce_disagg: bool,
9397
) -> Self {
9498
Self {
9599
router_mode: mode,
96100
kv_router_config: config.unwrap_or_default(),
97-
busy_threshold,
101+
active_decode_blocks_threshold,
102+
active_prefill_tokens_threshold,
98103
enforce_disagg,
99104
}
100105
}
@@ -105,7 +110,8 @@ impl From<RouterConfig> for RsRouterConfig {
105110
RsRouterConfig {
106111
router_mode: rc.router_mode.into(),
107112
kv_router_config: rc.kv_router_config.inner,
108-
busy_threshold: rc.busy_threshold,
113+
active_decode_blocks_threshold: rc.active_decode_blocks_threshold,
114+
active_prefill_tokens_threshold: rc.active_prefill_tokens_threshold,
109115
enforce_disagg: rc.enforce_disagg,
110116
}
111117
}

lib/bindings/python/rust/llm/kv.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use rs::traits::events::EventSubscriber;
2121
use tracing;
2222

2323
use llm_rs::kv_router::protocols::*;
24-
use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks};
24+
use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks, start_zmq_listener};
2525
use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
2626

2727
#[pyfunction]
@@ -106,6 +106,9 @@ pub struct ZmqKvEventPublisherConfig {
106106
pub zmq_endpoint: String,
107107
#[pyo3(get, set)]
108108
pub zmq_topic: String,
109+
#[pyo3(get, set)]
110+
pub enable_local_indexer: bool, // whether the underlying KvEventPublisher publishes to
111+
// both global and worker-local KvIndexers
109112
}
110113

111114
#[pymethods]
@@ -115,19 +118,22 @@ impl ZmqKvEventPublisherConfig {
115118
worker_id,
116119
kv_block_size,
117120
zmq_endpoint = "tcp://127.0.0.1:5557".to_string(),
118-
zmq_topic = "".to_string()
121+
zmq_topic = "".to_string(),
122+
enable_local_indexer = false
119123
))]
120124
pub fn new(
121125
worker_id: WorkerId,
122126
kv_block_size: usize,
123127
zmq_endpoint: String,
124128
zmq_topic: String,
129+
enable_local_indexer: bool,
125130
) -> Self {
126131
Self {
127132
worker_id,
128133
kv_block_size,
129134
zmq_endpoint,
130135
zmq_topic,
136+
enable_local_indexer,
131137
}
132138
}
133139
}
@@ -141,13 +147,14 @@ pub(crate) struct ZmqKvEventPublisher {
141147
impl ZmqKvEventPublisher {
142148
#[new]
143149
fn new(component: Component, config: ZmqKvEventPublisherConfig) -> PyResult<Self> {
144-
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new(
150+
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new_with_local_indexer(
145151
component.inner,
146152
config.kv_block_size as u32,
147153
Some(KvEventSourceConfig::Zmq {
148154
endpoint: config.zmq_endpoint,
149155
topic: config.zmq_topic,
150156
}),
157+
config.enable_local_indexer,
151158
)
152159
.map_err(to_pyerr)?;
153160
Ok(Self { inner })
@@ -179,7 +186,7 @@ impl ZmqKvEventListener {
179186
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<KvCacheEvent>();
180187
let shutdown_token = tokio_util::sync::CancellationToken::new();
181188

182-
tokio::spawn(llm_rs::kv_router::publisher::start_zmq_listener(
189+
tokio::spawn(start_zmq_listener(
183190
zmq_endpoint,
184191
zmq_topic,
185192
tx,

0 commit comments

Comments
 (0)