Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/guides/request_plane.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ export DYN_REQUEST_PLANE=tcp

# Optional: Configure TCP server host and port
export DYN_TCP_RPC_HOST=0.0.0.0 # Default host
export DYN_TCP_RPC_PORT=9999 # Default port
# export DYN_TCP_RPC_PORT=9999 # Optional: specify a fixed port

# Run your Dynamo service
DYN_REQUEST_PLANE=tcp python -m dynamo.frontend --http-port=8000 &
DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B
```

**Note:** By default, TCP uses an OS-assigned free port (port 0). This is ideal for environments where multiple services may run on the same machine or when you want to avoid port conflicts. If you need a specific port (e.g., for firewall rules), set `DYN_TCP_RPC_PORT` explicitly.

**When to use TCP:**
- Simple deployments with direct service-to-service communication (e.g. frontend to backend)
- Minimal infrastructure requirements (no NATS needed)
Expand All @@ -124,7 +126,7 @@ DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B

Additional TCP-specific environment variables:
- `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected)
- `DYN_TCP_RPC_PORT`: Server port (default: 9999)
- `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules.
- `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB)
- `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds)
- `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50)
Expand Down Expand Up @@ -228,7 +230,7 @@ Request plane configuration is loaded from environment variables at startup and

1. Stop your Dynamo services
2. Set environment variable `DYN_REQUEST_PLANE=tcp`
3. Optionally configure TCP-specific settings (`DYN_TCP_RPC_PORT`, etc.)
3. Optionally configure TCP-specific settings (e.g., `DYN_TCP_RPC_HOST`). Note: `DYN_TCP_RPC_PORT` is optional; if not set, an OS-assigned free port is used automatically.
4. Restart your services


Expand Down Expand Up @@ -279,7 +281,7 @@ curl http://localhost:8000/v1/chat/completions \
**Symptoms:** Server fails to start due to "address already in use"

**Solutions:**
- TCP default port: 9999 (adjust environment variable `DYN_TCP_RPC_PORT`)
- TCP: By default, TCP uses an OS-assigned free port, so port conflicts should be rare. If you explicitly set `DYN_TCP_RPC_PORT` to a specific port and get conflicts, either change the port or remove the setting to use automatic port assignment.
- HTTP default port: 8888 (adjust environment variable `DYN_HTTP_RPC_PORT`)

## Performance Considerations
Expand Down
3 changes: 1 addition & 2 deletions lib/llm/src/discovery/model_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,11 @@ impl ModelManager {
// Register router via discovery mechanism
let discovery = endpoint.component().drt().discovery();
let instance_id = discovery.instance_id();
let request_plane_mode = endpoint.drt().request_plane();

// Build transport for router endpoint based on request plane mode
// Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component
let router_endpoint_id = router_endpoint_id(endpoint.id().namespace);
let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id);
let transport = build_transport_type(endpoint, &router_endpoint_id, instance_id).await?;

let discovery_spec = DiscoverySpec::Endpoint {
namespace: router_endpoint_id.namespace.clone(),
Expand Down
100 changes: 67 additions & 33 deletions lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,6 @@ impl EndpointConfigBuilder {
let request_plane_mode = endpoint.drt().request_plane();
tracing::info!("Endpoint starting with request plane mode: {request_plane_mode}",);

// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
// Build transport based on request plane mode
let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id);

let instance = Instance {
component: endpoint_id.component.clone(),
endpoint: endpoint_id.name.clone(),
namespace: endpoint_id.namespace.clone(),
instance_id: connection_id,
transport,
};
tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
let guard = system_health.lock();
guard.register_health_check_target(
&endpoint.name,
instance,
health_check_payload.clone(),
);
if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
handler.set_endpoint_health_check_notifier(notifier)?;
}
}

