From c3d3e082399d93909055e880a38e34527340300f Mon Sep 17 00:00:00 2001 From: Taylor Cramer Date: Mon, 19 Feb 2018 13:41:28 -0800 Subject: [PATCH 1/2] Add futures-io --- Cargo.toml | 1 + LICENSE-MIT | 1 + futures-io/Cargo.toml | 19 + futures-io/src/lib.rs | 305 ++++++++ futures-util/Cargo.toml | 5 +- futures-util/src/io/allow_std.rs | 96 +++ futures-util/src/io/codec.rs | 373 ++++++++++ futures-util/src/io/codecs.rs | 124 ++++ futures-util/src/io/copy.rs | 97 +++ futures-util/src/io/flush.rs | 44 ++ futures-util/src/io/framed.rs | 257 +++++++ futures-util/src/io/framed_read.rs | 297 ++++++++ futures-util/src/io/framed_write.rs | 267 +++++++ futures-util/src/io/io.rs | 23 + futures-util/src/io/length_delimited.rs | 902 ++++++++++++++++++++++++ futures-util/src/io/lines.rs | 61 ++ futures-util/src/io/mod.rs | 159 +++++ futures-util/src/io/read.rs | 57 ++ futures-util/src/io/read_exact.rs | 83 +++ futures-util/src/io/read_to_end.rs | 120 ++++ futures-util/src/io/read_until.rs | 75 ++ futures-util/src/io/shutdown.rs | 44 ++ futures-util/src/io/split.rs | 68 ++ futures-util/src/io/window.rs | 117 +++ futures-util/src/io/write_all.rs | 86 +++ futures-util/src/lib.rs | 27 +- futures/Cargo.toml | 3 +- futures/src/lib.rs | 19 + 28 files changed, 3724 insertions(+), 6 deletions(-) create mode 100644 futures-io/Cargo.toml create mode 100644 futures-io/src/lib.rs create mode 100644 futures-util/src/io/allow_std.rs create mode 100644 futures-util/src/io/codec.rs create mode 100644 futures-util/src/io/codecs.rs create mode 100644 futures-util/src/io/copy.rs create mode 100644 futures-util/src/io/flush.rs create mode 100644 futures-util/src/io/framed.rs create mode 100644 futures-util/src/io/framed_read.rs create mode 100644 futures-util/src/io/framed_write.rs create mode 100644 futures-util/src/io/io.rs create mode 100644 futures-util/src/io/length_delimited.rs create mode 100644 futures-util/src/io/lines.rs create mode 100644 futures-util/src/io/mod.rs create mode 100644 futures-util/src/io/read.rs create mode 100644 futures-util/src/io/read_exact.rs create mode 100644 futures-util/src/io/read_to_end.rs create mode 100644 futures-util/src/io/read_until.rs create mode 100644 futures-util/src/io/shutdown.rs create mode 100644 futures-util/src/io/split.rs create mode 100644 futures-util/src/io/window.rs create mode 100644 futures-util/src/io/write_all.rs diff --git a/Cargo.toml b/Cargo.toml index b1a3ec5f7b..c027f57a3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "futures-core", "futures-channel", "futures-executor", + "futures-io", "futures-util", "futures-sink", ] diff --git a/LICENSE-MIT b/LICENSE-MIT index 28e630cf40..8ad082ec4f 100644 --- a/LICENSE-MIT +++ b/LICENSE-MIT @@ -1,4 +1,5 @@ Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml new file mode 100644 index 0000000000..4546f24a29 --- /dev/null +++ b/futures-io/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "futures-io" +version = "0.2.0" +authors = ["Alex Crichton "] +license = "MIT/Apache-2.0" +repository = "https://github.com/alexcrichton/futures-rs" +homepage = "https://github.com/alexcrichton/futures-rs" +documentation = "https://docs.rs/futures-io" +description = """ +The `AsyncRead` and `AsyncWrite` traits for the futures-rs library. +""" + +[features] +std = ["futures-core/std", "iovec"] +default = ["std"] + +[dependencies] +futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } +iovec = { version = "0.1", optional = true } diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs new file mode 100644 index 0000000000..e58a0cd95f --- /dev/null +++ b/futures-io/src/lib.rs @@ -0,0 +1,305 @@ +//! Asynchronous IO +//! +//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow +//! data to be read and written asynchronously. + +#![no_std] +#![deny(missing_docs, missing_debug_implementations)] +#![doc(html_root_url = "https://docs.rs/futures-io/0.2")] + +macro_rules! if_std { + ($($i:item)*) => ($( + #[cfg(feature = "std")] + $i + )*) +} + +if_std! { + extern crate futures_core; + extern crate iovec; + extern crate std; + + use futures_core::{Async, Poll, task}; + use std::boxed::Box; + use std::io as StdIo; + use std::ptr; + use std::vec::Vec; + + // Re-export IoVec for convenience + pub use iovec::IoVec; + + // Re-export io::Error so that users don't have to deal + // with conflicts when `use`ing `futures::io` and `std::io`. + pub use StdIo::Error as Error; + + /// A type used to conditionally initialize buffers passed to `AsyncRead` + /// methods. + #[derive(Debug)] + pub struct Initializer(bool); + + impl Initializer { + /// Returns a new `Initializer` which will zero out buffers. + #[inline] + pub fn zeroing() -> Initializer { + Initializer(true) + } + + /// Returns a new `Initializer` which will not zero out buffers. + /// + /// # Safety + /// + /// This method may only be called by `AsyncRead`ers which guarantee + /// that they will not read from the buffers passed to `AsyncRead` + /// methods, and that the return value of the method accurately reflects + /// the number of bytes that have been written to the head of the buffer. + #[inline] + pub unsafe fn nop() -> Initializer { + Initializer(false) + } + + /// Indicates if a buffer should be initialized. + #[inline] + pub fn should_initialize(&self) -> bool { + self.0 + } + + /// Initializes a buffer if necessary. + #[inline] + pub fn initialize(&self, buf: &mut [u8]) { + if self.should_initialize() { + unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } + } + } + } + + /// Objects which can be read asynchronously. + pub trait AsyncRead { + /// Determines if this `AsyncRead`er can work with buffers of + /// uninitialized memory. + /// + /// The default implementation returns an initializer which will zero + /// buffers. + /// + /// # Safety + /// + /// This method is `unsafe` because and `AsyncRead`er could otherwise + /// return a non-zeroing `Initializer` from another `AsyncRead` type + /// without an `unsafe` block. + #[inline] + unsafe fn initializer(&self) -> Initializer { + Initializer::zeroing() + } + + /// Attempt to read from the `AsyncRead` into `buf`. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If reading would block, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// object becomes readable or is closed. + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll; + + /// Attempt to read from the `AsyncRead` into `vec` using vectored + /// IO operations. This allows data to be read into multiple buffers + /// using a single operation. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// By default, this method delegates to using `poll_read` on the first + /// buffer in `vec`. Objects which support vectored IO should override + /// this method. + /// + /// If reading would block, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// object becomes readable or is closed. + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + if let Some(first_iovec) = vec.get_mut(0) { + self.poll_read(&mut *first_iovec, cx) + } else { + // `vec` is empty. + return Ok(Async::Ready(0)); + } + } + } + + /// Objects which can be written to asynchronously. + pub trait AsyncWrite { + /// Attempt to write bytes from `buf` into the object. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If writing would block, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// the object becomes writable or is closed. + fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + -> Poll; + + /// Attempt to write bytes from `vec` into the object using vectored + /// IO operations. This allows data from multiple buffers to be written + /// using a single operation. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// By default, this method delegates to using `poll_write` on the first + /// buffer in `vec`. Objects which support vectored IO should override + /// this method. + /// + /// If writing would block, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// object becomes writable or is closed. + fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + -> Poll + { + if let Some(first_iovec) = vec.get(0) { + self.poll_write(&*first_iovec, cx) + } else { + // `vec` is empty. + return Ok(Async::Ready(0)); + } + } + + /// Attempt to flush the object, ensuring that all intermediately + /// buffered contents reach their destination. + /// + /// On success, returns `Ok(Async::Ready(()))`. + /// + /// If flushing is incomplete, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// object can make progress towards flushing. + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>; + + /// Attempt to close the object. + /// + /// On success, returns `Ok(Async::Ready(()))`. + /// + /// If closing is incomplete, this function returns `Ok(Async::Pending)` + /// and arranges for `cx.waker()` to receive a notification when the + /// object can make progress towards closing. + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>; + } + + macro_rules! deref_async_read { + () => { + unsafe fn initializer(&self) -> Initializer { + (**self).initializer() + } + + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll + { + (**self).poll_read(buf, cx) + } + + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + (**self).poll_vectored_read(vec, cx) + } + } + } + + impl AsyncRead for Box { + deref_async_read!(); + } + + impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T { + deref_async_read!(); + } + + /// `unsafe` because the `StdIo::Read` type must not access the buffer + /// before reading data into it. + macro_rules! unsafe_delegate_async_read_to_stdio { + () => { + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } + + fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context) + -> Poll + { + Ok(Async::Ready(StdIo::Read::read(self, buf)?)) + } + } + } + + impl<'a> AsyncRead for &'a [u8] { + unsafe_delegate_async_read_to_stdio!(); + } + + impl AsyncRead for StdIo::Repeat { + unsafe_delegate_async_read_to_stdio!(); + } + + impl> AsyncRead for StdIo::Cursor { + unsafe_delegate_async_read_to_stdio!(); + } + + macro_rules! deref_async_write { + () => { + fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + -> Poll + { + (**self).poll_write(buf, cx) + } + + fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + -> Poll + { + (**self).poll_vectored_write(vec, cx) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> { + (**self).poll_flush(cx) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> { + (**self).poll_close(cx) + } + } + } + + impl AsyncWrite for Box { + deref_async_write!(); + } + + impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { + deref_async_write!(); + } + + macro_rules! delegate_async_write_to_stdio { + () => { + fn poll_write(&mut self, buf: &[u8], _: &mut task::Context) + -> Poll + { + Ok(Async::Ready(StdIo::Write::write(self, buf)?)) + } + + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> { + Ok(Async::Ready(StdIo::Write::flush(self)?)) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> { + self.poll_flush(cx) + } + } + } + + impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> { + delegate_async_write_to_stdio!(); + } + + impl AsyncWrite for StdIo::Cursor> { + delegate_async_write_to_stdio!(); + } + + impl AsyncWrite for StdIo::Cursor> { + delegate_async_write_to_stdio!(); + } + + impl AsyncWrite for StdIo::Sink { + delegate_async_write_to_stdio!(); + } +} diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 944f4e37f0..4176f663d4 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library. """ [features] -std = ["futures-core/std", "futures-sink/std"] +std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"] default = ["std"] [dependencies] +bytes = { version = "0.4", optional = true } +log = { version = "0.4", optional = true } futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } +futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false} [dev-dependencies] diff --git a/futures-util/src/io/allow_std.rs b/futures-util/src/io/allow_std.rs new file mode 100644 index 0000000000..dcbd820181 --- /dev/null +++ b/futures-util/src/io/allow_std.rs @@ -0,0 +1,96 @@ +use futures_core::{Async, Poll, task}; +use futures_io::{AsyncRead, AsyncWrite}; +use std::{fmt, io}; +use std::string::String; +use std::vec::Vec; + +/// A simple wrapper type which allows types which implement only +/// implement `std::io::Read` or `std::io::Write` +/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`. +/// +/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`, +/// it is expected that they will notify the current task on readiness. +/// Synchronous `std` types should not issue errors of this kind and +/// are safe to use in this context. However, using these types with +/// `AllowStdIo` will cause the event loop to block, so they should be used +/// with care. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct AllowStdIo(T); + +impl AllowStdIo { + /// Creates a new `AllowStdIo` from an existing IO object. + pub fn new(io: T) -> Self { + AllowStdIo(io) + } + + /// Returns a reference to the contained IO object. + pub fn get_ref(&self) -> &T { + &self.0 + } + + /// Returns a mutable reference to the contained IO object. + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes self and returns the contained IO object. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl io::Write for AllowStdIo where T: io::Write { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.0.write_all(buf) + } + fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> { + self.0.write_fmt(fmt) + } +} + +impl AsyncWrite for AllowStdIo where T: io::Write { + fn poll_write(&mut self, buf: &[u8], _: &mut task::Context) + -> Poll + { + Ok(Async::Ready(io::Write::write(&mut self.0, buf)?)) + } + + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), io::Error> { + Ok(Async::Ready(io::Write::flush(self)?)) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + self.poll_flush(cx) + } +} + +impl io::Read for AllowStdIo where T: io::Read { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } + // TODO: implement the `initializer` fn when it stabilizes. + // See rust-lang/rust #42788 + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + fn read_to_string(&mut self, buf: &mut String) -> io::Result { + self.0.read_to_string(buf) + } + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.0.read_exact(buf) + } +} + +impl AsyncRead for AllowStdIo where T: io::Read { + fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context) + -> Poll + { + Ok(Async::Ready(io::Read::read(&mut self.0, buf)?)) + } +} diff --git a/futures-util/src/io/codec.rs b/futures-util/src/io/codec.rs new file mode 100644 index 0000000000..678d7f31f2 --- /dev/null +++ b/futures-util/src/io/codec.rs @@ -0,0 +1,373 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +pub use io::codecs::{BytesCodec, LinesCodec}; +pub use io::framed::{Framed, FramedParts}; +pub use io::framed_read::{FramedRead, Decoder}; +pub use io::framed_write::{FramedWrite, Encoder}; + +pub mod length_delimited { + //! Frame a stream of bytes based on a length prefix + //! + //! Many protocols delimit their frames by prefacing frame data with a + //! frame head that specifies the length of the frame. The + //! `length_delimited` module provides utilities for handling the length + //! based framing. This allows the consumer to work with entire frames + //! without having to worry about buffering or other framing logic. + //! + //! # Getting started + //! + //! If implementing a protocol from scratch, using length delimited framing + //! is an easy way to get started. [`Framed::new()`] will adapt a + //! full-duplex byte stream with a length delimited framer using default + //! configuration values. + //! + //! ``` + //! extern crate futures; + //! use futures::io::{AsyncRead, AsyncWrite}; + //! use futures::io::codec::length_delimited; + //! + //! fn bind_transport(io: T) + //! -> length_delimited::Framed + //! { + //! length_delimited::Framed::new(io) + //! } + //! + //! # fn main() {} + //! ``` + //! + //! The returned transport implements `Sink + Stream` for `BytesMut`. It + //! encodes the frame with a big-endian `u32` header denoting the frame + //! payload length: + //! + //! ```text + //! +----------+--------------------------------+ + //! | len: u32 | frame payload | + //! +----------+--------------------------------+ + //! ``` + //! + //! Specifically, given the following: + //! + //! ``` + //! # extern crate bytes; + //! # extern crate futures; + //! + //! use bytes::BytesMut; + //! use futures::{Sink, SinkExt, Future}; + //! use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; + //! use futures::io::codec::length_delimited; + //! use futures::executor::block_on; + //! + //! fn write_frame(io: T) { + //! let mut transport = length_delimited::Framed::new(io); + //! let frame = BytesMut::from("hello world"); + //! + //! block_on(transport.send(frame)).unwrap(); + //! } + //! # fn main() {} + //! ``` + //! + //! The encoded frame will look like this: + //! + //! ```text + //! +---- len: u32 ----+---- data ----+ + //! | \x00\x00\x00\x0b | hello world | + //! +------------------+--------------+ + //! ``` + //! + //! # Decoding + //! + //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], + //! such that each yielded [`BytesMut`] value contains the contents of an + //! entire frame. There are many configuration parameters enabling + //! [`FrameRead`] to handle a wide range of protocols. Here are some + //! examples that will cover the various options at a high level. + //! + //! ## Example 1 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! .num_skip(0) // Do not strip frame header + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! The value of the length field is 11 (`\x0B`) which represents the length + //! of the payload, `hello world`. By default, [`FramedRead`] assumes that + //! the length field represents the number of bytes that **follows** the + //! length field. Thus, the entire frame has a length of 13: 2 bytes for the + //! frame head + 11 bytes for the payload. + //! + //! ## Example 2 + //! + //! The following will parse a `u16` length field at offset 0, omitting the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! // `num_skip` is not needed, the default is to skip + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +--- Payload ---+ + //! | \x00\x0B | Hello world | --> | Hello world | + //! +----------+---------------+ +---------------+ + //! ``` + //! + //! This is similar to the first example, the only difference is that the + //! frame head is **not** included in the yielded `BytesMut` value. + //! + //! ## Example 3 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. In this case, the length field + //! **includes** the frame head length. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(-2) // size of head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! In most cases, the length field represents the length of the payload + //! only, as shown in the previous examples. However, in some protocols the + //! length field represents the length of the whole frame, including the + //! head. In such cases, we specify a negative `length_adjustment` to adjust + //! the value provided in the frame head to represent the payload length. + //! + //! ## Example 4 + //! + //! The following will parse a 3 byte length field at offset 0 in a 5 byte + //! frame head, including the frame head in the yielded `BytesMut`. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(3) + //! .length_adjustment(2) // remaining head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! + //! DECODED + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! ``` + //! + //! A more advanced example that shows a case where there is extra frame + //! head data between the length field and the payload. In such cases, it is + //! usually desirable to include the frame head as part of the yielded + //! `BytesMut`. This lets consumers of the length delimited framer to + //! process the frame head as needed. + //! + //! The positive `length_adjustment` value lets `FramedRead` factor in the + //! additional head into the frame length calculation. + //! + //! ## Example 5 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(1) // length of hdr2 + //! .num_skip(3) // length of hdr1 + LEN + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0B | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! The length field is situated in the middle of the frame head. In this + //! case, the first byte in the frame head could be a version or some other + //! identifier that is not needed for processing. On the other hand, the + //! second half of the head is needed. + //! + //! `length_field_offset` indicates how many bytes to skip before starting + //! to read the length field. `length_adjustment` is the number of bytes to + //! skip starting at the end of the length field. In this case, it is the + //! second half of the head. + //! + //! ## Example 6 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. In this case, the length field **includes** the frame head + //! length. + //! + //! ``` + //! # extern crate futures; + //! # use futures::io::AsyncRead; + //! # use futures::io::codec::length_delimited; + //! # fn bind_read(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(-3) // length of hdr1 + LEN, negative + //! .num_skip(3) + //! .new_read(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0F | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! Similar to the example above, the difference is that the length field + //! represents the length of the entire frame instead of just the payload. + //! The length of `hdr1` and `len` must be counted in `length_adjustment`. + //! Note that the length of `hdr2` does **not** need to be explicitly set + //! anywhere because it already is factored into the total frame length that + //! is read from the byte stream. + //! + //! # Encoding + //! + //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], + //! such that each submitted [`BytesMut`] is prefaced by a length field. + //! There are fewer configuration options than [`FramedRead`]. Given + //! protocols that have more complex frame heads, an encoder should probably + //! be written by hand using [`Encoder`]. + //! + //! Here is a simple example, given a `FramedWrite` with the following + //! configuration: + //! + //! ``` + //! # extern crate bytes; + //! # extern crate futures; + //! # use futures::io::AsyncWrite; + //! # use futures::io::codec::length_delimited; + //! # use bytes::BytesMut; + //! # fn write_frame(io: T) { + //! # let _: length_delimited::FramedWrite = + //! length_delimited::Builder::new() + //! .length_field_length(2) + //! .new_write(io); + //! # } + //! # fn main() {} + //! ``` + //! + //! A payload of `hello world` will be encoded as: + //! + //! ```text + //! +- len: u16 -+---- data ----+ + //! | \x00\x0b | hello world | + //! +------------+--------------+ + //! ``` + //! + //! [`FramedRead`]: struct.FramedRead.html + //! [`FramedWrite`]: struct.FramedWrite.html + //! [`AsyncRead`]: ../../trait.AsyncRead.html + //! [`AsyncWrite`]: ../../trait.AsyncWrite.html + //! [`Encoder`]: ../trait.Encoder.html + //! [`BytesMut`]: https://docs.rs/bytes/~0.4/bytes/struct.BytesMut.html + + pub use io::length_delimited::*; +} diff --git a/futures-util/src/io/codecs.rs b/futures-util/src/io/codecs.rs new file mode 100644 index 0000000000..8e6429124a --- /dev/null +++ b/futures-util/src/io/codecs.rs @@ -0,0 +1,124 @@ +use bytes::{Bytes, BufMut, BytesMut}; +use io::codec::{Encoder, Decoder}; +use std::{io, str}; +use std::string::{String, ToString}; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { BytesCodec(()) } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf).map_err(|_| + io::Error::new( + io::ErrorKind::InvalidData, + "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { + if let Some(newline_offset) = + buf[self.next_index..].iter().position(|b| *b == b'\n') + { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len()-1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} diff --git a/futures-util/src/io/copy.rs b/futures-util/src/io/copy.rs new file mode 100644 index 0000000000..8f6f5196a9 --- /dev/null +++ b/futures-util/src/io/copy.rs @@ -0,0 +1,97 @@ +use std::io; +use std::boxed::Box; + +use {Future, Poll, task}; + +use futures_io::{AsyncRead, AsyncWrite}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the [`copy`] function, this future will resolve to the number of +/// bytes copied or an error if one happens. +/// +/// [`copy`]: fn.copy.html +#[derive(Debug)] +pub struct Copy { + reader: Option, + read_done: bool, + writer: Option, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy(reader: R, writer: W) -> Copy + where R: AsyncRead, + W: AsyncWrite, +{ + Copy { + reader: Some(reader), + read_done: false, + writer: Some(writer), + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl Future for Copy + where R: AsyncRead, + W: AsyncWrite, +{ + type Item = (u64, R, W); + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll<(u64, R, W), io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let reader = self.reader.as_mut().unwrap(); + let n = try_ready!(reader.poll_read(&mut self.buf, cx)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let writer = self.writer.as_mut().unwrap(); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap], cx)); + if i == 0 { + return Err(io::Error::new(io::ErrorKind::WriteZero, + "write zero byte into writer")); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush(cx)); + let reader = self.reader.take().unwrap(); + let writer = self.writer.take().unwrap(); + return Ok((self.amt, reader, writer).into()) + } + } + } +} diff --git a/futures-util/src/io/flush.rs b/futures-util/src/io/flush.rs new file mode 100644 index 0000000000..6b7c8b6288 --- /dev/null +++ b/futures-util/src/io/flush.rs @@ -0,0 +1,44 @@ +use std::io; + +use {Poll, Future, Async, task}; + +use futures_io::AsyncWrite; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the [`flush`] function. +/// +/// [`flush`]: fn.flush.html +#[derive(Debug)] +pub struct Flush { + a: Option, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush(a: A) -> Flush + where A: AsyncWrite, +{ + Flush { + a: Some(a), + } +} + +impl Future for Flush + where A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll { + try_ready!(self.a.as_mut().unwrap().poll_flush(cx)); + Ok(Async::Ready(self.a.take().unwrap())) + } +} + diff --git a/futures-util/src/io/framed.rs b/futures-util/src/io/framed.rs new file mode 100644 index 0000000000..0bb9573181 --- /dev/null +++ b/futures-util/src/io/framed.rs @@ -0,0 +1,257 @@ +use std::io::{self, Read, Write}; +use std::fmt; + +use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec}; +use io::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2, Decoder}; +use io::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2, Encoder}; + +use {Stream, Sink, Poll, task}; +use bytes::{BytesMut}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +pub struct Framed { + inner: FramedRead2>>, +} + +pub struct Fuse(pub T, pub U); + +pub fn framed(inner: T, codec: U) -> Framed + where T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } +} + +impl Framed { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts, codec: U) -> Framed + { + Framed { + inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + FramedParts { inner: inner.0, readbuf, writebuf } + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data, and also the current codec state. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + /// + /// Note that this function will be removed once the codec has been + /// integrated into `FramedParts` in a new version (see + /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)). + pub fn into_parts_and_codec(self) -> (FramedParts, U) { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + (FramedParts { inner: inner.0, readbuf, writebuf }, inner.1) + } +} + +impl Stream for Framed + where T: AsyncRead, + U: Decoder, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + self.inner.poll(cx) + } +} + +impl Sink for Framed + where T: AsyncWrite, + U: Encoder, + U::Error: From, +{ + type SinkItem = U::Item; + type SinkError = U::Error; + + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_ready(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.inner.get_mut().start_send(item) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.inner.get_mut().start_close() + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_flush(cx) + } +} + +impl fmt::Debug for Framed + where T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl Read for Fuse { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + self.0.read(dst) + } +} + +impl AsyncRead for Fuse { + unsafe fn initializer(&self) -> Initializer { + self.0.initializer() + } + + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll + { + self.0.poll_read(buf, cx) + } + + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + self.0.poll_vectored_read(vec, cx) + } +} + +impl Write for Fuse { + fn write(&mut self, src: &[u8]) -> io::Result { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl AsyncWrite for Fuse { + fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + -> Poll + { + self.0.poll_write(buf, cx) + } + + fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + -> Poll + { + self.0.poll_vectored_write(vec, cx) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + self.0.poll_flush(cx) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + self.0.poll_close(cx) + } +} + +impl Decoder for Fuse { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl Encoder for Fuse { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts +{ + /// The inner transport used to read bytes to and write bytes to + pub inner: T, + /// The buffer with read but unprocessed data. + pub readbuf: BytesMut, + /// A buffer with unprocessed data which are not written yet. + pub writebuf: BytesMut +} diff --git a/futures-util/src/io/framed_read.rs b/futures-util/src/io/framed_read.rs new file mode 100644 index 0000000000..1764961e95 --- /dev/null +++ b/futures-util/src/io/framed_read.rs @@ -0,0 +1,297 @@ +use std::{fmt, io}; + +use futures_io::AsyncRead; +use io::AsyncReadExt; +use io::framed::Fuse; + +use {Async, Poll, Stream, Sink, task}; +use bytes::BytesMut; + +/// Decoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `Framed` or +/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has +/// already been buffered in `src` and decodes the data into a stream of +/// `Self::Item` frames. +/// +/// Implementations are able to track state on `self`, which enables +/// implementing stateful streaming parsers. In many cases, though, this type +/// will simply be a unit struct (e.g. `struct HttpDecoder`). +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// `From` is required in the interest of making `Error` suitable + /// for returning directly from a `FramedRead`, and to enable the default + /// implementation of `decode_eof` to yield an `io::Error` when the decoder + /// fails to consume all available data. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedRead` whenever bytes are ready to be + /// parsed. The provided buffer of bytes is what's been read so far, and + /// this instance of `Decode` can determine whether an entire frame is in + /// the buffer and is ready to be returned. + /// + /// If an entire frame is available, then this instance will remove those + /// bytes from the buffer provided and return them as a decoded + /// frame. Note that removing bytes from the provided buffer doesn't always + /// necessarily copy the bytes, so this should be an efficient operation in + /// most circumstances. + /// + /// If the bytes look valid, but a frame isn't fully available yet, then + /// `Ok(None)` is returned. This indicates to the `Framed` instance that + /// it needs to read some more bytes before calling this method again. + /// + /// Note that the bytes provided may be empty. If a previous call to + /// `decode` consumed all the bytes in the buffer then `decode` will be + /// called again until it returns `None`, indicating that more bytes need to + /// be read. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + /// + /// Note that the `buf` argument may be empty. If a previous call to + /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be + /// called again until it returns `None`, indicating that there are no more + /// frames to yield. This behavior enables returning finalization frames + /// that may not be based on inbound data. + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + match try!(self.decode(buf)) { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "bytes remaining on stream").into()) + } + } + } + } +} + +/// A `Stream` of messages decoded from an `AsyncRead`. +pub struct FramedRead { + inner: FramedRead2>, +} + +pub struct FramedRead2 { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl FramedRead + where T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl FramedRead { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl Stream for FramedRead + where T: AsyncRead, + D: Decoder, +{ + type Item = D::Item; + type Error = D::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + self.inner.poll(cx) + } +} + +impl Sink for FramedRead + where T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_ready(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.inner.inner.0.start_send(item) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.inner.inner.0.start_close() + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_flush(cx) + } +} + +impl fmt::Debug for FramedRead + where T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub fn framed_read2(inner: T) -> FramedRead2 { + FramedRead2 { + inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_read2_with_buffer(inner: T, mut buf: BytesMut) -> FramedRead2 { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner, + eof: false, + is_readable: buf.len() > 0, + buffer: buf, + } +} + +impl FramedRead2 { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Stream for FramedRead2 + where T: AsyncRead + Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if self.is_readable { + if self.eof { + let frame = self.inner.decode_eof(&mut self.buffer)?; + return Ok(Async::Ready(frame)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = self.inner.decode(&mut self.buffer)? { + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.buffer.reserve(1); + if 0 == try_ready!(self.inner.read_buf(&mut self.buffer, cx)) { + self.eof = true; + } + + self.is_readable = true; + } + } +} diff --git a/futures-util/src/io/framed_write.rs b/futures-util/src/io/framed_write.rs new file mode 100644 index 0000000000..3ac15f6ea3 --- /dev/null +++ b/futures-util/src/io/framed_write.rs @@ -0,0 +1,267 @@ +use std::io::{self, Read}; +use std::fmt; + +use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec}; +use io::codec::Decoder; +use io::framed::Fuse; + +use {Async, Poll, Stream, Sink, task}; +use bytes::BytesMut; + +/// Trait of helper objects to write out messages as bytes, for use with +/// `FramedWrite`. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + /// + /// `FramedWrite` requires `Encoder`s errors to implement `From` + /// in the interest letting it return `Error`s directly. + type Error: From; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `item` into the byte buffer provided by `dst`. + /// The `dst` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) + -> Result<(), Self::Error>; +} + +/// A `Sink` of frames encoded to an `AsyncWrite`. +pub struct FramedWrite { + inner: FramedWrite2>, +} + +pub struct FramedWrite2 { + inner: T, + buffer: BytesMut, + do_close: bool, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl FramedWrite + where T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl FramedWrite { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +impl Sink for FramedWrite + where T: AsyncWrite, + E: Encoder, +{ + type SinkItem = E::Item; + type SinkError = E::Error; + + delegate_sink!(inner); +} + +impl Stream for FramedWrite + where T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + self.inner.inner.0.poll(cx) + } +} + +impl fmt::Debug for FramedWrite + where T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedWrite2 ===== + +pub fn framed_write2(inner: T) -> FramedWrite2 { + FramedWrite2 { + inner, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + do_close: false, + } +} + +pub fn framed_write2_with_buffer(inner: T, mut buf: BytesMut) -> FramedWrite2 { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedWrite2 { + inner, + buffer: buf, + do_close: false, + } +} + +impl FramedWrite2 { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Sink for FramedWrite2 + where T: AsyncWrite + Encoder, +{ + type SinkItem = T::Item; + type SinkError = T::Error; + + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's + // *still* over 8KiB, then apply backpressure (reject the send). + if self.buffer.len() < BACKPRESSURE_BOUNDARY { + return Ok(Async::Ready(())); + } + + self.poll_flush(cx)?; + + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + Ok(Async::Pending) + } else { + Ok(Async::Ready(())) + } + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.inner.encode(item, &mut self.buffer) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.do_close = true; + Ok(()) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + trace!("flushing framed transport"); + + while !self.buffer.is_empty() { + trace!("writing; remaining={}", self.buffer.len()); + + let n = try_ready!(self.inner.poll_write(&self.buffer, cx)); + + if n == 0 { + return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to + write frame to transport").into()); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.buffer.split_to(n); + } + + try_ready!(self.inner.poll_flush(cx)); + + if self.do_close { + self.inner.poll_close(cx).map_err(Into::into) + } else { + Ok(Async::Ready(())) + } + } +} + +impl Decoder for FramedWrite2 { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, T::Error> { + self.inner.decode(src) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result, T::Error> { + self.inner.decode_eof(src) + } +} + +impl Read for FramedWrite2 { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + self.inner.read(dst) + } +} + +impl AsyncRead for FramedWrite2 { + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } + + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll + { + self.inner.poll_read(buf, cx) + } + + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + self.inner.poll_vectored_read(vec, cx) + } +} diff --git a/futures-util/src/io/io.rs b/futures-util/src/io/io.rs new file mode 100644 index 0000000000..bcd58c486c --- /dev/null +++ b/futures-util/src/io/io.rs @@ -0,0 +1,23 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +pub use io::allow_std::AllowStdIo; +pub use io::copy::{copy, Copy}; +pub use io::flush::{flush, Flush}; +//pub use io::lines::{lines, Lines}; +pub use io::read::{read, Read}; +pub use io::read_exact::{read_exact, ReadExact}; +pub use io::read_to_end::{read_to_end, ReadToEnd}; +//pub use io::read_until::{read_until, ReadUntil}; +pub use io::shutdown::{shutdown, Shutdown}; +pub use io::split::{ReadHalf, WriteHalf}; +pub use io::window::Window; +pub use io::write_all::{write_all, WriteAll}; diff --git a/futures-util/src/io/length_delimited.rs b/futures-util/src/io/length_delimited.rs new file mode 100644 index 0000000000..a17a65e0af --- /dev/null +++ b/futures-util/src/io/length_delimited.rs @@ -0,0 +1,902 @@ +use bytes::{Buf, BufMut, BytesMut, IntoBuf, BigEndian, LittleEndian}; +use bytes::buf::Chain; + +use {Async, Stream, Sink, Poll, task}; +use io::{AsyncWriteExt, codec}; + +use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec}; + +use std::{cmp, fmt}; +use std::error::Error as StdError; +use std::io::{self, Cursor}; + +/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values. +/// +/// `Builder` enables constructing configured length delimited framers. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// Adapts a byte stream into a unified `Stream` and `Sink` that works over +/// entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +pub struct Framed { + inner: FramedRead>, +} + +/// Adapts a byte stream to a `Stream` yielding entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[derive(Debug)] +pub struct FramedRead { + inner: codec::FramedRead, +} + +/// An error when the number of bytes read is more than max frame length. +pub struct FrameTooBig { + _priv: (), +} + +#[derive(Debug)] +struct Decoder { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +/// Adapts a byte stream to a `Sink` accepting entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +pub struct FramedWrite { + // I/O type + inner: T, + + // Configuration values + builder: Builder, + + // Current frame being written + frame: Option, B::Buf>>, + + do_close: bool, +} + +// ===== impl Framed ===== + +impl Framed { + /// Creates a new `Framed` with default configuration values. + pub fn new(inner: T) -> Framed { + Builder::new().new_framed(inner) + } +} + +impl Framed { + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref().get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().get_mut() + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner() + } +} + +impl Stream for Framed { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, io::Error> { + self.inner.poll(cx) + } +} + +impl Sink for Framed { + type SinkItem = B; + type SinkError = io::Error; + + delegate_sink!(inner); +} + +impl fmt::Debug for Framed + where T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl FramedRead ===== + +impl FramedRead { + /// Creates a new `FramedRead` with default configuration values. + pub fn new(inner: T) -> FramedRead { + Builder::new().new_read(inner) + } +} + +impl FramedRead { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.inner.decoder().builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.inner.decoder_mut().builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl Stream for FramedRead { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, io::Error> { + self.inner.poll(cx) + } +} + +impl Sink for FramedRead { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + delegate_sink!(inner); +} + +impl io::Write for FramedRead { + fn write(&mut self, src: &[u8]) -> io::Result { + self.inner.get_mut().write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.get_mut().flush() + } +} + +impl AsyncWrite for FramedRead { + fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + -> Poll + { + self.inner.get_mut().poll_write(buf, cx) + } + + fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + -> Poll + { + self.inner.get_mut().poll_vectored_write(vec, cx) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + self.inner.get_mut().poll_flush(cx) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + self.inner.get_mut().poll_close(cx) + } + +} + +// ===== impl Decoder ====== + +impl Decoder { + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint::(field_len) + } else { + src.get_uint::(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig { + _priv: (), + })); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl codec::Decoder for Decoder { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result> { + let n = match self.state { + DecodeState::Head => { + match self.decode_head(src)? { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + } + } + DecodeState::Data(n) => n, + }; + + match self.decode_data(n, src)? { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +// ===== impl FramedWrite ===== + +impl FramedWrite { + /// Creates a new `FramedWrite` with default configuration values. + pub fn new(inner: T) -> FramedWrite { + Builder::new().new_write(inner) + } +} + +impl FramedWrite { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will write to the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is encoded. In other + /// words, if a frame is currently in process of being encoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl FramedWrite { + // If there is a buffered frame, try to write it to `T` + fn do_write(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { + if self.frame.is_none() { + return Ok(Async::Ready(())); + } + + loop { + let frame = self.frame.as_mut().unwrap(); + try_ready!(self.inner.write_buf(frame, cx)); + + if !frame.has_remaining() { + break; + } + } + + self.frame = None; + + Ok(Async::Ready(())) + } + + fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> { + let mut head = BytesMut::with_capacity(8); + let n = buf.remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig { + _priv: (), + })); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + // Error handling + let n = match n { + Some(n) => n, + None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), + }; + + if self.builder.length_field_is_big_endian { + head.put_uint::(n as u64, self.builder.length_field_len); + } else { + head.put_uint::(n as u64, self.builder.length_field_len); + } + + debug_assert!(self.frame.is_none()); + + self.frame = Some(head.into_buf().chain(buf)); + + Ok(()) + } +} + +impl Sink for FramedWrite { + type SinkItem = B; + type SinkError = io::Error; + + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.do_write(cx) + } + + fn start_send(&mut self, item: B) -> Result<(), Self::SinkError> { + self.set_frame(item.into_buf()) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.do_close = true; + Ok(()) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + // Write any buffered frame to T + try_ready!(self.do_write(cx)); + + // Try flushing the underlying IO + self.inner.poll_close(cx) + } +} + +impl Stream for FramedWrite { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, T::Error> { + self.inner.poll(cx) + } +} + +impl io::Read for FramedWrite { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + self.get_mut().read(dst) + } +} + +impl AsyncRead for FramedWrite { + unsafe fn initializer(&self) -> Initializer { + self.get_ref().initializer() + } + + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll + { + self.get_mut().poll_read(buf, cx) + } + + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + self.get_mut().poll_vectored_read(vec, cx) + } +} + +impl fmt::Debug for FramedWrite + where T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner) + .field("builder", &self.builder) + .field("frame", &self.frame) + .finish() + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited framer builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # use futures::io::AsyncRead; + /// use futures::io::codec::length_delimited::Builder; + /// + /// # fn bind_read(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # fn main() {} + /// ``` + pub fn new_read(&self, upstream: T) -> FramedRead + where T: AsyncRead, + { + FramedRead { + inner: codec::FramedRead::new(upstream, Decoder { + builder: *self, + state: DecodeState::Head, + }), + } + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate bytes; + /// # extern crate futures; + /// # use futures::io::AsyncWrite; + /// # use futures::io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame(io: T) { + /// # let _: length_delimited::FramedWrite = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # fn main() {} + /// ``` + pub fn new_write(&self, inner: T) -> FramedWrite + where T: AsyncWrite, + B: IntoBuf, + { + FramedWrite { + inner: inner, + builder: *self, + frame: None, + do_close: false, + } + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate bytes; + /// # extern crate futures; + /// # use futures::io::{AsyncRead, AsyncWrite}; + /// # use futures::io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame(io: T) { + /// # let _: length_delimited::Framed = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # fn main() {} + /// ``` + pub fn new_framed(&self, inner: T) -> Framed + where T: AsyncRead + AsyncWrite, + B: IntoBuf + { + let inner = self.new_read(self.new_write(inner)); + Framed { inner: inner } + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len) + } +} + + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig") + .finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs new file mode 100644 index 0000000000..37620762dd --- /dev/null +++ b/futures-util/src/io/lines.rs @@ -0,0 +1,61 @@ +use std::io::{self, BufRead}; +use std::mem; +use std::string::String; + +use {Poll, Stream}; + +use io::AsyncRead; + +/// Combinator created by the top-level `lines` method which is a stream over +/// the lines of text on an I/O object. +#[derive(Debug)] +pub struct Lines { + io: A, + line: String, +} + +/// Creates a new stream from the I/O object given representing the lines of +/// input that are found on `A`. +/// +/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of +/// lines that the object contains. The returned stream will reach its end once +/// `a` reaches EOF. +pub fn lines(a: A) -> Lines + where A: AsyncRead + BufRead, +{ + Lines { + io: a, + line: String::new(), + } +} + +impl Lines { + /// Returns the underlying I/O object. + /// + /// Note that this may lose data already read into internal buffers. It's + /// recommended to only call this once the stream has reached its end. + pub fn into_inner(self) -> A { + self.io + } +} + +impl Stream for Lines + where A: AsyncRead + BufRead, +{ + type Item = String; + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll, io::Error> { + let n = try_ready!(self.io.read_line(&mut self.line)); + if n == 0 && self.line.len() == 0 { + return Ok(None.into()) + } + if self.line.ends_with("\n") { + self.line.pop(); + if self.line.ends_with("\r") { + self.line.pop(); + } + } + Ok(Some(mem::replace(&mut self.line, String::new())).into()) + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs new file mode 100644 index 0000000000..df57dbbeee --- /dev/null +++ b/futures-util/src/io/mod.rs @@ -0,0 +1,159 @@ +//! Core I/O traits and combinators. +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +use std::io as std_io; + +use futures_core::{Async, Poll, task}; +pub use futures_io::{AsyncRead, AsyncWrite, IoVec}; +use bytes::{Buf, BufMut}; + +pub mod io; +pub mod codec; + +mod allow_std; +mod codecs; +mod copy; +mod flush; +mod framed; +mod framed_read; +mod framed_write; +mod length_delimited; +// Requires "BufRead" +// mod lines; +mod read; +mod read_exact; +mod read_to_end; +// TODO: resolve. Temporary disabled because it requires "BufRead", +// which does not have an async equivalent. +// mod read_until; +mod shutdown; +mod split; +mod window; +mod write_all; + +use self::codec::{Decoder, Encoder, Framed}; +use self::split::{ReadHalf, WriteHalf}; + +/// An extension trait which adds utility methods to `AsyncRead` types. +pub trait AsyncReadExt: AsyncRead { + /// Pull some bytes from this source into the specified `Buf`, returning + /// how many bytes were read. + /// + /// The `buf` provided will have bytes read into it and the internal cursor + /// will be advanced if any bytes were read. Note that this method typically + /// will not reallocate the buffer provided. + fn read_buf(&mut self, buf: &mut B, cx: &mut task::Context) + -> Poll + where Self: Sized, + { + if !buf.has_remaining_mut() { + return Ok(Async::Ready(0)); + } + + unsafe { + let n = { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), b2.into(), b3.into(), b4.into(), + b5.into(), b6.into(), b7.into(), b8.into(), + b9.into(), b10.into(), b11.into(), b12.into(), + b13.into(), b14.into(), b15.into(), b16.into(), + ]; + let n = buf.bytes_vec_mut(&mut bufs); + try_ready!(self.poll_vectored_read(&mut bufs[..n], cx)) + }; + + buf.advance_mut(n); + Ok(Async::Ready(n)) + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + fn framed(self, codec: T) -> Framed + where Self: AsyncWrite + Sized, + { + framed::framed(self, codec) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `Read` and `Write` traits, + /// respectively. + fn split(self) -> (ReadHalf, WriteHalf) + where Self: AsyncWrite + Sized, + { + split::split(self) + } +} + +impl AsyncReadExt for T {} + +/// An extension trait which adds utility methods to `AsyncWrite` types. +pub trait AsyncWriteExt: AsyncWrite { + /// Write a `Buf` into this value, returning how many bytes were written. + /// + /// Note that this method will advance the `buf` provided automatically by + /// the number of bytes written. + fn write_buf(&mut self, buf: &mut B, cx: &mut task::Context) + -> Poll + where Self: Sized, + { + if !buf.has_remaining() { + return Ok(Async::Ready(0)); + } + + let n = { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [iovec; 64]; + let n = buf.bytes_vec(&mut bufs); + try_ready!(self.poll_vectored_write(&bufs[..n], cx)) + }; + buf.advance(n); + Ok(Async::Ready(n)) + } +} + +impl AsyncWriteExt for T {} diff --git a/futures-util/src/io/read.rs b/futures-util/src/io/read.rs new file mode 100644 index 0000000000..65b8720938 --- /dev/null +++ b/futures-util/src/io/read.rs @@ -0,0 +1,57 @@ +use std::io; +use std::mem; + +use {Future, Poll, task}; + +use io::AsyncRead; + +#[derive(Debug)] +enum State { + Pending { + rd: R, + buf: T, + }, + Empty, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read(rd: R, buf: T) -> Read + where R: AsyncRead, + T: AsMut<[u8]> +{ + Read { state: State::Pending { rd: rd, buf: buf } } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read { + state: State, +} + +impl Future for Read + where R: AsyncRead, + T: AsMut<[u8]> +{ + type Item = (R, T, usize); + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll<(R, T, usize), io::Error> { + let nread = match self.state { + State::Pending { ref mut rd, ref mut buf } => + try_ready!(rd.poll_read(&mut buf.as_mut()[..], cx)), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { rd, buf } => Ok((rd, buf, nread).into()), + State::Empty => panic!("invalid internal state"), + } + } +} diff --git a/futures-util/src/io/read_exact.rs b/futures-util/src/io/read_exact.rs new file mode 100644 index 0000000000..18318d2e72 --- /dev/null +++ b/futures-util/src/io/read_exact.rs @@ -0,0 +1,83 @@ +use std::io; +use std::mem; + +use {Poll, Future, task}; + +use io::AsyncRead; + +/// A future which can be used to easily read exactly enough bytes to fill +/// a buffer. +/// +/// Created by the [`read_exact`] function. +/// +/// [`read_exact`]: fn.read_exact.html +#[derive(Debug)] +pub struct ReadExact { + state: State, +} + +#[derive(Debug)] +enum State { + Reading { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact(a: A, buf: T) -> ReadExact + where A: AsyncRead, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl Future for ReadExact + where A: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf, ref mut pos } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_ready!(a.poll_read(&mut buf[*pos..], cx)); + *pos += n; + if n == 0 { + return Err(eof()) + } + } + } + State::Empty => panic!("poll a ReadExact after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs new file mode 100644 index 0000000000..bdf3c0071f --- /dev/null +++ b/futures-util/src/io/read_to_end.rs @@ -0,0 +1,120 @@ +use std::io::{self, ErrorKind}; +use std::mem; +use std::vec::Vec; + +use {Async, Poll, Future, task}; + +use io::AsyncRead; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the [`read_to_end`] function. +/// +/// [`read_to_end`]: fn.read_to_end.html +#[derive(Debug)] +pub struct ReadToEnd { + state: State, +} + +#[derive(Debug)] +enum State { + Reading { + a: A, + buf: Vec, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_to_end(a: A, buf: Vec) -> ReadToEnd + where A: AsyncRead, +{ + ReadToEnd { + state: State::Reading { + a: a, + buf: buf, + } + } +} + +struct Guard<'a> { buf: &'a mut Vec, len: usize } + +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + unsafe { self.buf.set_len(self.len); } + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +fn read_to_end_internal(r: &mut R, buf: &mut Vec, cx: &mut task::Context) + -> Poll +{ + let start_len = buf.len(); + let mut g = Guard { len: buf.len(), buf: buf }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + r.initializer().initialize(&mut g.buf[g.len..]); + } + } + + match r.poll_read(&mut g.buf[g.len..], cx) { + Ok(Async::Ready(0)) => { + ret = Ok(Async::Ready(g.len - start_len)); + break; + } + Ok(Async::Ready(n)) => g.len += n, + Ok(Async::Pending) => return Ok(Async::Pending), + Err(ref e) if e.kind() == ErrorKind::Interrupted => {} + Err(e) => { + ret = Err(e); + break; + } + } + } + + ret +} + +impl Future for ReadToEnd + where A: AsyncRead, +{ + type Item = (A, Vec); + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_ready!(read_to_end_internal(a, buf, cx)); + }, + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs new file mode 100644 index 0000000000..71ebf09c1b --- /dev/null +++ b/futures-util/src/io/read_until.rs @@ -0,0 +1,75 @@ +use std::io::{self, BufRead}; +use std::mem; +use std::vec::Vec; + +use {Poll, Future}; + +use io::AsyncRead; + +/// A future which can be used to easily read the contents of a stream into a +/// vector until the delimiter is reached. +/// +/// Created by the [`read_until`] function. +/// +/// [`read_until`]: fn.read_until.html +#[derive(Debug)] +pub struct ReadUntil { + state: State, +} + +#[derive(Debug)] +enum State { + Reading { + a: A, + byte: u8, + buf: Vec, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided until the delimiter `byte` is reached. +/// This method is the async equivalent to [`BufRead::read_until`]. +/// +/// In case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all bytes up to, and including, the delimiter +/// (if found). +/// +/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until +pub fn read_until(a: A, byte: u8, buf: Vec) -> ReadUntil + where A: AsyncRead + BufRead, +{ + ReadUntil { + state: State::Reading { + a: a, + byte: byte, + buf: buf, + } + } +} + +impl Future for ReadUntil + where A: AsyncRead + BufRead +{ + type Item = (A, Vec); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec), io::Error> { + match self.state { + State::Reading { ref mut a, byte, ref mut buf } => { + // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter. + // and just return it, as we are finished. + // If we hit "would block" then all the read data so far + // is in our buffer, and otherwise we propagate errors. + try_ready!(a.read_until(byte, buf)); + }, + State::Empty => panic!("poll ReadUntil after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, byte: _, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/futures-util/src/io/shutdown.rs b/futures-util/src/io/shutdown.rs new file mode 100644 index 0000000000..6e06c8dc66 --- /dev/null +++ b/futures-util/src/io/shutdown.rs @@ -0,0 +1,44 @@ +use std::io; + +use {Poll, Future, Async, task}; + +use AsyncWrite; + +/// A future used to fully shutdown an I/O object. +/// +/// Resolves to the underlying I/O object once the shutdown operation is +/// complete. +/// +/// Created by the [`shutdown`] function. +/// +/// [`shutdown`]: fn.shutdown.html +#[derive(Debug)] +pub struct Shutdown { + a: Option, +} + +/// Creates a future which will entirely shutdown an I/O object and then yield +/// the object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`, +/// scheduling a retry if `WouldBlock` is seen along the way. +pub fn shutdown(a: A) -> Shutdown + where A: AsyncWrite, +{ + Shutdown { + a: Some(a), + } +} + +impl Future for Shutdown + where A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll { + try_ready!(self.a.as_mut().unwrap().poll_close(cx)); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs new file mode 100644 index 0000000000..e9a27f9afe --- /dev/null +++ b/futures-util/src/io/split.rs @@ -0,0 +1,68 @@ +use std::io; + +use {Async, Poll, task}; +use lock::BiLock; + +use futures_io::{AsyncRead, AsyncWrite, Error, IoVec}; + +/// The readable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct ReadHalf { + handle: BiLock, +} + +/// The writable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct WriteHalf { + handle: BiLock, +} + +fn lock_and_then(lock: &BiLock, cx: &mut task::Context, f: F) -> Result, E> + where F: FnOnce(&mut T, &mut task::Context) -> Result, E> +{ + match lock.poll_lock(cx) { + Async::Ready(ref mut l) => f(l, cx), + Async::Pending => Ok(Async::Pending), + } +} + +pub fn split(t: T) -> (ReadHalf, WriteHalf) { + let (a, b) = BiLock::new(t); + (ReadHalf { handle: a }, WriteHalf { handle: b }) +} + +impl AsyncRead for ReadHalf { + fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + -> Poll + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_read(buf, cx)) + } + + fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + -> Poll + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_read(vec, cx)) + } +} + +impl AsyncWrite for WriteHalf { + fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + -> Poll + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_write(buf, cx)) + } + + fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + -> Poll + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_write(vec, cx)) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx)) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx)) + } +} diff --git a/futures-util/src/io/window.rs b/futures-util/src/io/window.rs new file mode 100644 index 0000000000..4ded9ad403 --- /dev/null +++ b/futures-util/src/io/window.rs @@ -0,0 +1,117 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +#[derive(Debug)] +pub struct Window { + inner: T, + range: ops::Range, +} + +impl> Window { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window { + assert!(start <= self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes before the `start` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window { + assert!(end <= self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl> AsRef<[u8]> for Window { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl> AsMut<[u8]> for Window { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs new file mode 100644 index 0000000000..a24be2524b --- /dev/null +++ b/futures-util/src/io/write_all.rs @@ -0,0 +1,86 @@ +use std::io; +use std::mem; + +use {Poll, Future, task}; + +use AsyncWrite; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +#[derive(Debug)] +pub struct WriteAll { + state: State, +} + +#[derive(Debug)] +enum State { + Writing { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all(a: A, buf: T) -> WriteAll + where A: AsyncWrite, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl Future for WriteAll + where A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { ref mut a, ref buf, ref mut pos } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_ready!(a.poll_write(&buf[*pos..], cx)); + *pos += n; + if n == 0 { + return Err(zero_write()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index a6efc7a2b9..d6be5899f1 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -6,8 +6,14 @@ #[macro_use] extern crate futures_core; +extern crate futures_io; extern crate futures_sink; +#[cfg(feature = "std")] +use futures_core::{Async, Future, Poll, Stream, task}; +#[cfg(feature = "std")] +use futures_sink::Sink; + macro_rules! if_std { ($($i:item)*) => ($( #[cfg(feature = "std")] @@ -15,6 +21,16 @@ macro_rules! if_std { )*) } +if_std! { + extern crate bytes; + #[macro_use] + extern crate log; +} + +#[cfg(feature = "std")] +#[macro_use] +extern crate std; + macro_rules! delegate_sink { ($field:ident) => { fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { @@ -35,16 +51,17 @@ macro_rules! delegate_sink { } } -#[macro_use] -#[cfg(feature = "std")] -extern crate std; - #[cfg(feature = "std")] pub mod lock; pub mod future; pub use future::FutureExt; +#[cfg(feature = "std")] +pub mod io; +#[cfg(feature = "std")] +pub use io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; + pub mod stream; pub use stream::StreamExt; @@ -54,4 +71,6 @@ pub use sink::SinkExt; pub mod prelude { //! Prelude with common traits from the `futures-util` crate. pub use {FutureExt, StreamExt, SinkExt}; + #[cfg(feature = "std")] + pub use {AsyncReadExt, AsyncWriteExt}; } diff --git a/futures/Cargo.toml b/futures/Cargo.toml index ac636d59de..b666ec6402 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -22,9 +22,10 @@ appveyor = { repository = "alexcrichton/futures-rs" } futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = false } +futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false } futures-util = { path = "../futures-util", version = "0.2.0", default-features = false } [features] -std = ["futures-core/std", "futures-executor/std", "futures-sink/std", "futures-util/std"] +std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-util/std"] default = ["std"] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 38e51753fd..817536f3f0 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -166,6 +166,7 @@ extern crate futures_core; extern crate futures_channel; extern crate futures_executor; +extern crate futures_io; extern crate futures_sink; extern crate futures_util; @@ -223,6 +224,16 @@ pub mod future { pub use futures_util::future::*; } +#[cfg(feature = "std")] +pub mod io { + //! IO + //! + //! This module contains the `AsyncRead` and `AsyncWrite` traits, as well + //! as a number of combinators and extensions for using them. + pub use futures_io::*; + pub use futures_util::io::*; +} + pub mod prelude { //! A "prelude" for crates using the `futures` crate. //! @@ -253,6 +264,14 @@ pub mod prelude { StreamExt, SinkExt, }; + + #[cfg(feature = "std")] + pub use futures_util::{ + AsyncRead, + AsyncWrite, + AsyncReadExt, + AsyncWriteExt, + }; } pub mod sink { From 00f8f370731dc1b12b71cf9f3ab988dd8f392ae2 Mon Sep 17 00:00:00 2001 From: Taylor Cramer Date: Tue, 20 Feb 2018 17:35:10 -0800 Subject: [PATCH 2/2] Make cx always the first argument --- futures-channel/src/mpsc/mod.rs | 6 ++--- futures-io/src/lib.rs | 36 ++++++++++++------------- futures-util/src/io/allow_std.rs | 4 +-- futures-util/src/io/copy.rs | 4 +-- futures-util/src/io/framed.rs | 16 +++++------ futures-util/src/io/framed_read.rs | 2 +- futures-util/src/io/framed_write.rs | 10 +++---- futures-util/src/io/length_delimited.rs | 18 ++++++------- futures-util/src/io/mod.rs | 8 +++--- futures-util/src/io/read.rs | 2 +- futures-util/src/io/read_exact.rs | 2 +- futures-util/src/io/read_to_end.rs | 10 +++---- futures-util/src/io/split.rs | 16 +++++------ futures-util/src/io/write_all.rs | 2 +- 14 files changed, 68 insertions(+), 68 deletions(-) diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 7160458129..f362e38de7 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -407,7 +407,7 @@ impl Sender { } // The channel has capacity to accept the message, so send it - self.do_send(Some(msg), None) + self.do_send(None, Some(msg)) .map_err(|ChannelClosed(v)| { TryChannelClosed { kind: TryChannelClosedKind::Disconnected(v.unwrap()), @@ -419,12 +419,12 @@ impl Sender { /// This function should only be called after `poll_ready` has responded /// that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed> { - self.do_send(Some(msg), None) + self.do_send(None, Some(msg)) } // Do the send without failing // None means close - fn do_send(&mut self, msg: Option, cx: Option<&mut task::Context>) + fn do_send(&mut self, cx: Option<&mut task::Context>, msg: Option) -> Result<(), ChannelClosed> { // First, increment the number of messages contained by the channel. diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs index e58a0cd95f..91e1518b2d 100644 --- a/futures-io/src/lib.rs +++ b/futures-io/src/lib.rs @@ -97,7 +97,7 @@ if_std! { /// If reading would block, this function returns `Ok(Async::Pending)` /// and arranges for `cx.waker()` to receive a notification when the /// object becomes readable or is closed. - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll; /// Attempt to read from the `AsyncRead` into `vec` using vectored @@ -113,11 +113,11 @@ if_std! { /// If reading would block, this function returns `Ok(Async::Pending)` /// and arranges for `cx.waker()` to receive a notification when the /// object becomes readable or is closed. - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - if let Some(first_iovec) = vec.get_mut(0) { - self.poll_read(&mut *first_iovec, cx) + if let Some(ref mut first_iovec) = vec.get_mut(0) { + self.poll_read(cx, first_iovec) } else { // `vec` is empty. return Ok(Async::Ready(0)); @@ -134,7 +134,7 @@ if_std! { /// If writing would block, this function returns `Ok(Async::Pending)` /// and arranges for `cx.waker()` to receive a notification when the /// the object becomes writable or is closed. - fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll; /// Attempt to write bytes from `vec` into the object using vectored @@ -150,11 +150,11 @@ if_std! { /// If writing would block, this function returns `Ok(Async::Pending)` /// and arranges for `cx.waker()` to receive a notification when the /// object becomes writable or is closed. - fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll { - if let Some(first_iovec) = vec.get(0) { - self.poll_write(&*first_iovec, cx) + if let Some(ref first_iovec) = vec.get(0) { + self.poll_write(cx, &*first_iovec) } else { // `vec` is empty. return Ok(Async::Ready(0)); @@ -187,16 +187,16 @@ if_std! { (**self).initializer() } - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll { - (**self).poll_read(buf, cx) + (**self).poll_read(cx, buf) } - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - (**self).poll_vectored_read(vec, cx) + (**self).poll_vectored_read(cx, vec) } } } @@ -217,7 +217,7 @@ if_std! { Initializer::nop() } - fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context) + fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8]) -> Poll { Ok(Async::Ready(StdIo::Read::read(self, buf)?)) @@ -239,16 +239,16 @@ if_std! { macro_rules! deref_async_write { () => { - fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll { - (**self).poll_write(buf, cx) + (**self).poll_write(cx, buf) } - fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll { - (**self).poll_vectored_write(vec, cx) + (**self).poll_vectored_write(cx, vec) } fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> { @@ -271,7 +271,7 @@ if_std! { macro_rules! delegate_async_write_to_stdio { () => { - fn poll_write(&mut self, buf: &[u8], _: &mut task::Context) + fn poll_write(&mut self, _: &mut task::Context, buf: &[u8]) -> Poll { Ok(Async::Ready(StdIo::Write::write(self, buf)?)) diff --git a/futures-util/src/io/allow_std.rs b/futures-util/src/io/allow_std.rs index dcbd820181..a957c6e450 100644 --- a/futures-util/src/io/allow_std.rs +++ b/futures-util/src/io/allow_std.rs @@ -55,7 +55,7 @@ impl io::Write for AllowStdIo where T: io::Write { } impl AsyncWrite for AllowStdIo where T: io::Write { - fn poll_write(&mut self, buf: &[u8], _: &mut task::Context) + fn poll_write(&mut self, _: &mut task::Context, buf: &[u8]) -> Poll { Ok(Async::Ready(io::Write::write(&mut self.0, buf)?)) @@ -88,7 +88,7 @@ impl io::Read for AllowStdIo where T: io::Read { } impl AsyncRead for AllowStdIo where T: io::Read { - fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context) + fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8]) -> Poll { Ok(Async::Ready(io::Read::read(&mut self.0, buf)?)) diff --git a/futures-util/src/io/copy.rs b/futures-util/src/io/copy.rs index 8f6f5196a9..f2da754dd4 100644 --- a/futures-util/src/io/copy.rs +++ b/futures-util/src/io/copy.rs @@ -61,7 +61,7 @@ impl Future for Copy // continue. if self.pos == self.cap && !self.read_done { let reader = self.reader.as_mut().unwrap(); - let n = try_ready!(reader.poll_read(&mut self.buf, cx)); + let n = try_ready!(reader.poll_read(cx, &mut self.buf)); if n == 0 { self.read_done = true; } else { @@ -73,7 +73,7 @@ impl Future for Copy // If our buffer has some data, let's write it out! while self.pos < self.cap { let writer = self.writer.as_mut().unwrap(); - let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap], cx)); + let i = try_ready!(writer.poll_write(cx, &self.buf[self.pos..self.cap])); if i == 0 { return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer")); diff --git a/futures-util/src/io/framed.rs b/futures-util/src/io/framed.rs index 0bb9573181..4e12918fac 100644 --- a/futures-util/src/io/framed.rs +++ b/futures-util/src/io/framed.rs @@ -175,16 +175,16 @@ impl AsyncRead for Fuse { self.0.initializer() } - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll { - self.0.poll_read(buf, cx) + self.0.poll_read(cx, buf) } - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - self.0.poll_vectored_read(vec, cx) + self.0.poll_vectored_read(cx, vec) } } @@ -199,16 +199,16 @@ impl Write for Fuse { } impl AsyncWrite for Fuse { - fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll { - self.0.poll_write(buf, cx) + self.0.poll_write(cx, buf) } - fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll { - self.0.poll_vectored_write(vec, cx) + self.0.poll_vectored_write(cx, vec) } fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { diff --git a/futures-util/src/io/framed_read.rs b/futures-util/src/io/framed_read.rs index 1764961e95..b6771db8f2 100644 --- a/futures-util/src/io/framed_read.rs +++ b/futures-util/src/io/framed_read.rs @@ -287,7 +287,7 @@ impl Stream for FramedRead2 // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF self.buffer.reserve(1); - if 0 == try_ready!(self.inner.read_buf(&mut self.buffer, cx)) { + if 0 == try_ready!(self.inner.read_buf(cx, &mut self.buffer)) { self.eof = true; } diff --git a/futures-util/src/io/framed_write.rs b/futures-util/src/io/framed_write.rs index 3ac15f6ea3..206836c301 100644 --- a/futures-util/src/io/framed_write.rs +++ b/futures-util/src/io/framed_write.rs @@ -207,7 +207,7 @@ impl Sink for FramedWrite2 while !self.buffer.is_empty() { trace!("writing; remaining={}", self.buffer.len()); - let n = try_ready!(self.inner.poll_write(&self.buffer, cx)); + let n = try_ready!(self.inner.poll_write(cx, &self.buffer)); if n == 0 { return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to @@ -253,15 +253,15 @@ impl AsyncRead for FramedWrite2 { self.inner.initializer() } - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll { - self.inner.poll_read(buf, cx) + self.inner.poll_read(cx, buf) } - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - self.inner.poll_vectored_read(vec, cx) + self.inner.poll_vectored_read(cx, vec) } } diff --git a/futures-util/src/io/length_delimited.rs b/futures-util/src/io/length_delimited.rs index a17a65e0af..0363d062b1 100644 --- a/futures-util/src/io/length_delimited.rs +++ b/futures-util/src/io/length_delimited.rs @@ -245,16 +245,16 @@ impl io::Write for FramedRead { } impl AsyncWrite for FramedRead { - fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll { - self.inner.get_mut().poll_write(buf, cx) + self.inner.get_mut().poll_write(cx, buf) } - fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll { - self.inner.get_mut().poll_vectored_write(vec, cx) + self.inner.get_mut().poll_vectored_write(cx, vec) } fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> { @@ -439,7 +439,7 @@ impl FramedWrite { loop { let frame = self.frame.as_mut().unwrap(); - try_ready!(self.inner.write_buf(frame, cx)); + try_ready!(self.inner.write_buf(cx, frame)); if !frame.has_remaining() { break; @@ -534,16 +534,16 @@ impl AsyncRead for FramedWrite { self.get_ref().initializer() } - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll { - self.get_mut().poll_read(buf, cx) + self.get_mut().poll_read(cx, buf) } - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - self.get_mut().poll_vectored_read(vec, cx) + self.get_mut().poll_vectored_read(cx, vec) } } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index df57dbbeee..9fe887b1a5 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -47,7 +47,7 @@ pub trait AsyncReadExt: AsyncRead { /// The `buf` provided will have bytes read into it and the internal cursor /// will be advanced if any bytes were read. Note that this method typically /// will not reallocate the buffer provided. - fn read_buf(&mut self, buf: &mut B, cx: &mut task::Context) + fn read_buf(&mut self, cx: &mut task::Context, buf: &mut B) -> Poll where Self: Sized, { @@ -83,7 +83,7 @@ pub trait AsyncReadExt: AsyncRead { b13.into(), b14.into(), b15.into(), b16.into(), ]; let n = buf.bytes_vec_mut(&mut bufs); - try_ready!(self.poll_vectored_read(&mut bufs[..n], cx)) + try_ready!(self.poll_vectored_read(cx, &mut bufs[..n])) }; buf.advance_mut(n); @@ -133,7 +133,7 @@ pub trait AsyncWriteExt: AsyncWrite { /// /// Note that this method will advance the `buf` provided automatically by /// the number of bytes written. - fn write_buf(&mut self, buf: &mut B, cx: &mut task::Context) + fn write_buf(&mut self, cx: &mut task::Context, buf: &mut B) -> Poll where Self: Sized, { @@ -149,7 +149,7 @@ pub trait AsyncWriteExt: AsyncWrite { let iovec = <&IoVec>::from(DUMMY); let mut bufs = [iovec; 64]; let n = buf.bytes_vec(&mut bufs); - try_ready!(self.poll_vectored_write(&bufs[..n], cx)) + try_ready!(self.poll_vectored_write(cx, &bufs[..n])) }; buf.advance(n); Ok(Async::Ready(n)) diff --git a/futures-util/src/io/read.rs b/futures-util/src/io/read.rs index 65b8720938..a6e764f797 100644 --- a/futures-util/src/io/read.rs +++ b/futures-util/src/io/read.rs @@ -45,7 +45,7 @@ impl Future for Read fn poll(&mut self, cx: &mut task::Context) -> Poll<(R, T, usize), io::Error> { let nread = match self.state { State::Pending { ref mut rd, ref mut buf } => - try_ready!(rd.poll_read(&mut buf.as_mut()[..], cx)), + try_ready!(rd.poll_read(cx, &mut buf.as_mut()[..])), State::Empty => panic!("poll a Read after it's done"), }; diff --git a/futures-util/src/io/read_exact.rs b/futures-util/src/io/read_exact.rs index 18318d2e72..bfe69b10d1 100644 --- a/futures-util/src/io/read_exact.rs +++ b/futures-util/src/io/read_exact.rs @@ -65,7 +65,7 @@ impl Future for ReadExact State::Reading { ref mut a, ref mut buf, ref mut pos } => { let buf = buf.as_mut(); while *pos < buf.len() { - let n = try_ready!(a.poll_read(&mut buf[*pos..], cx)); + let n = try_ready!(a.poll_read(cx, &mut buf[*pos..])); *pos += n; if n == 0 { return Err(eof()) diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs index bdf3c0071f..04d8df181b 100644 --- a/futures-util/src/io/read_to_end.rs +++ b/futures-util/src/io/read_to_end.rs @@ -38,8 +38,8 @@ pub fn read_to_end(a: A, buf: Vec) -> ReadToEnd { ReadToEnd { state: State::Reading { - a: a, - buf: buf, + a, + buf, } } } @@ -61,7 +61,7 @@ impl<'a> Drop for Guard<'a> { // // Because we're extending the buffer with uninitialized data for trusted // readers, we need to make sure to truncate that if any of this panics. -fn read_to_end_internal(r: &mut R, buf: &mut Vec, cx: &mut task::Context) +fn read_to_end_internal(r: &mut R, cx: &mut task::Context, buf: &mut Vec) -> Poll { let start_len = buf.len(); @@ -77,7 +77,7 @@ fn read_to_end_internal(r: &mut R, buf: &mut Vec, cx: } } - match r.poll_read(&mut g.buf[g.len..], cx) { + match r.poll_read(cx, &mut g.buf[g.len..]) { Ok(Async::Ready(0)) => { ret = Ok(Async::Ready(g.len - start_len)); break; @@ -107,7 +107,7 @@ impl Future for ReadToEnd // If we get `Ok`, then we know the stream hit EOF and we're done. If we // hit "would block" then all the read data so far is in our buffer, and // otherwise we propagate errors - try_ready!(read_to_end_internal(a, buf, cx)); + try_ready!(read_to_end_internal(a, cx, buf)); }, State::Empty => panic!("poll ReadToEnd after it's done"), } diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs index e9a27f9afe..69fb872b03 100644 --- a/futures-util/src/io/split.rs +++ b/futures-util/src/io/split.rs @@ -32,30 +32,30 @@ pub fn split(t: T) -> (ReadHalf, WriteHalf) { } impl AsyncRead for ReadHalf { - fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context) + fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll { - lock_and_then(&self.handle, cx, |l, cx| l.poll_read(buf, cx)) + lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf)) } - fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context) + fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll { - lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_read(vec, cx)) + lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_read(cx, vec)) } } impl AsyncWrite for WriteHalf { - fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context) + fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll { - lock_and_then(&self.handle, cx, |l, cx| l.poll_write(buf, cx)) + lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf)) } - fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context) + fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll { - lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_write(vec, cx)) + lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_write(cx, vec)) } fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> { diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs index a24be2524b..dd0f4e4c9c 100644 --- a/futures-util/src/io/write_all.rs +++ b/futures-util/src/io/write_all.rs @@ -68,7 +68,7 @@ impl Future for WriteAll State::Writing { ref mut a, ref buf, ref mut pos } => { let buf = buf.as_ref(); while *pos < buf.len() { - let n = try_ready!(a.poll_write(&buf[*pos..], cx)); + let n = try_ready!(a.poll_write(cx, &buf[*pos..])); *pos += n; if n == 0 { return Err(zero_write())