Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 41 additions & 33 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ mod accept {
}

mod run {
use tracing::instrument::WithSubscriber;

pub trait Run {
#[allow(async_fn_in_trait)]
async fn run<F, A>(server: super::Server<F, A, Self>)
Expand Down Expand Up @@ -318,25 +320,28 @@ mod run {
};
let svc = crate::service(server.filter.clone());
let svc = hyper_util::service::TowerToHyperService::new(svc);
tokio::spawn(async move {
let io = match accepting.await {
Ok(io) => io,
Err(err) => {
tracing::debug!("server accept error: {:?}", err);
return;
tokio::spawn(
async move {
let io = match accepting.await {
Ok(io) => io,
Err(err) => {
tracing::debug!("server accept error: {:?}", err);
return;
}
};
if let Err(err) = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.http1()
.pipeline_flush(pipeline)
.serve_connection_with_upgrades(io, svc)
.await
{
tracing::error!("server connection error: {:?}", err)
}
};
if let Err(err) = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.http1()
.pipeline_flush(pipeline)
.serve_connection_with_upgrades(io, svc)
.await
{
tracing::error!("server connection error: {:?}", err)
}
});
.with_current_subscriber(),
);
}
}
}
Expand Down Expand Up @@ -377,24 +382,27 @@ mod run {
let svc = crate::service(server.filter.clone());
let svc = hyper_util::service::TowerToHyperService::new(svc);
let watcher = graceful_util.watcher();
tokio::spawn(async move {
let io = match accepting.await {
Ok(io) => io,
Err(err) => {
tracing::debug!("server accepting error: {:?}", err);
return;
tokio::spawn(
async move {
let io = match accepting.await {
Ok(io) => io,
Err(err) => {
tracing::debug!("server accepting error: {:?}", err);
return;
}
};
let mut hyper = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
);
hyper.http1().pipeline_flush(pipeline);
let conn = hyper.serve_connection_with_upgrades(io, svc);
let conn = watcher.watch(conn);
if let Err(err) = conn.await {
tracing::error!("server connection error: {:?}", err)
}
};
let mut hyper = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
);
hyper.http1().pipeline_flush(pipeline);
let conn = hyper.serve_connection_with_upgrades(io, svc);
let conn = watcher.watch(conn);
if let Err(err) = conn.await {
tracing::error!("server connection error: {:?}", err)
}
});
.with_current_subscriber(),
);
}

drop(server.acceptor); // close listener
Expand Down