Skip to content

Commit

Permalink
Optimize timers
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina committed Apr 25, 2020
1 parent 2cd7f41 commit 51a1bf2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 52 deletions.
6 changes: 4 additions & 2 deletions src/async_io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Async I/O.
//! Abstraction over [epoll]/[kqueue]/[wepoll].
//!
//! TODO
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [wepoll]: https://github.com/piscisaureus/wepoll
use std::future::Future;
use std::io::{self, Read, Write};
Expand Down
138 changes: 91 additions & 47 deletions src/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The reactor, async I/O, and timers.
//! The reactor notifying [`Async`][`crate::Async`] and [`Timer`][`crate::Timer`].
//!
//! TODO
//! There is a single global reactor that contains all registered I/O handles and timers. The
//! reactor is polled by the executor, i.e. the [`run()`][`crate::run()`] function.
#[cfg(not(any(
target_os = "linux", // epoll
Expand Down Expand Up @@ -29,6 +30,7 @@ use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

use crossbeam::queue::ArrayQueue;
#[cfg(unix)]
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use once_cell::sync::Lazy;
Expand All @@ -39,12 +41,12 @@ use socket2::Socket;
use crate::io_event::IoEvent;
use crate::throttle;

/// The reactor driving I/O events and timers.
/// The reactor.
///
/// Every async I/O handle ("source") and every timer is registered here. Invocations of `run()`
/// poll the reactor to check for new events every now and then.
/// Every async I/O handle and every timer is registered here. Invocations of [`run()`] poll the
/// reactor to check for new events every now and then.
///
/// There is only one global instance of this type, accessible by `Reactor::get()`.
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
/// Raw bindings to epoll/kqueue/wepoll.
sys: sys::Reactor,
Expand All @@ -62,6 +64,12 @@ pub(crate) struct Reactor {
/// timer.
timers: piper::Mutex<BTreeMap<(Instant, usize), Waker>>,

/// A queue of timer operations (insert and remove).
///
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ArrayQueue<TimerOp>,

/// An I/O event that is triggered when a new earliest timer is registered.
///
/// This is used to wake up the thread waiting on the reactor, which would otherwise wait until
Expand All @@ -80,6 +88,7 @@ impl Reactor {
sources: piper::Mutex::new(Slab::new()),
events: piper::Lock::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
timer_ops: ArrayQueue::new(1000),
event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
});
&REACTOR
Expand Down Expand Up @@ -129,27 +138,30 @@ impl Reactor {
/// Registers a timer in the reactor.
///
/// Returns the inserted timer's ID.
pub fn insert_timer(&self, when: Instant, waker: Waker) -> usize {
let mut timers = self.timers.lock();

pub fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
// Generate a new timer ID.
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);

// Insert this timer into the timers map.
timers.insert((when, id), waker);

// If this timer is now the earliest one, interrupt the reactor.
if timers.keys().next().map(|(when, _)| *when) == Some(when) {
self.event.notify();
// Push an insert operation.
while self.timer_ops.push(TimerOp::Insert(when, id, waker.clone())).is_err() {
// Fire timers to drain the queue.
self.fire_timers();
}

// Interrupt the reactor.
self.event.notify();

id
}

/// Deregisters a timer from the reactor.
pub fn remove_timer(&self, when: Instant, id: usize) {
self.timers.lock().remove(&(when, id));
// Push a remove operation.
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
// Fire timers to drain the queue.
self.fire_timers();
}
}

/// Attempts to lock the reactor.
Expand All @@ -166,9 +178,55 @@ impl Reactor {
let events = self.events.lock().await;
ReactorLock { reactor, events }
}

/// Fires ready timers.
///
/// Returns the duration until the next timer before this method was called.
fn fire_timers(&self) -> Option<Duration> {
let mut timers = self.timers.lock();

// Process timer operations, but no more than the queue capacity because otherwise we could
// keep popping operations forever.
for _ in 0..self.timer_ops.capacity() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}

let now = Instant::now();

This comment has been minimized.

Copy link
@Byron

Byron Apr 28, 2020

Contributor

Would it be OK to bail out early in case there are no timers, i.e. if timers.is_empty() is true?

As I am very sure there is a very good reason not to bail out, and given the excellent in-code documentation to help people understand and learn, I would certainly love to see (or contribute) a comment to make clear why early bailout isn't possible.

This comment has been minimized.

Copy link
@ghost

ghost Apr 28, 2020

