diff --git a/docs/guides/request_plane.md b/docs/guides/request_plane.md index 0533477222..9cc4f74131 100644 --- a/docs/guides/request_plane.md +++ b/docs/guides/request_plane.md @@ -108,13 +108,15 @@ export DYN_REQUEST_PLANE=tcp # Optional: Configure TCP server host and port export DYN_TCP_RPC_HOST=0.0.0.0 # Default host -export DYN_TCP_RPC_PORT=9999 # Default port +# export DYN_TCP_RPC_PORT=9999 # Optional: specify a fixed port # Run your Dynamo service DYN_REQUEST_PLANE=tcp python -m dynamo.frontend --http-port=8000 & DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B ``` +**Note:** By default, TCP uses an OS-assigned free port (port 0). This is ideal for environments where multiple services may run on the same machine or when you want to avoid port conflicts. If you need a specific port (e.g., for firewall rules), set `DYN_TCP_RPC_PORT` explicitly. + **When to use TCP:** - Simple deployments with direct service-to-service communication (e.g. frontend to backend) - Minimal infrastructure requirements (no NATS needed) @@ -124,7 +126,7 @@ DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B Additional TCP-specific environment variables: - `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected) -- `DYN_TCP_RPC_PORT`: Server port (default: 9999) +- `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules. - `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB) - `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds) - `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50) @@ -228,7 +230,7 @@ Request plane configuration is loaded from environment variables at startup and 1. Stop your Dynamo services 2. Set environment variable `DYN_REQUEST_PLANE=tcp` -3. Optionally configure TCP-specific settings (`DYN_TCP_RPC_PORT`, etc.) +3. Optionally configure TCP-specific settings (e.g., `DYN_TCP_RPC_HOST`). Note: `DYN_TCP_RPC_PORT` is optional; if not set, an OS-assigned free port is used automatically. 4. Restart your services @@ -279,7 +281,7 @@ curl http://localhost:8000/v1/chat/completions \ **Symptoms:** Server fails to start due to "address already in use" **Solutions:** -- TCP default port: 9999 (adjust environment variable `DYN_TCP_RPC_PORT`) +- TCP: By default, TCP uses an OS-assigned free port, so port conflicts should be rare. If you explicitly set `DYN_TCP_RPC_PORT` to a specific port and get conflicts, either change the port or remove the setting to use automatic port assignment. - HTTP default port: 8888 (adjust environment variable `DYN_HTTP_RPC_PORT`) ## Performance Considerations diff --git a/lib/llm/src/discovery/model_manager.rs b/lib/llm/src/discovery/model_manager.rs index 6ba4afda14..b6b998b230 100644 --- a/lib/llm/src/discovery/model_manager.rs +++ b/lib/llm/src/discovery/model_manager.rs @@ -330,12 +330,11 @@ impl ModelManager { // Register router via discovery mechanism let discovery = endpoint.component().drt().discovery(); let instance_id = discovery.instance_id(); - let request_plane_mode = endpoint.drt().request_plane(); // Build transport for router endpoint based on request plane mode // Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component let router_endpoint_id = router_endpoint_id(endpoint.id().namespace); - let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id); + let transport = build_transport_type(endpoint, &router_endpoint_id, instance_id).await?; let discovery_spec = DiscoverySpec::Endpoint { namespace: router_endpoint_id.namespace.clone(), diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 9e0826d3a1..c1fc285d44 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -89,30 +89,6 @@ impl EndpointConfigBuilder { let request_plane_mode = endpoint.drt().request_plane(); tracing::info!("Endpoint starting with request plane mode: {request_plane_mode}",); - // Register health check target in SystemHealth if provided - if let Some(health_check_payload) = &health_check_payload { - // Build transport based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id); - - let instance = Instance { - component: endpoint_id.component.clone(), - endpoint: endpoint_id.name.clone(), - namespace: endpoint_id.namespace.clone(), - instance_id: connection_id, - transport, - }; - tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target"); - let guard = system_health.lock(); - guard.register_health_check_target( - &endpoint.name, - instance, - health_check_payload.clone(), - ); - if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) { - handler.set_endpoint_health_check_notifier(notifier)?; - } - } - // Register with graceful shutdown tracker if needed if graceful_shutdown { tracing::debug!( @@ -137,9 +113,33 @@ impl EndpointConfigBuilder { let component_name_for_task = endpoint_id.component.clone(); let endpoint_name_for_task = endpoint_id.name.clone(); - // Get the unified request plane server (works for all transport types) + // Get the unified request plane server let server = endpoint.drt().request_plane_server().await?; + // Register health check target in SystemHealth if provided + if let Some(health_check_payload) = &health_check_payload { + // Build transport based on request plane mode + let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?; + + let instance = Instance { + component: endpoint_id.component.clone(), + endpoint: endpoint_id.name.clone(), + namespace: endpoint_id.namespace.clone(), + instance_id: connection_id, + transport, + }; + tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target"); + let guard = system_health.lock(); + guard.register_health_check_target( + &endpoint.name, + instance, + health_check_payload.clone(), + ); + if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) { + handler.set_endpoint_health_check_notifier(notifier)?; + } + } + tracing::info!( endpoint = %endpoint_name_for_task, transport = server.transport_name(), @@ -198,7 +198,7 @@ impl EndpointConfigBuilder { let discovery = endpoint.drt().discovery(); // Build transport for discovery service based on request plane mode - let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id); + let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?; let discovery_spec = crate::discovery::DiscoverySpec::Endpoint { namespace: endpoint_id.namespace.clone(), @@ -232,11 +232,14 @@ impl EndpointConfigBuilder { /// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name) /// - TCP: Includes endpoint name for routing (e.g., host:port/endpoint_name) /// - NATS: Uses subject-based addressing (unique per endpoint) -pub fn build_transport_type( +/// +/// # Errors +/// Returns an error if TCP mode is used but the TCP server hasn't been started yet. +fn build_transport_type_inner( mode: RequestPlaneMode, endpoint_id: &EndpointId, connection_id: u64, -) -> TransportType { +) -> Result { match mode { RequestPlaneMode::Http => { let http_host = crate::utils::get_http_rpc_host_from_env(); @@ -252,23 +255,54 @@ pub fn build_transport_type( endpoint_id.name ); - TransportType::Http(http_endpoint) + Ok(TransportType::Http(http_endpoint)) } RequestPlaneMode::Tcp => { let tcp_host = crate::utils::get_tcp_rpc_host_from_env(); + // If a fixed port is explicitly configured, use it directly (no init ordering dependency). + // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used). let tcp_port = std::env::var("DYN_TCP_RPC_PORT") .ok() .and_then(|p| p.parse::().ok()) - .unwrap_or(9999); + .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?); // Include endpoint name for proper TCP routing // TCP client parses this format and adds x-endpoint-path header for server-side routing let tcp_endpoint = format!("{}:{}/{}", tcp_host, tcp_port, endpoint_id.name); - TransportType::Tcp(tcp_endpoint) + Ok(TransportType::Tcp(tcp_endpoint)) } - RequestPlaneMode::Nats => { - TransportType::Nats(nats::instance_subject(endpoint_id, connection_id)) + RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject( + endpoint_id, + connection_id, + ))), + } +} + +/// Build transport type, ensuring TCP server is initialized when needed. +/// +/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind +/// before we can construct a correct transport address. This helper ensures that initialization +/// occurs, then delegates to the internal builder. +pub async fn build_transport_type( + endpoint: &Endpoint, + endpoint_id: &EndpointId, + connection_id: u64, +) -> Result { + let mode = endpoint.drt().request_plane(); + + if mode == RequestPlaneMode::Tcp { + // Only force server init when we *don't* have a valid explicit port. + let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .is_some(); + + if !has_fixed_port { + // Ensure request plane server is initialized before building transport. + let _ = endpoint.drt().request_plane_server().await?; } } + + build_transport_type_inner(mode, endpoint_id, connection_id) } diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index 2b9d880fc2..c7f16999bb 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -11,7 +11,7 @@ use crate::pipeline::network::PushWorkHandler; use anyhow::Result; use bytes::Bytes; use dashmap::DashMap; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; @@ -36,7 +36,10 @@ fn get_max_message_size() -> usize { /// Shared TCP server that handles multiple endpoints on a single port pub struct SharedTcpServer { handlers: Arc>>, + /// The address to bind to (may have port 0 for OS-assigned port) bind_addr: SocketAddr, + /// The actual bound address (populated after bind_and_start, contains actual port) + actual_addr: RwLock>, cancellation_token: CancellationToken, } @@ -55,11 +58,83 @@ impl SharedTcpServer { pub fn new(bind_addr: SocketAddr, cancellation_token: CancellationToken) -> Arc { Arc::new(Self { handlers: Arc::new(DashMap::new()), + // address we requested to bind to. bind_addr, + // actual address after free port assignment (if DYN_TCP_RPC_PORT is not specified) + actual_addr: RwLock::new(None), cancellation_token, }) } + /// Bind the server and start accepting connections. + /// + /// This method binds to the configured address first, then starts the accept loop. + /// If the configured port is 0, the OS will assign a free port. + /// The actual bound address is stored and can be retrieved via `actual_address()`. + /// + /// Returns the actual bound address (useful when port 0 was specified). + pub async fn bind_and_start(self: Arc) -> Result { + tracing::info!("Binding TCP server to {}", self.bind_addr); + + let listener = TcpListener::bind(&self.bind_addr).await?; + let actual_addr = listener.local_addr()?; + + tracing::info!( + requested = %self.bind_addr, + actual = %actual_addr, + "TCP server bound successfully" + ); + + // Store the actual bound address + *self.actual_addr.write() = Some(actual_addr); + + // Start accepting connections in a background task + let server = self.clone(); + tokio::spawn(async move { + server.accept_loop(listener).await; + }); + + Ok(actual_addr) + } + + /// Get the actual bound address (after bind_and_start has been called). + /// + /// Returns None if the server hasn't been started yet. + pub fn actual_address(&self) -> Option { + *self.actual_addr.read() + } + + /// Internal accept loop - runs after binding + async fn accept_loop(self: Arc, listener: TcpListener) { + let cancellation_token = self.cancellation_token.clone(); + + loop { + tokio::select! { + accept_result = listener.accept() => { + match accept_result { + Ok((stream, peer_addr)) => { + tracing::trace!("Accepted TCP connection from {}", peer_addr); + + let handlers = self.handlers.clone(); + tokio::spawn(async move { + if let Err(e) = Self::handle_connection(stream, handlers).await { + tracing::error!("TCP connection error: {}", e); + } + }); + } + Err(e) => { + tracing::error!("Failed to accept TCP connection: {}", e); + } + } + } + _ = cancellation_token.cancelled() => { + tracing::info!("SharedTcpServer received cancellation signal, shutting down"); + return; + } + } + } + } + #[allow(clippy::too_many_arguments)] pub async fn register_endpoint( &self, @@ -93,7 +168,7 @@ impl SharedTcpServer { tracing::info!( "Registered endpoint '{}' with shared TCP server on {}", endpoint_name, - self.bind_addr + self.actual_address().unwrap_or(self.bind_addr) ); Ok(()) @@ -129,37 +204,16 @@ impl SharedTcpServer { } } + /// Start the server (legacy method - prefer bind_and_start for new code). + /// + /// This method is kept for backwards compatibility. It binds and starts + /// the server but doesn't return the actual bound address. pub async fn start(self: Arc) -> Result<()> { - tracing::info!("Starting shared TCP server on {}", self.bind_addr); - - let listener = TcpListener::bind(&self.bind_addr).await?; - let cancellation_token = self.cancellation_token.clone(); - - loop { - tokio::select! { - accept_result = listener.accept() => { - match accept_result { - Ok((stream, peer_addr)) => { - tracing::trace!("Accepted TCP connection from {}", peer_addr); - - let handlers = self.handlers.clone(); - tokio::spawn(async move { - if let Err(e) = Self::handle_connection(stream, handlers).await { - tracing::debug!("TCP connection error: {}", e); - } - }); - } - Err(e) => { - tracing::error!("Failed to accept TCP connection: {}", e); - } - } - } - _ = cancellation_token.cancelled() => { - tracing::info!("SharedTcpServer received cancellation signal, shutting down"); - return Ok(()); - } - } - } + let cancel_token = self.cancellation_token.clone(); + self.bind_and_start().await?; + // Wait for cancellation (the accept loop runs in background) + cancel_token.cancelled().await; + Ok(()) } async fn handle_connection( @@ -378,7 +432,10 @@ impl super::unified_server::RequestPlaneServer for SharedTcpServer { } fn address(&self) -> String { - format!("tcp://{}:{}", self.bind_addr.ip(), self.bind_addr.port()) + // Return actual bound address if available (after bind_and_start), + // otherwise fall back to configured bind address + let addr = self.actual_address().unwrap_or(self.bind_addr); + format!("tcp://{}:{}", addr.ip(), addr.port()) } fn transport_name(&self) -> &'static str { diff --git a/lib/runtime/src/pipeline/network/manager.rs b/lib/runtime/src/pipeline/network/manager.rs index 4a85809890..c0d952721d 100644 --- a/lib/runtime/src/pipeline/network/manager.rs +++ b/lib/runtime/src/pipeline/network/manager.rs @@ -19,8 +19,36 @@ use crate::distributed::RequestPlaneMode; use anyhow::Result; use async_once_cell::OnceCell; use std::sync::Arc; +use std::sync::OnceLock; use tokio_util::sync::CancellationToken; +/// Global storage for the actual TCP RPC port after binding. +/// Uses OnceLock since the port is set once when the server binds and never changes. +static ACTUAL_TCP_RPC_PORT: OnceLock = OnceLock::new(); + +/// Get the actual TCP RPC port that the server is listening on. +pub fn get_actual_tcp_rpc_port() -> anyhow::Result { + ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| { + tracing::error!( + "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()" + ); + anyhow::anyhow!( + "TCP RPC port not initialized. This is not expected." + ) + }) +} + +/// Set the actual TCP RPC port (called internally after server binds). +fn set_actual_tcp_rpc_port(port: u16) { + if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) { + tracing::warn!( + existing_port = existing, + new_port = port, + "TCP RPC port already set, ignoring new value" + ); + } +} + /// Network configuration loaded from environment variables #[derive(Clone)] struct NetworkConfig { @@ -31,7 +59,8 @@ struct NetworkConfig { // TCP server configuration tcp_host: String, - tcp_port: u16, + /// TCP port to bind to. If None, the OS will assign a free port. + tcp_port: Option, // HTTP client configuration http_client_config: super::egress::http_router::Http2Config, @@ -60,12 +89,12 @@ impl NetworkConfig { .unwrap_or_else(|_| "/v1/rpc".to_string()), // TCP server configuration + // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port tcp_host: std::env::var("DYN_TCP_RPC_HOST") .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()), tcp_port: std::env::var("DYN_TCP_RPC_PORT") .ok() - .and_then(|p| p.parse().ok()) - .unwrap_or(9999), + .and_then(|p| p.parse().ok()), // HTTP client configuration (reads DYN_HTTP2_* env vars) http_client_config: super::egress::http_router::Http2Config::from_env(), @@ -140,12 +169,35 @@ impl NetworkManager { ) -> Self { let config = NetworkConfig::from_env(nats_client); - tracing::info!( - %mode, - http_port = config.http_port, - tcp_port = config.tcp_port, - "Initializing NetworkManager" - ); + match mode { + RequestPlaneMode::Http => { + tracing::info!( + %mode, + host = %config.http_host, + port = config.http_port, + rpc_root = %config.http_rpc_root, + "Initializing NetworkManager with HTTP request plane" + ); + } + RequestPlaneMode::Tcp => { + let port_display = config + .tcp_port + .map(|p| p.to_string()) + .unwrap_or_else(|| "OS-assigned".to_string()); + tracing::info!( + %mode, + host = %config.tcp_host, + port = %port_display, + "Initializing NetworkManager with TCP request plane" + ); + } + RequestPlaneMode::Nats => { + tracing::info!( + %mode, + "Initializing NetworkManager with NATS request plane" + ); + } + } Self { mode, @@ -250,24 +302,31 @@ impl NetworkManager { async fn create_tcp_server(&self) -> Result> { use super::ingress::shared_tcp_endpoint::SharedTcpServer; - let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port) + // Use configured port if specified, otherwise use port 0 (OS assigns free port) + let port = self.config.tcp_port.unwrap_or(0); + let bind_addr = format!("{}:{}", self.config.tcp_host, port) .parse() .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?; tracing::info!( bind_addr = %bind_addr, + port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" }, "Creating TCP request plane server" ); let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone()); - // Start server in background - let server_clone = server.clone(); - tokio::spawn(async move { - if let Err(e) = server_clone.start().await { - tracing::error!("TCP request plane server error: {}", e); - } - }); + // Bind and start server, getting the actual bound address + let actual_addr = server.clone().bind_and_start().await?; + + // Store the actual bound port globally so build_transport_type() can access it + set_actual_tcp_rpc_port(actual_addr.port()); + + tracing::info!( + actual_addr = %actual_addr, + actual_port = actual_addr.port(), + "TCP request plane server started" + ); Ok(server as Arc) }