diff --git a/Cargo.toml b/Cargo.toml
index b1a3ec5f7b..c027f57a3f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,6 +4,7 @@ members = [
   "futures-core",
   "futures-channel",
   "futures-executor",
+  "futures-io",
   "futures-util",
   "futures-sink",
 ]
diff --git a/LICENSE-MIT b/LICENSE-MIT
index 28e630cf40..8ad082ec4f 100644
--- a/LICENSE-MIT
+++ b/LICENSE-MIT
@@ -1,4 +1,5 @@
 Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
 
 Permission is hereby granted, free of charge, to any
 person obtaining a copy of this software and associated
diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs
index 7160458129..f362e38de7 100644
--- a/futures-channel/src/mpsc/mod.rs
+++ b/futures-channel/src/mpsc/mod.rs
@@ -407,7 +407,7 @@ impl<T> Sender<T> {
         }
 
         // The channel has capacity to accept the message, so send it
-        self.do_send(Some(msg), None)
+        self.do_send(None, Some(msg))
             .map_err(|ChannelClosed(v)| {
                 TryChannelClosed {
                     kind: TryChannelClosedKind::Disconnected(v.unwrap()),
@@ -419,12 +419,12 @@ impl<T> Sender<T> {
     /// This function should only be called after `poll_ready` has responded
     /// that the channel is ready to receive a message.
     pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
-        self.do_send(Some(msg), None)
+        self.do_send(None, Some(msg))
     }
 
     // Do the send without failing
     // None means close
-    fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
+    fn do_send(&mut self, cx: Option<&mut task::Context>, msg: Option<T>)
         -> Result<(), ChannelClosed<T>>
     {
         // First, increment the number of messages contained by the channel.
diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml
new file mode 100644
index 0000000000..4546f24a29
--- /dev/null
+++ b/futures-io/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "futures-io"
+version = "0.2.0"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+license = "MIT/Apache-2.0"
+repository = "https://github.com/alexcrichton/futures-rs"
+homepage = "https://github.com/alexcrichton/futures-rs"
+documentation = "https://docs.rs/futures-io"
+description = """
+The `AsyncRead` and `AsyncWrite` traits for the futures-rs library.
+"""
+
+[features]
+std = ["futures-core/std", "iovec"]
+default = ["std"]
+
+[dependencies]
+futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
+iovec = { version = "0.1", optional = true }
diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs
new file mode 100644
index 0000000000..91e1518b2d
--- /dev/null
+++ b/futures-io/src/lib.rs
@@ -0,0 +1,305 @@
+//! Asynchronous IO
+//!
+//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow
+//! data to be read and written asynchronously.
+
+#![no_std]
+#![deny(missing_docs, missing_debug_implementations)]
+#![doc(html_root_url = "https://docs.rs/futures-io/0.2")]
+
+macro_rules! if_std {
+    ($($i:item)*) => ($(
+        #[cfg(feature = "std")]
+        $i
+    )*)
+}
+
+if_std! {
+    extern crate futures_core;
+    extern crate iovec;
+    extern crate std;
+
+    use futures_core::{Async, Poll, task};
+    use std::boxed::Box;
+    use std::io as StdIo;
+    use std::ptr;
+    use std::vec::Vec;
+
+    // Re-export IoVec for convenience
+    pub use iovec::IoVec;
+
+    // Re-export io::Error so that users don't have to deal
+    // with conflicts when `use`ing `futures::io` and `std::io`.
+    pub use StdIo::Error as Error;
+
+    /// A type used to conditionally initialize buffers passed to `AsyncRead`
+    /// methods.
+    #[derive(Debug)]
+    pub struct Initializer(bool);
+
+    impl Initializer {
+        /// Returns a new `Initializer` which will zero out buffers.
+        #[inline]
+        pub fn zeroing() -> Initializer {
+            Initializer(true)
+        }
+
+        /// Returns a new `Initializer` which will not zero out buffers.
+        ///
+        /// # Safety
+        ///
+        /// This method may only be called by `AsyncRead`ers which guarantee
+        /// that they will not read from the buffers passed to `AsyncRead`
+        /// methods, and that the return value of the method accurately reflects
+        /// the number of bytes that have been written to the head of the buffer.
+        #[inline]
+        pub unsafe fn nop() -> Initializer {
+            Initializer(false)
+        }
+
+        /// Indicates if a buffer should be initialized.
+        #[inline]
+        pub fn should_initialize(&self) -> bool {
+            self.0
+        }
+
+        /// Initializes a buffer if necessary.
+        #[inline]
+        pub fn initialize(&self, buf: &mut [u8]) {
+            if self.should_initialize() {
+                unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
+            }
+        }
+    }
+
+    /// Objects which can be read asynchronously.
+    pub trait AsyncRead {
+        /// Determines if this `AsyncRead`er can work with buffers of
+        /// uninitialized memory.
+        ///
+        /// The default implementation returns an initializer which will zero
+        /// buffers.
+        ///
+        /// # Safety
+        ///
+        /// This method is `unsafe` because and `AsyncRead`er could otherwise
+        /// return a non-zeroing `Initializer` from another `AsyncRead` type
+        /// without an `unsafe` block.
+        #[inline]
+        unsafe fn initializer(&self) -> Initializer {
+            Initializer::zeroing()
+        }
+
+        /// Attempt to read from the `AsyncRead` into `buf`.
+        ///
+        /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+        ///
+        /// If reading would block, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// object becomes readable or is closed.
+        fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+            -> Poll<usize, Error>;
+
+        /// Attempt to read from the `AsyncRead` into `vec` using vectored
+        /// IO operations. This allows data to be read into multiple buffers
+        /// using a single operation.
+        ///
+        /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+        ///
+        /// By default, this method delegates to using `poll_read` on the first
+        /// buffer in `vec`. Objects which support vectored IO should override
+        /// this method.
+        ///
+        /// If reading would block, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// object becomes readable or is closed.
+        fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+            -> Poll<usize, Error>
+        {
+            if let Some(ref mut first_iovec) = vec.get_mut(0) {
+                self.poll_read(cx, first_iovec)
+            } else {
+                // `vec` is empty.
+                return Ok(Async::Ready(0));
+            }
+        }
+    }
+
+    /// Objects which can be written to asynchronously.
+    pub trait AsyncWrite {
+        /// Attempt to write bytes from `buf` into the object.
+        ///
+        /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+        ///
+        /// If writing would block, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// the object becomes writable or is closed.
+        fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
+            -> Poll<usize, Error>;
+
+        /// Attempt to write bytes from `vec` into the object using vectored
+        /// IO operations. This allows data from multiple buffers to be written
+        /// using a single operation.
+        ///
+        /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+        ///
+        /// By default, this method delegates to using `poll_write` on the first
+        /// buffer in `vec`. Objects which support vectored IO should override
+        /// this method.
+        ///
+        /// If writing would block, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// object becomes writable or is closed.
+        fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
+            -> Poll<usize, Error>
+        {
+            if let Some(ref first_iovec) = vec.get(0) {
+                self.poll_write(cx, &*first_iovec)
+            } else {
+                // `vec` is empty.
+                return Ok(Async::Ready(0));
+            }
+        }
+
+        /// Attempt to flush the object, ensuring that all intermediately
+        /// buffered contents reach their destination.
+        ///
+        /// On success, returns `Ok(Async::Ready(()))`.
+        ///
+        /// If flushing is incomplete, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// object can make progress towards flushing.
+        fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
+
+        /// Attempt to close the object.
+        ///
+        /// On success, returns `Ok(Async::Ready(()))`.
+        ///
+        /// If closing is incomplete, this function returns `Ok(Async::Pending)`
+        /// and arranges for `cx.waker()` to receive a notification when the
+        /// object can make progress towards closing.
+        fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
+    }
+
+    macro_rules! deref_async_read {
+        () => {
+            unsafe fn initializer(&self) -> Initializer {
+                (**self).initializer()
+            }
+
+            fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+                -> Poll<usize, Error>
+            {
+                (**self).poll_read(cx, buf)
+            }
+
+            fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+                -> Poll<usize, Error>
+            {
+                (**self).poll_vectored_read(cx, vec)
+            }
+        }
+    }
+
+    impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
+        deref_async_read!();
+    }
+
+    impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
+        deref_async_read!();
+    }
+
+    /// `unsafe` because the `StdIo::Read` type must not access the buffer
+    /// before reading data into it.
+    macro_rules! unsafe_delegate_async_read_to_stdio {
+        () => {
+            unsafe fn initializer(&self) -> Initializer {
+                Initializer::nop()
+            }
+
+            fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8])
+                -> Poll<usize, Error>
+            {
+                Ok(Async::Ready(StdIo::Read::read(self, buf)?))
+            }
+        }
+    }
+
+    impl<'a> AsyncRead for &'a [u8] {
+        unsafe_delegate_async_read_to_stdio!();
+    }
+
+    impl AsyncRead for StdIo::Repeat {
+        unsafe_delegate_async_read_to_stdio!();
+    }
+
+    impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
+        unsafe_delegate_async_read_to_stdio!();
+    }
+
+    macro_rules! deref_async_write {
+        () => {
+            fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
+                -> Poll<usize, Error>
+            {
+                (**self).poll_write(cx, buf)
+            }
+
+            fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
+                -> Poll<usize, Error>
+            {
+                (**self).poll_vectored_write(cx, vec)
+            }
+
+            fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
+                (**self).poll_flush(cx)
+            }
+
+            fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
+                (**self).poll_close(cx)
+            }
+        }
+    }
+
+    impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { 
+        deref_async_write!();
+    }
+
+    impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
+        deref_async_write!();
+    }
+
+    macro_rules! delegate_async_write_to_stdio {
+        () => {
+            fn poll_write(&mut self, _: &mut task::Context, buf: &[u8])
+                -> Poll<usize, Error>
+            {
+                Ok(Async::Ready(StdIo::Write::write(self, buf)?))
+            }
+
+            fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
+                Ok(Async::Ready(StdIo::Write::flush(self)?))
+            }
+
+            fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
+                self.poll_flush(cx)
+            }
+        }
+    }
+
+    impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
+        delegate_async_write_to_stdio!();
+    }
+
+    impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
+        delegate_async_write_to_stdio!();
+    }
+
+    impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
+        delegate_async_write_to_stdio!();
+    }
+
+    impl AsyncWrite for StdIo::Sink {
+        delegate_async_write_to_stdio!();
+    }
+}
diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml
index 944f4e37f0..4176f663d4 100644
--- a/futures-util/Cargo.toml
+++ b/futures-util/Cargo.toml
@@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library.
 """
 
 [features]
-std = ["futures-core/std", "futures-sink/std"]
+std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"]
 default = ["std"]
 
 [dependencies]
+bytes = { version = "0.4", optional = true }
+log = { version = "0.4", optional = true }
 futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
 futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
+futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
 futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
 
 [dev-dependencies]
diff --git a/futures-util/src/io/allow_std.rs b/futures-util/src/io/allow_std.rs
new file mode 100644
index 0000000000..a957c6e450
--- /dev/null
+++ b/futures-util/src/io/allow_std.rs
@@ -0,0 +1,96 @@
+use futures_core::{Async, Poll, task};
+use futures_io::{AsyncRead, AsyncWrite};
+use std::{fmt, io};
+use std::string::String;
+use std::vec::Vec;
+
+/// A simple wrapper type which allows types which implement only
+/// implement `std::io::Read` or `std::io::Write`
+/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`.
+///
+/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`,
+/// it is expected that they will notify the current task on readiness.
+/// Synchronous `std` types should not issue errors of this kind and
+/// are safe to use in this context. However, using these types with
+/// `AllowStdIo` will cause the event loop to block, so they should be used
+/// with care.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AllowStdIo<T>(T);
+
+impl<T> AllowStdIo<T> {
+    /// Creates a new `AllowStdIo` from an existing IO object.
+    pub fn new(io: T) -> Self {
+        AllowStdIo(io)
+    }
+
+    /// Returns a reference to the contained IO object.
+    pub fn get_ref(&self) -> &T {
+        &self.0
+    }
+
+    /// Returns a mutable reference to the contained IO object.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.0
+    }
+
+    /// Consumes self and returns the contained IO object.
+    pub fn into_inner(self) -> T {
+        self.0
+    }
+}
+
+impl<T> io::Write for AllowStdIo<T> where T: io::Write {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.0.write(buf)
+    }
+    fn flush(&mut self) -> io::Result<()> {
+        self.0.flush()
+    }
+    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+        self.0.write_all(buf)
+    }
+    fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> {
+        self.0.write_fmt(fmt)
+    }
+}
+
+impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write {
+    fn poll_write(&mut self, _: &mut task::Context, buf: &[u8])
+        -> Poll<usize, io::Error>
+    {
+        Ok(Async::Ready(io::Write::write(&mut self.0, buf)?))
+    }
+
+    fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), io::Error> {
+        Ok(Async::Ready(io::Write::flush(self)?))
+    }
+
+    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        self.poll_flush(cx)
+    }
+}
+
+impl<T> io::Read for AllowStdIo<T> where T: io::Read {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.0.read(buf)
+    }
+    // TODO: implement the `initializer` fn when it stabilizes.
+    // See rust-lang/rust #42788
+    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
+        self.0.read_to_end(buf)
+    }
+    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
+        self.0.read_to_string(buf)
+    }
+    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+        self.0.read_exact(buf)
+    }
+}
+
+impl<T> AsyncRead for AllowStdIo<T> where T: io::Read {
+    fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8])
+        -> Poll<usize, io::Error>
+    {
+        Ok(Async::Ready(io::Read::read(&mut self.0, buf)?))
+    }
+}
diff --git a/futures-util/src/io/codec.rs b/futures-util/src/io/codec.rs
new file mode 100644
index 0000000000..678d7f31f2
--- /dev/null
+++ b/futures-util/src/io/codec.rs
@@ -0,0 +1,373 @@
+//! Utilities for encoding and decoding frames.
+//!
+//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as [transports].
+//!
+//! [`AsyncRead`]: #
+//! [`AsyncWrite`]: #
+//! [`Sink`]: #
+//! [`Stream`]: #
+//! [transports]: #
+
+pub use io::codecs::{BytesCodec, LinesCodec};
+pub use io::framed::{Framed, FramedParts};
+pub use io::framed_read::{FramedRead, Decoder};
+pub use io::framed_write::{FramedWrite, Encoder};
+
+pub mod length_delimited {
+    //! Frame a stream of bytes based on a length prefix
+    //!
+    //! Many protocols delimit their frames by prefacing frame data with a
+    //! frame head that specifies the length of the frame. The
+    //! `length_delimited` module provides utilities for handling the length
+    //! based framing. This allows the consumer to work with entire frames
+    //! without having to worry about buffering or other framing logic.
+    //!
+    //! # Getting started
+    //!
+    //! If implementing a protocol from scratch, using length delimited framing
+    //! is an easy way to get started. [`Framed::new()`] will adapt a
+    //! full-duplex byte stream with a length delimited framer using default
+    //! configuration values.
+    //!
+    //! ```
+    //! extern crate futures;
+    //! use futures::io::{AsyncRead, AsyncWrite};
+    //! use futures::io::codec::length_delimited;
+    //!
+    //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
+    //!     -> length_delimited::Framed<T>
+    //! {
+    //!     length_delimited::Framed::new(io)
+    //! }
+    //!
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The returned transport implements `Sink + Stream` for `BytesMut`. It
+    //! encodes the frame with a big-endian `u32` header denoting the frame
+    //! payload length:
+    //!
+    //! ```text
+    //! +----------+--------------------------------+
+    //! | len: u32 |          frame payload         |
+    //! +----------+--------------------------------+
+    //! ```
+    //!
+    //! Specifically, given the following:
+    //!
+    //! ```
+    //! # extern crate bytes;
+    //! # extern crate futures;
+    //!
+    //! use bytes::BytesMut;
+    //! use futures::{Sink, SinkExt, Future};
+    //! use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+    //! use futures::io::codec::length_delimited;
+    //! use futures::executor::block_on;
+    //!
+    //! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+    //!     let mut transport = length_delimited::Framed::new(io);
+    //!     let frame = BytesMut::from("hello world");
+    //!
+    //!     block_on(transport.send(frame)).unwrap();
+    //! }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The encoded frame will look like this:
+    //!
+    //! ```text
+    //! +---- len: u32 ----+---- data ----+
+    //! | \x00\x00\x00\x0b |  hello world |
+    //! +------------------+--------------+
+    //! ```
+    //!
+    //! # Decoding
+    //!
+    //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
+    //! such that each yielded [`BytesMut`] value contains the contents of an
+    //! entire frame. There are many configuration parameters enabling
+    //! [`FrameRead`] to handle a wide range of protocols. Here are some
+    //! examples that will cover the various options at a high level.
+    //!
+    //! ## Example 1
+    //!
+    //! The following will parse a `u16` length field at offset 0, including the
+    //! frame head in the yielded `BytesMut`.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(0) // default value
+    //!     .length_field_length(2)
+    //!     .length_adjustment(0)   // default value
+    //!     .num_skip(0) // Do not strip frame header
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!          INPUT                           DECODED
+    //! +-- len ---+--- Payload ---+     +-- len ---+--- Payload ---+
+    //! | \x00\x0B |  Hello world  | --> | \x00\x0B |  Hello world  |
+    //! +----------+---------------+     +----------+---------------+
+    //! ```
+    //!
+    //! The value of the length field is 11 (`\x0B`) which represents the length
+    //! of the payload, `hello world`. By default, [`FramedRead`] assumes that
+    //! the length field represents the number of bytes that **follows** the
+    //! length field. Thus, the entire frame has a length of 13: 2 bytes for the
+    //! frame head + 11 bytes for the payload.
+    //!
+    //! ## Example 2
+    //!
+    //! The following will parse a `u16` length field at offset 0, omitting the
+    //! frame head in the yielded `BytesMut`.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(0) // default value
+    //!     .length_field_length(2)
+    //!     .length_adjustment(0)   // default value
+    //!     // `num_skip` is not needed, the default is to skip
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!          INPUT                        DECODED
+    //! +-- len ---+--- Payload ---+     +--- Payload ---+
+    //! | \x00\x0B |  Hello world  | --> |  Hello world  |
+    //! +----------+---------------+     +---------------+
+    //! ```
+    //!
+    //! This is similar to the first example, the only difference is that the
+    //! frame head is **not** included in the yielded `BytesMut` value.
+    //!
+    //! ## Example 3
+    //!
+    //! The following will parse a `u16` length field at offset 0, including the
+    //! frame head in the yielded `BytesMut`. In this case, the length field
+    //! **includes** the frame head length.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(0) // default value
+    //!     .length_field_length(2)
+    //!     .length_adjustment(-2)  // size of head
+    //!     .num_skip(0)
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!          INPUT                           DECODED
+    //! +-- len ---+--- Payload ---+     +-- len ---+--- Payload ---+
+    //! | \x00\x0D |  Hello world  | --> | \x00\x0D |  Hello world  |
+    //! +----------+---------------+     +----------+---------------+
+    //! ```
+    //!
+    //! In most cases, the length field represents the length of the payload
+    //! only, as shown in the previous examples. However, in some protocols the
+    //! length field represents the length of the whole frame, including the
+    //! head. In such cases, we specify a negative `length_adjustment` to adjust
+    //! the value provided in the frame head to represent the payload length.
+    //!
+    //! ## Example 4
+    //!
+    //! The following will parse a 3 byte length field at offset 0 in a 5 byte
+    //! frame head, including the frame head in the yielded `BytesMut`.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(0) // default value
+    //!     .length_field_length(3)
+    //!     .length_adjustment(2)  // remaining head
+    //!     .num_skip(0)
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!                  INPUT
+    //! +---- len -----+- head -+--- Payload ---+
+    //! | \x00\x00\x0B | \xCAFE |  Hello world  |
+    //! +--------------+--------+---------------+
+    //!
+    //!                  DECODED
+    //! +---- len -----+- head -+--- Payload ---+
+    //! | \x00\x00\x0B | \xCAFE |  Hello world  |
+    //! +--------------+--------+---------------+
+    //! ```
+    //!
+    //! A more advanced example that shows a case where there is extra frame
+    //! head data between the length field and the payload. In such cases, it is
+    //! usually desirable to include the frame head as part of the yielded
+    //! `BytesMut`. This lets consumers of the length delimited framer to
+    //! process the frame head as needed.
+    //!
+    //! The positive `length_adjustment` value lets `FramedRead` factor in the
+    //! additional head into the frame length calculation.
+    //!
+    //! ## Example 5
+    //!
+    //! The following will parse a `u16` length field at offset 1 of a 4 byte
+    //! frame head. The first byte and the length field will be omitted from the
+    //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+    //! included.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(1) // length of hdr1
+    //!     .length_field_length(2)
+    //!     .length_adjustment(1)  // length of hdr2
+    //!     .num_skip(3) // length of hdr1 + LEN
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!                  INPUT
+    //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+    //! |  \xCA  | \x00\x0B |  \xFE  |  Hello world  |
+    //! +--------+----------+--------+---------------+
+    //!
+    //!          DECODED
+    //! +- hdr2 -+--- Payload ---+
+    //! |  \xFE  |  Hello world  |
+    //! +--------+---------------+
+    //! ```
+    //!
+    //! The length field is situated in the middle of the frame head. In this
+    //! case, the first byte in the frame head could be a version or some other
+    //! identifier that is not needed for processing. On the other hand, the
+    //! second half of the head is needed.
+    //!
+    //! `length_field_offset` indicates how many bytes to skip before starting
+    //! to read the length field.  `length_adjustment` is the number of bytes to
+    //! skip starting at the end of the length field. In this case, it is the
+    //! second half of the head.
+    //!
+    //! ## Example 6
+    //!
+    //! The following will parse a `u16` length field at offset 1 of a 4 byte
+    //! frame head. The first byte and the length field will be omitted from the
+    //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+    //! included. In this case, the length field **includes** the frame head
+    //! length.
+    //!
+    //! ```
+    //! # extern crate futures;
+    //! # use futures::io::AsyncRead;
+    //! # use futures::io::codec::length_delimited;
+    //! # fn bind_read<T: AsyncRead>(io: T) {
+    //! length_delimited::Builder::new()
+    //!     .length_field_offset(1) // length of hdr1
+    //!     .length_field_length(2)
+    //!     .length_adjustment(-3)  // length of hdr1 + LEN, negative
+    //!     .num_skip(3)
+    //!     .new_read(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! The following frame will be decoded as such:
+    //!
+    //! ```text
+    //!                  INPUT
+    //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+    //! |  \xCA  | \x00\x0F |  \xFE  |  Hello world  |
+    //! +--------+----------+--------+---------------+
+    //!
+    //!          DECODED
+    //! +- hdr2 -+--- Payload ---+
+    //! |  \xFE  |  Hello world  |
+    //! +--------+---------------+
+    //! ```
+    //!
+    //! Similar to the example above, the difference is that the length field
+    //! represents the length of the entire frame instead of just the payload.
+    //! The length of `hdr1` and `len` must be counted in `length_adjustment`.
+    //! Note that the length of `hdr2` does **not** need to be explicitly set
+    //! anywhere because it already is factored into the total frame length that
+    //! is read from the byte stream.
+    //!
+    //! # Encoding
+    //!
+    //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
+    //! such that each submitted [`BytesMut`] is prefaced by a length field.
+    //! There are fewer configuration options than [`FramedRead`]. Given
+    //! protocols that have more complex frame heads, an encoder should probably
+    //! be written by hand using [`Encoder`].
+    //!
+    //! Here is a simple example, given a `FramedWrite` with the following
+    //! configuration:
+    //!
+    //! ```
+    //! # extern crate bytes;
+    //! # extern crate futures;
+    //! # use futures::io::AsyncWrite;
+    //! # use futures::io::codec::length_delimited;
+    //! # use bytes::BytesMut;
+    //! # fn write_frame<T: AsyncWrite>(io: T) {
+    //! # let _: length_delimited::FramedWrite<T, BytesMut> =
+    //! length_delimited::Builder::new()
+    //!     .length_field_length(2)
+    //!     .new_write(io);
+    //! # }
+    //! # fn main() {}
+    //! ```
+    //!
+    //! A payload of `hello world` will be encoded as:
+    //!
+    //! ```text
+    //! +- len: u16 -+---- data ----+
+    //! |  \x00\x0b  |  hello world |
+    //! +------------+--------------+
+    //! ```
+    //!
+    //! [`FramedRead`]: struct.FramedRead.html
+    //! [`FramedWrite`]: struct.FramedWrite.html
+    //! [`AsyncRead`]: ../../trait.AsyncRead.html
+    //! [`AsyncWrite`]: ../../trait.AsyncWrite.html
+    //! [`Encoder`]: ../trait.Encoder.html
+    //! [`BytesMut`]: https://docs.rs/bytes/~0.4/bytes/struct.BytesMut.html
+
+    pub use io::length_delimited::*;
+}
diff --git a/futures-util/src/io/codecs.rs b/futures-util/src/io/codecs.rs
new file mode 100644
index 0000000000..8e6429124a
--- /dev/null
+++ b/futures-util/src/io/codecs.rs
@@ -0,0 +1,124 @@
+use bytes::{Bytes, BufMut, BytesMut};
+use io::codec::{Encoder, Decoder};
+use std::{io, str};
+use std::string::{String, ToString};
+
+/// A simple `Codec` implementation that just ships bytes around.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+    /// Creates a new `BytesCodec` for shipping around raw bytes.
+    pub fn new() -> BytesCodec { BytesCodec(())  }
+}
+
+impl Decoder for BytesCodec {
+    type Item = BytesMut;
+    type Error = io::Error;
+
+    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+        if buf.len() > 0 {
+            let len = buf.len();
+            Ok(Some(buf.split_to(len)))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+impl Encoder for BytesCodec {
+    type Item = Bytes;
+    type Error = io::Error;
+
+    fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+        buf.reserve(data.len());
+        buf.put(data);
+        Ok(())
+    }
+}
+
+/// A simple `Codec` implementation that splits up data into lines.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct LinesCodec {
+    // Stored index of the next index to examine for a `\n` character.
+    // This is used to optimize searching.
+    // For example, if `decode` was called with `abc`, it would hold `3`,
+    // because that is the next index to examine.
+    // The next time `decode` is called with `abcde\n`, the method will
+    // only look at `de\n` before returning.
+    next_index: usize,
+}
+
+impl LinesCodec {
+    /// Returns a `LinesCodec` for splitting up data into lines.
+    pub fn new() -> LinesCodec {
+        LinesCodec { next_index: 0 }
+    }
+}
+
+fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
+    str::from_utf8(buf).map_err(|_|
+        io::Error::new(
+            io::ErrorKind::InvalidData,
+            "Unable to decode input as UTF8"))
+}
+
+fn without_carriage_return(s: &[u8]) -> &[u8] {
+    if let Some(&b'\r') = s.last() {
+        &s[..s.len() - 1]
+    } else {
+        s
+    }
+}
+
+impl Decoder for LinesCodec {
+    type Item = String;
+    type Error = io::Error;
+
+    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+        if let Some(newline_offset) =
+            buf[self.next_index..].iter().position(|b| *b == b'\n')
+        {
+            let newline_index = newline_offset + self.next_index;
+            let line = buf.split_to(newline_index + 1);
+            let line = &line[..line.len()-1];
+            let line = without_carriage_return(line);
+            let line = utf8(line)?;
+            self.next_index = 0;
+            Ok(Some(line.to_string()))
+        } else {
+            self.next_index = buf.len();
+            Ok(None)
+        }
+    }
+
+    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+        Ok(match self.decode(buf)? {
+            Some(frame) => Some(frame),
+            None => {
+                // No terminating newline - return remaining data, if any
+                if buf.is_empty() || buf == &b"\r"[..] {
+                    None
+                } else {
+                    let line = buf.take();
+                    let line = without_carriage_return(&line);
+                    let line = utf8(line)?;
+                    self.next_index = 0;
+                    Some(line.to_string())
+                }
+            }
+        })
+    }
+}
+
+impl Encoder for LinesCodec {
+    type Item = String;
+    type Error = io::Error;
+
+    fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
+        buf.reserve(line.len() + 1);
+        buf.put(line);
+        buf.put_u8(b'\n');
+        Ok(())
+    }
+}
diff --git a/futures-util/src/io/copy.rs b/futures-util/src/io/copy.rs
new file mode 100644
index 0000000000..f2da754dd4
--- /dev/null
+++ b/futures-util/src/io/copy.rs
@@ -0,0 +1,97 @@
+use std::io;
+use std::boxed::Box;
+
+use {Future, Poll, task};
+
+use futures_io::{AsyncRead, AsyncWrite};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the [`copy`] function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+///
+/// [`copy`]: fn.copy.html
+#[derive(Debug)]
+pub struct Copy<R, W> {
+    reader: Option<R>,
+    read_done: bool,
+    writer: Option<W>,
+    pos: usize,
+    cap: usize,
+    amt: u64,
+    buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+    where R: AsyncRead,
+          W: AsyncWrite,
+{
+    Copy {
+        reader: Some(reader),
+        read_done: false,
+        writer: Some(writer),
+        amt: 0,
+        pos: 0,
+        cap: 0,
+        buf: Box::new([0; 2048]),
+    }
+}
+
+impl<R, W> Future for Copy<R, W>
+    where R: AsyncRead,
+          W: AsyncWrite,
+{
+    type Item = (u64, R, W);
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<(u64, R, W), io::Error> {
+        loop {
+            // If our buffer is empty, then we need to read some data to
+            // continue.
+            if self.pos == self.cap && !self.read_done {
+                let reader = self.reader.as_mut().unwrap();
+                let n = try_ready!(reader.poll_read(cx, &mut self.buf));
+                if n == 0 {
+                    self.read_done = true;
+                } else {
+                    self.pos = 0;
+                    self.cap = n;
+                }
+            }
+
+            // If our buffer has some data, let's write it out!
+            while self.pos < self.cap {
+                let writer = self.writer.as_mut().unwrap();
+                let i = try_ready!(writer.poll_write(cx, &self.buf[self.pos..self.cap]));
+                if i == 0 {
+                    return Err(io::Error::new(io::ErrorKind::WriteZero,
+                                              "write zero byte into writer"));
+                } else {
+                    self.pos += i;
+                    self.amt += i as u64;
+                }
+            }
+
+            // If we've written al the data and we've seen EOF, flush out the
+            // data and finish the transfer.
+            // done with the entire transfer.
+            if self.pos == self.cap && self.read_done {
+                try_ready!(self.writer.as_mut().unwrap().poll_flush(cx));
+                let reader = self.reader.take().unwrap();
+                let writer = self.writer.take().unwrap();
+                return Ok((self.amt, reader, writer).into())
+            }
+        }
+    }
+}
diff --git a/futures-util/src/io/flush.rs b/futures-util/src/io/flush.rs
new file mode 100644
index 0000000000..6b7c8b6288
--- /dev/null
+++ b/futures-util/src/io/flush.rs
@@ -0,0 +1,44 @@
+use std::io;
+
+use {Poll, Future, Async, task};
+
+use futures_io::AsyncWrite;
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the [`flush`] function.
+///
+/// [`flush`]: fn.flush.html
+#[derive(Debug)]
+pub struct Flush<A> {
+    a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+    where A: AsyncWrite,
+{
+    Flush {
+        a: Some(a),
+    }
+}
+
+impl<A> Future for Flush<A>
+    where A: AsyncWrite,
+{
+    type Item = A;
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<A, io::Error> {
+        try_ready!(self.a.as_mut().unwrap().poll_flush(cx));
+        Ok(Async::Ready(self.a.take().unwrap()))
+    }
+}
+
diff --git a/futures-util/src/io/framed.rs b/futures-util/src/io/framed.rs
new file mode 100644
index 0000000000..4e12918fac
--- /dev/null
+++ b/futures-util/src/io/framed.rs
@@ -0,0 +1,257 @@
+use std::io::{self, Read, Write};
+use std::fmt;
+
+use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
+use io::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2, Decoder};
+use io::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2, Encoder};
+
+use {Stream, Sink, Poll, task};
+use bytes::{BytesMut};
+
+/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
+pub struct Framed<T, U> {
+    inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
+}
+
+pub struct Fuse<T, U>(pub T, pub U);
+
+pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
+    where T: AsyncRead + AsyncWrite,
+          U: Decoder + Encoder,
+{
+    Framed {
+        inner: framed_read2(framed_write2(Fuse(inner, codec))),
+    }
+}
+
+impl<T, U> Framed<T, U> {
+    /// Provides a `Stream` and `Sink` interface for reading and writing to this
+    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+    ///
+    /// Raw I/O objects work with byte sequences, but higher-level code usually
+    /// wants to batch these into meaningful chunks, called "frames". This
+    /// method layers framing on top of an I/O object, by using the `Codec`
+    /// traits to handle encoding and decoding of messages frames. Note that
+    /// the incoming and outgoing frame types may be distinct.
+    ///
+    /// This function returns a *single* object that is both `Stream` and
+    /// `Sink`; grouping this into a single object is often useful for layering
+    /// things like gzip or TLS, which require both read and write access to the
+    /// underlying object.
+    ///
+    /// This objects takes a stream and a readbuffer and a writebuffer. These field
+    /// can be obtained from an existing `Framed` with the `into_parts` method.
+    ///
+    /// If you want to work more directly with the streams and sink, consider
+    /// calling `split` on the `Framed` returned by this method, which will
+    /// break them into separate objects, allowing them to interact more easily.
+    pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U>
+    {
+        Framed {
+            inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf),
+        }
+    }
+
+    /// Returns a reference to the underlying I/O stream wrapped by
+    /// `Frame`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        &self.inner.get_ref().get_ref().0
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `Frame`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner.get_mut().get_mut().0
+    }
+
+    /// Consumes the `Frame`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn into_inner(self) -> T {
+        self.inner.into_inner().into_inner().0
+    }
+
+    /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+    /// with unprocessed data.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn into_parts(self) -> FramedParts<T> {
+        let (inner, readbuf) = self.inner.into_parts();
+	    let (inner, writebuf) = inner.into_parts();
+        FramedParts { inner: inner.0, readbuf, writebuf }
+    }
+
+    /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+    /// with unprocessed data, and also the current codec state.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    ///
+    /// Note that this function will be removed once the codec has been
+    /// integrated into `FramedParts` in a new version (see
+    /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
+    pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
+        let (inner, readbuf) = self.inner.into_parts();
+        let (inner, writebuf) = inner.into_parts();
+        (FramedParts { inner: inner.0, readbuf, writebuf }, inner.1)
+    }
+}
+
+impl<T, U> Stream for Framed<T, U>
+    where T: AsyncRead,
+          U: Decoder,
+{
+    type Item = U::Item;
+    type Error = U::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
+        self.inner.poll(cx)
+    }
+}
+
+impl<T, U> Sink for Framed<T, U>
+    where T: AsyncWrite,
+          U: Encoder,
+          U::Error: From<io::Error>,
+{
+    type SinkItem = U::Item;
+    type SinkError = U::Error;
+
+    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        self.inner.get_mut().poll_ready(cx)
+    }
+
+    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
+        self.inner.get_mut().start_send(item)
+    }
+
+    fn start_close(&mut self) -> Result<(), Self::SinkError> {
+        self.inner.get_mut().start_close()
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        self.inner.get_mut().poll_flush(cx)
+    }
+}
+
+impl<T, U> fmt::Debug for Framed<T, U>
+    where T: fmt::Debug,
+          U: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("Framed")
+         .field("io", &self.inner.get_ref().get_ref().0)
+         .field("codec", &self.inner.get_ref().get_ref().1)
+         .finish()
+    }
+}
+
+// ===== impl Fuse =====
+
+impl<T: Read, U> Read for Fuse<T, U> {
+    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+        self.0.read(dst)
+    }
+}
+
+impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
+    unsafe fn initializer(&self) -> Initializer {
+        self.0.initializer()
+    }
+
+    fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+        -> Poll<usize, io::Error>
+    {
+        self.0.poll_read(cx, buf)
+    }
+
+    fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+        -> Poll<usize, io::Error>
+    {
+        self.0.poll_vectored_read(cx, vec)
+    }
+}
+
+impl<T: Write, U> Write for Fuse<T, U> {
+    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+        self.0.write(src)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.0.flush()
+    }
+}
+
+impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
+    fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
+        -> Poll<usize, io::Error>
+    {
+        self.0.poll_write(cx, buf)
+    }
+
+    fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
+        -> Poll<usize, io::Error>
+    {
+        self.0.poll_vectored_write(cx, vec)
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        self.0.poll_flush(cx)
+    }
+
+    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        self.0.poll_close(cx)
+    }
+}
+
+impl<T, U: Decoder> Decoder for Fuse<T, U> {
+    type Item = U::Item;
+    type Error = U::Error;
+
+    fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+        self.1.decode(buffer)
+    }
+
+    fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+        self.1.decode_eof(buffer)
+    }
+}
+
+impl<T, U: Encoder> Encoder for Fuse<T, U> {
+    type Item = U::Item;
+    type Error = U::Error;
+
+    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+        self.1.encode(item, dst)
+    }
+}
+
+/// `FramedParts` contains an export of the data of a Framed transport.
+/// It can be used to construct a new `Framed` with a different codec.
+/// It contains all current buffers and the inner transport.
+#[derive(Debug)]
+pub struct FramedParts<T>
+{
+    /// The inner transport used to read bytes to and write bytes to
+    pub inner: T,
+    /// The buffer with read but unprocessed data.
+    pub readbuf: BytesMut,
+    /// A buffer with unprocessed data which are not written yet.
+    pub writebuf: BytesMut
+}
diff --git a/futures-util/src/io/framed_read.rs b/futures-util/src/io/framed_read.rs
new file mode 100644
index 0000000000..b6771db8f2
--- /dev/null
+++ b/futures-util/src/io/framed_read.rs
@@ -0,0 +1,297 @@
+use std::{fmt, io};
+
+use futures_io::AsyncRead;
+use io::AsyncReadExt;
+use io::framed::Fuse;
+
+use {Async, Poll, Stream, Sink, task};
+use bytes::BytesMut;
+
+/// Decoding of frames via buffers.
+///
+/// This trait is used when constructing an instance of `Framed` or
+/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
+/// already been buffered in `src` and decodes the data into a stream of
+/// `Self::Item` frames.
+///
+/// Implementations are able to track state on `self`, which enables
+/// implementing stateful streaming parsers. In many cases, though, this type
+/// will simply be a unit struct (e.g. `struct HttpDecoder`).
+pub trait Decoder {
+    /// The type of decoded frames.
+    type Item;
+
+    /// The type of unrecoverable frame decoding errors.
+    ///
+    /// If an individual message is ill-formed but can be ignored without
+    /// interfering with the processing of future messages, it may be more
+    /// useful to report the failure as an `Item`.
+    ///
+    /// `From<io::Error>` is required in the interest of making `Error` suitable
+    /// for returning directly from a `FramedRead`, and to enable the default
+    /// implementation of `decode_eof` to yield an `io::Error` when the decoder
+    /// fails to consume all available data.
+    ///
+    /// Note that implementors of this trait can simply indicate `type Error =
+    /// io::Error` to use I/O errors as this type.
+    type Error: From<io::Error>;
+
+    /// Attempts to decode a frame from the provided buffer of bytes.
+    ///
+    /// This method is called by `FramedRead` whenever bytes are ready to be
+    /// parsed.  The provided buffer of bytes is what's been read so far, and
+    /// this instance of `Decode` can determine whether an entire frame is in
+    /// the buffer and is ready to be returned.
+    ///
+    /// If an entire frame is available, then this instance will remove those
+    /// bytes from the buffer provided and return them as a decoded
+    /// frame. Note that removing bytes from the provided buffer doesn't always
+    /// necessarily copy the bytes, so this should be an efficient operation in
+    /// most circumstances.
+    ///
+    /// If the bytes look valid, but a frame isn't fully available yet, then
+    /// `Ok(None)` is returned. This indicates to the `Framed` instance that
+    /// it needs to read some more bytes before calling this method again.
+    ///
+    /// Note that the bytes provided may be empty. If a previous call to
+    /// `decode` consumed all the bytes in the buffer then `decode` will be
+    /// called again until it returns `None`, indicating that more bytes need to
+    /// be read.
+    ///
+    /// Finally, if the bytes in the buffer are malformed then an error is
+    /// returned indicating why. This informs `Framed` that the stream is now
+    /// corrupt and should be terminated.
+    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
+
+    /// A default method available to be called when there are no more bytes
+    /// available to be read from the underlying I/O.
+    ///
+    /// This method defaults to calling `decode` and returns an error if
+    /// `Ok(None)` is returned while there is unconsumed data in `buf`.
+    /// Typically this doesn't need to be implemented unless the framing
+    /// protocol differs near the end of the stream.
+    ///
+    /// Note that the `buf` argument may be empty. If a previous call to
+    /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
+    /// called again until it returns `None`, indicating that there are no more
+    /// frames to yield. This behavior enables returning finalization frames
+    /// that may not be based on inbound data.
+    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+        match try!(self.decode(buf)) {
+            Some(frame) => Ok(Some(frame)),
+            None => {
+                if buf.is_empty() {
+                    Ok(None)
+                } else {
+                    Err(io::Error::new(io::ErrorKind::Other,
+                                       "bytes remaining on stream").into())
+                }
+            }
+        }
+    }
+}
+
+/// A `Stream` of messages decoded from an `AsyncRead`.
+pub struct FramedRead<T, D> {
+    inner: FramedRead2<Fuse<T, D>>,
+}
+
+pub struct FramedRead2<T> {
+    inner: T,
+    eof: bool,
+    is_readable: bool,
+    buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+// ===== impl FramedRead =====
+
+impl<T, D> FramedRead<T, D>
+    where T: AsyncRead,
+          D: Decoder,
+{
+    /// Creates a new `FramedRead` with the given `decoder`.
+    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
+        FramedRead {
+            inner: framed_read2(Fuse(inner, decoder)),
+        }
+    }
+}
+
+impl<T, D> FramedRead<T, D> {
+    /// Returns a reference to the underlying I/O stream wrapped by
+    /// `FramedRead`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        &self.inner.inner.0
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `FramedRead`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner.inner.0
+    }
+
+    /// Consumes the `FramedRead`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn into_inner(self) -> T {
+        self.inner.inner.0
+    }
+
+    /// Returns a reference to the underlying decoder.
+    pub fn decoder(&self) -> &D {
+        &self.inner.inner.1
+    }
+
+    /// Returns a mutable reference to the underlying decoder.
+    pub fn decoder_mut(&mut self) -> &mut D {
+        &mut self.inner.inner.1
+    }
+}
+
+impl<T, D> Stream for FramedRead<T, D>
+    where T: AsyncRead,
+          D: Decoder,
+{
+    type Item = D::Item;
+    type Error = D::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
+        self.inner.poll(cx)
+    }
+}
+
+impl<T, D> Sink for FramedRead<T, D>
+    where T: Sink,
+{
+    type SinkItem = T::SinkItem;
+    type SinkError = T::SinkError;
+
+    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        self.inner.inner.0.poll_ready(cx)
+    }
+
+    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
+        self.inner.inner.0.start_send(item)
+    }
+
+    fn start_close(&mut self) -> Result<(), Self::SinkError> {
+        self.inner.inner.0.start_close()
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        self.inner.inner.0.poll_flush(cx)
+    }
+}
+
+impl<T, D> fmt::Debug for FramedRead<T, D>
+    where T: fmt::Debug,
+          D: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("FramedRead")
+            .field("inner", &self.inner.inner.0)
+            .field("decoder", &self.inner.inner.1)
+            .field("eof", &self.inner.eof)
+            .field("is_readable", &self.inner.is_readable)
+            .field("buffer", &self.inner.buffer)
+            .finish()
+    }
+}
+
+// ===== impl FramedRead2 =====
+
+pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
+    FramedRead2 {
+        inner,
+        eof: false,
+        is_readable: false,
+        buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+    }
+}
+
+pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
+    if buf.capacity() < INITIAL_CAPACITY {
+        let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+        buf.reserve(bytes_to_reserve);
+    }
+    FramedRead2 {
+        inner,
+        eof: false,
+        is_readable: buf.len() > 0,
+        buffer: buf,
+    }
+}
+
+impl<T> FramedRead2<T> {
+    pub fn get_ref(&self) -> &T {
+        &self.inner
+    }
+
+    pub fn into_inner(self) -> T {
+        self.inner
+    }
+
+    pub fn into_parts(self) -> (T, BytesMut) {
+        (self.inner, self.buffer)
+    }
+
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+}
+
+impl<T> Stream for FramedRead2<T>
+    where T: AsyncRead + Decoder,
+{
+    type Item = T::Item;
+    type Error = T::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
+        loop {
+            // Repeatedly call `decode` or `decode_eof` as long as it is
+            // "readable". Readable is defined as not having returned `None`. If
+            // the upstream has returned EOF, and the decoder is no longer
+            // readable, it can be assumed that the decoder will never become
+            // readable again, at which point the stream is terminated.
+            if self.is_readable {
+                if self.eof {
+                    let frame = self.inner.decode_eof(&mut self.buffer)?;
+                    return Ok(Async::Ready(frame));
+                }
+
+                trace!("attempting to decode a frame");
+
+                if let Some(frame) = self.inner.decode(&mut self.buffer)? {
+                    trace!("frame decoded from buffer");
+                    return Ok(Async::Ready(Some(frame)));
+                }
+
+                self.is_readable = false;
+            }
+
+            assert!(!self.eof);
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+            self.buffer.reserve(1);
+            if 0 == try_ready!(self.inner.read_buf(cx, &mut self.buffer)) {
+                self.eof = true;
+            }
+
+            self.is_readable = true;
+        }
+    }
+}
diff --git a/futures-util/src/io/framed_write.rs b/futures-util/src/io/framed_write.rs
new file mode 100644
index 0000000000..206836c301
--- /dev/null
+++ b/futures-util/src/io/framed_write.rs
@@ -0,0 +1,267 @@
+use std::io::{self, Read};
+use std::fmt;
+
+use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
+use io::codec::Decoder;
+use io::framed::Fuse;
+
+use {Async, Poll, Stream, Sink, task};
+use bytes::BytesMut;
+
+/// Trait of helper objects to write out messages as bytes, for use with
+/// `FramedWrite`.
+pub trait Encoder {
+    /// The type of items consumed by the `Encoder`
+    type Item;
+
+    /// The type of encoding errors.
+    ///
+    /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
+    /// in the interest letting it return `Error`s directly.
+    type Error: From<io::Error>;
+
+    /// Encodes a frame into the buffer provided.
+    ///
+    /// This method will encode `item` into the byte buffer provided by `dst`.
+    /// The `dst` provided is an internal buffer of the `Framed` instance and
+    /// will be written out when possible.
+    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut)
+              -> Result<(), Self::Error>;
+}
+
+/// A `Sink` of frames encoded to an `AsyncWrite`.
+pub struct FramedWrite<T, E> {
+    inner: FramedWrite2<Fuse<T, E>>,
+}
+
+pub struct FramedWrite2<T> {
+    inner: T,
+    buffer: BytesMut,
+    do_close: bool,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+impl<T, E> FramedWrite<T, E>
+    where T: AsyncWrite,
+          E: Encoder,
+{
+    /// Creates a new `FramedWrite` with the given `encoder`.
+    pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+        FramedWrite {
+            inner: framed_write2(Fuse(inner, encoder)),
+        }
+    }
+}
+
+impl<T, E> FramedWrite<T, E> {
+    /// Returns a reference to the underlying I/O stream wrapped by
+    /// `FramedWrite`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        &self.inner.inner.0
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `FramedWrite`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner.inner.0
+    }
+
+    /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn into_inner(self) -> T {
+        self.inner.inner.0
+    }
+
+    /// Returns a reference to the underlying decoder.
+    pub fn encoder(&self) -> &E {
+        &self.inner.inner.1
+    }
+
+    /// Returns a mutable reference to the underlying decoder.
+    pub fn encoder_mut(&mut self) -> &mut E {
+        &mut self.inner.inner.1
+    }
+}
+
+impl<T, E> Sink for FramedWrite<T, E>
+    where T: AsyncWrite,
+          E: Encoder,
+{
+    type SinkItem = E::Item;
+    type SinkError = E::Error;
+
+    delegate_sink!(inner);
+}
+
+impl<T, D> Stream for FramedWrite<T, D>
+    where T: Stream,
+{
+    type Item = T::Item;
+    type Error = T::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
+        self.inner.inner.0.poll(cx)
+    }
+}
+
+impl<T, U> fmt::Debug for FramedWrite<T, U>
+    where T: fmt::Debug,
+          U: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("FramedWrite")
+         .field("inner", &self.inner.get_ref().0)
+         .field("encoder", &self.inner.get_ref().1)
+         .field("buffer", &self.inner.buffer)
+         .finish()
+    }
+}
+
+// ===== impl FramedWrite2 =====
+
+pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
+    FramedWrite2 {
+        inner,
+        buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+        do_close: false,
+    }
+}
+
+pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
+    if buf.capacity() < INITIAL_CAPACITY {
+        let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+        buf.reserve(bytes_to_reserve);
+    }
+    FramedWrite2 {
+        inner,
+        buffer: buf,
+        do_close: false,
+    }
+}
+
+impl<T> FramedWrite2<T> {
+    pub fn get_ref(&self) -> &T {
+        &self.inner
+    }
+
+    pub fn into_inner(self) -> T {
+        self.inner
+    }
+
+    pub fn into_parts(self) -> (T, BytesMut) {
+        (self.inner, self.buffer)
+    }
+
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+}
+
+impl<T> Sink for FramedWrite2<T>
+    where T: AsyncWrite + Encoder,
+{
+    type SinkItem = T::Item;
+    type SinkError = T::Error;
+
+    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
+        // *still* over 8KiB, then apply backpressure (reject the send).
+        if self.buffer.len() < BACKPRESSURE_BOUNDARY {
+            return Ok(Async::Ready(()));
+        }
+
+        self.poll_flush(cx)?;
+
+        if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
+            Ok(Async::Pending)
+        } else {
+            Ok(Async::Ready(()))
+        }
+    }
+
+    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
+        self.inner.encode(item, &mut self.buffer)
+    }
+
+    fn start_close(&mut self) -> Result<(), Self::SinkError> {
+        self.do_close = true;
+        Ok(())
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        trace!("flushing framed transport");
+
+        while !self.buffer.is_empty() {
+            trace!("writing; remaining={}", self.buffer.len());
+
+            let n = try_ready!(self.inner.poll_write(cx, &self.buffer));
+
+            if n == 0 {
+                return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to
+                                          write frame to transport").into());
+            }
+
+            // TODO: Add a way to `bytes` to do this w/o returning the drained
+            // data.
+            let _ = self.buffer.split_to(n);
+        }
+
+        try_ready!(self.inner.poll_flush(cx));
+
+        if self.do_close {
+            self.inner.poll_close(cx).map_err(Into::into)
+        } else {
+            Ok(Async::Ready(()))
+        }
+    }
+}
+
+impl<T: Decoder> Decoder for FramedWrite2<T> {
+    type Item = T::Item;
+    type Error = T::Error;
+
+    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+        self.inner.decode(src)
+    }
+
+    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+        self.inner.decode_eof(src)
+    }
+}
+
+impl<T: Read> Read for FramedWrite2<T> {
+    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+        self.inner.read(dst)
+    }
+}
+
+impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
+    unsafe fn initializer(&self) -> Initializer {
+        self.inner.initializer()
+    }
+
+    fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+        -> Poll<usize, io::Error>
+    {
+        self.inner.poll_read(cx, buf)
+    }
+
+    fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+        -> Poll<usize, io::Error>
+    {
+        self.inner.poll_vectored_read(cx, vec)
+    }
+}
diff --git a/futures-util/src/io/io.rs b/futures-util/src/io/io.rs
new file mode 100644
index 0000000000..bcd58c486c
--- /dev/null
+++ b/futures-util/src/io/io.rs
@@ -0,0 +1,23 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+//!
+//! A description of the high-level I/O combinators can be [found online] in
+//! addition to a description of the [low level details].
+//!
+//! [found online]: https://tokio.rs/docs/getting-started/core/
+//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
+
+pub use io::allow_std::AllowStdIo;
+pub use io::copy::{copy, Copy};
+pub use io::flush::{flush, Flush};
+//pub use io::lines::{lines, Lines};
+pub use io::read::{read, Read};
+pub use io::read_exact::{read_exact, ReadExact};
+pub use io::read_to_end::{read_to_end, ReadToEnd};
+//pub use io::read_until::{read_until, ReadUntil};
+pub use io::shutdown::{shutdown, Shutdown};
+pub use io::split::{ReadHalf, WriteHalf};
+pub use io::window::Window;
+pub use io::write_all::{write_all, WriteAll};
diff --git a/futures-util/src/io/length_delimited.rs b/futures-util/src/io/length_delimited.rs
new file mode 100644
index 0000000000..0363d062b1
--- /dev/null
+++ b/futures-util/src/io/length_delimited.rs
@@ -0,0 +1,902 @@
+use bytes::{Buf, BufMut, BytesMut, IntoBuf, BigEndian, LittleEndian};
+use bytes::buf::Chain;
+
+use {Async, Stream, Sink, Poll, task};
+use io::{AsyncWriteExt, codec};
+
+use futures_io::{AsyncRead, AsyncWrite, Initializer, IoVec};
+
+use std::{cmp, fmt};
+use std::error::Error as StdError;
+use std::io::{self, Cursor};
+
+/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values.
+///
+/// `Builder` enables constructing configured length delimited framers. Note
+/// that not all configuration settings apply to both encoding and decoding. See
+/// the documentation for specific methods for more detail.
+#[derive(Debug, Clone, Copy)]
+pub struct Builder {
+    // Maximum frame length
+    max_frame_len: usize,
+
+    // Number of bytes representing the field length
+    length_field_len: usize,
+
+    // Number of bytes in the header before the length field
+    length_field_offset: usize,
+
+    // Adjust the length specified in the header field by this amount
+    length_adjustment: isize,
+
+    // Total number of bytes to skip before reading the payload, if not set,
+    // `length_field_len + length_field_offset`
+    num_skip: Option<usize>,
+
+    // Length field byte order (little or big endian)
+    length_field_is_big_endian: bool,
+}
+
+/// Adapts a byte stream into a unified `Stream` and `Sink` that works over
+/// entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+pub struct Framed<T, B: IntoBuf = BytesMut> {
+    inner: FramedRead<FramedWrite<T, B>>,
+}
+
+/// Adapts a byte stream to a `Stream` yielding entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+#[derive(Debug)]
+pub struct FramedRead<T> {
+    inner: codec::FramedRead<T, Decoder>,
+}
+
+/// An error when the number of bytes read is more than max frame length.
+pub struct FrameTooBig {
+    _priv: (),
+}
+
+#[derive(Debug)]
+struct Decoder {
+    // Configuration values
+    builder: Builder,
+
+    // Read state
+    state: DecodeState,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum DecodeState {
+    Head,
+    Data(usize),
+}
+
+/// Adapts a byte stream to a `Sink` accepting entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+pub struct FramedWrite<T, B: IntoBuf = BytesMut> {
+    // I/O type
+    inner: T,
+
+    // Configuration values
+    builder: Builder,
+
+    // Current frame being written
+    frame: Option<Chain<Cursor<BytesMut>, B::Buf>>,
+
+    do_close: bool,
+}
+
+// ===== impl Framed =====
+
+impl<T: AsyncRead + AsyncWrite, B: IntoBuf> Framed<T, B> {
+    /// Creates a new `Framed` with default configuration values.
+    pub fn new(inner: T) -> Framed<T, B> {
+        Builder::new().new_framed(inner)
+    }
+}
+
+impl<T, B: IntoBuf> Framed<T, B> {
+    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        self.inner.get_ref().get_ref()
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `Framed`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        self.inner.get_mut().get_mut()
+    }
+
+    /// Consumes the `Framed`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn into_inner(self) -> T {
+        self.inner.into_inner().into_inner()
+    }
+}
+
+impl<T: AsyncRead, B: IntoBuf> Stream for Framed<T, B> {
+    type Item = BytesMut;
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<BytesMut>, io::Error> {
+        self.inner.poll(cx)
+    }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> Sink for Framed<T, B> {
+    type SinkItem = B;
+    type SinkError = io::Error;
+
+    delegate_sink!(inner);
+}
+
+impl<T, B: IntoBuf> fmt::Debug for Framed<T, B>
+    where T: fmt::Debug,
+          B::Buf: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("Framed")
+            .field("inner", &self.inner)
+            .finish()
+    }
+}
+
+// ===== impl FramedRead =====
+
+impl<T: AsyncRead> FramedRead<T> {
+    /// Creates a new `FramedRead` with default configuration values.
+    pub fn new(inner: T) -> FramedRead<T> {
+        Builder::new().new_read(inner)
+    }
+}
+
+impl<T> FramedRead<T> {
+    /// Returns the current max frame setting
+    ///
+    /// This is the largest size this codec will accept from the wire. Larger
+    /// frames will be rejected.
+    pub fn max_frame_length(&self) -> usize {
+        self.inner.decoder().builder.max_frame_len
+    }
+
+    /// Updates the max frame setting.
+    ///
+    /// The change takes effect the next time a frame is decoded. In other
+    /// words, if a frame is currently in process of being decoded with a frame
+    /// size greater than `val` but less than the max frame length in effect
+    /// before calling this function, then the frame will be allowed.
+    pub fn set_max_frame_length(&mut self, val: usize) {
+        self.inner.decoder_mut().builder.max_frame_length(val);
+    }
+
+    /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        self.inner.get_ref()
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `FramedRead`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        self.inner.get_mut()
+    }
+
+    /// Consumes the `FramedRead`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn into_inner(self) -> T {
+        self.inner.into_inner()
+    }
+}
+
+impl<T: AsyncRead> Stream for FramedRead<T> {
+    type Item = BytesMut;
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<BytesMut>, io::Error> {
+        self.inner.poll(cx)
+    }
+}
+
+impl<T: Sink> Sink for FramedRead<T> {
+    type SinkItem = T::SinkItem;
+    type SinkError = T::SinkError;
+
+    delegate_sink!(inner);
+}
+
+impl<T: io::Write> io::Write for FramedRead<T> {
+    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+        self.inner.get_mut().write(src)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.inner.get_mut().flush()
+    }
+}
+
+impl<T: AsyncWrite> AsyncWrite for FramedRead<T> {
+    fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
+        -> Poll<usize, io::Error>
+    {
+        self.inner.get_mut().poll_write(cx, buf)
+    }
+
+    fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
+        -> Poll<usize, io::Error>
+    {
+        self.inner.get_mut().poll_vectored_write(cx, vec)
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        self.inner.get_mut().poll_flush(cx)
+    }
+
+    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        self.inner.get_mut().poll_close(cx)
+    }
+
+}
+
+// ===== impl Decoder ======
+
+impl Decoder {
+    fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
+        let head_len = self.builder.num_head_bytes();
+        let field_len = self.builder.length_field_len;
+
+        if src.len() < head_len {
+            // Not enough data
+            return Ok(None);
+        }
+
+        let n = {
+            let mut src = Cursor::new(&mut *src);
+
+            // Skip the required bytes
+            src.advance(self.builder.length_field_offset);
+
+            // match endianess
+            let n = if self.builder.length_field_is_big_endian {
+                src.get_uint::<BigEndian>(field_len)
+            } else {
+                src.get_uint::<LittleEndian>(field_len)
+            };
+
+            if n > self.builder.max_frame_len as u64 {
+                return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig {
+                    _priv: (),
+                }));
+            }
+
+            // The check above ensures there is no overflow
+            let n = n as usize;
+
+            // Adjust `n` with bounds checking
+            let n = if self.builder.length_adjustment < 0 {
+                n.checked_sub(-self.builder.length_adjustment as usize)
+            } else {
+                n.checked_add(self.builder.length_adjustment as usize)
+            };
+
+            // Error handling
+            match n {
+                Some(n) => n,
+                None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")),
+            }
+        };
+
+        let num_skip = self.builder.get_num_skip();
+
+        if num_skip > 0 {
+            let _ = src.split_to(num_skip);
+        }
+
+        // Ensure that the buffer has enough space to read the incoming
+        // payload
+        src.reserve(n);
+
+        return Ok(Some(n));
+    }
+
+    fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+        // At this point, the buffer has already had the required capacity
+        // reserved. All there is to do is read.
+        if src.len() < n {
+            return Ok(None);
+        }
+
+        Ok(Some(src.split_to(n)))
+    }
+}
+
+impl codec::Decoder for Decoder {
+    type Item = BytesMut;
+    type Error = io::Error;
+
+    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+        let n = match self.state {
+            DecodeState::Head => {
+                match self.decode_head(src)? {
+                    Some(n) => {
+                        self.state = DecodeState::Data(n);
+                        n
+                    }
+                    None => return Ok(None),
+                }
+            }
+            DecodeState::Data(n) => n,
+        };
+
+        match self.decode_data(n, src)? {
+            Some(data) => {
+                // Update the decode state
+                self.state = DecodeState::Head;
+
+                // Make sure the buffer has enough space to read the next head
+                src.reserve(self.builder.num_head_bytes());
+
+                Ok(Some(data))
+            }
+            None => Ok(None),
+        }
+    }
+}
+
+// ===== impl FramedWrite =====
+
+impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
+    /// Creates a new `FramedWrite` with default configuration values.
+    pub fn new(inner: T) -> FramedWrite<T, B> {
+        Builder::new().new_write(inner)
+    }
+}
+
+impl<T, B: IntoBuf> FramedWrite<T, B> {
+    /// Returns the current max frame setting
+    ///
+    /// This is the largest size this codec will write to the wire. Larger
+    /// frames will be rejected.
+    pub fn max_frame_length(&self) -> usize {
+        self.builder.max_frame_len
+    }
+
+    /// Updates the max frame setting.
+    ///
+    /// The change takes effect the next time a frame is encoded. In other
+    /// words, if a frame is currently in process of being encoded with a frame
+    /// size greater than `val` but less than the max frame length in effect
+    /// before calling this function, then the frame will be allowed.
+    pub fn set_max_frame_length(&mut self, val: usize) {
+        self.builder.max_frame_length(val);
+    }
+
+    /// Returns a reference to the underlying I/O stream wrapped by
+    /// `FramedWrite`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise
+    /// being worked with.
+    pub fn get_ref(&self) -> &T {
+        &self.inner
+    }
+
+    /// Returns a mutable reference to the underlying I/O stream wrapped by
+    /// `FramedWrite`.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+
+    /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+    ///
+    /// Note that care should be taken to not tamper with the underlying stream
+    /// of data coming in as it may corrupt the stream of frames otherwise being
+    /// worked with.
+    pub fn into_inner(self) -> T {
+        self.inner
+    }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
+    // If there is a buffered frame, try to write it to `T`
+    fn do_write(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
+        if self.frame.is_none() {
+            return Ok(Async::Ready(()));
+        }
+
+        loop {
+            let frame = self.frame.as_mut().unwrap();
+            try_ready!(self.inner.write_buf(cx, frame));
+
+            if !frame.has_remaining() {
+                break;
+            }
+        }
+
+        self.frame = None;
+
+        Ok(Async::Ready(()))
+    }
+
+    fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> {
+        let mut head = BytesMut::with_capacity(8);
+        let n = buf.remaining();
+
+        if n > self.builder.max_frame_len {
+            return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig {
+                _priv: (),
+            }));
+        }
+
+        // Adjust `n` with bounds checking
+        let n = if self.builder.length_adjustment < 0 {
+            n.checked_add(-self.builder.length_adjustment as usize)
+        } else {
+            n.checked_sub(self.builder.length_adjustment as usize)
+        };
+
+        // Error handling
+        let n = match n {
+            Some(n) => n,
+            None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")),
+        };
+
+        if self.builder.length_field_is_big_endian {
+            head.put_uint::<BigEndian>(n as u64, self.builder.length_field_len);
+        } else {
+            head.put_uint::<LittleEndian>(n as u64, self.builder.length_field_len);
+        }
+
+        debug_assert!(self.frame.is_none());
+
+        self.frame = Some(head.into_buf().chain(buf));
+
+        Ok(())
+    }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> {
+    type SinkItem = B;
+    type SinkError = io::Error;
+
+    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        self.do_write(cx)
+    }
+
+    fn start_send(&mut self, item: B) -> Result<(), Self::SinkError> {
+        self.set_frame(item.into_buf())
+    }
+
+    fn start_close(&mut self) -> Result<(), Self::SinkError> {
+        self.do_close = true;
+        Ok(())
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
+        // Write any buffered frame to T
+        try_ready!(self.do_write(cx));
+
+        // Try flushing the underlying IO
+        self.inner.poll_close(cx)
+    }
+}
+
+impl<T: Stream, B: IntoBuf> Stream for FramedWrite<T, B> {
+    type Item = T::Item;
+    type Error = T::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<T::Item>, T::Error> {
+        self.inner.poll(cx)
+    }
+}
+
+impl<T: io::Read, B: IntoBuf> io::Read for FramedWrite<T, B> {
+    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+        self.get_mut().read(dst)
+    }
+}
+
+impl<T: AsyncRead, U: IntoBuf> AsyncRead for FramedWrite<T, U> {
+    unsafe fn initializer(&self) -> Initializer {
+        self.get_ref().initializer()
+    }
+
+    fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+        -> Poll<usize, io::Error>
+    {
+        self.get_mut().poll_read(cx, buf)
+    }
+
+    fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+        -> Poll<usize, io::Error>
+    {
+        self.get_mut().poll_vectored_read(cx, vec)
+    }
+}
+
+impl<T, B: IntoBuf> fmt::Debug for FramedWrite<T, B>
+    where T: fmt::Debug,
+          B::Buf: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("FramedWrite")
+            .field("inner", &self.inner)
+            .field("builder", &self.builder)
+            .field("frame", &self.frame)
+            .finish()
+    }
+}
+
+// ===== impl Builder =====
+
+impl Builder {
+    /// Creates a new length delimited framer builder with default configuration
+    /// values.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .length_field_offset(0)
+    ///     .length_field_length(2)
+    ///     .length_adjustment(0)
+    ///     .num_skip(0)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn new() -> Builder {
+        Builder {
+            // Default max frame length of 8MB
+            max_frame_len: 8 * 1_024 * 1_024,
+
+            // Default byte length of 4
+            length_field_len: 4,
+
+            // Default to the header field being at the start of the header.
+            length_field_offset: 0,
+
+            length_adjustment: 0,
+
+            // Total number of bytes to skip before reading the payload, if not set,
+            // `length_field_len + length_field_offset`
+            num_skip: None,
+
+            // Default to reading the length field in network (big) endian.
+            length_field_is_big_endian: true,
+        }
+    }
+
+    /// Read the length field as a big endian integer
+    ///
+    /// This is the default setting.
+    ///
+    /// This configuration option applies to both encoding and decoding.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .big_endian()
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn big_endian(&mut self) -> &mut Self {
+        self.length_field_is_big_endian = true;
+        self
+    }
+
+    /// Read the length field as a little endian integer
+    ///
+    /// The default setting is big endian.
+    ///
+    /// This configuration option applies to both encoding and decoding.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .little_endian()
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn little_endian(&mut self) -> &mut Self {
+        self.length_field_is_big_endian = false;
+        self
+    }
+
+    /// Sets the max frame length
+    ///
+    /// This configuration option applies to both encoding and decoding. The
+    /// default value is 8MB.
+    ///
+    /// When decoding, the length field read from the byte stream is checked
+    /// against this setting **before** any adjustments are applied. When
+    /// encoding, the length of the submitted payload is checked against this
+    /// setting.
+    ///
+    /// When frames exceed the max length, an `io::Error` with the custom value
+    /// of the `FrameTooBig` type will be returned.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .max_frame_length(8 * 1024)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
+        self.max_frame_len = val;
+        self
+    }
+
+    /// Sets the number of bytes used to represent the length field
+    ///
+    /// The default value is `4`. The max value is `8`.
+    ///
+    /// This configuration option applies to both encoding and decoding.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .length_field_length(4)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn length_field_length(&mut self, val: usize) -> &mut Self {
+        assert!(val > 0 && val <= 8, "invalid length field length");
+        self.length_field_len = val;
+        self
+    }
+
+    /// Sets the number of bytes in the header before the length field
+    ///
+    /// This configuration option only applies to decoding.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .length_field_offset(1)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
+        self.length_field_offset = val;
+        self
+    }
+
+    /// Delta between the payload length specified in the header and the real
+    /// payload length
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .length_adjustment(-2)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
+        self.length_adjustment = val;
+        self
+    }
+
+    /// Sets the number of bytes to skip before reading the payload
+    ///
+    /// Default value is `length_field_len + length_field_offset`
+    ///
+    /// This configuration option only applies to decoding
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .num_skip(4)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn num_skip(&mut self, val: usize) -> &mut Self {
+        self.num_skip = Some(val);
+        self
+    }
+
+    /// Create a configured length delimited `FramedRead`
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate futures;
+    /// # use futures::io::AsyncRead;
+    /// use futures::io::codec::length_delimited::Builder;
+    ///
+    /// # fn bind_read<T: AsyncRead>(io: T) {
+    /// Builder::new()
+    ///     .length_field_offset(0)
+    ///     .length_field_length(2)
+    ///     .length_adjustment(0)
+    ///     .num_skip(0)
+    ///     .new_read(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn new_read<T>(&self, upstream: T) -> FramedRead<T>
+        where T: AsyncRead,
+    {
+        FramedRead {
+            inner: codec::FramedRead::new(upstream, Decoder {
+                builder: *self,
+                state: DecodeState::Head,
+            }),
+        }
+    }
+
+    /// Create a configured length delimited `FramedWrite`
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate bytes;
+    /// # extern crate futures;
+    /// # use futures::io::AsyncWrite;
+    /// # use futures::io::codec::length_delimited;
+    /// # use bytes::BytesMut;
+    /// # fn write_frame<T: AsyncWrite>(io: T) {
+    /// # let _: length_delimited::FramedWrite<T, BytesMut> =
+    /// length_delimited::Builder::new()
+    ///     .length_field_length(2)
+    ///     .new_write(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn new_write<T, B>(&self, inner: T) -> FramedWrite<T, B>
+        where T: AsyncWrite,
+              B: IntoBuf,
+    {
+        FramedWrite {
+            inner: inner,
+            builder: *self,
+            frame: None,
+            do_close: false,
+        }
+    }
+
+    /// Create a configured length delimited `Framed`
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # extern crate bytes;
+    /// # extern crate futures;
+    /// # use futures::io::{AsyncRead, AsyncWrite};
+    /// # use futures::io::codec::length_delimited;
+    /// # use bytes::BytesMut;
+    /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+    /// # let _: length_delimited::Framed<T, BytesMut> =
+    /// length_delimited::Builder::new()
+    ///     .length_field_length(2)
+    ///     .new_framed(io);
+    /// # }
+    /// # fn main() {}
+    /// ```
+    pub fn new_framed<T, B>(&self, inner: T) -> Framed<T, B>
+        where T: AsyncRead + AsyncWrite,
+              B: IntoBuf
+    {
+        let inner = self.new_read(self.new_write(inner));
+        Framed { inner: inner }
+    }
+
+    fn num_head_bytes(&self) -> usize {
+        let num = self.length_field_offset + self.length_field_len;
+        cmp::max(num, self.num_skip.unwrap_or(0))
+    }
+
+    fn get_num_skip(&self) -> usize {
+        self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len)
+    }
+}
+
+
+// ===== impl FrameTooBig =====
+
+impl fmt::Debug for FrameTooBig {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("FrameTooBig")
+            .finish()
+    }
+}
+
+impl fmt::Display for FrameTooBig {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.write_str(self.description())
+    }
+}
+
+impl StdError for FrameTooBig {
+    fn description(&self) -> &str {
+        "frame size too big"
+    }
+}
diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs
new file mode 100644
index 0000000000..37620762dd
--- /dev/null
+++ b/futures-util/src/io/lines.rs
@@ -0,0 +1,61 @@
+use std::io::{self, BufRead};
+use std::mem;
+use std::string::String;
+
+use {Poll, Stream};
+
+use io::AsyncRead;
+
+/// Combinator created by the top-level `lines` method which is a stream over
+/// the lines of text on an I/O object.
+#[derive(Debug)]
+pub struct Lines<A> {
+    io: A,
+    line: String,
+}
+
+/// Creates a new stream from the I/O object given representing the lines of
+/// input that are found on `A`.
+///
+/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of
+/// lines that the object contains. The returned stream will reach its end once
+/// `a` reaches EOF.
+pub fn lines<A>(a: A) -> Lines<A>
+    where A: AsyncRead + BufRead,
+{
+    Lines {
+        io: a,
+        line: String::new(),
+    }
+}
+
+impl<A> Lines<A> {
+    /// Returns the underlying I/O object.
+    ///
+    /// Note that this may lose data already read into internal buffers. It's
+    /// recommended to only call this once the stream has reached its end.
+    pub fn into_inner(self) -> A {
+        self.io
+    }
+}
+
+impl<A> Stream for Lines<A>
+    where A: AsyncRead + BufRead,
+{
+    type Item = String;
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<String>, io::Error> {
+        let n = try_ready!(self.io.read_line(&mut self.line));
+        if n == 0 && self.line.len() == 0 {
+            return Ok(None.into())
+        }
+        if self.line.ends_with("\n") {
+            self.line.pop();
+            if self.line.ends_with("\r") {
+                self.line.pop();
+            }
+        }
+        Ok(Some(mem::replace(&mut self.line, String::new())).into())
+    }
+}
diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs
new file mode 100644
index 0000000000..9fe887b1a5
--- /dev/null
+++ b/futures-util/src/io/mod.rs
@@ -0,0 +1,159 @@
+//! Core I/O traits and combinators.
+//!
+//! A description of the high-level I/O combinators can be [found online] in
+//! addition to a description of the [low level details].
+//!
+//! [found online]: https://tokio.rs/docs/getting-started/core/
+//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
+
+use std::io as std_io;
+
+use futures_core::{Async, Poll, task};
+pub use futures_io::{AsyncRead, AsyncWrite, IoVec};
+use bytes::{Buf, BufMut};
+
+pub mod io;
+pub mod codec;
+
+mod allow_std;
+mod codecs;
+mod copy;
+mod flush;
+mod framed;
+mod framed_read;
+mod framed_write;
+mod length_delimited;
+// Requires "BufRead"
+// mod lines;
+mod read;
+mod read_exact;
+mod read_to_end;
+// TODO: resolve. Temporary disabled because it requires "BufRead",
+// which does not have an async equivalent.
+// mod read_until;
+mod shutdown;
+mod split;
+mod window;
+mod write_all;
+
+use self::codec::{Decoder, Encoder, Framed};
+use self::split::{ReadHalf, WriteHalf};
+
+/// An extension trait which adds utility methods to `AsyncRead` types.
+pub trait AsyncReadExt: AsyncRead {
+    /// Pull some bytes from this source into the specified `Buf`, returning
+    /// how many bytes were read.
+    ///
+    /// The `buf` provided will have bytes read into it and the internal cursor
+    /// will be advanced if any bytes were read. Note that this method typically
+    /// will not reallocate the buffer provided.
+    fn read_buf<B: BufMut>(&mut self, cx: &mut task::Context, buf: &mut B)
+        -> Poll<usize, std_io::Error>
+        where Self: Sized,
+    {
+        if !buf.has_remaining_mut() {
+            return Ok(Async::Ready(0));
+        }
+
+        unsafe {
+            let n = {
+                // The `IoVec` type can't have a 0-length size, so we create a bunch
+                // of dummy versions on the stack with 1 length which we'll quickly
+                // overwrite.
+                let b1: &mut [u8] = &mut [0];
+                let b2: &mut [u8] = &mut [0];
+                let b3: &mut [u8] = &mut [0];
+                let b4: &mut [u8] = &mut [0];
+                let b5: &mut [u8] = &mut [0];
+                let b6: &mut [u8] = &mut [0];
+                let b7: &mut [u8] = &mut [0];
+                let b8: &mut [u8] = &mut [0];
+                let b9: &mut [u8] = &mut [0];
+                let b10: &mut [u8] = &mut [0];
+                let b11: &mut [u8] = &mut [0];
+                let b12: &mut [u8] = &mut [0];
+                let b13: &mut [u8] = &mut [0];
+                let b14: &mut [u8] = &mut [0];
+                let b15: &mut [u8] = &mut [0];
+                let b16: &mut [u8] = &mut [0];
+                let mut bufs: [&mut IoVec; 16] = [
+                    b1.into(), b2.into(), b3.into(), b4.into(),
+                    b5.into(), b6.into(), b7.into(), b8.into(),
+                    b9.into(), b10.into(), b11.into(), b12.into(),
+                    b13.into(), b14.into(), b15.into(), b16.into(),
+                ];
+                let n = buf.bytes_vec_mut(&mut bufs);
+                try_ready!(self.poll_vectored_read(cx, &mut bufs[..n]))
+            };
+
+            buf.advance_mut(n);
+            Ok(Async::Ready(n))
+        }
+    }
+
+    /// Provides a `Stream` and `Sink` interface for reading and writing to this
+    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+    ///
+    /// Raw I/O objects work with byte sequences, but higher-level code usually
+    /// wants to batch these into meaningful chunks, called "frames". This
+    /// method layers framing on top of an I/O object, by using the `Codec`
+    /// traits to handle encoding and decoding of messages frames. Note that
+    /// the incoming and outgoing frame types may be distinct.
+    ///
+    /// This function returns a *single* object that is both `Stream` and
+    /// `Sink`; grouping this into a single object is often useful for layering
+    /// things like gzip or TLS, which require both read and write access to the
+    /// underlying object.
+    ///
+    /// If you want to work more directly with the streams and sink, consider
+    /// calling `split` on the `Framed` returned by this method, which will
+    /// break them into separate objects, allowing them to interact more easily.
+    fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T>
+        where Self: AsyncWrite + Sized,
+    {
+        framed::framed(self, codec)
+    }
+
+    /// Helper method for splitting this read/write object into two halves.
+    ///
+    /// The two halves returned implement the `Read` and `Write` traits,
+    /// respectively.
+    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
+        where Self: AsyncWrite + Sized,
+    {
+        split::split(self)
+    }
+}
+
+impl<T: AsyncRead + ?Sized> AsyncReadExt for T {}
+
+/// An extension trait which adds utility methods to `AsyncWrite` types.
+pub trait AsyncWriteExt: AsyncWrite {
+    /// Write a `Buf` into this value, returning how many bytes were written.
+    ///
+    /// Note that this method will advance the `buf` provided automatically by
+    /// the number of bytes written.
+    fn write_buf<B: Buf>(&mut self, cx: &mut task::Context, buf: &mut B)
+        -> Poll<usize, std_io::Error>
+        where Self: Sized,
+    {
+        if !buf.has_remaining() {
+            return Ok(Async::Ready(0));
+        }
+
+        let n = {
+            // The `IoVec` type can't have a zero-length size, so create a dummy
+            // version from a 1-length slice which we'll overwrite with the
+            // `bytes_vec` method.
+            static DUMMY: &[u8] = &[0];
+            let iovec = <&IoVec>::from(DUMMY);
+            let mut bufs = [iovec; 64];
+            let n = buf.bytes_vec(&mut bufs);
+            try_ready!(self.poll_vectored_write(cx, &bufs[..n]))
+        };
+        buf.advance(n);
+        Ok(Async::Ready(n))
+    }
+}
+
+impl<T: AsyncWrite + ?Sized> AsyncWriteExt for T {}
diff --git a/futures-util/src/io/read.rs b/futures-util/src/io/read.rs
new file mode 100644
index 0000000000..a6e764f797
--- /dev/null
+++ b/futures-util/src/io/read.rs
@@ -0,0 +1,57 @@
+use std::io;
+use std::mem;
+
+use {Future, Poll, task};
+
+use io::AsyncRead;
+
+#[derive(Debug)]
+enum State<R, T> {
+    Pending {
+        rd: R,
+        buf: T,
+    },
+    Empty,
+}
+
+/// Tries to read some bytes directly into the given `buf` in asynchronous
+/// manner, returning a future type.
+///
+/// The returned future will resolve to both the I/O stream and the buffer
+/// as well as the number of bytes read once the read operation is completed.
+pub fn read<R, T>(rd: R, buf: T) -> Read<R, T>
+    where R: AsyncRead,
+          T: AsMut<[u8]>
+{
+    Read { state: State::Pending { rd: rd, buf: buf } }
+}
+
+/// A future which can be used to easily read available number of bytes to fill
+/// a buffer.
+///
+/// Created by the [`read`] function.
+#[derive(Debug)]
+pub struct Read<R, T> {
+    state: State<R, T>,
+}
+
+impl<R, T> Future for Read<R, T>
+    where R: AsyncRead,
+          T: AsMut<[u8]>
+{
+    type Item = (R, T, usize);
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<(R, T, usize), io::Error> {
+        let nread = match self.state {
+            State::Pending { ref mut rd, ref mut buf } =>
+                try_ready!(rd.poll_read(cx, &mut buf.as_mut()[..])),
+            State::Empty => panic!("poll a Read after it's done"),
+        };
+
+        match mem::replace(&mut self.state, State::Empty) {
+            State::Pending { rd, buf } => Ok((rd, buf, nread).into()),
+            State::Empty => panic!("invalid internal state"),
+        }
+    }
+}
diff --git a/futures-util/src/io/read_exact.rs b/futures-util/src/io/read_exact.rs
new file mode 100644
index 0000000000..bfe69b10d1
--- /dev/null
+++ b/futures-util/src/io/read_exact.rs
@@ -0,0 +1,83 @@
+use std::io;
+use std::mem;
+
+use {Poll, Future, task};
+
+use io::AsyncRead;
+
+/// A future which can be used to easily read exactly enough bytes to fill
+/// a buffer.
+///
+/// Created by the [`read_exact`] function.
+///
+/// [`read_exact`]: fn.read_exact.html
+#[derive(Debug)]
+pub struct ReadExact<A, T> {
+    state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+    Reading {
+        a: A,
+        buf: T,
+        pos: usize,
+    },
+    Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+    where A: AsyncRead,
+          T: AsMut<[u8]>,
+{
+    ReadExact {
+        state: State::Reading {
+            a: a,
+            buf: buf,
+            pos: 0,
+        },
+    }
+}
+
+fn eof() -> io::Error {
+    io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+    where A: AsyncRead,
+          T: AsMut<[u8]>,
+{
+    type Item = (A, T);
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> {
+        match self.state {
+            State::Reading { ref mut a, ref mut buf, ref mut pos } => {
+                let buf = buf.as_mut();
+                while *pos < buf.len() {
+                    let n = try_ready!(a.poll_read(cx, &mut buf[*pos..]));
+                    *pos += n;
+                    if n == 0 {
+                        return Err(eof())
+                    }
+                }
+            }
+            State::Empty => panic!("poll a ReadExact after it's done"),
+        }
+
+        match mem::replace(&mut self.state, State::Empty) {
+            State::Reading { a, buf, .. } => Ok((a, buf).into()),
+            State::Empty => panic!(),
+        }
+    }
+}
diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs
new file mode 100644
index 0000000000..04d8df181b
--- /dev/null
+++ b/futures-util/src/io/read_to_end.rs
@@ -0,0 +1,120 @@
+use std::io::{self, ErrorKind};
+use std::mem;
+use std::vec::Vec;
+
+use {Async, Poll, Future, task};
+
+use io::AsyncRead;
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the [`read_to_end`] function.
+///
+/// [`read_to_end`]: fn.read_to_end.html
+#[derive(Debug)]
+pub struct ReadToEnd<A> {
+    state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+    Reading {
+        a: A,
+        buf: Vec<u8>,
+    },
+    Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+    where A: AsyncRead,
+{
+    ReadToEnd {
+        state: State::Reading {
+            a,
+            buf,
+        }
+    }
+}
+
+struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
+
+impl<'a> Drop for Guard<'a> {
+    fn drop(&mut self) {
+        unsafe { self.buf.set_len(self.len); }
+    }
+}
+
+// This uses an adaptive system to extend the vector when it fills. We want to
+// avoid paying to allocate and zero a huge chunk of memory if the reader only
+// has 4 bytes while still making large reads if the reader does have a ton
+// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+// time is 4,500 times (!) slower than this if the reader has a very small
+// amount of data to return.
+//
+// Because we're extending the buffer with uninitialized data for trusted
+// readers, we need to make sure to truncate that if any of this panics.
+fn read_to_end_internal<R: AsyncRead + ?Sized>(r: &mut R, cx: &mut task::Context, buf: &mut Vec<u8>)
+    -> Poll<usize, io::Error>
+{
+    let start_len = buf.len();
+    let mut g = Guard { len: buf.len(), buf: buf };
+    let ret;
+    loop {
+        if g.len == g.buf.len() {
+            unsafe {
+                g.buf.reserve(32);
+                let capacity = g.buf.capacity();
+                g.buf.set_len(capacity);
+                r.initializer().initialize(&mut g.buf[g.len..]);
+            }
+        }
+
+        match r.poll_read(cx, &mut g.buf[g.len..]) {
+            Ok(Async::Ready(0)) => {
+                ret = Ok(Async::Ready(g.len - start_len));
+                break;
+            }
+            Ok(Async::Ready(n)) => g.len += n,
+            Ok(Async::Pending) => return Ok(Async::Pending),
+            Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
+            Err(e) => {
+                ret = Err(e);
+                break;
+            }
+        }
+    }
+
+    ret
+}
+
+impl<A> Future for ReadToEnd<A>
+    where A: AsyncRead,
+{
+    type Item = (A, Vec<u8>);
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec<u8>), io::Error> {
+        match self.state {
+            State::Reading { ref mut a, ref mut buf } => {
+                // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+                // hit "would block" then all the read data so far is in our buffer, and
+                // otherwise we propagate errors
+                try_ready!(read_to_end_internal(a, cx, buf));
+            },
+            State::Empty => panic!("poll ReadToEnd after it's done"),
+        }
+
+        match mem::replace(&mut self.state, State::Empty) {
+            State::Reading { a, buf } => Ok((a, buf).into()),
+            State::Empty => unreachable!(),
+        }
+    }
+}
diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs
new file mode 100644
index 0000000000..71ebf09c1b
--- /dev/null
+++ b/futures-util/src/io/read_until.rs
@@ -0,0 +1,75 @@
+use std::io::{self, BufRead};
+use std::mem;
+use std::vec::Vec;
+
+use {Poll, Future};
+
+use io::AsyncRead;
+
+/// A future which can be used to easily read the contents of a stream into a
+/// vector until the delimiter is reached.
+///
+/// Created by the [`read_until`] function.
+///
+/// [`read_until`]: fn.read_until.html
+#[derive(Debug)]
+pub struct ReadUntil<A> {
+    state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+    Reading {
+        a: A,
+        byte: u8,
+        buf: Vec<u8>,
+    },
+    Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided until the delimiter `byte` is reached.
+/// This method is the async equivalent to [`BufRead::read_until`].
+///
+/// In case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all bytes up to, and including, the delimiter
+/// (if found).
+///
+/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until
+pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A>
+    where A: AsyncRead + BufRead,
+{
+    ReadUntil {
+        state: State::Reading {
+            a: a,
+            byte: byte,
+            buf: buf,
+        }
+    }
+}
+
+impl<A> Future for ReadUntil<A>
+    where A: AsyncRead + BufRead
+{
+    type Item = (A, Vec<u8>);
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+        match self.state {
+            State::Reading { ref mut a, byte, ref mut buf } => {
+                // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter.
+                // and just return it, as we are finished.
+                // If we hit "would block" then all the read data so far
+                // is in our buffer, and otherwise we propagate errors.
+                try_ready!(a.read_until(byte, buf));
+            },
+            State::Empty => panic!("poll ReadUntil after it's done"),
+        }
+
+        match mem::replace(&mut self.state, State::Empty) {
+            State::Reading { a, byte: _, buf } => Ok((a, buf).into()),
+            State::Empty => unreachable!(),
+        }
+    }
+}
diff --git a/futures-util/src/io/shutdown.rs b/futures-util/src/io/shutdown.rs
new file mode 100644
index 0000000000..6e06c8dc66
--- /dev/null
+++ b/futures-util/src/io/shutdown.rs
@@ -0,0 +1,44 @@
+use std::io;
+
+use {Poll, Future, Async, task};
+
+use AsyncWrite;
+
+/// A future used to fully shutdown an I/O object.
+///
+/// Resolves to the underlying I/O object once the shutdown operation is
+/// complete.
+///
+/// Created by the [`shutdown`] function.
+///
+/// [`shutdown`]: fn.shutdown.html
+#[derive(Debug)]
+pub struct Shutdown<A> {
+    a: Option<A>,
+}
+
+/// Creates a future which will entirely shutdown an I/O object and then yield
+/// the object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`,
+/// scheduling a retry if `WouldBlock` is seen along the way.
+pub fn shutdown<A>(a: A) -> Shutdown<A>
+    where A: AsyncWrite,
+{
+    Shutdown {
+        a: Some(a),
+    }
+}
+
+impl<A> Future for Shutdown<A>
+    where A: AsyncWrite,
+{
+    type Item = A;
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<A, io::Error> {
+        try_ready!(self.a.as_mut().unwrap().poll_close(cx));
+        Ok(Async::Ready(self.a.take().unwrap()))
+    }
+}
diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs
new file mode 100644
index 0000000000..69fb872b03
--- /dev/null
+++ b/futures-util/src/io/split.rs
@@ -0,0 +1,68 @@
+use std::io;
+
+use {Async, Poll, task};
+use lock::BiLock;
+
+use futures_io::{AsyncRead, AsyncWrite, Error, IoVec};
+
+/// The readable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct ReadHalf<T> {
+    handle: BiLock<T>,
+}
+
+/// The writable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct WriteHalf<T> {
+    handle: BiLock<T>,
+}
+
+fn lock_and_then<T, U, E, F>(lock: &BiLock<T>, cx: &mut task::Context, f: F) -> Result<Async<U>, E>
+    where F: FnOnce(&mut T, &mut task::Context) -> Result<Async<U>, E>
+{
+    match lock.poll_lock(cx) {
+        Async::Ready(ref mut l) => f(l, cx),
+        Async::Pending => Ok(Async::Pending),
+    }
+}
+
+pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
+    let (a, b) = BiLock::new(t);
+    (ReadHalf { handle: a }, WriteHalf { handle: b })
+}
+
+impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
+    fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
+        -> Poll<usize, io::Error>
+    {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf))
+    }
+
+    fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
+        -> Poll<usize, io::Error>
+    {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_read(cx, vec))
+    }
+}
+
+impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
+    fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
+        -> Poll<usize, Error>
+    {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf))
+    }
+
+    fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
+        -> Poll<usize, Error>
+    {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_vectored_write(cx, vec))
+    }
+
+    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
+    }
+
+    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
+        lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
+    }
+}
diff --git a/futures-util/src/io/window.rs b/futures-util/src/io/window.rs
new file mode 100644
index 0000000000..4ded9ad403
--- /dev/null
+++ b/futures-util/src/io/window.rs
@@ -0,0 +1,117 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+#[derive(Debug)]
+pub struct Window<T> {
+    inner: T,
+    range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+    /// Creates a new window around the buffer `t` defaulting to the entire
+    /// slice.
+    ///
+    /// Further methods can be called on the returned `Window<T>` to alter the
+    /// window into the data provided.
+    pub fn new(t: T) -> Window<T> {
+        Window {
+            range: 0..t.as_ref().len(),
+            inner: t,
+        }
+    }
+
+    /// Gets a shared reference to the underlying buffer inside of this
+    /// `Window`.
+    pub fn get_ref(&self) -> &T {
+        &self.inner
+    }
+
+    /// Gets a mutable reference to the underlying buffer inside of this
+    /// `Window`.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+
+    /// Consumes this `Window`, returning the underlying buffer.
+    pub fn into_inner(self) -> T {
+        self.inner
+    }
+
+    /// Returns the starting index of this window into the underlying buffer
+    /// `T`.
+    pub fn start(&self) -> usize {
+        self.range.start
+    }
+
+    /// Returns the end index of this window into the underlying buffer
+    /// `T`.
+    pub fn end(&self) -> usize {
+        self.range.end
+    }
+
+    /// Changes the starting index of this window to the index specified.
+    ///
+    /// Returns the windows back to chain multiple calls to this method.
+    ///
+    /// # Panics
+    ///
+    /// This method will panic if `start` is out of bounds for the underlying
+    /// slice or if it comes after the `end` configured in this window.
+    pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+        assert!(start <= self.inner.as_ref().len());
+        assert!(start <= self.range.end);
+        self.range.start = start;
+        self
+    }
+
+    /// Changes the end index of this window to the index specified.
+    ///
+    /// Returns the windows back to chain multiple calls to this method.
+    ///
+    /// # Panics
+    ///
+    /// This method will panic if `end` is out of bounds for the underlying
+    /// slice or if it comes before the `start` configured in this window.
+    pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+        assert!(end <= self.inner.as_ref().len());
+        assert!(self.range.start <= end);
+        self.range.end = end;
+        self
+    }
+
+    // TODO: how about a generic set() method along the lines of:
+    //
+    //       buffer.set(..3)
+    //             .set(0..2)
+    //             .set(4..)
+    //
+    // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+    fn as_ref(&self) -> &[u8] {
+        &self.inner.as_ref()[self.range.start..self.range.end]
+    }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+    fn as_mut(&mut self) -> &mut [u8] {
+        &mut self.inner.as_mut()[self.range.start..self.range.end]
+    }
+}
diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs
new file mode 100644
index 0000000000..dd0f4e4c9c
--- /dev/null
+++ b/futures-util/src/io/write_all.rs
@@ -0,0 +1,86 @@
+use std::io;
+use std::mem;
+
+use {Poll, Future, task};
+
+use AsyncWrite;
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the [`write_all`] top-level method.
+///
+/// [`write_all`]: fn.write_all.html
+#[derive(Debug)]
+pub struct WriteAll<A, T> {
+    state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+    Writing {
+        a: A,
+        buf: T,
+        pos: usize,
+    },
+    Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+    where A: AsyncWrite,
+          T: AsRef<[u8]>,
+{
+    WriteAll {
+        state: State::Writing {
+            a: a,
+            buf: buf,
+            pos: 0,
+        },
+    }
+}
+
+fn zero_write() -> io::Error {
+    io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+    where A: AsyncWrite,
+          T: AsRef<[u8]>,
+{
+    type Item = (A, T);
+    type Error = io::Error;
+
+    fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> {
+        match self.state {
+            State::Writing { ref mut a, ref buf, ref mut pos } => {
+                let buf = buf.as_ref();
+                while *pos < buf.len() {
+                    let n = try_ready!(a.poll_write(cx, &buf[*pos..]));
+                    *pos += n;
+                    if n == 0 {
+                        return Err(zero_write())
+                    }
+                }
+            }
+            State::Empty => panic!("poll a WriteAll after it's done"),
+        }
+
+        match mem::replace(&mut self.state, State::Empty) {
+            State::Writing { a, buf, .. } => Ok((a, buf).into()),
+            State::Empty => panic!(),
+        }
+    }
+}
diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs
index a6efc7a2b9..d6be5899f1 100644
--- a/futures-util/src/lib.rs
+++ b/futures-util/src/lib.rs
@@ -6,8 +6,14 @@
 
 #[macro_use]
 extern crate futures_core;
+extern crate futures_io;
 extern crate futures_sink;
 
+#[cfg(feature = "std")]
+use futures_core::{Async, Future, Poll, Stream, task};
+#[cfg(feature = "std")]
+use futures_sink::Sink;
+
 macro_rules! if_std {
     ($($i:item)*) => ($(
         #[cfg(feature = "std")]
@@ -15,6 +21,16 @@ macro_rules! if_std {
     )*)
 }
 
+if_std! {
+    extern crate bytes;
+    #[macro_use]
+    extern crate log;
+}
+
+#[cfg(feature = "std")]
+#[macro_use]
+extern crate std;
+
 macro_rules! delegate_sink {
     ($field:ident) => {
         fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
@@ -35,16 +51,17 @@ macro_rules! delegate_sink {
     }
 }
 
-#[macro_use]
-#[cfg(feature = "std")]
-extern crate std;
-
 #[cfg(feature = "std")]
 pub mod lock;
 
 pub mod future;
 pub use future::FutureExt;
 
+#[cfg(feature = "std")]
+pub mod io;
+#[cfg(feature = "std")]
+pub use io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
+
 pub mod stream;
 pub use stream::StreamExt;
 
@@ -54,4 +71,6 @@ pub use sink::SinkExt;
 pub mod prelude {
     //! Prelude with common traits from the `futures-util` crate.
     pub use {FutureExt, StreamExt, SinkExt};
+    #[cfg(feature = "std")]
+    pub use {AsyncReadExt, AsyncWriteExt};
 }
diff --git a/futures/Cargo.toml b/futures/Cargo.toml
index ac636d59de..b666ec6402 100644
--- a/futures/Cargo.toml
+++ b/futures/Cargo.toml
@@ -22,9 +22,10 @@ appveyor = { repository = "alexcrichton/futures-rs" }
 futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
 futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
 futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = false }
+futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
 futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false }
 futures-util = { path = "../futures-util", version = "0.2.0", default-features = false }
 
 [features]
-std = ["futures-core/std", "futures-executor/std", "futures-sink/std", "futures-util/std"]
+std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-util/std"]
 default = ["std"]
diff --git a/futures/src/lib.rs b/futures/src/lib.rs
index 38e51753fd..817536f3f0 100644
--- a/futures/src/lib.rs
+++ b/futures/src/lib.rs
@@ -166,6 +166,7 @@
 extern crate futures_core;
 extern crate futures_channel;
 extern crate futures_executor;
+extern crate futures_io;
 extern crate futures_sink;
 extern crate futures_util;
 
@@ -223,6 +224,16 @@ pub mod future {
     pub use futures_util::future::*;
 }
 
+#[cfg(feature = "std")]
+pub mod io {
+    //! IO
+    //!
+    //! This module contains the `AsyncRead` and `AsyncWrite` traits, as well
+    //! as a number of combinators and extensions for using them.
+    pub use futures_io::*;
+    pub use futures_util::io::*;
+}
+
 pub mod prelude {
 	//! A "prelude" for crates using the `futures` crate.
 	//!
@@ -253,6 +264,14 @@ pub mod prelude {
 		StreamExt,
         SinkExt,
     };
+
+    #[cfg(feature = "std")]
+    pub use futures_util::{
+        AsyncRead,
+        AsyncWrite,
+        AsyncReadExt,
+        AsyncWriteExt,
+    };
 }
 
 pub mod sink {