Skip to content

Commit

Permalink
remove footgun from HttpServer listener
Browse files Browse the repository at this point in the history
it was possible to create a graceful executor for
an HttpServer that was used to create a listener,
but have it block the app shutdown
due to not being taken into account for the underlying
transport server

this also simplifies the API

closes #350
  • Loading branch information
GlenDC committed Nov 6, 2024
1 parent 47a3a5a commit 3bb1287
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 82 deletions.
5 changes: 2 additions & 3 deletions examples/http_form.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ async fn main() {
let graceful = rama::graceful::Shutdown::default();

graceful.spawn_task_fn(|guard| async move {
let exec = Executor::graceful(guard.clone());
let exec = Executor::graceful(guard);
HttpServer::auto(exec)
.listen_graceful(
guard,
.listen(
"127.0.0.1:62002",
TraceLayer::new_for_http().
layer(
Expand Down
52 changes: 31 additions & 21 deletions examples/http_service_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use rama::{

/// Everything else we need is provided by the standard library, community crates or tokio.
use serde_json::json;
use std::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand All @@ -51,26 +52,35 @@ async fn main() {
)
.init();

let addr = "127.0.0.1:62011";
tracing::info!("running service at: {addr}");
let exec = Executor::default();
HttpServer::auto(exec)
.listen(
addr,
TraceLayer::new_for_http()
.layer(
match_service!{
HttpMatcher::get("/") => Html(r##"<h1>Home</h1><a href="/echo">Echo Request</a>"##.to_owned()),
PathMatcher::new("/echo") => |req: Request| async move {
Json(json!({
"method": req.method().as_str(),
"path": req.uri().path(),
}))
},
_ => Redirect::temporary("/"),
}
),
)
let graceful = rama::graceful::Shutdown::default();

graceful.spawn_task_fn(|guard| async move {
let addr = "127.0.0.1:62011";
tracing::info!("running service at: {addr}");
let exec = Executor::graceful(guard);
HttpServer::auto(exec)
.listen(
addr,
TraceLayer::new_for_http()
.layer(
match_service!{
HttpMatcher::get("/") => Html(r##"<h1>Home</h1><a href="/echo">Echo Request</a>"##.to_owned()),
PathMatcher::new("/echo") => |req: Request| async move {
Json(json!({
"method": req.method().as_str(),
"path": req.uri().path(),
}))
},
_ => Redirect::temporary("/"),
}
),
)
.await
.unwrap();
});

graceful
.shutdown_with_limit(Duration::from_secs(30))
.await
.unwrap();
.expect("graceful shutdown");
}
4 changes: 1 addition & 3 deletions rama-core/src/rt/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ impl Executor {
None => tokio::spawn(future),
}
}
}

impl Executor {
/// Get a reference to the shutdown guard,
/// if and only if the executor was created with [`Self::graceful`].
pub(crate) fn guard(&self) -> Option<&ShutdownGuard> {
pub fn guard(&self) -> Option<&ShutdownGuard> {
self.guard.as_ref()
}
}
89 changes: 34 additions & 55 deletions rama-http-backend/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tokio::net::ToSocketAddrs;
/// [`Service`]: rama_core::Service
pub struct HttpServer<B> {
builder: B,
guard: Option<ShutdownGuard>,
}

impl<B> fmt::Debug for HttpServer<B>
Expand All @@ -48,6 +49,7 @@ where
fn clone(&self) -> Self {
Self {
builder: self.builder.clone(),
guard: self.guard.clone(),
}
}
}
Expand All @@ -57,8 +59,30 @@ impl HttpServer<Http1ConnBuilder> {
pub fn http1() -> Self {
Self {
builder: Http1ConnBuilder::new(),
guard: None,
}
}

/// Set the guard that can be used by the [`HttpServer`]
/// in case it is turned into an http1 listener.
pub fn with_guard(mut self, guard: ShutdownGuard) -> Self {
self.guard = Some(guard);
self
}

/// Maybe set the guard that can be used by the [`HttpServer`]
/// in case it is turned into an http1 listener.
pub fn maybe_with_guard(mut self, guard: Option<ShutdownGuard>) -> Self {
self.guard = guard;
self
}

/// Set the guard that can be used by the [`HttpServer`]
/// in case it is turned into an http1 listener.
pub fn set_guard(&mut self, guard: ShutdownGuard) -> &mut Self {
self.guard = Some(guard);
self
}
}

impl HttpServer<Http1ConnBuilder> {
Expand Down Expand Up @@ -188,8 +212,10 @@ impl<'a> Http1Config<'a> {
impl HttpServer<H2ConnBuilder<HyperExecutor>> {
/// Create a new h2 `Builder` with default settings.
pub fn h2(exec: Executor) -> Self {
let guard = exec.guard().cloned();
Self {
builder: H2ConnBuilder::new(HyperExecutor(exec)),
guard,
}
}
}
Expand Down Expand Up @@ -333,8 +359,10 @@ impl<'a, E> H2Config<'a, E> {
impl HttpServer<AutoConnBuilder<HyperExecutor>> {
/// Create a new dual http/1.1 + h2 `Builder` with default settings.
pub fn auto(exec: Executor) -> Self {
let guard = exec.guard().cloned();
Self {
builder: AutoConnBuilder::new(HyperExecutor(exec)),
guard,
}
}
}
Expand Down Expand Up @@ -644,34 +672,12 @@ where
Response: IntoResponse + Send + 'static,
A: ToSocketAddrs,
{
TcpListener::bind(addr)
.await?
.serve(self.service(service))
.await;
Ok(())
}

/// Listen gracefully for connections on the given address, serving HTTP connections.
///
/// Same as [`Self::listen`], but it will respect the given [`ShutdownGuard`],
/// and also pass it to the service.
///
/// [`ShutdownGuard`]: rama_core::graceful::ShutdownGuard
pub async fn listen_graceful<S, Response, A>(
self,
guard: ShutdownGuard,
addr: A,
service: S,
) -> HttpServeResult
where
S: Service<(), Request, Response = Response, Error = Infallible>,
Response: IntoResponse + Send + 'static,
A: ToSocketAddrs,
{
TcpListener::bind(addr)
.await?
.serve_graceful(guard, self.service(service))
.await;
let tcp = TcpListener::bind(addr).await?;
let service = HttpService::new(self.builder, service);
match self.guard {
Some(guard) => tcp.serve_graceful(guard, service).await,
None => tcp.serve(service).await,
};
Ok(())
}

Expand Down Expand Up @@ -700,33 +706,6 @@ where
.await;
Ok(())
}

/// Listen gracefully for connections on the given address, serving HTTP connections.
///
/// Same as [`Self::listen_graceful`], but including the given state in the [`Service`]'s [`Context`].
///
/// [`Service`]: rama_core::Service
/// [`Context`]: rama_core::Context
pub async fn listen_graceful_with_state<State, S, Response, A>(
self,
guard: ShutdownGuard,
state: State,
addr: A,
service: S,
) -> HttpServeResult
where
State: Clone + Send + Sync + 'static,
S: Service<State, Request, Response = Response, Error = Infallible>,
Response: IntoResponse + Send + 'static,
A: ToSocketAddrs,
{
TcpListener::build_with_state(state)
.bind(addr)
.await?
.serve_graceful(guard, self.service(service))
.await;
Ok(())
}
}

/// A [`Service`] that can be used to serve IO Byte streams (e.g. a TCP Stream) as HTTP.
Expand Down

0 comments on commit 3bb1287

Please sign in to comment.