// Register with graceful shutdown tracker if needed
if graceful_shutdown {
tracing::debug!(
Expand All @@ -137,9 +113,33 @@ impl EndpointConfigBuilder {
let component_name_for_task = endpoint_id.component.clone();
let endpoint_name_for_task = endpoint_id.name.clone();

// Get the unified request plane server (works for all transport types)
// Get the unified request plane server
let server = endpoint.drt().request_plane_server().await?;

// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
// Build transport based on request plane mode
let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;

let instance = Instance {
component: endpoint_id.component.clone(),
endpoint: endpoint_id.name.clone(),
namespace: endpoint_id.namespace.clone(),
instance_id: connection_id,
transport,
};
tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
let guard = system_health.lock();
guard.register_health_check_target(
&endpoint.name,
instance,
health_check_payload.clone(),
);
if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
handler.set_endpoint_health_check_notifier(notifier)?;
}
}

tracing::info!(
endpoint = %endpoint_name_for_task,
transport = server.transport_name(),
Expand Down Expand Up @@ -198,7 +198,7 @@ impl EndpointConfigBuilder {
let discovery = endpoint.drt().discovery();

// Build transport for discovery service based on request plane mode
let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id);
let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;

let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
namespace: endpoint_id.namespace.clone(),
Expand Down Expand Up @@ -232,11 +232,14 @@ impl EndpointConfigBuilder {
/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
/// - TCP: Includes endpoint name for routing (e.g., host:port/endpoint_name)
/// - NATS: Uses subject-based addressing (unique per endpoint)
pub fn build_transport_type(
///
/// # Errors
/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
fn build_transport_type_inner(
mode: RequestPlaneMode,
endpoint_id: &EndpointId,
connection_id: u64,
) -> TransportType {
) -> Result<TransportType> {
match mode {
RequestPlaneMode::Http => {
let http_host = crate::utils::get_http_rpc_host_from_env();
Expand All @@ -252,23 +255,54 @@ pub fn build_transport_type(
endpoint_id.name
);

TransportType::Http(http_endpoint)
Ok(TransportType::Http(http_endpoint))
}
RequestPlaneMode::Tcp => {
let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
// If a fixed port is explicitly configured, use it directly (no init ordering dependency).
// Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(9999);
.unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);

// Include endpoint name for proper TCP routing
// TCP client parses this format and adds x-endpoint-path header for server-side routing
let tcp_endpoint = format!("{}:{}/{}", tcp_host, tcp_port, endpoint_id.name);

TransportType::Tcp(tcp_endpoint)
Ok(TransportType::Tcp(tcp_endpoint))
}
RequestPlaneMode::Nats => {
TransportType::Nats(nats::instance_subject(endpoint_id, connection_id))
RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
endpoint_id,
connection_id,
))),
}
}

/// Build transport type, ensuring TCP server is initialized when needed.
///
/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
/// before we can construct a correct transport address. This helper ensures that initialization
/// occurs, then delegates to the internal builder.
pub async fn build_transport_type(
endpoint: &Endpoint,
endpoint_id: &EndpointId,
connection_id: u64,
) -> Result<TransportType> {
let mode = endpoint.drt().request_plane();

if mode == RequestPlaneMode::Tcp {
// Only force server init when we *don't* have a valid explicit port.
let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.is_some();

if !has_fixed_port {
// Ensure request plane server is initialized before building transport.
let _ = endpoint.drt().request_plane_server().await?;
}
}

build_transport_type_inner(mode, endpoint_id, connection_id)
}
123 changes: 90 additions & 33 deletions lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::pipeline::network::PushWorkHandler;
use anyhow::Result;
use bytes::Bytes;
use dashmap::DashMap;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -36,7 +36,10 @@ fn get_max_message_size() -> usize {
/// Shared TCP server that handles multiple endpoints on a single port
pub struct SharedTcpServer {
handlers: Arc<DashMap<String, Arc<EndpointHandler>>>,
/// The address to bind to (may have port 0 for OS-assigned port)
bind_addr: SocketAddr,
/// The actual bound address (populated after bind_and_start, contains actual port)
actual_addr: RwLock<Option<SocketAddr>>,
cancellation_token: CancellationToken,
}

Expand All @@ -55,11 +58,83 @@ impl SharedTcpServer {
pub fn new(bind_addr: SocketAddr, cancellation_token: CancellationToken) -> Arc<Self> {
Arc::new(Self {
handlers: Arc::new(DashMap::new()),
// address we requested to bind to.
bind_addr,
// actual address after free port assignment (if DYN_TCP_RPC_PORT is not specified)
actual_addr: RwLock::new(None),
cancellation_token,
})
}

/// Bind the server and start accepting connections.
///
/// This method binds to the configured address first, then starts the accept loop.
/// If the configured port is 0, the OS will assign a free port.
/// The actual bound address is stored and can be retrieved via `actual_address()`.
///
/// Returns the actual bound address (useful when port 0 was specified).
pub async fn bind_and_start(self: Arc<Self>) -> Result<SocketAddr> {
tracing::info!("Binding TCP server to {}", self.bind_addr);

let listener = TcpListener::bind(&self.bind_addr).await?;
let actual_addr = listener.local_addr()?;

tracing::info!(
requested = %self.bind_addr,
actual = %actual_addr,
"TCP server bound successfully"
);

// Store the actual bound address
*self.actual_addr.write() = Some(actual_addr);

// Start accepting connections in a background task
let server = self.clone();
tokio::spawn(async move {
server.accept_loop(listener).await;
});

Ok(actual_addr)
}

/// Get the actual bound address (after bind_and_start has been called).
///
/// Returns None if the server hasn't been started yet.
pub fn actual_address(&self) -> Option<SocketAddr> {
*self.actual_addr.read()
}

/// Internal accept loop - runs after binding
async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
let cancellation_token = self.cancellation_token.clone();

loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::trace!("Accepted TCP connection from {}", peer_addr);

let handlers = self.handlers.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, handlers).await {
tracing::error!("TCP connection error: {}", e);
}
});
}
Err(e) => {
tracing::error!("Failed to accept TCP connection: {}", e);
}
}
}
_ = cancellation_token.cancelled() => {
tracing::info!("SharedTcpServer received cancellation signal, shutting down");
return;
}
}
}
}