Hmm I think it'd be okay to bail out here if timers is empty :)

If the map is indeed empty, then the code below does basically nothing - the map is unchanged, no wakers are woken, and we return None.

I doubt that'd be a significant optimization, though. Note that no allocations should happen - the BTreeMap we split off and the Vec of wakers are empty, so they shouldn't allocate.

This comment has been minimized.

Copy link
@Byron

Byron Apr 29, 2020

Contributor

I absolutely agree that just by looking at code there is no way to know if bailing out early would make a difference, and rather keep it simple than optimizing prematurely.

What usually 'trips me off' is the usage of Instant::now() because of the code behind it:

Screenshot 2020-04-29 at 08 48 49

It looks like many major platforms will actually run into a Mutex, which seems like something to avoid at least according to my gut-feeling 😅.

In any case, Smol and aync-task and piper really, really make a huge difference in demystifying how things work and this is exactly what makes me want to try and migrate to Smol this weekend actually. And from reading the code thus far, I am nothing but encouraged that this is the right decision.

Love your work!

This comment has been minimized.

Copy link
@ghost

ghost Apr 29, 2020

I see - and after thinking a little bit about this, I'm pretty sure we could use a fuzzier version of Instant that is allowed to sometimes go backwards in time a little bit. Maybe we could replace it with a more efficient alternative!

Thanks for the kind works! <3 Let me know if I can help in any way. I appreciate this kind of feedback a lot!


// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);

// Calculate the duration until the next event.
let dur = if ready.is_empty() {
// Duration until the next timer.
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
// Timers are about to fire right now.
Some(Duration::from_secs(0))
};

// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}

dur
}
}

/// Polls the reactor for I/O events and wakes up tasks.
/// A lock on the reactor.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: piper::LockGuard<sys::Events>,
Expand All @@ -187,32 +245,12 @@ impl ReactorLock<'_> {

/// Processes new events, optionally blocking until the first event.
fn react(&mut self, block: bool) -> io::Result<()> {
let timeout = {
let mut timers = self.reactor.timers.lock();
let now = Instant::now();

// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);

// Calculate the timeout for waiting on I/O events.
let timeout = if ready.is_empty() && block {
// Block until the next timer.
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
// Don't block.
Some(Duration::from_secs(0))
};

// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}

timeout
// Fire timers and compute the timeout for blocking on I/O events.
let next_timer = self.reactor.fire_timers();
let timeout = if block {
next_timer
} else {
Some(Duration::from_secs(0))
};

// Block on I/O events.
Expand Down Expand Up @@ -246,12 +284,18 @@ impl ReactorLock<'_> {
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),

// A real error occureed.
// An actual error occureed.
Err(err) => Err(err),
}
}
}

/// A single timer operation.
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}

/// A registered source of I/O events.
#[derive(Debug)]
pub(crate) struct Source {
Expand Down Expand Up @@ -319,7 +363,7 @@ fn io_err(err: nix::Error) -> io::Error {
}
}

/// Bindings to epoll (Linux, Android, illumos).
/// Raw bindings to epoll (Linux, Android, illumos).
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
mod sys {
use std::convert::TryInto;
Expand Down Expand Up @@ -377,7 +421,7 @@ mod sys {
}
}

/// Bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
/// Raw bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
#[cfg(any(
target_os = "macos",
target_os = "ios",
Expand Down Expand Up @@ -473,7 +517,7 @@ mod sys {
}
}

/// Bindings to wepoll (Windows).
/// Raw bindings to wepoll (Windows).
#[cfg(target_os = "windows")]
mod sys {
use std::io;
Expand Down
5 changes: 2 additions & 3 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::reactor::Reactor;
/// ) -> io::Result<T> {
/// futures::select! {
/// t = f.fuse() => t,
/// _ = Timer::after(dur).fuse() => Err(io::Error::from(io::ErrorKind::TimedOut)),
/// _ = Timer::after(dur).fuse() => Err(io::ErrorKind::TimedOut.into()),
/// }
/// }
///
Expand Down Expand Up @@ -126,8 +126,7 @@ impl Future for Timer {
} else {
if self.id.is_none() {
// Register the timer in the reactor.
let waker = cx.waker().clone();
self.id = Some(Reactor::get().insert_timer(self.when, waker));
self.id = Some(Reactor::get().insert_timer(self.when, cx.waker()));
}
Poll::Pending
}
Expand Down

0 comments on commit 51a1bf2

Please sign in to comment.