diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index ef3222fce..55b082541 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -27,11 +27,12 @@ codegen = ["dep:async-trait"] gzip = ["dep:flate2"] default = ["transport", "codegen", "prost"] prost = ["dep:prost"] -tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:async-stream", "tokio/rt"] +tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "tokio/rt"] tls-roots = ["tls-roots-common", "dep:rustls-native-certs"] tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"] transport = [ + "dep:async-stream", "dep:axum", "channel", "dep:h2", diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 07544ab9b..1d482901b 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -2,7 +2,7 @@ use super::compression::{compress, CompressionEncoding, SingleMessageCompression use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; -use futures_util::{ready, StreamExt, TryStream, TryStreamExt}; +use futures_util::ready; use http::HeaderMap; use http_body::Body; use pin_project::pin_project; @@ -10,7 +10,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio_stream::Stream; +use tokio_stream::{Stream, StreamExt}; pub(super) const BUFFER_SIZE: usize = 8 * 1024; @@ -31,8 +31,7 @@ where compression_encoding, compression_override, max_message_size, - ) - .into_stream(); + ); EncodeBody::new_server(stream) } @@ -53,8 +52,7 @@ where compression_encoding, SingleMessageCompressionOverride::default(), max_message_size, - ) - .into_stream(); + ); EncodeBody::new_client(stream) } @@ -64,7 +62,7 @@ fn encode( compression_encoding: Option, compression_override: SingleMessageCompressionOverride, max_message_size: Option, -) -> impl TryStream +) -> impl Stream> where T: Encoder, U: Stream>, @@ -251,8 +249,8 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let mut self_proj = self.project(); - match ready!(self_proj.inner.try_poll_next_unpin(cx)) { + let self_proj = self.project(); + match ready!(self_proj.inner.poll_next(cx)) { Some(Ok(d)) => Some(Ok(d)).into(), Some(Err(status)) => match self_proj.state.role { Role::Client => Some(Err(status)).into(), diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index 70a216309..0697d9e21 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -1,7 +1,6 @@ //! Codegen exports used by `tonic-build`. pub use async_trait::async_trait; -pub use futures_util::future::{ok, poll_fn, Ready}; pub use tokio_stream; pub use std::future::Future; diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index fdb422fd0..e8c13b83d 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -7,9 +7,9 @@ use crate::{ server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, Code, Request, Status, }; -use futures_util::{TryStream, TryStreamExt}; use http_body::Body; use std::fmt; +use tokio_stream::{Stream, StreamExt}; macro_rules! t { ($result:expr) => { @@ -428,7 +428,7 @@ where max_message_size: Option, ) -> http::Response where - B: TryStream + Send + 'static, + B: Stream> + Send + 'static, { let response = match response { Ok(r) => r, @@ -453,7 +453,7 @@ where let body = encode_server( self.codec.encoder(), - body.into_stream(), + body, accept_encoding, compression_override, max_message_size, diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b2fbdec1e..1eabed27b 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -188,7 +188,7 @@ impl Channel { D: Discover + Unpin + Send + 'static, D::Error: Into, D::Key: Hash + Send + Clone, - E: Executor> + Send + Sync + 'static, + E: Executor> + Send + Sync + 'static, { let svc = Balance::new(discover); diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index 82065b8d4..1b3b19d83 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -118,5 +118,4 @@ pub use self::server::ServerTlsConfig; #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use self::tls::Identity; -type BoxFuture = - std::pin::Pin> + Send + 'static>>; +type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index 818319715..09711eee7 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -14,7 +14,7 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpListener, }; -use tokio_stream::Stream; +use tokio_stream::{Stream, StreamExt}; #[cfg(not(feature = "tls"))] pub(crate) fn tcp_incoming( @@ -25,8 +25,13 @@ where IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into, { - use futures_util::stream::TryStreamExt; - incoming.err_into().map_ok(ServerIo::new_io) + async_stream::try_stream! { + futures_util::pin_mut!(incoming); + + while let Some(item) = incoming.next().await { + yield item.map(ServerIo::new_io)? + } + } } #[cfg(feature = "tls")] @@ -81,8 +86,6 @@ async fn select( where IE: Into, { - use tokio_stream::StreamExt; - if tasks.is_empty() { return match incoming.try_next().await { Ok(Some(stream)) => SelectOutput::Incoming(stream), diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 51421fc56..2ac31f14c 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -36,7 +36,7 @@ use self::recover_error::RecoverError; use super::service::{GrpcTimeout, ServerIo}; use crate::body::BoxBody; use bytes::Bytes; -use futures_util::{future, ready}; +use futures_util::ready; use http::{Request, Response}; use http_body::Body as _; use hyper::{server::accept, Body}; @@ -44,7 +44,7 @@ use pin_project::pin_project; use std::{ convert::Infallible, fmt, - future::Future, + future::{self, Future}, marker::PhantomData, net::SocketAddr, pin::Pin, diff --git a/tonic/src/transport/service/add_origin.rs b/tonic/src/transport/service/add_origin.rs index ca259b9ac..fdd7d3f9e 100644 --- a/tonic/src/transport/service/add_origin.rs +++ b/tonic/src/transport/service/add_origin.rs @@ -1,4 +1,4 @@ -use futures_util::future::BoxFuture; +use crate::transport::BoxFuture; use http::uri::Authority; use http::uri::Scheme; use http::{Request, Uri}; diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index a12f2d6a9..46a88dda5 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,6 +1,8 @@ -use super::super::BoxFuture; use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; -use crate::{body::BoxBody, transport::Endpoint}; +use crate::{ + body::BoxBody, + transport::{BoxFuture, Endpoint}, +}; use http::Uri; use hyper::client::conn::Builder; use hyper::client::connect::Connection as HyperConnection; @@ -100,7 +102,7 @@ impl Connection { impl Service for Connection { type Response = Response; type Error = crate::Error; - type Future = BoxFuture; + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(&mut self.inner, cx).map_err(Into::into) diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index 67b6e90f5..b672f3bf3 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -64,7 +64,7 @@ where { type Response = BoxedIo; type Error = crate::Error; - type Future = BoxFuture; + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { MakeConnection::poll_ready(&mut self.inner, cx).map_err(Into::into) diff --git a/tonic/src/transport/service/executor.rs b/tonic/src/transport/service/executor.rs index 2b8a7e10f..de3cfbe6e 100644 --- a/tonic/src/transport/service/executor.rs +++ b/tonic/src/transport/service/executor.rs @@ -1,4 +1,4 @@ -use futures_util::future::BoxFuture; +use crate::transport::BoxFuture; use std::{future::Future, sync::Arc}; pub(crate) use hyper::rt::Executor;