Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/sglang/srt/entrypoints/EngineBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def generate(
bootstrap_port: Optional[Union[List[int], int]] = None,
bootstrap_room: Optional[Union[List[int], int]] = None,
data_parallel_rank: Optional[int] = None,
decode_dp_rank: Optional[int] = None,
rid: Optional[Union[List[str], str]] = None,
) -> Union[Dict, Iterator[Dict]]:
"""Generate outputs based on given inputs."""
Expand Down
4 changes: 4 additions & 0 deletions python/sglang/srt/entrypoints/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def generate(
bootstrap_port: Optional[Union[List[int], int]] = None,
bootstrap_room: Optional[Union[List[int], int]] = None,
data_parallel_rank: Optional[int] = None,
decode_dp_rank: Optional[int] = None,
rid: Optional[Union[List[str], str]] = None,
) -> Union[Dict, Iterator[Dict]]:
"""
Expand Down Expand Up @@ -219,6 +220,7 @@ def generate(
bootstrap_port=bootstrap_port,
bootstrap_room=bootstrap_room,
data_parallel_rank=data_parallel_rank,
decode_dp_rank=decode_dp_rank,
rid=rid,
)
generator = self.tokenizer_manager.generate_request(obj, None)
Expand Down Expand Up @@ -266,6 +268,7 @@ async def async_generate(
bootstrap_port: Optional[Union[List[int], int]] = None,
bootstrap_room: Optional[Union[List[int], int]] = None,
data_parallel_rank: Optional[int] = None,
decode_dp_rank: Optional[int] = None,
rid: Optional[Union[List[str], str]] = None,
) -> Union[Dict, AsyncIterator[Dict]]:
"""
Expand Down Expand Up @@ -303,6 +306,7 @@ async def async_generate(
bootstrap_port=bootstrap_port,
bootstrap_room=bootstrap_room,
data_parallel_rank=data_parallel_rank,
decode_dp_rank=decode_dp_rank,
rid=rid,
)
generator = self.tokenizer_manager.generate_request(obj, None)
Expand Down
4 changes: 4 additions & 0 deletions python/sglang/srt/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ class ChatCompletionRequest(BaseModel):
bootstrap_port: Optional[Union[List[Optional[int]], int]] = None
bootstrap_room: Optional[Union[List[int], int]] = None

# For data parallel rank routing
data_parallel_rank: Optional[int] = None
decode_dp_rank: Optional[int] = None

