diff --git a/src/async_io.rs b/src/async_io.rs index ce294108..298bb4d0 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -1,6 +1,8 @@ -//! Async I/O. +//! Abstraction over [epoll]/[kqueue]/[wepoll]. //! -//! TODO +//! [epoll]: https://en.wikipedia.org/wiki/Epoll +//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue +//! [wepoll]: https://github.com/piscisaureus/wepoll use std::future::Future; use std::io::{self, Read, Write}; diff --git a/src/reactor.rs b/src/reactor.rs index 3d5f6064..e99d1ec8 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -1,6 +1,7 @@ -//! The reactor, async I/O, and timers. +//! The reactor notifying [`Async`][`crate::Async`] and [`Timer`][`crate::Timer`]. //! -//! TODO +//! There is a single global reactor that contains all registered I/O handles and timers. The +//! reactor is polled by the executor, i.e. the [`run()`][`crate::run()`] function. #[cfg(not(any( target_os = "linux", // epoll @@ -29,6 +30,7 @@ use std::sync::Arc; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; +use crossbeam::queue::ArrayQueue; #[cfg(unix)] use nix::fcntl::{fcntl, FcntlArg, OFlag}; use once_cell::sync::Lazy; @@ -39,12 +41,12 @@ use socket2::Socket; use crate::io_event::IoEvent; use crate::throttle; -/// The reactor driving I/O events and timers. +/// The reactor. /// -/// Every async I/O handle ("source") and every timer is registered here. Invocations of `run()` -/// poll the reactor to check for new events every now and then. +/// Every async I/O handle and every timer is registered here. Invocations of [`run()`] poll the +/// reactor to check for new events every now and then. /// -/// There is only one global instance of this type, accessible by `Reactor::get()`. +/// There is only one global instance of this type, accessible by [`Reactor::get()`]. pub(crate) struct Reactor { /// Raw bindings to epoll/kqueue/wepoll. sys: sys::Reactor, @@ -62,6 +64,12 @@ pub(crate) struct Reactor { /// timer. timers: piper::Mutex>, + /// A queue of timer operations (insert and remove). + /// + /// When inserting or removing a timer, we don't process it immediately - we just push it into + /// this queue. Timers actually get processed when the queue fills up or the reactor is polled. + timer_ops: ArrayQueue, + /// An I/O event that is triggered when a new earliest timer is registered. /// /// This is used to wake up the thread waiting on the reactor, which would otherwise wait until @@ -80,6 +88,7 @@ impl Reactor { sources: piper::Mutex::new(Slab::new()), events: piper::Lock::new(sys::Events::new()), timers: piper::Mutex::new(BTreeMap::new()), + timer_ops: ArrayQueue::new(1000), event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")), }); &REACTOR @@ -129,27 +138,30 @@ impl Reactor { /// Registers a timer in the reactor. /// /// Returns the inserted timer's ID. - pub fn insert_timer(&self, when: Instant, waker: Waker) -> usize { - let mut timers = self.timers.lock(); - + pub fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { // Generate a new timer ID. static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); - // Insert this timer into the timers map. - timers.insert((when, id), waker); - - // If this timer is now the earliest one, interrupt the reactor. - if timers.keys().next().map(|(when, _)| *when) == Some(when) { - self.event.notify(); + // Push an insert operation. + while self.timer_ops.push(TimerOp::Insert(when, id, waker.clone())).is_err() { + // Fire timers to drain the queue. + self.fire_timers(); } + // Interrupt the reactor. + self.event.notify(); + id } /// Deregisters a timer from the reactor. pub fn remove_timer(&self, when: Instant, id: usize) { - self.timers.lock().remove(&(when, id)); + // Push a remove operation. + while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() { + // Fire timers to drain the queue. + self.fire_timers(); + } } /// Attempts to lock the reactor. @@ -166,9 +178,55 @@ impl Reactor { let events = self.events.lock().await; ReactorLock { reactor, events } } + + /// Fires ready timers. + /// + /// Returns the duration until the next timer before this method was called. + fn fire_timers(&self) -> Option { + let mut timers = self.timers.lock(); + + // Process timer operations, but no more than the queue capacity because otherwise we could + // keep popping operations forever. + for _ in 0..self.timer_ops.capacity() { + match self.timer_ops.pop() { + Ok(TimerOp::Insert(when, id, waker)) => { + timers.insert((when, id), waker); + } + Ok(TimerOp::Remove(when, id)) => { + timers.remove(&(when, id)); + } + Err(_) => break, + } + } + + let now = Instant::now(); + + // Split timers into ready and pending timers. + let pending = timers.split_off(&(now, 0)); + let ready = mem::replace(&mut *timers, pending); + + // Calculate the duration until the next event. + let dur = if ready.is_empty() { + // Duration until the next timer. + timers + .keys() + .next() + .map(|(when, _)| when.saturating_duration_since(now)) + } else { + // Timers are about to fire right now. + Some(Duration::from_secs(0)) + }; + + // Wake up tasks waiting on timers. + for (_, waker) in ready { + waker.wake(); + } + + dur + } } -/// Polls the reactor for I/O events and wakes up tasks. +/// A lock on the reactor. pub(crate) struct ReactorLock<'a> { reactor: &'a Reactor, events: piper::LockGuard, @@ -187,32 +245,12 @@ impl ReactorLock<'_> { /// Processes new events, optionally blocking until the first event. fn react(&mut self, block: bool) -> io::Result<()> { - let timeout = { - let mut timers = self.reactor.timers.lock(); - let now = Instant::now(); - - // Split timers into ready and pending timers. - let pending = timers.split_off(&(now, 0)); - let ready = mem::replace(&mut *timers, pending); - - // Calculate the timeout for waiting on I/O events. - let timeout = if ready.is_empty() && block { - // Block until the next timer. - timers - .keys() - .next() - .map(|(when, _)| when.saturating_duration_since(now)) - } else { - // Don't block. - Some(Duration::from_secs(0)) - }; - - // Wake up tasks waiting on timers. - for (_, waker) in ready { - waker.wake(); - } - - timeout + // Fire timers and compute the timeout for blocking on I/O events. + let next_timer = self.reactor.fire_timers(); + let timeout = if block { + next_timer + } else { + Some(Duration::from_secs(0)) }; // Block on I/O events. @@ -246,12 +284,18 @@ impl ReactorLock<'_> { // The syscall was interrupted. Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()), - // A real error occureed. + // An actual error occureed. Err(err) => Err(err), } } } +/// A single timer operation. +enum TimerOp { + Insert(Instant, usize, Waker), + Remove(Instant, usize), +} + /// A registered source of I/O events. #[derive(Debug)] pub(crate) struct Source { @@ -319,7 +363,7 @@ fn io_err(err: nix::Error) -> io::Error { } } -/// Bindings to epoll (Linux, Android, illumos). +/// Raw bindings to epoll (Linux, Android, illumos). #[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))] mod sys { use std::convert::TryInto; @@ -377,7 +421,7 @@ mod sys { } } -/// Bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD). +/// Raw bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD). #[cfg(any( target_os = "macos", target_os = "ios", @@ -473,7 +517,7 @@ mod sys { } } -/// Bindings to wepoll (Windows). +/// Raw bindings to wepoll (Windows). #[cfg(target_os = "windows")] mod sys { use std::io; diff --git a/src/timer.rs b/src/timer.rs index e49d1ad8..bd9f9407 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -43,7 +43,7 @@ use crate::reactor::Reactor; /// ) -> io::Result { /// futures::select! { /// t = f.fuse() => t, -/// _ = Timer::after(dur).fuse() => Err(io::Error::from(io::ErrorKind::TimedOut)), +/// _ = Timer::after(dur).fuse() => Err(io::ErrorKind::TimedOut.into()), /// } /// } /// @@ -126,8 +126,7 @@ impl Future for Timer { } else { if self.id.is_none() { // Register the timer in the reactor. - let waker = cx.waker().clone(); - self.id = Some(Reactor::get().insert_timer(self.when, waker)); + self.id = Some(Reactor::get().insert_timer(self.when, cx.waker())); } Poll::Pending }