From ec0a806ab06f3f7c21be59451ea271e68f0753b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 16 Jun 2022 12:46:22 +0100 Subject: [PATCH] frame_write: Use tokio_util::io::framed_write --- Cargo.toml | 2 +- src/codec/framed_write.rs | 46 ++++----------------------------------- 2 files changed, 5 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bc138805..b69007cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ members = [ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } -tokio-util = { version = "0.7.1", features = ["codec"] } +tokio-util = { version = "0.7.3", features = ["io", "codec"] } tokio = { version = "1", features = ["io-util"] } bytes = "1" http = "0.2" diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 4b1b4acc..c6d13be6 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -4,11 +4,12 @@ use crate::frame::{self, Frame, FrameSize}; use crate::hpack; use bytes::{Buf, BufMut, BytesMut}; +use tokio_util::io::poll_write_buf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use std::io::{self, Cursor, IoSlice}; +use std::io::{self, Cursor}; // A macro to get around a method needing to borrow &mut self macro_rules! limited_write_buf { @@ -44,9 +45,6 @@ struct Encoder { /// Max frame size, this is specified by the peer max_frame_size: FrameSize, - - /// Whether or not the wrapped `AsyncWrite` supports vectored IO. - is_write_vectored: bool, } #[derive(Debug)] @@ -76,7 +74,6 @@ where B: Buf, { pub fn new(inner: T) -> FramedWrite { - let is_write_vectored = inner.is_write_vectored(); FramedWrite { inner, encoder: Encoder { @@ -85,7 +82,6 @@ where next: None, last_data_frame: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, - is_write_vectored, }, } } @@ -126,21 +122,11 @@ where Some(Next::Data(ref mut frame)) => { tracing::trace!(queued_data_frame = true); let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut buf, - cx, - ))? + ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?; } _ => { tracing::trace!(queued_data_frame = false); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut self.encoder.buf, - cx, - ))? + ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut self.encoder.buf))?; } } } @@ -165,30 +151,6 @@ where } } -fn write( - writer: &mut T, - is_write_vectored: bool, - buf: &mut B, - cx: &mut Context<'_>, -) -> Poll> -where - T: AsyncWrite + Unpin, - B: Buf, -{ - // TODO(eliza): when tokio-util 0.5.1 is released, this - // could just use `poll_write_buf`... - const MAX_IOVS: usize = 64; - let n = if is_write_vectored { - let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; - let cnt = buf.chunks_vectored(&mut bufs); - ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))? - } else { - ready!(Pin::new(writer).poll_write(cx, buf.chunk()))? - }; - buf.advance(n); - Ok(()).into() -} - #[must_use] enum ControlFlow { Continue,