# OpenAI/SGLang default sampling parameters
_DEFAULT_SAMPLING_PARAMS = {
"temperature": 1.0,
Expand Down
3 changes: 3 additions & 0 deletions python/sglang/srt/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def _convert_to_internal_request(
bootstrap_host=request.bootstrap_host,
bootstrap_port=request.bootstrap_port,
bootstrap_room=request.bootstrap_room,
# For data parallel rank routing
data_parallel_rank=request.data_parallel_rank,
decode_dp_rank=request.decode_dp_rank,
return_hidden_states=request.return_hidden_states,
rid=request.rid,
extra_key=self._compute_extra_key(request),
Expand Down
25 changes: 20 additions & 5 deletions python/sglang/srt/managers/data_parallel_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,26 @@ def launch_tensor_parallel_group(
self.max_req_input_len = scheduler_info[0]["max_req_input_len"]

def maybe_external_dp_rank_routing(self, req: Req):
if req.data_parallel_rank is not None:
logger.debug(f"Direct routing to DP rank {req.data_parallel_rank}")
self.workers[req.data_parallel_rank].send_pyobj(req)
return True
return False
if self.server_args.disaggregation_mode == "prefill":
if req.data_parallel_rank is not None:
logger.debug(
f"Prefill direct routing to DP rank {req.data_parallel_rank}"
)
self.workers[req.data_parallel_rank].send_pyobj(req)
return True
return False
else:
if req.decode_dp_rank is not None:
logger.debug(f"Decode direct routing to DP rank {req.decode_dp_rank}")
self.workers[req.decode_dp_rank].send_pyobj(req)
return True
if req.data_parallel_rank is not None:
logger.debug(
f"Decode direct routing to DP rank {req.data_parallel_rank}, by data parallel rank"
)
self.workers[req.data_parallel_rank].send_pyobj(req)
return True
return False

def round_robin_scheduler(self, req: Req):
if self.maybe_external_dp_rank_routing(req):
Expand Down
7 changes: 7 additions & 0 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class GenerateReqInput(BaseReq):
# For data parallel rank routing
data_parallel_rank: Optional[int] = None

decode_dp_rank: Optional[int] = None

# For background responses (OpenAI responses API)
background: bool = False

Expand Down Expand Up @@ -620,6 +622,9 @@ def __getitem__(self, i):
data_parallel_rank=(
self.data_parallel_rank if self.data_parallel_rank is not None else None
),
decode_dp_rank=(
self.decode_dp_rank if self.decode_dp_rank is not None else None
),
conversation_id=self.conversation_id,
priority=self.priority,
extra_key=self.extra_key,
Expand Down Expand Up @@ -678,6 +683,8 @@ class TokenizedGenerateReqInput(BaseReq):
# For data parallel rank routing
data_parallel_rank: Optional[int] = None

decode_dp_rank: Optional[int] = None

# Priority for the request
priority: Optional[int] = None

Expand Down
2 changes: 2 additions & 0 deletions python/sglang/srt/managers/schedule_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def __init__(
bootstrap_room: Optional[int] = None,
disagg_mode: Optional[DisaggregationMode] = None,
data_parallel_rank: Optional[int] = None,
decode_dp_rank: Optional[int] = None,
vocab_size: Optional[int] = None,
priority: Optional[int] = None,
metrics_collector: Optional[SchedulerMetricsCollector] = None,
Expand Down Expand Up @@ -664,6 +665,7 @@ def __init__(

# For data parallel rank routing
self.data_parallel_rank: Optional[int] = data_parallel_rank
self.decode_dp_rank: Optional[int] = decode_dp_rank

# the start index of the sent kv cache
# We want to send it chunk by chunk for chunked prefill.
Expand Down
1 change: 1 addition & 0 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,7 @@ def handle_generate_request(
bootstrap_room=recv_req.bootstrap_room,
disagg_mode=self.disaggregation_mode,
data_parallel_rank=recv_req.data_parallel_rank,
decode_dp_rank=recv_req.decode_dp_rank,
vocab_size=self.model_config.vocab_size,
priority=recv_req.priority,
metrics_collector=(
Expand Down
1 change: 1 addition & 0 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ def _create_tokenized_object(
custom_logit_processor=obj.custom_logit_processor,
return_hidden_states=obj.return_hidden_states,
data_parallel_rank=obj.data_parallel_rank,
decode_dp_rank=obj.decode_dp_rank,
priority=obj.priority,
extra_key=obj.extra_key,
)
Expand Down
1 change: 1 addition & 0 deletions sgl-router/benches/request_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn default_generate_request() -> GenerateRequest {
bootstrap_room: None,
bootstrap_pair_key: None,
data_parallel_rank: None,
decode_dp_rank: None,
background: false,
conversation_id: None,
priority: None,
Expand Down
2 changes: 2 additions & 0 deletions sgl-router/bindings/python/sglang_router/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Router:
port: Port number to bind the router server. Default: 3001
worker_startup_timeout_secs: Timeout in seconds for worker startup. Default: 300
worker_startup_check_interval: Interval in seconds between checks for worker initialization. Default: 10
worker_load_check_interval: Interval in seconds between get loads for worker initialization. Default: 10
cache_threshold: Cache threshold (0.0-1.0) for cache-aware routing. Routes to cached worker
if the match rate exceeds threshold, otherwise routes to the worker with the smallest
tree. Default: 0.5
Expand All @@ -88,6 +89,7 @@ class Router:
max_payload_size: Maximum payload size in bytes. Default: 256MB
max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24
dp_aware: Enable data parallelism aware schedule. Default: False
dp_minimum_tokens_scheduler: Enable minimum tokens scheduler for data parallel group. Default: False
enable_igw: Enable IGW (Inference-Gateway) mode for multi-model support. When enabled,
the router can manage multiple models simultaneously with per-model load balancing
policies. Default: False
Expand Down
13 changes: 13 additions & 0 deletions sgl-router/bindings/python/sglang_router/router_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RouterArgs:
decode_policy: Optional[str] = None # Specific policy for decode nodes in PD mode
worker_startup_timeout_secs: int = 600
worker_startup_check_interval: int = 30
worker_load_check_interval: int = 10
cache_threshold: float = 0.3
balance_abs_threshold: int = 64
balance_rel_threshold: float = 1.5
Expand All @@ -36,6 +37,7 @@ class RouterArgs:
max_payload_size: int = 512 * 1024 * 1024 # 512MB default for large batches
bucket_adjust_interval_secs: int = 5
dp_aware: bool = False
dp_minimum_tokens_scheduler: bool = False
enable_igw: bool = False # Enable IGW (Inter-Gateway) mode for multi-model support
api_key: Optional[str] = None
log_dir: Optional[str] = None
Expand Down Expand Up @@ -217,6 +219,12 @@ def add_cli_args(
default=RouterArgs.worker_startup_check_interval,
help="Interval in seconds between checks for worker startup",
)
parser.add_argument(
f"--{prefix}worker-load-check-interval",
type=int,
default=RouterArgs.worker_load_check_interval,
help="Interval in seconds between checks for worker startup",
)
parser.add_argument(
f"--{prefix}cache-threshold",
type=float,
Expand Down Expand Up @@ -264,6 +272,11 @@ def add_cli_args(
action="store_true",
help="Enable data parallelism aware schedule",
)
parser.add_argument(
f"--{prefix}dp-minimum-tokens-scheduler",
action="store_true",
help="Enable minimum tokens scheduler for data parallel group",
)
parser.add_argument(
f"--{prefix}enable-igw",
action="store_true",
Expand Down
10 changes: 10 additions & 0 deletions sgl-router/bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,15 @@ struct Router {
policy: PolicyType,
worker_startup_timeout_secs: u64,
worker_startup_check_interval: u64,
worker_load_check_interval: u64,
cache_threshold: f32,
balance_abs_threshold: usize,
balance_rel_threshold: f32,
eviction_interval_secs: u64,
max_tree_size: usize,
max_payload_size: usize,
dp_aware: bool,
dp_minimum_tokens_scheduler: bool,
api_key: Option<String>,
log_dir: Option<String>,
log_level: Option<String>,
Expand Down Expand Up @@ -342,6 +344,7 @@ impl Router {
.request_timeout_secs(self.request_timeout_secs)
.worker_startup_timeout_secs(self.worker_startup_timeout_secs)
.worker_startup_check_interval_secs(self.worker_startup_check_interval)
.worker_load_check_interval_secs(self.worker_load_check_interval)
.max_concurrent_requests(self.max_concurrent_requests)
.queue_size(self.queue_size)
.queue_timeout_secs(self.queue_timeout_secs)
Expand Down Expand Up @@ -397,6 +400,7 @@ impl Router {
self.client_key_path.as_ref(),
)
.add_ca_certificates(self.ca_cert_paths.clone())
.dp_minimum_tokens_scheduler(self.dp_minimum_tokens_scheduler)
.build()
}
}
Expand All @@ -411,13 +415,15 @@ impl Router {
port = 3001,
worker_startup_timeout_secs = 600,
worker_startup_check_interval = 30,
worker_load_check_interval = 10,
cache_threshold = 0.3,
balance_abs_threshold = 64,
balance_rel_threshold = 1.5,
eviction_interval_secs = 120,
max_tree_size = 2usize.pow(26),
max_payload_size = 512 * 1024 * 1024,
dp_aware = false,
dp_minimum_tokens_scheduler = false,
api_key = None,
log_dir = None,
log_level = None,
Expand Down Expand Up @@ -486,13 +492,15 @@ impl Router {
port: u16,
worker_startup_timeout_secs: u64,
worker_startup_check_interval: u64,
worker_load_check_interval: u64,
cache_threshold: f32,
balance_abs_threshold: usize,
balance_rel_threshold: f32,
eviction_interval_secs: u64,
max_tree_size: usize,
max_payload_size: usize,
dp_aware: bool,
dp_minimum_tokens_scheduler: bool,
api_key: Option<String>,
log_dir: Option<String>,
log_level: Option<String>,
Expand Down Expand Up @@ -574,13 +582,15 @@ impl Router {
policy,
worker_startup_timeout_secs,
worker_startup_check_interval,
worker_load_check_interval,
cache_threshold,
balance_abs_threshold,
balance_rel_threshold,
eviction_interval_secs,
max_tree_size,
max_payload_size,
dp_aware,
dp_minimum_tokens_scheduler,
api_key,
log_dir,
log_level,
Expand Down
5 changes: 4 additions & 1 deletion sgl-router/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ impl AppContextBuilder {
/// Create policy registry
fn with_policy_registry(mut self, config: &RouterConfig) -> Self {
self.policy_registry = Some(Arc::new(PolicyRegistry::new(config.policy.clone())));
if config.dp_minimum_tokens_scheduler {
self.policy_registry.as_ref().unwrap().enable_dp_minimum_tokens_scheduler();
}
self
}

Expand Down Expand Up @@ -457,7 +460,7 @@ impl AppContextBuilder {
.expect("policy_registry must be set")
.clone(),
client.clone(),
config.worker_startup_check_interval_secs,
config.worker_load_check_interval_secs,
)));
self
}
Expand Down
9 changes: 9 additions & 0 deletions sgl-router/src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ impl RouterConfigBuilder {
self
}

pub fn worker_load_check_interval_secs(mut self, interval: u64) -> Self {
self.config.worker_load_check_interval_secs = interval;
self
}
// ==================== Rate Limiting ====================

pub fn max_concurrent_requests(mut self, max: i32) -> Self {
Expand Down Expand Up @@ -434,6 +438,11 @@ impl RouterConfigBuilder {
self
}

pub fn dp_minimum_tokens_scheduler(mut self, enable: bool) -> Self {
self.config.dp_minimum_tokens_scheduler = enable;
self
}

// ==================== Option Setters ====================
// Accept Option<T> and only set if Some

Expand Down
4 changes: 4 additions & 0 deletions sgl-router/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ pub struct RouterConfig {
pub request_timeout_secs: u64,
pub worker_startup_timeout_secs: u64,
pub worker_startup_check_interval_secs: u64,
pub worker_load_check_interval_secs: u64,
pub dp_aware: bool,
pub dp_minimum_tokens_scheduler: bool,
pub api_key: Option<String>,
pub discovery: Option<DiscoveryConfig>,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -471,7 +473,9 @@ impl Default for RouterConfig {
request_timeout_secs: 1800, // 30 minutes
worker_startup_timeout_secs: 600,
worker_startup_check_interval_secs: 30,
worker_load_check_interval_secs: 10,
dp_aware: false,
dp_minimum_tokens_scheduler: false,
api_key: None,
discovery: None,
metrics: None,
Expand Down
Loading
Loading