Skip to content

Commit

Permalink
Improve graceful shutdown with draining
Browse files Browse the repository at this point in the history
In-progress connections now prevent the server from returning until they are complete, ensuing that graceful termination can still happen.
  • Loading branch information
alexrudy committed Apr 2, 2024
1 parent 8c69cff commit d469de5
Showing 1 changed file with 43 additions and 20 deletions.
63 changes: 43 additions & 20 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio_stream::StreamExt as _;
use tower::util::BoxCloneService;
use tower::util::Oneshot;
use tower::ServiceExt;
use tracing::debug;
use tracing::trace;

#[cfg(feature = "tls")]
Expand Down Expand Up @@ -563,14 +564,14 @@ impl<L> Server<L> {

tokio::pin!(incoming);

let graceful = signal.is_some();
let sig = Fuse { inner: signal };
tokio::pin!(sig);

loop {
tokio::select! {
_ = &mut sig => {
trace!("signal received, shutting down");
drop(signal_rx);
break;
},
io = incoming.next() => {
Expand All @@ -580,7 +581,9 @@ impl<L> Server<L> {
trace!("error accepting connection: {:#}", e);
continue;
},
None => break,
None => {
break
},
};

trace!("connection accepted");
Expand All @@ -595,11 +598,23 @@ impl<L> Server<L> {
.map_err(super::Error::from_source)?;
let hyper_svc = TowerToHyperService::new(req_svc);

serve_connection(io, hyper_svc, builder.clone(), signal_tx.clone());
serve_connection(io, hyper_svc, builder.clone(), graceful.then(|| signal_rx.clone()));
}
}
}

if graceful {
let _ = signal_tx.send(());
drop(signal_rx);
trace!(
"waiting for {} connections to close",
signal_tx.receiver_count()
);

// Wait for all connections to close
signal_tx.closed().await;
}

Ok(())
}
}
Expand All @@ -610,7 +625,7 @@ fn serve_connection<IO, S>(
io: ServerIo<IO>,
hyper_svc: TowerToHyperService<S>,
builder: hyper_util::server::conn::auto::Builder<TokioExecutor>,
watcher: Arc<tokio::sync::watch::Sender<()>>,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
) where
S: Service<Request, Response = Response> + Clone + Send + 'static,
S::Future: Send + 'static,
Expand All @@ -619,30 +634,32 @@ fn serve_connection<IO, S>(
IO::ConnectInfo: Clone + Send + Sync + 'static,
{
tokio::spawn(async move {
let sig = Fuse {
inner: Some(watcher.closed()),
};
{
let sig = Fuse {
inner: watcher.as_mut().map(|w| w.changed()),
};

tokio::pin!(sig);
tokio::pin!(sig);

let conn = builder.serve_connection(TokioIo::new(io), hyper_svc);
tokio::pin!(conn);
let conn = builder.serve_connection(TokioIo::new(io), hyper_svc);
tokio::pin!(conn);

loop {
tokio::select! {
rv = &mut conn => {
if let Err(err) = rv {
trace!("failed serving connection: {:#}", err);
loop {
tokio::select! {
rv = &mut conn => {
if let Err(err) = rv {
debug!("failed serving connection: {:#}", err);
}
break;
},
_ = &mut sig => {
conn.as_mut().graceful_shutdown();
}
break;
},
_ = &mut sig => {
trace!("signal received, shutting down");
conn.as_mut().graceful_shutdown();
}
}
}

drop(watcher);
trace!("connection closed");
});
}
Expand Down Expand Up @@ -1057,6 +1074,12 @@ struct Fuse<F> {
inner: Option<F>,
}

impl<F> Fuse<F> {
fn is_terminated(self: &Pin<&mut Self>) -> bool {
self.inner.is_none()
}
}

impl<F> Future for Fuse<F>
where
F: Future,
Expand Down

0 comments on commit d469de5

Please sign in to comment.