#[allow(clippy::too_many_arguments)]
pub async fn register_endpoint(
&self,
Expand Down Expand Up @@ -93,7 +168,7 @@ impl SharedTcpServer {
tracing::info!(
"Registered endpoint '{}' with shared TCP server on {}",
endpoint_name,
self.bind_addr
self.actual_address().unwrap_or(self.bind_addr)
);

Ok(())
Expand Down Expand Up @@ -129,37 +204,16 @@ impl SharedTcpServer {
}
}

/// Start the server (legacy method - prefer bind_and_start for new code).
///
/// This method is kept for backwards compatibility. It binds and starts
/// the server but doesn't return the actual bound address.
pub async fn start(self: Arc<Self>) -> Result<()> {
tracing::info!("Starting shared TCP server on {}", self.bind_addr);

let listener = TcpListener::bind(&self.bind_addr).await?;
let cancellation_token = self.cancellation_token.clone();

loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::trace!("Accepted TCP connection from {}", peer_addr);

let handlers = self.handlers.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, handlers).await {
tracing::debug!("TCP connection error: {}", e);
}
});
}
Err(e) => {
tracing::error!("Failed to accept TCP connection: {}", e);
}
}
}
_ = cancellation_token.cancelled() => {
tracing::info!("SharedTcpServer received cancellation signal, shutting down");
return Ok(());
}
}
}
let cancel_token = self.cancellation_token.clone();
self.bind_and_start().await?;
// Wait for cancellation (the accept loop runs in background)
cancel_token.cancelled().await;
Ok(())
}

async fn handle_connection(
Expand Down Expand Up @@ -378,7 +432,10 @@ impl super::unified_server::RequestPlaneServer for SharedTcpServer {
}

fn address(&self) -> String {
format!("tcp://{}:{}", self.bind_addr.ip(), self.bind_addr.port())
// Return actual bound address if available (after bind_and_start),
// otherwise fall back to configured bind address
let addr = self.actual_address().unwrap_or(self.bind_addr);
format!("tcp://{}:{}", addr.ip(), addr.port())
}

fn transport_name(&self) -> &'static str {
Expand Down
Loading
Loading