From 63844b33a67e22d08cb25d9216f2de5cc24848e3 Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Wed, 10 Dec 2025 14:11:10 -0800 Subject: [PATCH 1/7] test: graceful shutdown for tcp req plane --- .../network/ingress/shared_tcp_endpoint.rs | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index 7ed5ea6e3d..604ad256c9 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -391,3 +391,216 @@ impl super::unified_server::RequestPlaneServer for SharedTcpServer { true } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipeline::error::PipelineError; + use async_trait::async_trait; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::time::Duration; + use tokio::time::Instant; + + /// Mock handler that simulates slow request processing for testing + struct SlowMockHandler { + /// Tracks if a request is currently being processed + request_in_flight: Arc, + /// Notifies when request processing starts + request_started: Arc, + /// Notifies when request processing completes + request_completed: Arc, + /// Duration to simulate request processing + processing_duration: Duration, + } + + impl SlowMockHandler { + fn new(processing_duration: Duration) -> Self { + Self { + request_in_flight: Arc::new(AtomicBool::new(false)), + request_started: Arc::new(Notify::new()), + request_completed: Arc::new(Notify::new()), + processing_duration, + } + } + } + + #[async_trait] + impl PushWorkHandler for SlowMockHandler { + async fn handle_payload(&self, _payload: Bytes) -> Result<(), PipelineError> { + self.request_in_flight.store(true, Ordering::SeqCst); + self.request_started.notify_one(); + + tracing::debug!( + "SlowMockHandler: Request started, sleeping for {:?}", + self.processing_duration + ); + + // Simulate slow request processing + tokio::time::sleep(self.processing_duration).await; + + tracing::debug!("SlowMockHandler: Request completed"); + + self.request_in_flight.store(false, Ordering::SeqCst); + self.request_completed.notify_one(); + Ok(()) + } + + fn add_metrics( + &self, + _endpoint: &crate::component::Endpoint, + _metrics_labels: Option<&[(&str, &str)]>, + ) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_graceful_shutdown_waits_for_inflight_tcp_requests() { + // Initialize tracing for test debugging + let _ = tracing_subscriber::fmt() + .with_test_writer() + .with_max_level(tracing::Level::DEBUG) + .try_init(); + + let cancellation_token = CancellationToken::new(); + let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + // Create SharedTcpServer + let server = SharedTcpServer::new(bind_addr, cancellation_token.clone()); + + // Create a handler that takes 1s to process requests + let handler = Arc::new(SlowMockHandler::new(Duration::from_secs(1))); + let request_started = handler.request_started.clone(); + let request_completed = handler.request_completed.clone(); + let request_in_flight = handler.request_in_flight.clone(); + + // Register endpoint + let endpoint_path = "test_endpoint".to_string(); + let system_health = Arc::new(Mutex::new(SystemHealth::new( + crate::HealthStatus::Ready, + vec![], + "/health".to_string(), + "/live".to_string(), + ))); + + server + .register_endpoint( + endpoint_path.clone(), + handler.clone() as Arc, + 1, + "test_namespace".to_string(), + "test_component".to_string(), + "test_endpoint".to_string(), + system_health, + ) + .await + .expect("Failed to register endpoint"); + + tracing::debug!("Endpoint registered"); + + // Get the endpoint handler to simulate request processing + let endpoint_handler = server + .handlers + .get(&endpoint_path) + .expect("Handler should be registered") + .clone(); + + // Spawn a task that simulates an inflight request + let request_task = tokio::spawn({ + let handler = handler.clone(); + async move { + let payload = Bytes::from("test payload"); + handler.handle_payload(payload).await + } + }); + + // Increment inflight counter manually to simulate the request being tracked + endpoint_handler.inflight.fetch_add(1, Ordering::SeqCst); + + // Wait for request to start processing + tokio::select! { + _ = request_started.notified() => { + tracing::debug!("Request processing started"); + } + _ = tokio::time::sleep(Duration::from_secs(2)) => { + panic!("Timeout waiting for request to start"); + } + } + + // Verify request is in flight + assert!( + request_in_flight.load(Ordering::SeqCst), + "Request should be in flight" + ); + + // Now unregister the endpoint while request is inflight + let unregister_start = Instant::now(); + tracing::debug!("Starting unregister_endpoint with inflight request"); + + // Spawn unregister in a separate task so we can monitor its behavior + let unregister_task = tokio::spawn({ + let server = server.clone(); + let endpoint_path = endpoint_path.clone(); + async move { + server.unregister_endpoint(&endpoint_path, "test_endpoint").await; + Instant::now() + } + }); + + // Give unregister a moment to remove handler and start waiting + tokio::time::sleep(Duration::from_millis(50)).await; + + // Verify that unregister_endpoint hasn't returned yet (it should be waiting) + assert!( + !unregister_task.is_finished(), + "unregister_endpoint should still be waiting for inflight request" + ); + + tracing::debug!("Verified unregister is waiting, now waiting for request to complete"); + + // Wait for the request to complete + tokio::select! { + _ = request_completed.notified() => { + tracing::debug!("Request completed"); + } + _ = tokio::time::sleep(Duration::from_secs(2)) => { + panic!("Timeout waiting for request to complete"); + } + } + + // Decrement inflight counter and notify (simulating what the real code does) + endpoint_handler.inflight.fetch_sub(1, Ordering::SeqCst); + endpoint_handler.notify.notify_one(); + + // Now wait for unregister to complete + let unregister_end = tokio::time::timeout(Duration::from_secs(2), unregister_task) + .await + .expect("unregister_endpoint should complete after inflight request finishes") + .expect("unregister task should not panic"); + + let unregister_duration = unregister_end - unregister_start; + + tracing::debug!("unregister_endpoint completed in {:?}", unregister_duration); + + // Verify unregister_endpoint waited for the inflight request + assert!( + unregister_duration >= Duration::from_secs(1), + "unregister_endpoint should have waited ~1s for inflight request, but only took {:?}", + unregister_duration + ); + + // Verify request completed successfully + assert!( + !request_in_flight.load(Ordering::SeqCst), + "Request should have completed" + ); + + // Wait for request task to finish + request_task + .await + .expect("Request task should complete") + .expect("Request should succeed"); + + tracing::info!("Test passed: unregister_endpoint properly waited for inflight TCP request"); + } +} From b1d15bbda1768b6889504f8a1184de93a75bc85e Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Wed, 10 Dec 2025 14:15:02 -0800 Subject: [PATCH 2/7] fmt --- .../src/pipeline/network/ingress/shared_tcp_endpoint.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index 604ad256c9..2b9d880fc2 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -542,7 +542,9 @@ mod tests { let server = server.clone(); let endpoint_path = endpoint_path.clone(); async move { - server.unregister_endpoint(&endpoint_path, "test_endpoint").await; + server + .unregister_endpoint(&endpoint_path, "test_endpoint") + .await; Instant::now() } }); From b83e955835a1dc8fab7ddebcf90a997aa1a5fa22 Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Thu, 11 Dec 2025 01:26:49 -0800 Subject: [PATCH 3/7] wip --- docs/guides/request_plane.md | 10 +- lib/runtime/src/component/endpoint.rs | 8 +- .../network/ingress/shared_tcp_endpoint.rs | 117 +++++++++++++----- lib/runtime/src/pipeline/network/manager.rs | 51 ++++++-- 4 files changed, 134 insertions(+), 52 deletions(-) diff --git a/docs/guides/request_plane.md b/docs/guides/request_plane.md index 0533477222..9cc4f74131 100644 --- a/docs/guides/request_plane.md +++ b/docs/guides/request_plane.md @@ -108,13 +108,15 @@ export DYN_REQUEST_PLANE=tcp # Optional: Configure TCP server host and port export DYN_TCP_RPC_HOST=0.0.0.0 # Default host -export DYN_TCP_RPC_PORT=9999 # Default port +# export DYN_TCP_RPC_PORT=9999 # Optional: specify a fixed port # Run your Dynamo service DYN_REQUEST_PLANE=tcp python -m dynamo.frontend --http-port=8000 & DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B ``` +**Note:** By default, TCP uses an OS-assigned free port (port 0). This is ideal for environments where multiple services may run on the same machine or when you want to avoid port conflicts. If you need a specific port (e.g., for firewall rules), set `DYN_TCP_RPC_PORT` explicitly. + **When to use TCP:** - Simple deployments with direct service-to-service communication (e.g. frontend to backend) - Minimal infrastructure requirements (no NATS needed) @@ -124,7 +126,7 @@ DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B Additional TCP-specific environment variables: - `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected) -- `DYN_TCP_RPC_PORT`: Server port (default: 9999) +- `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules. - `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB) - `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds) - `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50) @@ -228,7 +230,7 @@ Request plane configuration is loaded from environment variables at startup and 1. Stop your Dynamo services 2. Set environment variable `DYN_REQUEST_PLANE=tcp` -3. Optionally configure TCP-specific settings (`DYN_TCP_RPC_PORT`, etc.) +3. Optionally configure TCP-specific settings (e.g., `DYN_TCP_RPC_HOST`). Note: `DYN_TCP_RPC_PORT` is optional; if not set, an OS-assigned free port is used automatically. 4. Restart your services @@ -279,7 +281,7 @@ curl http://localhost:8000/v1/chat/completions \ **Symptoms:** Server fails to start due to "address already in use" **Solutions:** -- TCP default port: 9999 (adjust environment variable `DYN_TCP_RPC_PORT`) +- TCP: By default, TCP uses an OS-assigned free port, so port conflicts should be rare. If you explicitly set `DYN_TCP_RPC_PORT` to a specific port and get conflicts, either change the port or remove the setting to use automatic port assignment. - HTTP default port: 8888 (adjust environment variable `DYN_HTTP_RPC_PORT`) ## Performance Considerations diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 9e0826d3a1..c55ed85cb3 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -256,10 +256,10 @@ pub fn build_transport_type( } RequestPlaneMode::Tcp => { let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); - let tcp_port = std::env::var("DYN_TCP_RPC_PORT") - .ok() - .and_then(|p| p.parse::().ok()) - .unwrap_or(9999); + // Get the actual TCP port from the global reference (set by TCP server after binding). + // If user explicitly set DYN_TCP_RPC_PORT, that value is used during binding. + // If port was OS-assigned (0), this returns the actual assigned port. + let tcp_port = crate::pipeline::network::manager::get_actual_tcp_rpc_port(); // Include endpoint name for proper TCP routing // TCP client parses this format and adds x-endpoint-path header for server-side routing diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index 2b9d880fc2..c81626e80e 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -36,7 +36,10 @@ fn get_max_message_size() -> usize { /// Shared TCP server that handles multiple endpoints on a single port pub struct SharedTcpServer { handlers: Arc>>, + /// The address to bind to (may have port 0 for OS-assigned port) bind_addr: SocketAddr, + /// The actual bound address (populated after bind_and_start, contains actual port) + actual_addr: std::sync::RwLock>, cancellation_token: CancellationToken, } @@ -56,10 +59,80 @@ impl SharedTcpServer { Arc::new(Self { handlers: Arc::new(DashMap::new()), bind_addr, + actual_addr: std::sync::RwLock::new(None), cancellation_token, }) } + /// Bind the server and start accepting connections. + /// + /// This method binds to the configured address first, then starts the accept loop. + /// If the configured port is 0, the OS will assign a free port. + /// The actual bound address is stored and can be retrieved via `actual_address()`. + /// + /// Returns the actual bound address (useful when port 0 was specified). + pub async fn bind_and_start(self: Arc) -> Result { + tracing::info!("Binding TCP server to {}", self.bind_addr); + + let listener = TcpListener::bind(&self.bind_addr).await?; + let actual_addr = listener.local_addr()?; + + tracing::info!( + requested = %self.bind_addr, + actual = %actual_addr, + "TCP server bound successfully" + ); + + // Store the actual bound address + *self.actual_addr.write().unwrap() = Some(actual_addr); + + // Start accepting connections in a background task + let server = self.clone(); + tokio::spawn(async move { + server.accept_loop(listener).await; + }); + + Ok(actual_addr) + } + + /// Get the actual bound address (after bind_and_start has been called). + /// + /// Returns None if the server hasn't been started yet. + pub fn actual_address(&self) -> Option { + *self.actual_addr.read().unwrap() + } + + /// Internal accept loop - runs after binding + async fn accept_loop(self: Arc, listener: TcpListener) { + let cancellation_token = self.cancellation_token.clone(); + + loop { + tokio::select! { + accept_result = listener.accept() => { + match accept_result { + Ok((stream, peer_addr)) => { + tracing::trace!("Accepted TCP connection from {}", peer_addr); + + let handlers = self.handlers.clone(); + tokio::spawn(async move { + if let Err(e) = Self::handle_connection(stream, handlers).await { + tracing::debug!("TCP connection error: {}", e); + } + }); + } + Err(e) => { + tracing::error!("Failed to accept TCP connection: {}", e); + } + } + } + _ = cancellation_token.cancelled() => { + tracing::info!("SharedTcpServer received cancellation signal, shutting down"); + return; + } + } + } + } + #[allow(clippy::too_many_arguments)] pub async fn register_endpoint( &self, @@ -129,37 +202,16 @@ impl SharedTcpServer { } } + /// Start the server (legacy method - prefer bind_and_start for new code). + /// + /// This method is kept for backwards compatibility. It binds and starts + /// the server but doesn't return the actual bound address. pub async fn start(self: Arc) -> Result<()> { - tracing::info!("Starting shared TCP server on {}", self.bind_addr); - - let listener = TcpListener::bind(&self.bind_addr).await?; - let cancellation_token = self.cancellation_token.clone(); - - loop { - tokio::select! { - accept_result = listener.accept() => { - match accept_result { - Ok((stream, peer_addr)) => { - tracing::trace!("Accepted TCP connection from {}", peer_addr); - - let handlers = self.handlers.clone(); - tokio::spawn(async move { - if let Err(e) = Self::handle_connection(stream, handlers).await { - tracing::debug!("TCP connection error: {}", e); - } - }); - } - Err(e) => { - tracing::error!("Failed to accept TCP connection: {}", e); - } - } - } - _ = cancellation_token.cancelled() => { - tracing::info!("SharedTcpServer received cancellation signal, shutting down"); - return Ok(()); - } - } - } + let cancel_token = self.cancellation_token.clone(); + self.bind_and_start().await?; + // Wait for cancellation (the accept loop runs in background) + cancel_token.cancelled().await; + Ok(()) } async fn handle_connection( @@ -378,7 +430,10 @@ impl super::unified_server::RequestPlaneServer for SharedTcpServer { } fn address(&self) -> String { - format!("tcp://{}:{}", self.bind_addr.ip(), self.bind_addr.port()) + // Return actual bound address if available (after bind_and_start), + // otherwise fall back to configured bind address + let addr = self.actual_address().unwrap_or(self.bind_addr); + format!("tcp://{}:{}", addr.ip(), addr.port()) } fn transport_name(&self) -> &'static str { diff --git a/lib/runtime/src/pipeline/network/manager.rs b/lib/runtime/src/pipeline/network/manager.rs index 4a85809890..ff1726c9dc 100644 --- a/lib/runtime/src/pipeline/network/manager.rs +++ b/lib/runtime/src/pipeline/network/manager.rs @@ -19,8 +19,25 @@ use crate::distributed::RequestPlaneMode; use anyhow::Result; use async_once_cell::OnceCell; use std::sync::Arc; +use std::sync::OnceLock; use tokio_util::sync::CancellationToken; +/// Global storage for the actual TCP RPC port after binding. +/// Uses OnceLock since the port is set once when the server binds and never changes. +static ACTUAL_TCP_RPC_PORT: OnceLock = OnceLock::new(); + +/// Get the actual TCP RPC port that the server is listening on. +/// Returns 0 if the TCP server hasn't been started yet. +pub fn get_actual_tcp_rpc_port() -> u16 { + ACTUAL_TCP_RPC_PORT.get().copied().unwrap_or(0) +} + +/// Set the actual TCP RPC port (called internally after server binds). +fn set_actual_tcp_rpc_port(port: u16) { + // Ignore if already set (shouldn't happen in normal operation) + let _ = ACTUAL_TCP_RPC_PORT.set(port); +} + /// Network configuration loaded from environment variables #[derive(Clone)] struct NetworkConfig { @@ -31,7 +48,8 @@ struct NetworkConfig { // TCP server configuration tcp_host: String, - tcp_port: u16, + /// TCP port to bind to. If None, the OS will assign a free port. + tcp_port: Option, // HTTP client configuration http_client_config: super::egress::http_router::Http2Config, @@ -60,12 +78,12 @@ impl NetworkConfig { .unwrap_or_else(|_| "/v1/rpc".to_string()), // TCP server configuration + // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port tcp_host: std::env::var("DYN_TCP_RPC_HOST") .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()), tcp_port: std::env::var("DYN_TCP_RPC_PORT") .ok() - .and_then(|p| p.parse().ok()) - .unwrap_or(9999), + .and_then(|p| p.parse().ok()), // HTTP client configuration (reads DYN_HTTP2_* env vars) http_client_config: super::egress::http_router::Http2Config::from_env(), @@ -143,8 +161,8 @@ impl NetworkManager { tracing::info!( %mode, http_port = config.http_port, - tcp_port = config.tcp_port, - "Initializing NetworkManager" + tcp_port = ?config.tcp_port, + "Initializing NetworkManager (tcp_port=None means OS will assign free port)" ); Self { @@ -250,24 +268,31 @@ impl NetworkManager { async fn create_tcp_server(&self) -> Result> { use super::ingress::shared_tcp_endpoint::SharedTcpServer; - let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port) + // Use configured port if specified, otherwise use port 0 (OS assigns free port) + let port = self.config.tcp_port.unwrap_or(0); + let bind_addr = format!("{}:{}", self.config.tcp_host, port) .parse() .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?; tracing::info!( bind_addr = %bind_addr, + port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" }, "Creating TCP request plane server" ); let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone()); - // Start server in background - let server_clone = server.clone(); - tokio::spawn(async move { - if let Err(e) = server_clone.start().await { - tracing::error!("TCP request plane server error: {}", e); - } - }); + // Bind and start server, getting the actual bound address + let actual_addr = server.clone().bind_and_start().await?; + + // Store the actual bound port globally so build_transport_type() can access it + set_actual_tcp_rpc_port(actual_addr.port()); + + tracing::info!( + actual_addr = %actual_addr, + actual_port = actual_addr.port(), + "TCP request plane server started" + ); Ok(server as Arc) } From cc2cda426220e071502b7b49fb8647a59c16bcbe Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Thu, 11 Dec 2025 13:42:35 -0800 Subject: [PATCH 4/7] update --- lib/llm/src/discovery/model_manager.rs | 2 +- lib/runtime/src/component/endpoint.rs | 70 ++++++++++--------- .../network/ingress/shared_tcp_endpoint.rs | 6 +- lib/runtime/src/pipeline/network/manager.rs | 56 ++++++++++++--- 4 files changed, 87 insertions(+), 47 deletions(-) diff --git a/lib/llm/src/discovery/model_manager.rs b/lib/llm/src/discovery/model_manager.rs index 6ba4afda14..c3ab41f787 100644 --- a/lib/llm/src/discovery/model_manager.rs +++ b/lib/llm/src/discovery/model_manager.rs @@ -335,7 +335,7 @@ impl ModelManager { // Build transport for router endpoint based on request plane mode // Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component let router_endpoint_id = router_endpoint_id(endpoint.id().namespace); - let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id); + let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id)?; let discovery_spec = DiscoverySpec::Endpoint { namespace: router_endpoint_id.namespace.clone(), diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index c55ed85cb3..c77c5161fc 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -89,30 +89,6 @@ impl EndpointConfigBuilder { let request_plane_mode = endpoint.drt().request_plane(); tracing::info!("Endpoint starting with request plane mode: {request_plane_mode}",); - // Register health check target in SystemHealth if provided - if let Some(health_check_payload) = &health_check_payload { - // Build transport based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id); - - let instance = Instance { - component: endpoint_id.component.clone(), - endpoint: endpoint_id.name.clone(), - namespace: endpoint_id.namespace.clone(), - instance_id: connection_id, - transport, - }; - tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target"); - let guard = system_health.lock(); - guard.register_health_check_target( - &endpoint.name, - instance, - health_check_payload.clone(), - ); - if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) { - handler.set_endpoint_health_check_notifier(notifier)?; - } - } - // Register with graceful shutdown tracker if needed if graceful_shutdown { tracing::debug!( @@ -137,9 +113,33 @@ impl EndpointConfigBuilder { let component_name_for_task = endpoint_id.component.clone(); let endpoint_name_for_task = endpoint_id.name.clone(); - // Get the unified request plane server (works for all transport types) + // Get the unified request plane server let server = endpoint.drt().request_plane_server().await?; + // Register health check target in SystemHealth if provided + if let Some(health_check_payload) = &health_check_payload { + // Build transport based on request plane mode + let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id)?; + + let instance = Instance { + component: endpoint_id.component.clone(), + endpoint: endpoint_id.name.clone(), + namespace: endpoint_id.namespace.clone(), + instance_id: connection_id, + transport, + }; + tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target"); + let guard = system_health.lock(); + guard.register_health_check_target( + &endpoint.name, + instance, + health_check_payload.clone(), + ); + if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) { + handler.set_endpoint_health_check_notifier(notifier)?; + } + } + tracing::info!( endpoint = %endpoint_name_for_task, transport = server.transport_name(), @@ -198,7 +198,7 @@ impl EndpointConfigBuilder { let discovery = endpoint.drt().discovery(); // Build transport for discovery service based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id); + let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id)?; let discovery_spec = crate::discovery::DiscoverySpec::Endpoint { namespace: endpoint_id.namespace.clone(), @@ -232,11 +232,14 @@ impl EndpointConfigBuilder { /// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name) /// - TCP: Includes endpoint name for routing (e.g., host:port/endpoint_name) /// - NATS: Uses subject-based addressing (unique per endpoint) +/// +/// # Errors +/// Returns an error if TCP mode is used but the TCP server hasn't been started yet. pub fn build_transport_type( mode: RequestPlaneMode, endpoint_id: &EndpointId, connection_id: u64, -) -> TransportType { +) -> Result { match mode { RequestPlaneMode::Http => { let http_host = crate::utils::get_http_rpc_host_from_env(); @@ -252,23 +255,24 @@ pub fn build_transport_type( endpoint_id.name ); - TransportType::Http(http_endpoint) + Ok(TransportType::Http(http_endpoint)) } RequestPlaneMode::Tcp => { let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); // Get the actual TCP port from the global reference (set by TCP server after binding). // If user explicitly set DYN_TCP_RPC_PORT, that value is used during binding. // If port was OS-assigned (0), this returns the actual assigned port. - let tcp_port = crate::pipeline::network::manager::get_actual_tcp_rpc_port(); + let tcp_port = crate::pipeline::network::manager::get_actual_tcp_rpc_port()?; // Include endpoint name for proper TCP routing // TCP client parses this format and adds x-endpoint-path header for server-side routing let tcp_endpoint = format!("{}:{}/{}", tcp_host, tcp_port, endpoint_id.name); - TransportType::Tcp(tcp_endpoint) - } - RequestPlaneMode::Nats => { - TransportType::Nats(nats::instance_subject(endpoint_id, connection_id)) + Ok(TransportType::Tcp(tcp_endpoint)) } + RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject( + endpoint_id, + connection_id, + ))), } } diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index c81626e80e..4301726424 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -58,7 +58,9 @@ impl SharedTcpServer { pub fn new(bind_addr: SocketAddr, cancellation_token: CancellationToken) -> Arc { Arc::new(Self { handlers: Arc::new(DashMap::new()), + // address we requested to bind to. bind_addr, + // actual address after free port assignment (if DYN_TCP_RPC_PORT is not specified) actual_addr: std::sync::RwLock::new(None), cancellation_token, }) @@ -116,7 +118,7 @@ impl SharedTcpServer { let handlers = self.handlers.clone(); tokio::spawn(async move { if let Err(e) = Self::handle_connection(stream, handlers).await { - tracing::debug!("TCP connection error: {}", e); + tracing::error!("TCP connection error: {}", e); } }); } @@ -166,7 +168,7 @@ impl SharedTcpServer { tracing::info!( "Registered endpoint '{}' with shared TCP server on {}", endpoint_name, - self.bind_addr + self.actual_address().unwrap_or(self.bind_addr) ); Ok(()) diff --git a/lib/runtime/src/pipeline/network/manager.rs b/lib/runtime/src/pipeline/network/manager.rs index ff1726c9dc..c0d952721d 100644 --- a/lib/runtime/src/pipeline/network/manager.rs +++ b/lib/runtime/src/pipeline/network/manager.rs @@ -27,15 +27,26 @@ use tokio_util::sync::CancellationToken; static ACTUAL_TCP_RPC_PORT: OnceLock = OnceLock::new(); /// Get the actual TCP RPC port that the server is listening on. -/// Returns 0 if the TCP server hasn't been started yet. -pub fn get_actual_tcp_rpc_port() -> u16 { - ACTUAL_TCP_RPC_PORT.get().copied().unwrap_or(0) +pub fn get_actual_tcp_rpc_port() -> anyhow::Result { + ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| { + tracing::error!( + "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()" + ); + anyhow::anyhow!( + "TCP RPC port not initialized. This is not expected." + ) + }) } /// Set the actual TCP RPC port (called internally after server binds). fn set_actual_tcp_rpc_port(port: u16) { - // Ignore if already set (shouldn't happen in normal operation) - let _ = ACTUAL_TCP_RPC_PORT.set(port); + if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) { + tracing::warn!( + existing_port = existing, + new_port = port, + "TCP RPC port already set, ignoring new value" + ); + } } /// Network configuration loaded from environment variables @@ -158,12 +169,35 @@ impl NetworkManager { ) -> Self { let config = NetworkConfig::from_env(nats_client); - tracing::info!( - %mode, - http_port = config.http_port, - tcp_port = ?config.tcp_port, - "Initializing NetworkManager (tcp_port=None means OS will assign free port)" - ); + match mode { + RequestPlaneMode::Http => { + tracing::info!( + %mode, + host = %config.http_host, + port = config.http_port, + rpc_root = %config.http_rpc_root, + "Initializing NetworkManager with HTTP request plane" + ); + } + RequestPlaneMode::Tcp => { + let port_display = config + .tcp_port + .map(|p| p.to_string()) + .unwrap_or_else(|| "OS-assigned".to_string()); + tracing::info!( + %mode, + host = %config.tcp_host, + port = %port_display, + "Initializing NetworkManager with TCP request plane" + ); + } + RequestPlaneMode::Nats => { + tracing::info!( + %mode, + "Initializing NetworkManager with NATS request plane" + ); + } + } Self { mode, From ecef6288fdf14cb11ca144ef68436251341cdb52 Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Thu, 11 Dec 2025 14:10:37 -0800 Subject: [PATCH 5/7] handle model manager init sequence --- lib/llm/src/discovery/model_manager.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/llm/src/discovery/model_manager.rs b/lib/llm/src/discovery/model_manager.rs index c3ab41f787..45f3786483 100644 --- a/lib/llm/src/discovery/model_manager.rs +++ b/lib/llm/src/discovery/model_manager.rs @@ -332,6 +332,11 @@ impl ModelManager { let instance_id = discovery.instance_id(); let request_plane_mode = endpoint.drt().request_plane(); + // Ensure request plane server is initialized before building transport. + // For TCP mode, this sets the actual port (which may be OS-assigned). + // This must happen before build_transport_type() which reads the port. + let _ = endpoint.drt().request_plane_server().await?; + // Build transport for router endpoint based on request plane mode // Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component let router_endpoint_id = router_endpoint_id(endpoint.id().namespace); From 52e0ad5b2538581398dfbc8e33d06dfa4e73fb10 Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Thu, 11 Dec 2025 14:11:11 -0800 Subject: [PATCH 6/7] update --- .../pipeline/network/ingress/shared_tcp_endpoint.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index 4301726424..c7f16999bb 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -11,7 +11,7 @@ use crate::pipeline::network::PushWorkHandler; use anyhow::Result; use bytes::Bytes; use dashmap::DashMap; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; @@ -39,7 +39,7 @@ pub struct SharedTcpServer { /// The address to bind to (may have port 0 for OS-assigned port) bind_addr: SocketAddr, /// The actual bound address (populated after bind_and_start, contains actual port) - actual_addr: std::sync::RwLock>, + actual_addr: RwLock>, cancellation_token: CancellationToken, } @@ -61,7 +61,7 @@ impl SharedTcpServer { // address we requested to bind to. bind_addr, // actual address after free port assignment (if DYN_TCP_RPC_PORT is not specified) - actual_addr: std::sync::RwLock::new(None), + actual_addr: RwLock::new(None), cancellation_token, }) } @@ -86,7 +86,7 @@ impl SharedTcpServer { ); // Store the actual bound address - *self.actual_addr.write().unwrap() = Some(actual_addr); + *self.actual_addr.write() = Some(actual_addr); // Start accepting connections in a background task let server = self.clone(); @@ -101,7 +101,7 @@ impl SharedTcpServer { /// /// Returns None if the server hasn't been started yet. pub fn actual_address(&self) -> Option { - *self.actual_addr.read().unwrap() + *self.actual_addr.read() } /// Internal accept loop - runs after binding From e02bb11240b53747ba48d370e0e60569286e1c90 Mon Sep 17 00:00:00 2001 From: Biswa Panda Date: Thu, 11 Dec 2025 15:05:44 -0800 Subject: [PATCH 7/7] refactor --- lib/llm/src/discovery/model_manager.rs | 8 +---- lib/runtime/src/component/endpoint.rs | 44 ++++++++++++++++++++++---- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/lib/llm/src/discovery/model_manager.rs b/lib/llm/src/discovery/model_manager.rs index 45f3786483..b6b998b230 100644 --- a/lib/llm/src/discovery/model_manager.rs +++ b/lib/llm/src/discovery/model_manager.rs @@ -330,17 +330,11 @@ impl ModelManager { // Register router via discovery mechanism let discovery = endpoint.component().drt().discovery(); let instance_id = discovery.instance_id(); - let request_plane_mode = endpoint.drt().request_plane(); - - // Ensure request plane server is initialized before building transport. - // For TCP mode, this sets the actual port (which may be OS-assigned). - // This must happen before build_transport_type() which reads the port. - let _ = endpoint.drt().request_plane_server().await?; // Build transport for router endpoint based on request plane mode // Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component let router_endpoint_id = router_endpoint_id(endpoint.id().namespace); - let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id)?; + let transport = build_transport_type(endpoint, &router_endpoint_id, instance_id).await?; let discovery_spec = DiscoverySpec::Endpoint { namespace: router_endpoint_id.namespace.clone(), diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index c77c5161fc..c1fc285d44 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -119,7 +119,7 @@ impl EndpointConfigBuilder { // Register health check target in SystemHealth if provided if let Some(health_check_payload) = &health_check_payload { // Build transport based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id)?; + let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?; let instance = Instance { component: endpoint_id.component.clone(), @@ -198,7 +198,7 @@ impl EndpointConfigBuilder { let discovery = endpoint.drt().discovery(); // Build transport for discovery service based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id)?; + let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?; let discovery_spec = crate::discovery::DiscoverySpec::Endpoint { namespace: endpoint_id.namespace.clone(), @@ -235,7 +235,7 @@ impl EndpointConfigBuilder { /// /// # Errors /// Returns an error if TCP mode is used but the TCP server hasn't been started yet. -pub fn build_transport_type( +fn build_transport_type_inner( mode: RequestPlaneMode, endpoint_id: &EndpointId, connection_id: u64, @@ -259,10 +259,12 @@ pub fn build_transport_type( } RequestPlaneMode::Tcp => { let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); - // Get the actual TCP port from the global reference (set by TCP server after binding). - // If user explicitly set DYN_TCP_RPC_PORT, that value is used during binding. - // If port was OS-assigned (0), this returns the actual assigned port. - let tcp_port = crate::pipeline::network::manager::get_actual_tcp_rpc_port()?; + // If a fixed port is explicitly configured, use it directly (no init ordering dependency). + // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used). + let tcp_port = std::env::var("DYN_TCP_RPC_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?); // Include endpoint name for proper TCP routing // TCP client parses this format and adds x-endpoint-path header for server-side routing @@ -276,3 +278,31 @@ pub fn build_transport_type( ))), } } + +/// Build transport type, ensuring TCP server is initialized when needed. +/// +/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind +/// before we can construct a correct transport address. This helper ensures that initialization +/// occurs, then delegates to the internal builder. +pub async fn build_transport_type( + endpoint: &Endpoint, + endpoint_id: &EndpointId, + connection_id: u64, +) -> Result { + let mode = endpoint.drt().request_plane(); + + if mode == RequestPlaneMode::Tcp { + // Only force server init when we *don't* have a valid explicit port. + let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .is_some(); + + if !has_fixed_port { + // Ensure request plane server is initialized before building transport. + let _ = endpoint.drt().request_plane_server().await?; + } + } + + build_transport_type_inner(mode, endpoint_id, connection_id) +}