Skip to content

Commit

Permalink
chore(tonic): Reduce futures-util (#1457)
Browse files Browse the repository at this point in the history
* chore(tonic): Reduce futures-util

* chore(transport): Vendor futures_util::future::BoxFuture
  • Loading branch information
tottoto authored Aug 10, 2023
1 parent 4b7a47f commit 651851a
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 30 deletions.
3 changes: 2 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ codegen = ["dep:async-trait"]
gzip = ["dep:flate2"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:async-stream", "tokio/rt"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "tokio/rt"]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
transport = [
"dep:async-stream",
"dep:axum",
"channel",
"dep:h2",
Expand Down
16 changes: 7 additions & 9 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use super::compression::{compress, CompressionEncoding, SingleMessageCompression
use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
use crate::{Code, Status};
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::{ready, StreamExt, TryStream, TryStreamExt};
use futures_util::ready;
use http::HeaderMap;
use http_body::Body;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;
use tokio_stream::{Stream, StreamExt};

pub(super) const BUFFER_SIZE: usize = 8 * 1024;

Expand All @@ -31,8 +31,7 @@ where
compression_encoding,
compression_override,
max_message_size,
)
.into_stream();
);

EncodeBody::new_server(stream)
}
Expand All @@ -53,8 +52,7 @@ where
compression_encoding,
SingleMessageCompressionOverride::default(),
max_message_size,
)
.into_stream();
);
EncodeBody::new_client(stream)
}

Expand All @@ -64,7 +62,7 @@ fn encode<T, U>(
compression_encoding: Option<CompressionEncoding>,
compression_override: SingleMessageCompressionOverride,
max_message_size: Option<usize>,
) -> impl TryStream<Ok = Bytes, Error = Status>
) -> impl Stream<Item = Result<Bytes, Status>>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
Expand Down Expand Up @@ -251,8 +249,8 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut self_proj = self.project();
match ready!(self_proj.inner.try_poll_next_unpin(cx)) {
let self_proj = self.project();
match ready!(self_proj.inner.poll_next(cx)) {
Some(Ok(d)) => Some(Ok(d)).into(),
Some(Err(status)) => match self_proj.state.role {
Role::Client => Some(Err(status)).into(),
Expand Down
1 change: 0 additions & 1 deletion tonic/src/codegen.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Codegen exports used by `tonic-build`.
pub use async_trait::async_trait;
pub use futures_util::future::{ok, poll_fn, Ready};
pub use tokio_stream;

pub use std::future::Future;
Expand Down
6 changes: 3 additions & 3 deletions tonic/src/server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService},
Code, Request, Status,
};
use futures_util::{TryStream, TryStreamExt};
use http_body::Body;
use std::fmt;
use tokio_stream::{Stream, StreamExt};

macro_rules! t {
($result:expr) => {
Expand Down Expand Up @@ -428,7 +428,7 @@ where
max_message_size: Option<usize>,
) -> http::Response<BoxBody>
where
B: TryStream<Ok = T::Encode, Error = Status> + Send + 'static,
B: Stream<Item = Result<T::Encode, Status>> + Send + 'static,
{
let response = match response {
Ok(r) => r,
Expand All @@ -453,7 +453,7 @@ where

let body = encode_server(
self.codec.encoder(),
body.into_stream(),
body,
accept_encoding,
compression_override,
max_message_size,
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Channel {
D: Discover<Service = Connection> + Unpin + Send + 'static,
D::Error: Into<crate::Error>,
D::Key: Hash + Send + Clone,
E: Executor<futures_util::future::BoxFuture<'static, ()>> + Send + Sync + 'static,
E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,
{
let svc = Balance::new(discover);

Expand Down
3 changes: 1 addition & 2 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,4 @@ pub use self::server::ServerTlsConfig;
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Identity;

type BoxFuture<T, E> =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send + 'static>>;
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
13 changes: 8 additions & 5 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
};
use tokio_stream::Stream;
use tokio_stream::{Stream, StreamExt};

#[cfg(not(feature = "tls"))]
pub(crate) fn tcp_incoming<IO, IE, L>(
Expand All @@ -25,8 +25,13 @@ where
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IE: Into<crate::Error>,
{
use futures_util::stream::TryStreamExt;
incoming.err_into().map_ok(ServerIo::new_io)
async_stream::try_stream! {
futures_util::pin_mut!(incoming);

while let Some(item) = incoming.next().await {
yield item.map(ServerIo::new_io)?
}
}
}

#[cfg(feature = "tls")]
Expand Down Expand Up @@ -81,8 +86,6 @@ async fn select<IO: 'static, IE>(
where
IE: Into<crate::Error>,
{
use tokio_stream::StreamExt;

if tasks.is_empty() {
return match incoming.try_next().await {
Ok(Some(stream)) => SelectOutput::Incoming(stream),
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ use self::recover_error::RecoverError;
use super::service::{GrpcTimeout, ServerIo};
use crate::body::BoxBody;
use bytes::Bytes;
use futures_util::{future, ready};
use futures_util::ready;
use http::{Request, Response};
use http_body::Body as _;
use hyper::{server::accept, Body};
use pin_project::pin_project;
use std::{
convert::Infallible,
fmt,
future::Future,
future::{self, Future},
marker::PhantomData,
net::SocketAddr,
pin::Pin,
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/service/add_origin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::future::BoxFuture;
use crate::transport::BoxFuture;
use http::uri::Authority;
use http::uri::Scheme;
use http::{Request, Uri};
Expand Down
8 changes: 5 additions & 3 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::super::BoxFuture;
use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent};
use crate::{body::BoxBody, transport::Endpoint};
use crate::{
body::BoxBody,
transport::{BoxFuture, Endpoint},
};
use http::Uri;
use hyper::client::conn::Builder;
use hyper::client::connect::Connection as HyperConnection;
Expand Down Expand Up @@ -100,7 +102,7 @@ impl Connection {
impl Service<Request> for Connection {
type Response = Response;
type Error = crate::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/service/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
{
type Response = BoxedIo;
type Error = crate::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
MakeConnection::poll_ready(&mut self.inner, cx).map_err(Into::into)
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/service/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::future::BoxFuture;
use crate::transport::BoxFuture;
use std::{future::Future, sync::Arc};

pub(crate) use hyper::rt::Executor;
Expand Down

0 comments on commit 651851a

Please sign in to comment.