Skip to content

Commit 4b7b430

Browse files
committed
use u64 for tokens threshold
Signed-off-by: PeaBrane <[email protected]>
1 parent 09c3277 commit 4b7b430

File tree

10 files changed

+67
-109
lines changed

10 files changed

+67
-109
lines changed

components/src/dynamo/frontend/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ def parse_args():
197197
)
198198
parser.add_argument(
199199
"--active-prefill-tokens-threshold",
200-
type=float,
200+
type=int,
201201
default=None,
202-
help="Threshold percentage for determining when a worker is considered busy based on prefill token utilization. Can exceed 1.0 since active prefill tokens include queued tokens. If not set, tokens-based busy detection is disabled.",
202+
help="Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.",
203203
)
204204
parser.add_argument(
205205
"--model-name",

docs/router/kv_cache_routing.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ The main KV-aware routing arguments:
3333

3434
- `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
3535

36-
- `--active-prefill-tokens-threshold`: Threshold for determining when a worker is considered busy based on prefill token utilization. Can exceed 1.0 since active prefill tokens include queued tokens (pending prefill work). If not set, tokens-based busy detection is disabled. When set, the router checks if active prefill tokens exceed `threshold * max_num_batch_tokens`. Generally, set this higher than 1.0 to account for queued requests.
36+
- `--active-prefill-tokens-threshold`: Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.
3737

3838
- `--router-ttl`: Time-to-live in seconds for blocks in the router's local cache predictions. Blocks older than this duration will be automatically expired and removed from the router's radix tree. Defaults to 120.0 seconds when `--no-kv-events` is used. This helps manage memory usage by removing stale cache predictions that are unlikely to be accurate.
3939

@@ -594,8 +594,8 @@ The busy thresholds can be updated at runtime without restarting the frontend. T
594594
# Set both thresholds for a model
595595
curl -X POST http://localhost:8000/busy_threshold \
596596
-H "Content-Type: application/json" \
597-
-d '{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1.5}'
598-
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1.5}
597+
-d '{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}'
598+
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}
599599

600600
# Set only active decode blocks threshold
601601
curl -X POST http://localhost:8000/busy_threshold \
@@ -607,12 +607,12 @@ curl -X POST http://localhost:8000/busy_threshold \
607607
curl -X POST http://localhost:8000/busy_threshold \
608608
-H "Content-Type: application/json" \
609609
-d '{"model": "meta-llama/Llama-2-7b-hf"}'
610-
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1.5}
610+
# Response: {"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}
611611
# Or if not configured: {"model": "...", "active_decode_blocks_threshold": null, "active_prefill_tokens_threshold": null}
612612
```
613613

614614
**List all configured thresholds (GET):**
615615
```bash
616616
curl http://localhost:8000/busy_threshold
617-
# Response: {"thresholds": [{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1.5}]}
617+
# Response: {"thresholds": [{"model": "meta-llama/Llama-2-7b-hf", "active_decode_blocks_threshold": 0.85, "active_prefill_tokens_threshold": 1000}]}
618618
```

lib/bindings/c/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,8 +1033,8 @@ pub async fn create_worker_selection_pipeline_chat(
10331033

10341034
// Create worker monitor if busy_threshold is set
10351035
// Note: C bindings don't register with ModelManager, so HTTP endpoint won't see this
1036-
// C bindings only support active_decode_blocks_threshold for now (active_prefill_tokens_threshold defaults to 1000.0 = effectively disabled)
1037-
let worker_monitor = busy_threshold.map(|t| KvWorkerMonitor::new(client.clone(), t, 1000.0));
1036+
// C bindings only support active_decode_blocks_threshold for now (active_prefill_tokens_threshold defaults to 1000000 tokens = effectively disabled)
1037+
let worker_monitor = busy_threshold.map(|t| KvWorkerMonitor::new(client.clone(), t, 1000000));
10381038

10391039
let engine = build_routed_pipeline::<
10401040
NvCreateChatCompletionRequest,

lib/bindings/python/rust/llm/entrypoint.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ pub struct RouterConfig {
7979
kv_router_config: KvRouterConfig,
8080
/// Threshold for active decode blocks utilization (0.0-1.0)
8181
active_decode_blocks_threshold: Option<f64>,
82-
/// Threshold for active prefill tokens utilization (can exceed 1.0)
83-
active_prefill_tokens_threshold: Option<f64>,
82+
/// Threshold for active prefill tokens utilization (literal token count)
83+
active_prefill_tokens_threshold: Option<u64>,
8484
enforce_disagg: bool,
8585
}
8686

@@ -92,7 +92,7 @@ impl RouterConfig {
9292
mode: RouterMode,
9393
config: Option<KvRouterConfig>,
9494
active_decode_blocks_threshold: Option<f64>,
95-
active_prefill_tokens_threshold: Option<f64>,
95+
active_prefill_tokens_threshold: Option<u64>,
9696
enforce_disagg: bool,
9797
) -> Self {
9898
Self {

lib/llm/src/discovery/model_manager.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ impl ModelManager {
520520

521521
/// Get or set the active prefill tokens threshold for a model's worker monitor.
522522
///
523-
/// The threshold can exceed 1.0 since active prefill tokens include queued tokens.
523+
/// The threshold is a literal token count (not a percentage).
524524
///
525525
/// # Arguments
526526
///
@@ -529,12 +529,12 @@ impl ModelManager {
529529
///
530530
/// # Returns
531531
///
532-
/// The threshold value as f64, or `None` if no monitor exists for this model.
532+
/// The threshold value as u64, or `None` if no monitor exists for this model.
533533
pub fn active_prefill_tokens_threshold(
534534
&self,
535535
model: &str,
536-
threshold: Option<f64>,
537-
) -> Option<f64> {
536+
threshold: Option<u64>,
537+
) -> Option<u64> {
538538
let monitors = self.worker_monitors.read();
539539
let monitor = monitors.get(model)?;
540540

@@ -557,7 +557,7 @@ impl ModelManager {
557557
/// * `model` - The model name
558558
/// * `client` - The client for subscribing to KV metrics (only used if creating new)
559559
/// * `active_decode_blocks_threshold` - The initial/updated active decode blocks threshold value (0.0-1.0)
560-
/// * `active_prefill_tokens_threshold` - The initial/updated active prefill tokens threshold value (can exceed 1.0)
560+
/// * `active_prefill_tokens_threshold` - The initial/updated active prefill tokens threshold value (literal token count)
561561
///
562562
/// # Returns
563563
///
@@ -567,7 +567,7 @@ impl ModelManager {
567567
model: &str,
568568
client: Client,
569569
active_decode_blocks_threshold: f64,
570-
active_prefill_tokens_threshold: f64,
570+
active_prefill_tokens_threshold: u64,
571571
) -> KvWorkerMonitor {
572572
let mut monitors = self.worker_monitors.write();
573573

@@ -594,7 +594,7 @@ impl ModelManager {
594594
/// Lists all models that have worker monitors (and thus busy thresholds) configured.
595595
///
596596
/// Returns a vector of (model_name, active_decode_blocks_threshold, active_prefill_tokens_threshold) tuples.
597-
pub fn list_busy_thresholds(&self) -> Vec<(String, f64, f64)> {
597+
pub fn list_busy_thresholds(&self) -> Vec<(String, f64, u64)> {
598598
self.worker_monitors
599599
.read()
600600
.iter()

lib/llm/src/discovery/watcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,15 +408,15 @@ impl ModelWatcher {
408408
let worker_monitor = if self.router_config.active_decode_blocks_threshold.is_some()
409409
|| self.router_config.active_prefill_tokens_threshold.is_some()
410410
{
411-
// Default thresholds: active_decode_blocks=1.0 (disabled), active_prefill_tokens=1000.0 (effectively disabled)
411+
// Default thresholds: active_decode_blocks=1.0 (disabled), active_prefill_tokens=1000000 (effectively disabled)
412412
let active_decode_blocks = self
413413
.router_config
414414
.active_decode_blocks_threshold
415415
.unwrap_or(1.0);
416416
let active_prefill_tokens = self
417417
.router_config
418418
.active_prefill_tokens_threshold
419-
.unwrap_or(1000.0);
419+
.unwrap_or(1000000);
420420
Some(self.manager.get_or_create_worker_monitor(
421421
card.name(),
422422
client.clone(),

lib/llm/src/discovery/worker_monitor.rs

Lines changed: 20 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,20 @@ use tokio_stream::StreamExt;
1717
/// Scale factor for storing f64 thresholds as u32 (10000 = 4 decimal places)
1818
const THRESHOLD_SCALE: u32 = 10000;
1919

20-
/// Scale factor for storing f64 tokens threshold as u64 (values can exceed 1.0)
21-
const TOKENS_THRESHOLD_SCALE: u64 = 10000;
22-
2320
/// Worker load monitoring state per dp_rank
2421
#[derive(Clone, Debug, Default)]
2522
pub struct WorkerLoadState {
2623
pub active_decode_blocks: HashMap<u32, u64>,
2724
pub kv_total_blocks: HashMap<u32, u64>,
2825
pub active_prefill_tokens: HashMap<u32, u64>,
29-
pub max_num_batch_tokens: HashMap<u32, u64>,
3026
}
3127

3228
impl WorkerLoadState {
3329
/// Returns true if ALL dp_ranks are considered busy based on the dual-threshold logic:
3430
///
3531
/// For each dp_rank:
36-
/// 1. If `active_prefill_tokens` and `max_num_batch_tokens` are both available,
37-
/// check if tokens exceed threshold. If so, that dp_rank is busy.
32+
/// 1. If `active_prefill_tokens` is available, check if tokens exceed the literal threshold.
33+
/// If so, that dp_rank is busy.
3834
/// 2. If not, check if `active_decode_blocks` and `kv_total_blocks` are both available,
3935
/// and if blocks exceed threshold. If so, that dp_rank is busy.
4036
/// 3. If neither check can be performed (missing data), that dp_rank is considered free.
@@ -43,7 +39,7 @@ impl WorkerLoadState {
4339
pub fn is_busy(
4440
&self,
4541
active_decode_blocks_threshold: f64,
46-
active_prefill_tokens_threshold: f64,
42+
active_prefill_tokens_threshold: u64,
4743
) -> bool {
4844
// Get all dp_ranks we know about
4945
let all_dp_ranks: std::collections::HashSet<_> = self
@@ -60,13 +56,9 @@ impl WorkerLoadState {
6056

6157
// Check if ALL dp_ranks are busy
6258
all_dp_ranks.iter().all(|&dp_rank| {
63-
// First check: prefill tokens threshold
64-
// Skip if max_tokens is 0 (no capacity means threshold check is meaningless)
65-
if let (Some(&active_tokens), Some(&max_tokens)) = (
66-
self.active_prefill_tokens.get(&dp_rank),
67-
self.max_num_batch_tokens.get(&dp_rank),
68-
) && max_tokens > 0
69-
&& (active_tokens as f64) > (active_prefill_tokens_threshold * max_tokens as f64)
59+
// First check: prefill tokens threshold (literal token count)
60+
if let Some(&active_tokens) = self.active_prefill_tokens.get(&dp_rank)
61+
&& active_tokens > active_prefill_tokens_threshold
7062
{
7163
return true; // This dp_rank is busy due to tokens
7264
}
@@ -98,7 +90,7 @@ pub struct KvWorkerMonitor {
9890
worker_load_states: Arc<RwLock<HashMap<u64, WorkerLoadState>>>,
9991
/// Active decode blocks threshold stored as parts-per-10000 (e.g., 8500 = 0.85)
10092
active_decode_blocks_threshold: Arc<AtomicU32>,
101-
/// Active prefill tokens threshold stored as parts-per-10000 (can exceed 10000 for values > 1.0)
93+
/// Active prefill tokens threshold stored as literal token count (u64)
10294
active_prefill_tokens_threshold: Arc<AtomicU64>,
10395
/// Guard to ensure start_monitoring() only runs once across clones
10496
started: Arc<AtomicBool>,
@@ -107,15 +99,15 @@ pub struct KvWorkerMonitor {
10799
impl KvWorkerMonitor {
108100
/// Create a new worker monitor with the given thresholds.
109101
///
110-
/// - `active_decode_blocks_threshold` (0.0-1.0): Threshold for KV cache block utilization
111-
/// - `active_prefill_tokens_threshold` (can exceed 1.0): Threshold for prefill token utilization
102+
/// - `active_decode_blocks_threshold` (0.0-1.0): Threshold percentage for KV cache block utilization
103+
/// - `active_prefill_tokens_threshold`: Literal token count threshold for prefill token utilization
112104
///
113105
/// Both thresholds can be dynamically updated via `set_active_decode_blocks_threshold()` and
114106
/// `set_active_prefill_tokens_threshold()`.
115107
pub fn new(
116108
client: Client,
117109
active_decode_blocks_threshold: f64,
118-
active_prefill_tokens_threshold: f64,
110+
active_prefill_tokens_threshold: u64,
119111
) -> Self {
120112
Self {
121113
client,
@@ -124,7 +116,7 @@ impl KvWorkerMonitor {
124116
Self::active_decode_blocks_threshold_to_scaled(active_decode_blocks_threshold),
125117
)),
126118
active_prefill_tokens_threshold: Arc::new(AtomicU64::new(
127-
Self::active_prefill_tokens_threshold_to_scaled(active_prefill_tokens_threshold),
119+
active_prefill_tokens_threshold,
128120
)),
129121
started: Arc::new(AtomicBool::new(false)),
130122
}
@@ -142,18 +134,6 @@ impl KvWorkerMonitor {
142134
scaled as f64 / THRESHOLD_SCALE as f64
143135
}
144136

145-
/// Convert a f64 active prefill tokens threshold (can exceed 1.0) to scaled u64 for atomic storage.
146-
#[inline]
147-
fn active_prefill_tokens_threshold_to_scaled(threshold: f64) -> u64 {
148-
(threshold * TOKENS_THRESHOLD_SCALE as f64) as u64
149-
}
150-
151-
/// Convert a scaled u64 back to f64 active prefill tokens threshold.
152-
#[inline]
153-
fn scaled_to_active_prefill_tokens_threshold(scaled: u64) -> f64 {
154-
scaled as f64 / TOKENS_THRESHOLD_SCALE as f64
155-
}
156-
157137
/// Get the current active decode blocks threshold value as f64.
158138
pub fn active_decode_blocks_threshold(&self) -> f64 {
159139
Self::scaled_to_active_decode_blocks_threshold(
@@ -169,19 +149,15 @@ impl KvWorkerMonitor {
169149
);
170150
}
171151

172-
/// Get the current active prefill tokens threshold value as f64.
173-
pub fn active_prefill_tokens_threshold(&self) -> f64 {
174-
Self::scaled_to_active_prefill_tokens_threshold(
175-
self.active_prefill_tokens_threshold.load(Ordering::Relaxed),
176-
)
152+
/// Get the current active prefill tokens threshold value as u64.
153+
pub fn active_prefill_tokens_threshold(&self) -> u64 {
154+
self.active_prefill_tokens_threshold.load(Ordering::Relaxed)
177155
}
178156

179-
/// Set the active prefill tokens threshold value from f64.
180-
pub fn set_active_prefill_tokens_threshold(&self, threshold: f64) {
181-
self.active_prefill_tokens_threshold.store(
182-
Self::active_prefill_tokens_threshold_to_scaled(threshold),
183-
Ordering::Relaxed,
184-
);
157+
/// Set the active prefill tokens threshold value from u64.
158+
pub fn set_active_prefill_tokens_threshold(&self, threshold: u64) {
159+
self.active_prefill_tokens_threshold
160+
.store(threshold, Ordering::Relaxed);
185161
}
186162

187163
/// Get the worker load states for external access
@@ -244,7 +220,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
244220
let mut states = worker_load_states.write().unwrap();
245221
states.retain(|lease_id, _| runtime_configs.contains_key(lease_id));
246222

247-
// Update worker load states with total blocks and max batch tokens for all dp_ranks
223+
// Update worker load states with total blocks for all dp_ranks
248224
for (lease_id, runtime_config) in runtime_configs.iter() {
249225
let state = states.entry(*lease_id).or_default();
250226

@@ -254,13 +230,6 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
254230
state.kv_total_blocks.insert(dp_rank, total_blocks);
255231
}
256232
}
257-
258-
// Populate max_num_batch_tokens for all dp_ranks
259-
if let Some(max_tokens) = runtime_config.max_num_batched_tokens {
260-
for dp_rank in 0..runtime_config.data_parallel_size {
261-
state.max_num_batch_tokens.insert(dp_rank, max_tokens);
262-
}
263-
}
264233
}
265234
}
266235

@@ -294,9 +263,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
294263
let current_active_decode_blocks_threshold = Self::scaled_to_active_decode_blocks_threshold(
295264
active_decode_blocks_threshold.load(Ordering::Relaxed),
296265
);
297-
let current_active_prefill_tokens_threshold = Self::scaled_to_active_prefill_tokens_threshold(
298-
active_prefill_tokens_threshold.load(Ordering::Relaxed),
299-
);
266+
let current_active_prefill_tokens_threshold = active_prefill_tokens_threshold.load(Ordering::Relaxed);
300267

301268
// Recalculate all busy instances and update
302269
let states = worker_load_states.read().unwrap();

lib/llm/src/entrypoint.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub struct RouterConfig {
2323
pub kv_router_config: KvRouterConfig,
2424
/// Threshold for active decode blocks utilization (0.0-1.0)
2525
pub active_decode_blocks_threshold: Option<f64>,
26-
/// Threshold for active prefill tokens utilization (can exceed 1.0)
27-
pub active_prefill_tokens_threshold: Option<f64>,
26+
/// Threshold for active prefill tokens utilization (literal token count)
27+
pub active_prefill_tokens_threshold: Option<u64>,
2828
pub enforce_disagg: bool,
2929
}
3030

@@ -44,7 +44,7 @@ impl RouterConfig {
4444
self
4545
}
4646

47-
pub fn with_active_prefill_tokens_threshold(mut self, threshold: Option<f64>) -> Self {
47+
pub fn with_active_prefill_tokens_threshold(mut self, threshold: Option<u64>) -> Self {
4848
self.active_prefill_tokens_threshold = threshold;
4949
self
5050
}

0 commit comments

Comments
 (0)