diff --git a/crates/rmcp/src/transport/streamable_http_server/session.rs b/crates/rmcp/src/transport/streamable_http_server/session.rs index d9c5c7f4..9cf4d0db 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session.rs @@ -1,3 +1,25 @@ +//! Session management for the Streamable HTTP transport. +//! +//! A *session* groups the logically related interactions between a single MCP +//! client and the server, starting from the `initialize` handshake. The server +//! assigns each session a unique [`SessionId`] (returned to the client via the +//! `Mcp-Session-Id` response header) and the client includes that ID on every +//! subsequent request. +//! +//! Two tool calls carrying the same session ID come from the same logical +//! session; different IDs mean different clients or conversations. +//! +//! # Implementations +//! +//! * [`local::LocalSessionManager`] — in-memory session store (default). +//! * [`never::NeverSessionManager`] — rejects all session operations, used +//! when stateful mode is disabled. +//! +//! # Custom session managers +//! +//! Implement the [`SessionManager`] trait to back sessions with a database, +//! Redis, or any other external store. + use futures::Stream; pub use crate::transport::common::server_side_http::{ServerSseMessage, SessionId}; @@ -9,22 +31,40 @@ use crate::{ pub mod local; pub mod never; +/// Controls how MCP sessions are created, validated, and closed. +/// +/// The [`StreamableHttpService`](super::StreamableHttpService) calls into this +/// trait for every HTTP request that carries (or should carry) a session ID. +/// +/// See the [module-level docs](self) for background on sessions. pub trait SessionManager: Send + Sync + 'static { type Error: std::error::Error + Send + 'static; type Transport: crate::transport::Transport; - /// Create a new session with the given id and configuration. + + /// Create a new session and return its ID together with the transport + /// that will be used to exchange MCP messages within this session. fn create_session( &self, ) -> impl Future> + Send; + + /// Forward the first message (the `initialize` request) to the session. fn initialize_session( &self, id: &SessionId, message: ClientJsonRpcMessage, ) -> impl Future> + Send; + + /// Return `true` if a session with the given ID exists and is active. fn has_session(&self, id: &SessionId) -> impl Future> + Send; + + /// Close and remove the session. Corresponds to an HTTP DELETE request + /// with `Mcp-Session-Id`. fn close_session(&self, id: &SessionId) -> impl Future> + Send; + + /// Route a client request into the session and return an SSE stream + /// carrying the server's response(s). fn create_stream( &self, id: &SessionId, @@ -32,17 +72,25 @@ pub trait SessionManager: Send + Sync + 'static { ) -> impl Future< Output = Result + Send + Sync + 'static, Self::Error>, > + Send; + + /// Accept a notification, response, or error message from the client + /// without producing a response stream. fn accept_message( &self, id: &SessionId, message: ClientJsonRpcMessage, ) -> impl Future> + Send; + + /// Create an SSE stream not tied to a specific client request (HTTP GET). fn create_standalone_stream( &self, id: &SessionId, ) -> impl Future< Output = Result + Send + Sync + 'static, Self::Error>, > + Send; + + /// Resume an SSE stream from the given `Last-Event-ID`, replaying any + /// events the client missed. fn resume( &self, id: &SessionId, diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 37d4a008..15831775 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -55,12 +55,38 @@ impl Default for StreamableHttpServerConfig { } } -/// # Streamable Http Server +/// # Streamable HTTP server /// -/// ## Extract information from raw http request +/// An HTTP service that implements the +/// [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http) +/// for MCP servers. +/// +/// ## Session management +/// +/// When [`StreamableHttpServerConfig::stateful_mode`] is `true` (the default), +/// the server creates a session for each client that sends an `initialize` +/// request. The session ID is returned in the `Mcp-Session-Id` response header +/// and the client must include it on all subsequent requests. +/// +/// Two tool calls carrying the same `Mcp-Session-Id` come from the same logical +/// session (typically one conversation in an LLM client). Different session IDs +/// mean different sessions. +/// +/// The [`SessionManager`] trait controls how sessions are stored and routed: +/// +/// * [`LocalSessionManager`](super::session::local::LocalSessionManager) — +/// in-memory session store (default). +/// * [`NeverSessionManager`](super::session::never::NeverSessionManager) — +/// disables sessions entirely (stateless mode). +/// +/// ## Accessing HTTP request data from tool handlers +/// +/// The service consumes the request body but injects the remaining +/// [`http::request::Parts`] into [`crate::model::Extensions`], which is +/// accessible through [`crate::service::RequestContext`]. +/// +/// ### Reading the raw HTTP parts /// -/// The http service will consume the request body, however the rest part will be remain and injected into [`crate::model::Extensions`], -/// which you can get from [`crate::service::RequestContext`]. /// ```rust /// use rmcp::handler::server::tool::Extension; /// use http::request::Parts; @@ -68,6 +94,50 @@ impl Default for StreamableHttpServerConfig { /// tracing::info!("http parts:{parts:?}") /// } /// ``` +/// +/// ### Reading the session ID inside a tool handler +/// +/// ```rust,ignore +/// use rmcp::handler::server::tool::Extension; +/// use rmcp::service::RequestContext; +/// use rmcp::model::RoleServer; +/// +/// #[tool(description = "session-aware tool")] +/// async fn my_tool( +/// &self, +/// Extension(parts): Extension, +/// ) -> Result { +/// if let Some(session_id) = parts.headers.get("mcp-session-id") { +/// tracing::info!(?session_id, "called from session"); +/// } +/// // ... +/// # todo!() +/// } +/// ``` +/// +/// ### Accessing custom axum/tower extension state +/// +/// State added via axum's `Extension` layer is available inside +/// `Parts.extensions`: +/// +/// ```rust,ignore +/// use rmcp::service::RequestContext; +/// use rmcp::model::RoleServer; +/// +/// #[derive(Clone)] +/// struct AppState { /* ... */ } +/// +/// #[tool(description = "example")] +/// async fn my_tool( +/// &self, +/// ctx: RequestContext, +/// ) -> Result { +/// let parts = ctx.extensions.get::().unwrap(); +/// let state = parts.extensions.get::().unwrap(); +/// // use state... +/// # todo!() +/// } +/// ``` pub struct StreamableHttpService { pub config: StreamableHttpServerConfig, session_manager: Arc, diff --git a/examples/servers/src/common/counter.rs b/examples/servers/src/common/counter.rs index 9ca043f8..e92b142a 100644 --- a/examples/servers/src/common/counter.rs +++ b/examples/servers/src/common/counter.rs @@ -136,6 +136,23 @@ impl Counter { (a + b).to_string(), )])) } + + /// Returns the `Mcp-Session-Id` of the current session (streamable HTTP only). + #[tool(description = "Get the session ID for this connection")] + fn get_session_id(&self, ctx: RequestContext) -> Result { + let session_id = ctx + .extensions + .get::() + .and_then(|parts| parts.headers.get("mcp-session-id")) + .map(|v| v.to_str().unwrap_or("(non-ascii)").to_owned()); + + match session_id { + Some(id) => Ok(CallToolResult::success(vec![Content::text(id)])), + None => Ok(CallToolResult::success(vec![Content::text( + "no session (not running over streamable HTTP?)", + )])), + } + } } #[prompt_router]