Skip to content

Commit 98b1176

Browse files
committed
Add futures-io
1 parent 3e5aa4b commit 98b1176

28 files changed

+3722
-6
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = [
44
"futures-core",
55
"futures-channel",
66
"futures-executor",
7+
"futures-io",
78
"futures-util",
89
"futures-sink",
910
]

LICENSE-MIT

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
Copyright (c) 2016 Alex Crichton
2+
Copyright (c) 2017 The Tokio Authors
23

34
Permission is hereby granted, free of charge, to any
45
person obtaining a copy of this software and associated

futures-io/Cargo.toml

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "futures-io"
3+
version = "0.2.0"
4+
authors = ["Alex Crichton <[email protected]>"]
5+
license = "MIT/Apache-2.0"
6+
repository = "https://github.com/alexcrichton/futures-rs"
7+
homepage = "https://github.com/alexcrichton/futures-rs"
8+
documentation = "https://docs.rs/futures-io"
9+
description = """
10+
The `AsyncRead` and `AsyncWrite` traits for the futures-rs library.
11+
"""
12+
13+
[features]
14+
std = ["futures-core/std", "iovec"]
15+
default = ["std"]
16+
17+
[dependencies]
18+
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
19+
iovec = { version = "0.1", optional = true }

futures-io/src/lib.rs

