Skip to content

Commit 77990f4

Browse files
committed
push based publishing
Signed-off-by: PeaBrane <[email protected]>
1 parent 4b7b430 commit 77990f4

File tree

2 files changed

+68
-45
lines changed

2 files changed

+68
-45
lines changed

lib/llm/src/kv_router/scheduler.rs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@ use std::time::Duration;
1414
use tokio::sync::{RwLock, watch};
1515

1616
use super::KV_HIT_RATE_SUBJECT;
17-
use super::KV_METRICS_SUBJECT;
1817
use super::KvRouterConfig;
1918
use super::RouterConfigOverride;
2019
use super::WorkerSelector;
2120
use super::indexer::OverlapScores;
22-
use super::protocols::{ActiveLoad, DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank};
21+
use super::protocols::{DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank};
2322
use super::sequence::{ActiveSequencesMultiWorker, SequenceError};
2423

2524
use crate::tokens::SequenceHash;
@@ -288,47 +287,6 @@ impl KvScheduler {
288287
tracing::trace!("background endpoint subscriber shutting down");
289288
});
290289

291-
// Background task to publish active load metrics every 5 seconds
292-
let slots_metrics = slots.clone();
293-
let ns_metrics = component.namespace().clone();
294-
let metrics_cancel_token = component.drt().child_token();
295-
tokio::spawn(async move {
296-
let mut interval = tokio::time::interval(Duration::from_secs(5));
297-
tracing::trace!("active load metrics publishing task started");
298-
299-
loop {
300-
tokio::select! {
301-
_ = metrics_cancel_token.cancelled() => {
302-
tracing::trace!("active load metrics publishing task shutting down");
303-
break;
304-
}
305-
_ = interval.tick() => {
306-
// Query active tokens and blocks from all workers
307-
// Note: active_tokens is always tracked, but active_blocks may be empty
308-
// if --no-track-active-blocks is set
309-
let active_tokens = slots_metrics.active_tokens().await;
310-
let active_blocks = slots_metrics.active_blocks().await;
311-
312-
// Publish ActiveLoad for each worker/dp_rank (iterate over tokens since always tracked)
313-
for (worker, tokens) in active_tokens.iter() {
314-
let blocks = active_blocks.get(worker).copied();
315-
let active_load = ActiveLoad {
316-
worker_id: worker.worker_id,
317-
dp_rank: worker.dp_rank,
318-
active_decode_blocks: blocks.map(|b| b as u64),
319-
active_prefill_tokens: Some(*tokens as u64),
320-
};
321-
322-
if let Err(e) = ns_metrics.publish(KV_METRICS_SUBJECT, &active_load).await {
323-
tracing::warn!("Failed to publish active load metrics: {:?}", e);
324-
}
325-
}
326-
}
327-
}
328-
}
329-
tracing::trace!("active load metrics publishing task exiting");
330-
});
331-
332290
Ok(KvScheduler { request_tx, slots })
333291
}
334292

lib/llm/src/kv_router/sequence.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ use std::time::Duration;
3838
use tokio::time::Instant;
3939
use uuid::Uuid;
4040

41-
use super::protocols::{ActiveSequenceEvent, ActiveSequenceEventData, WorkerWithDpRank};
42-
use crate::kv_router::ACTIVE_SEQUENCES_SUBJECT;
41+
use super::protocols::{
42+
ActiveLoad, ActiveSequenceEvent, ActiveSequenceEventData, WorkerWithDpRank,
43+
};
44+
use crate::kv_router::{ACTIVE_SEQUENCES_SUBJECT, KV_METRICS_SUBJECT};
4345
use crate::local_model::runtime_config::ModelRuntimeConfig;
4446
use dynamo_runtime::CancellationToken;
4547

@@ -701,6 +703,9 @@ impl ActiveSequencesMultiWorker {
701703
self.request_to_worker.remove(expired_id);
702704
}
703705

706+
// Publish ActiveLoad metrics for this worker
707+
self.publish_active_load_for_worker(worker).await;
708+
704709
Ok(())
705710
}
706711

@@ -744,6 +749,9 @@ impl ActiveSequencesMultiWorker {
744749

745750
self.request_to_worker.remove(request_id);
746751

752+
// Publish ActiveLoad metrics for this worker
753+
self.publish_active_load_for_worker(worker).await;
754+
747755
Ok(())
748756
}
749757

@@ -790,9 +798,66 @@ impl ActiveSequencesMultiWorker {
790798
})
791799
.map_err(|_| SequenceError::WorkerChannelClosed)?;
792800

801+
// Publish ActiveLoad metrics for this worker
802+
self.publish_active_load_for_worker(worker).await;
803+
793804
Ok(())
794805
}
795806

807+
/// Helper method to query a single worker for active blocks/tokens and publish ActiveLoad
808+
async fn publish_active_load_for_worker(&self, worker: WorkerWithDpRank) {
809+
let Some(sender) = self.senders.get(&worker) else {
810+
tracing::warn!("Worker {worker:?} not found when publishing ActiveLoad");
811+
return;
812+
};
813+
814+
// Query active blocks
815+
let (blocks_tx, blocks_rx) = tokio::sync::oneshot::channel();
816+
if sender
817+
.send(UpdateSequences::ActiveBlocks { resp_tx: blocks_tx })
818+
.is_err()
819+
{
820+
tracing::warn!("Failed to send ActiveBlocks query to worker {worker:?}");
821+
return;
822+
}
823+
824+
// Query active tokens
825+
let (tokens_tx, tokens_rx) = tokio::sync::oneshot::channel();
826+
if sender
827+
.send(UpdateSequences::ActiveTokens { resp_tx: tokens_tx })
828+
.is_err()
829+
{
830+
tracing::warn!("Failed to send ActiveTokens query to worker {worker:?}");
831+
return;
832+
}
833+
834+
// Await both responses
835+
let (active_blocks, active_tokens) = match tokio::join!(blocks_rx, tokens_rx) {
836+
(Ok(blocks), Ok(tokens)) => (blocks, tokens),
837+
_ => {
838+
tracing::warn!("Failed to receive active blocks/tokens from worker {worker:?}");
839+
return;
840+
}
841+
};
842+
843+
// Publish ActiveLoad
844+
let active_load = ActiveLoad {
845+
worker_id: worker.worker_id,
846+
dp_rank: worker.dp_rank,
847+
active_decode_blocks: Some(active_blocks as u64),
848+
active_prefill_tokens: Some(active_tokens as u64),
849+
};
850+
851+
if let Err(e) = self
852+
.component
853+
.namespace()
854+
.publish(KV_METRICS_SUBJECT, &active_load)
855+
.await
856+
{
857+
tracing::warn!("Failed to publish ActiveLoad for worker {worker:?}: {e:?}");
858+
}
859+
}
860+
796861
/// Get the number of workers
797862
pub fn num_workers(&self) -> usize {
798863
self.senders.len()

0 commit comments

Comments
 (0)