Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion crates/rmcp/src/transport/streamable_http_server/session.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
//! Session management for the Streamable HTTP transport.
//!
//! A *session* groups the logically related interactions between a single MCP
//! client and the server, starting from the `initialize` handshake. The server
//! assigns each session a unique [`SessionId`] (returned to the client via the
//! `Mcp-Session-Id` response header) and the client includes that ID on every
//! subsequent request.
//!
//! Two tool calls carrying the same session ID come from the same logical
//! session; different IDs mean different clients or conversations.
//!
//! # Implementations
//!
//! * [`local::LocalSessionManager`] — in-memory session store (default).
//! * [`never::NeverSessionManager`] — rejects all session operations, used
//! when stateful mode is disabled.
//!
//! # Custom session managers
//!
//! Implement the [`SessionManager`] trait to back sessions with a database,
//! Redis, or any other external store.

use futures::Stream;

pub use crate::transport::common::server_side_http::{ServerSseMessage, SessionId};
Expand All @@ -9,40 +31,66 @@ use crate::{
pub mod local;
pub mod never;

/// Controls how MCP sessions are created, validated, and closed.
///
/// The [`StreamableHttpService`](super::StreamableHttpService) calls into this
/// trait for every HTTP request that carries (or should carry) a session ID.
///
/// See the [module-level docs](self) for background on sessions.
pub trait SessionManager: Send + Sync + 'static {
type Error: std::error::Error + Send + 'static;
type Transport: crate::transport::Transport<RoleServer>;
/// Create a new session with the given id and configuration.

/// Create a new session and return its ID together with the transport
/// that will be used to exchange MCP messages within this session.
fn create_session(
&self,
) -> impl Future<Output = Result<(SessionId, Self::Transport), Self::Error>> + Send;

/// Forward the first message (the `initialize` request) to the session.
fn initialize_session(
&self,
id: &SessionId,
message: ClientJsonRpcMessage,
) -> impl Future<Output = Result<ServerJsonRpcMessage, Self::Error>> + Send;

/// Return `true` if a session with the given ID exists and is active.
fn has_session(&self, id: &SessionId)
-> impl Future<Output = Result<bool, Self::Error>> + Send;

/// Close and remove the session. Corresponds to an HTTP DELETE request
/// with `Mcp-Session-Id`.
fn close_session(&self, id: &SessionId)
-> impl Future<Output = Result<(), Self::Error>> + Send;

/// Route a client request into the session and return an SSE stream
/// carrying the server's response(s).
fn create_stream(
&self,
id: &SessionId,
message: ClientJsonRpcMessage,
) -> impl Future<
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
> + Send;

/// Accept a notification, response, or error message from the client
/// without producing a response stream.
fn accept_message(
&self,
id: &SessionId,
message: ClientJsonRpcMessage,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Create an SSE stream not tied to a specific client request (HTTP GET).
fn create_standalone_stream(
&self,
id: &SessionId,
) -> impl Future<
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
> + Send;

/// Resume an SSE stream from the given `Last-Event-ID`, replaying any
/// events the client missed.
fn resume(
&self,
id: &SessionId,
Expand Down
78 changes: 74 additions & 4 deletions crates/rmcp/src/transport/streamable_http_server/tower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,89 @@ impl Default for StreamableHttpServerConfig {
}
}

/// # Streamable Http Server
/// # Streamable HTTP server
///
/// ## Extract information from raw http request
/// An HTTP service that implements the
/// [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http)
/// for MCP servers.
///
/// ## Session management
///
/// When [`StreamableHttpServerConfig::stateful_mode`] is `true` (the default),
/// the server creates a session for each client that sends an `initialize`
/// request. The session ID is returned in the `Mcp-Session-Id` response header
/// and the client must include it on all subsequent requests.
///
/// Two tool calls carrying the same `Mcp-Session-Id` come from the same logical
/// session (typically one conversation in an LLM client). Different session IDs
/// mean different sessions.
///
/// The [`SessionManager`] trait controls how sessions are stored and routed:
///
/// * [`LocalSessionManager`](super::session::local::LocalSessionManager) —
/// in-memory session store (default).
/// * [`NeverSessionManager`](super::session::never::NeverSessionManager) —
/// disables sessions entirely (stateless mode).
///
/// ## Accessing HTTP request data from tool handlers
///
/// The service consumes the request body but injects the remaining
/// [`http::request::Parts`] into [`crate::model::Extensions`], which is
/// accessible through [`crate::service::RequestContext`].
///
/// ### Reading the raw HTTP parts
///
/// The http service will consume the request body, however the rest part will be remain and injected into [`crate::model::Extensions`],
/// which you can get from [`crate::service::RequestContext`].
/// ```rust
/// use rmcp::handler::server::tool::Extension;
/// use http::request::Parts;
/// async fn my_tool(Extension(parts): Extension<Parts>) {
/// tracing::info!("http parts:{parts:?}")
/// }
/// ```
///
/// ### Reading the session ID inside a tool handler
///
/// ```rust,ignore
/// use rmcp::handler::server::tool::Extension;
/// use rmcp::service::RequestContext;
/// use rmcp::model::RoleServer;
///
/// #[tool(description = "session-aware tool")]
/// async fn my_tool(
/// &self,
/// Extension(parts): Extension<http::request::Parts>,
/// ) -> Result<CallToolResult, rmcp::ErrorData> {
/// if let Some(session_id) = parts.headers.get("mcp-session-id") {
/// tracing::info!(?session_id, "called from session");
/// }
/// // ...
/// # todo!()
/// }
/// ```
///
/// ### Accessing custom axum/tower extension state
///
/// State added via axum's `Extension` layer is available inside
/// `Parts.extensions`:
///
/// ```rust,ignore
/// use rmcp::service::RequestContext;
/// use rmcp::model::RoleServer;
///
/// #[derive(Clone)]
/// struct AppState { /* ... */ }
///
/// #[tool(description = "example")]
/// async fn my_tool(
/// &self,
/// ctx: RequestContext<RoleServer>,
/// ) -> Result<CallToolResult, rmcp::ErrorData> {
/// let parts = ctx.extensions.get::<http::request::Parts>().unwrap();
/// let state = parts.extensions.get::<AppState>().unwrap();
/// // use state...
/// # todo!()
/// }
/// ```
pub struct StreamableHttpService<S, M = super::session::local::LocalSessionManager> {
pub config: StreamableHttpServerConfig,
session_manager: Arc<M>,
Expand Down
17 changes: 17 additions & 0 deletions examples/servers/src/common/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ impl Counter {
(a + b).to_string(),
)]))
}

/// Returns the `Mcp-Session-Id` of the current session (streamable HTTP only).
#[tool(description = "Get the session ID for this connection")]
fn get_session_id(&self, ctx: RequestContext<RoleServer>) -> Result<CallToolResult, McpError> {
let session_id = ctx
.extensions
.get::<axum::http::request::Parts>()
.and_then(|parts| parts.headers.get("mcp-session-id"))
.map(|v| v.to_str().unwrap_or("(non-ascii)").to_owned());

match session_id {
Some(id) => Ok(CallToolResult::success(vec![Content::text(id)])),
None => Ok(CallToolResult::success(vec![Content::text(
"no session (not running over streamable HTTP?)",
)])),
}
}
}

#[prompt_router]
Expand Down