+305
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
//! Asynchronous IO
2+
//!
3+
//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow
4+
//! data to be read and written asynchronously.
5+
6+
#![no_std]
7+
#![deny(missing_docs, missing_debug_implementations)]
8+
#![doc(html_root_url = "https://docs.rs/futures-io/0.2")]
9+
10+
macro_rules! if_std {
11+
($($i:item)*) => ($(
12+
#[cfg(feature = "std")]
13+
$i
14+
)*)
15+
}
16+
17+
if_std! {
18+
extern crate futures_core;
19+
extern crate iovec;
20+
extern crate std;
21+
22+
use futures_core::{Async, Poll, task};
23+
use std::boxed::Box;
24+
use std::io as StdIo;
25+
use std::ptr;
26+
use std::vec::Vec;
27+
28+
// Re-export IoVec for convenience
29+
pub use iovec::IoVec;
30+
31+
// Re-export io::Error so that users don't have to deal
32+
// with conflicts when `use`ing `futures::io` and `std::io`.
33+
pub use StdIo::Error as Error;
34+
35+
/// A type used to conditionally initialize buffers passed to `AsyncRead`
36+
/// methods.
37+
#[derive(Debug)]
38+
pub struct Initializer(bool);
39+
40+
impl Initializer {
41+
/// Returns a new `Initializer` which will zero out buffers.
42+
#[inline]
43+
pub fn zeroing() -> Initializer {
44+
Initializer(true)
45+
}
46+
47+
/// Returns a new `Initializer` which will not zero out buffers.
48+
///
49+
/// # Safety
50+
///
51+
/// This method may only be called by `AsyncRead`ers which guarantee
52+
/// that they will not read from the buffers passed to `AsyncRead`
53+
/// methods, and that the return value of the method accurately reflects
54+
/// the number of bytes that have been written to the head of the buffer.
55+
#[inline]
56+
pub unsafe fn nop() -> Initializer {
57+
Initializer(false)
58+
}
59+
60+
/// Indicates if a buffer should be initialized.
61+
#[inline]
62+
pub fn should_initialize(&self) -> bool {
63+
self.0
64+
}
65+
66+
/// Initializes a buffer if necessary.
67+
#[inline]
68+
pub fn initialize(&self, buf: &mut [u8]) {
69+
if self.should_initialize() {
70+
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
71+
}
72+
}
73+
}
74+
75+
/// Objects which can be read asynchronously.
76+
pub trait AsyncRead {
77+
/// Determines if this `AsyncRead`er can work with buffers of
78+
/// uninitialized memory.
79+
///
80+
/// The default implementation returns an initializer which will zero
81+
/// buffers.
82+
///
83+
/// # Safety
84+
///
85+
/// This method is `unsafe` because and `AsyncRead`er could otherwise
86+
/// return a non-zeroing `Initializer` from another `AsyncRead` type
87+
/// without an `unsafe` block.
88+
#[inline]
89+
unsafe fn initializer(&self) -> Initializer {
90+
Initializer::zeroing()
91+
}
92+
93+
/// Attempt to read from the `AsyncRead` into `buf`.
94+
///
95+
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
96+
///
97+
/// If reading would block, this function returns `Ok(Async::Pending)`
98+
/// and arranges for `cx.waker()` to receive a notification when the
99+
/// object becomes readable or is closed.
100+
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
101+
-> Poll<usize, Error>;
102+
103+
/// Attempt to read from the `AsyncRead` into `vec` using vectored
104+
/// IO operations. This allows data to be read into multiple buffers
105+
/// using a single operation.
106+
///
107+
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
108+
///
109+
/// By default, this method delegates to using `poll_read` on the first
110+
/// buffer in `vec`. Objects which support vectored IO should override
111+
/// this method.
112+
///
113+
/// If reading would block, this function returns `Ok(Async::Pending)`
114+
/// and arranges for `cx.waker()` to receive a notification when the
115+
/// object becomes readable or is closed.
116+
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
117+
-> Poll<usize, Error>
118+
{
119+
if let Some(first_iovec) = vec.get_mut(0) {
120+
self.poll_read(&mut *first_iovec, cx)
121+
} else {
122+
// `vec` is empty.
123+
return Ok(Async::Ready(0));
124+
}
125+
}
126+
}
127+
128+
/// Objects which can be written to asynchronously.
129+
pub trait AsyncWrite {
130+
/// Attempt to write bytes from `buf` into the object.
131+
///
132+
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
133+
///
134+
/// If writing would block, this function returns `Ok(Async::Pending)`
135+
/// and arranges for `cx.waker()` to receive a notification when the
136+
/// the object becomes writable or is closed.
137+
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
138+
-> Poll<usize, Error>;
139+
140+
/// Attempt to write bytes from `vec` into the object using vectored
141+
/// IO operations. This allows data from multiple buffers to be written
142+
/// using a single operation.
143+
///
144+
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
145+
///
146+
/// By default, this method delegates to using `poll_write` on the first
147+
/// buffer in `vec`. Objects which support vectored IO should override
148+
/// this method.
149+
///
150+
/// If writing would block, this function returns `Ok(Async::Pending)`
151+
/// and arranges for `cx.waker()` to receive a notification when the
152+
/// object becomes writable or is closed.
153+
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
154+
-> Poll<usize, Error>
155+
{
156+
if let Some(first_iovec) = vec.get(0) {
157+
self.poll_write(&*first_iovec, cx)
158+
} else {
159+
// `vec` is empty.
160+
return Ok(Async::Ready(0));
161+
}
162+
}
163+
164+
/// Attempt to flush the object, ensuring that all intermediately
165+
/// buffered contents reach their destination.
166+
///
167+
/// On success, returns `Ok(Async::Ready(()))`.
168+
///
169+
/// If flushing is incomplete, this function returns `Ok(Async::Pending)`
170+
/// and arranges for `cx.waker()` to receive a notification when the
171+
/// object can make progress towards flushing.
172+
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
173+
174+
/// Attempt to close the object.
175+
///
176+
/// On success, returns `Ok(Async::Ready(()))`.
177+
///
178+
/// If closing is incomplete, this function returns `Ok(Async::Pending)`
179+
/// and arranges for `cx.waker()` to receive a notification when the
180+
/// object can make progress towards closing.
181+
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
182+
}
183+
184+
macro_rules! deref_async_read {
185+
() => {
186+
unsafe fn initializer(&self) -> Initializer {
187+
(**self).initializer()
188+
}
189+
190+
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
191+
-> Poll<usize, Error>
192+
{
193+
(**self).poll_read(buf, cx)
194+
}
195+
196+
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
197+
-> Poll<usize, Error>
198+
{
199+
(**self).poll_vectored_read(vec, cx)
200+
}
201+
}
202+
}
203+
204+
impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
205+
deref_async_read!();
206+
}
207+
208+
impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
209+
deref_async_read!();
210+
}
211+
212+
/// `unsafe` because the `StdIo::Read` type must not access the buffer
213+
/// before reading data into it.
214+
macro_rules! unsafe_delegate_async_read_to_stdio {
215+
() => {
216+
unsafe fn initializer(&self) -> Initializer {
217+
Initializer::nop()
218+
}
219+
220+
fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context)
221+
-> Poll<usize, Error>
222+
{
223+
Ok(Async::Ready(StdIo::Read::read(self, buf)?))
224+
}
225+
}
226+
}
227+
228+
impl<'a> AsyncRead for &'a [u8] {
229+
unsafe_delegate_async_read_to_stdio!();
230+
}
231+
232+
impl AsyncRead for StdIo::Repeat {
233+
unsafe_delegate_async_read_to_stdio!();
234+
}
235+
236+
impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
237+
unsafe_delegate_async_read_to_stdio!();
238+
}
239+
240+
macro_rules! deref_async_write {
241+
() => {
242+
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
243+
-> Poll<usize, Error>
244+
{
245+
(**self).poll_write(buf, cx)
246+
}
247+
248+
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
249+
-> Poll<usize, Error>
250+
{
251+
(**self).poll_vectored_write(vec, cx)
252+
}
253+
254+
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
255+
(**self).poll_flush(cx)
256+
}
257+
258+
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
259+
(**self).poll_close(cx)
260+
}
261+
}
262+
}
263+
264+
impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
265+
deref_async_write!();
266+
}
267+
268+
impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
269+
deref_async_write!();
270+
}
271+
272+
macro_rules! delegate_async_write_to_stdio {
273+
() => {
274+
fn poll_write(&mut self, buf: &[u8], _: &mut task::Context)
275+
-> Poll<usize, Error>
276+
{
277+
Ok(Async::Ready(StdIo::Write::write(self, buf)?))
278+
}
279+
280+
fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
281+
Ok(Async::Ready(StdIo::Write::flush(self)?))
282+
}
283+
284+
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
285+
self.poll_flush(cx)
286+
}
287+
}
288+
}
289+
290+
impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
291+
delegate_async_write_to_stdio!();
292+
}
293+
294+
impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
295+
delegate_async_write_to_stdio!();
296+
}
297+
298+
impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
299+
delegate_async_write_to_stdio!();
300+
}
301+
302+
impl AsyncWrite for StdIo::Sink {
303+
delegate_async_write_to_stdio!();
304+
}
305+
}

futures-util/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library.
1111
"""
1212

1313
[features]
14-
std = ["futures-core/std", "futures-sink/std"]
14+
std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"]
1515
default = ["std"]
1616

1717
[dependencies]
18+
bytes = { version = "0.4", optional = true }
19+
log = { version = "0.4", optional = true }
1820
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
1921
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
22+
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
2023
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
2124

2225
[dev-dependencies]

0 commit comments

Comments
 (0)