feat(server): implement Clone for GracefulShutdown#136
feat(server): implement Clone for GracefulShutdown#136tottoto wants to merge 1 commit intohyperium:masterfrom
Conversation
Not that I'm against adding this, but knowing the use cases would help. There's a part of me that worries if having multiple clones could be confusing when |
|
hyperium/tonic#1788 is a use case of this change. |
32d1c48 to
eb6167a
Compare
3e306ec to
d3b844d
Compare
d3b844d to
670aec7
Compare
|
@seanmonstar, when you need to Personally the amount of code it takes to do your own graceful shutdown is very little, so I will likely keep using async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: &hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
// This can take too long; thus should be done inside `tokio::spawn`;
// however we can't since that `move`s `sender`.
if let Ok(io) = acceptor.accept(val.0).await {
let con = sender.watch(server.serve_connection(hyper_util::rt::TokioIo::new(io), app.clone()));
tokio::spawn(async move {
let _r: Result<(), hyper::Error> = con.await;
});
}
}
}
}If async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
let tls = acceptor.clone();
let http = app.clone();
let builder = server.clone();
let shutdown = sender.clone();
tokio::spawn(async move {
if let Ok(io) = tls.accept(val.0).await {
let _r: Result<(), hyper::Error> = shutdown.watch(builder.serve_connection(hyper_util::rt::TokioIo::new(io), http)).await;
}
});
}
}
}"Hand-rolled" graceful shutdown: struct ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
{
con: hyper::server::conn::http2::Connection<
hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
S,
hyper_util::rt::TokioExecutor,
>,
signal: F,
shutting_down: bool,
}
impl<S, F> ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
{
fn new(
con: hyper::server::conn::http2::Connection<
hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
S,
hyper_util::rt::TokioExecutor,
>,
signal: F,
) -> Self {
Self {
con,
signal,
shutting_down: false,
}
}
}
impl<S, F> core::future::Future for ShutdownConnection<S, F>
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
>,
F: core::future::Future<Output = Result<(), tokio::sync::watch::error::RecvError>>
+ core::marker::Unpin,
{
type Output = Result<(), hyper::Error>;
fn poll(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if !self.shutting_down && core::pin::Pin::new(&mut self.signal).poll(cx).is_ready() {
self.shutting_down = true;
core::pin::Pin::new(&mut self.con).graceful_shutdown();
}
core::pin::Pin::new(&mut self.con).poll(cx)
}
}
async fn main_loop<S>(
listener: tokio::net::TcpListener,
acceptor: tokio_rustls::TlsAcceptor,
app: S,
server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
sender: &tokio::sync::watch::Sender<()>,
) -> core::convert::Infallible
where
S: hyper::service::Service<
axum::extract::Request<hyper::body::Incoming>,
Response = axum::response::Response<axum::body::Body>,
Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
Error = core::convert::Infallible,
> + core::clone::Clone
+ core::marker::Send
+ 'static,
{
loop {
if let Ok(val) = listener.accept().await {
let tls = acceptor.clone();
let http = app.clone();
let builder = server.clone();
let mut rx = sender.subscribe();
tokio::spawn(async move {
if let Ok(io) = tls.accept(val.0).await {
let _r: Result<(), hyper::Error> = ShutdownConnection::new(
builder.serve_connection(hyper_util::rt::TokioIo::new(io), http),
core::pin::pin!(rx.changed()),
)
.await;
}
});
}
}
} |
|
I'm certainly sympathetic to the use case. I've bumped into a couple times, too. But something always made me worry. I finally spent some time understanding my concern, and wrote up a different solution: #182 |
I like that approach more. It is essentially my "hand-rolled" implementation in library/type form. |
Implements Clone for GracefulShutdown. This allows users to use GracefulShutdown in other functions.