diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e04868826..deab25fd4 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -280,7 +280,7 @@ tower = ["dep:hyper", "dep:hyper-util", "dep:tower", "dep:http"] json-codec = ["dep:serde", "dep:serde_json", "dep:bytes"] compression = ["tonic/gzip"] tls = ["tonic/tls"] -tls-rustls = ["dep:hyper", "dep:hyper-util", "dep:hyper-rustls", "dep:tower", "tower-http/util", "tower-http/add-extension", "dep:rustls-pemfile", "dep:tokio-rustls"] +tls-rustls = ["dep:hyper", "dep:hyper-util", "dep:hyper-rustls", "dep:tower", "tower-http/util", "tower-http/add-extension", "dep:rustls-pemfile", "dep:tokio-rustls", "dep:pin-project", "dep:http-body-util"] dynamic-load-balance = ["dep:tower"] timeout = ["tokio/time", "dep:tower"] tls-client-auth = ["tonic/tls"] @@ -315,7 +315,7 @@ http = { version = "1", optional = true } http-body = { version = "1", optional = true } http-body-util = { version = "0.1", optional = true } hyper = { version = "1", optional = true } -hyper-util = { version = "0.1", optional = true } +hyper-util = { version = ">=0.1.4, <0.2", optional = true } listenfd = { version = "1.0", optional = true } bytes = { version = "1", optional = true } h2 = { version = "0.3", optional = true } @@ -323,6 +323,7 @@ tokio-rustls = { version = "0.26", optional = true, features = ["ring", "tls12"] hyper-rustls = { version = "0.27.0", features = ["http2", "ring", "tls12"], optional = true, default-features = false } rustls-pemfile = { version = "2.0.0", optional = true } tower-http = { version = "0.5", optional = true } +pin-project = { version = "1.0.11", optional = true } [build-dependencies] tonic-build = { path = "../tonic-build", features = ["prost"] } diff --git a/examples/src/tls_rustls/server.rs b/examples/src/tls_rustls/server.rs index 5630edfa1..0fb31f8fa 100644 --- a/examples/src/tls_rustls/server.rs +++ b/examples/src/tls_rustls/server.rs @@ -2,6 +2,7 @@ pub mod pb { tonic::include_proto!("/grpc.examples.unaryecho"); } +use http_body_util::BodyExt; use hyper::server::conn::http2::Builder; use hyper_util::rt::{TokioExecutor, TokioIo}; use pb::{EchoRequest, EchoResponse}; @@ -14,8 +15,8 @@ use tokio_rustls::{ }, TlsAcceptor, }; -use tonic::transport::server::TowerToHyperService; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{body::BoxBody, transport::Server, Request, Response, Status}; +use tower::{BoxError, ServiceExt}; use tower_http::ServiceBuilderExt; #[tokio::main] @@ -122,3 +123,70 @@ impl pb::echo_server::Echo for EchoServer { Ok(Response::new(EchoResponse { message })) } } + +/// An adaptor which converts a [`tower::Service`] to a [`hyper::service::Service`]. +/// +/// The [`hyper::service::Service`] trait is used by hyper to handle incoming requests, +/// and does not support the `poll_ready` method that is used by tower services. +/// +/// This is provided here because the equivalent adaptor in hyper-util does not support +/// tonic::body::BoxBody bodies. +#[derive(Debug, Clone)] +struct TowerToHyperService { + service: S, +} + +impl TowerToHyperService { + /// Create a new `TowerToHyperService` from a tower service. + fn new(service: S) -> Self { + Self { service } + } +} + +impl hyper::service::Service> for TowerToHyperService +where + S: tower::Service> + Clone, + S::Error: Into + 'static, +{ + type Response = S::Response; + type Error = BoxError; + type Future = TowerToHyperServiceFuture>; + + fn call(&self, req: hyper::Request) -> Self::Future { + let req = req.map(|incoming| { + incoming + .map_err(|err| Status::from_error(err.into())) + .boxed_unsync() + }); + TowerToHyperServiceFuture { + future: self.service.clone().oneshot(req), + } + } +} + +/// Future returned by [`TowerToHyperService`]. +#[derive(Debug)] +#[pin_project::pin_project] +struct TowerToHyperServiceFuture +where + S: tower::Service, +{ + #[pin] + future: tower::util::Oneshot, +} + +impl std::future::Future for TowerToHyperServiceFuture +where + S: tower::Service, + S::Error: Into + 'static, +{ + type Output = Result; + + #[inline] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.project().future.poll(cx).map_err(Into::into) + } +} diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index fb63058ad..3fa406c77 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -10,14 +10,9 @@ mod tls; mod unix; use tokio_stream::StreamExt as _; -use tower::util::BoxCloneService; -use tower::util::Oneshot; -use tower::ServiceExt; -use tracing::debug; -use tracing::trace; +use tracing::{debug, trace}; -pub use super::service::Routes; -pub use super::service::RoutesBuilder; +pub use super::service::{Routes, RoutesBuilder}; pub use conn::{Connected, TcpConnectInfo}; use hyper_util::rt::{TokioExecutor, TokioIo}; @@ -43,19 +38,17 @@ use crate::transport::Error; use self::recover_error::RecoverError; use super::service::{GrpcTimeout, ServerIo}; -use crate::body::boxed; -use crate::body::BoxBody; +use crate::body::{boxed, BoxBody}; use crate::server::NamedService; use bytes::Bytes; use http::{Request, Response}; use http_body_util::BodyExt; use hyper::body::Incoming; use pin_project::pin_project; -use std::future::poll_fn; use std::{ convert::Infallible, fmt, - future::{self, Future}, + future::{self, poll_fn, Future}, marker::PhantomData, net::SocketAddr, pin::{pin, Pin}, @@ -69,8 +62,8 @@ use tower::{ layer::util::{Identity, Stack}, layer::Layer, limit::concurrency::ConcurrencyLimitLayer, - util::Either, - Service, ServiceBuilder, + util::{BoxCloneService, Either, Oneshot}, + Service, ServiceBuilder, ServiceExt, }; type BoxHttpBody = crate::body::BoxBody; @@ -673,30 +666,15 @@ type ConnectionBuilder = hyper_util::server::conn::auto::Builder; /// The [`hyper::service::Service`] trait is used by hyper to handle incoming requests, /// and does not support the `poll_ready` method that is used by tower services. #[derive(Debug, Copy, Clone)] -pub struct TowerToHyperService { +pub(crate) struct TowerToHyperService { service: S, } impl TowerToHyperService { /// Create a new `TowerToHyperService` from a tower service. - pub fn new(service: S) -> Self { + pub(crate) fn new(service: S) -> Self { Self { service } } - - /// Extract the inner tower service. - pub fn into_inner(self) -> S { - self.service - } - - /// Get a reference to the inner tower service. - pub fn as_inner(&self) -> &S { - &self.service - } - - /// Get a mutable reference to the inner tower service. - pub fn as_inner_mut(&mut self) -> &mut S { - &mut self.service - } } impl hyper::service::Service> for TowerToHyperService @@ -719,7 +697,7 @@ where /// Future returned by [`TowerToHyperService`]. #[derive(Debug)] #[pin_project] -pub struct TowerToHyperServiceFuture +pub(crate) struct TowerToHyperServiceFuture where S: tower_service::Service, {