Skip to content

Latest commit

 

History

History
334 lines (291 loc) · 9.65 KB

File metadata and controls

334 lines (291 loc) · 9.65 KB

Kqueue

Since both Epoll and Kqueue is re implementing the same basic logic as we did for IOCP I'll let the code speak for itself. I have commented the code pretty extensively to clarify some interesting differences, though.

use crate::{Events, Interests, Token};
use std::io::{self, IoSliceMut, Read, Write};
use std::net;
use std::os::unix::io::{AsRawFd, RawFd};
use std::ptr;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

pub type Source = std::os::unix::io::RawFd;

pub struct Registrator {
    kq: Source,
    is_poll_dead: Arc<AtomicBool>,
}

impl Registrator {
    pub fn register(
        &self,
        stream: &TcpStream,
        token: usize,
        interests: Interests,
    ) -> io::Result<()> {
        if self.is_poll_dead.load(Ordering::SeqCst) {
            return Err(io::Error::new(
                io::ErrorKind::Interrupted,
                "Poll instance closed.",
            ));
        }

        let fd = stream.as_raw_fd();
        if interests.is_readable() {
            // We register the id (or most oftenly referred to as a Token) to the `udata` field
            // if the `Kevent`
            let event = ffi::Event::new_read_event(fd, token as u64);
            let event = [event];
            kevent(self.kq, &event, &mut [], 0, None)?;
        };

        if interests.is_writable() {
            unimplemented!();
        }

        Ok(())
    }

    pub fn close_loop(&self) -> io::Result<()> {
        // We set already here that the Poll instance is dead since this will be the last
        // event it will handle
        if self
            .is_poll_dead
            .compare_and_swap(false, true, Ordering::SeqCst)
        {
            return Err(io::Error::new(
                io::ErrorKind::Interrupted,
                "Poll instance closed.",
            ));
        }
        let event = ffi::Event::new_wakeup_event();
        let event = [event];
        kevent(self.kq, &event, &mut [], 0, None)?;

        Ok(())
    }
}

#[derive(Debug)]
pub struct Selector {
    kq: Source,
}

impl Selector {
    pub fn new() -> io::Result<Self> {
        Ok(Selector { kq: kqueue()? })
    }

    /// This function blocks and waits until an event has been received. It never times out.
    pub fn select(&self, events: &mut Events, timeout_ms: Option<i32>) -> io::Result<()> {
        // TODO: get n_events from self
        let n_events = events.capacity() as i32;
        events.clear();
        kevent(self.kq, &[], events, n_events, timeout_ms).map(|n_events| {
            // This is safe because `syscall_kevent` ensures that `n_events` are
            // assigned. We could check for a valid token for each event to verify so this is
            // just a performance optimization used in `mio` and copied here.
            unsafe { events.set_len(n_events as usize) };
        })
    }

    pub fn registrator(&self, is_poll_dead: Arc<AtomicBool>) -> Registrator {
        Registrator {
            kq: self.kq,
            is_poll_dead,
        }
    }
}

impl Drop for Selector {
    fn drop(&mut self) {
        match close(self.kq) {
            Ok(..) => (),
            Err(e) => {
                if !std::thread::panicking() {
                    panic!(e);
                }
            }
        }
    }
}

pub type Event = ffi::Kevent;
impl Event {
    pub fn id(&self) -> Token {
        self.udata as usize
    }
}

pub struct TcpStream {
    inner: net::TcpStream,
}

impl TcpStream {
    pub fn connect(adr: impl net::ToSocketAddrs) -> io::Result<Self> {
        // actually we should set this to non-blocking before we call connect which is not something
        // we get from the stdlib but could do with a syscall. Let's skip that step in this example.
        // In other words this will block shortly establishing a connection to the remote server
        let stream = net::TcpStream::connect(adr)?;
        stream.set_nonblocking(true)?;

        Ok(TcpStream { inner: stream })
    }
}

impl Read for TcpStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // If we let the socket operate non-blocking we could get an error of kind `WouldBlock`,
        // that means there is more data to read but we would block if we waited for it to arrive.
        // The right thing to do is to re-register the event, getting notified once more
        // data is available. We'll not do that in our implementation since we're making an example
        // and instead we make the socket blocking again while we read from it
        self.inner.set_nonblocking(false)?;
        (&self.inner).read(buf)
    }

    /// Copies data to fill each buffer in order, with the final buffer possibly only beeing
    /// partially filled. Now as we'll see this is like it's made for our use case when abstracting
    /// over IOCP AND epoll/kqueue (since we need to buffer anyways).
    ///
    /// IoSliceMut is like `&mut [u8]` but it's guaranteed to be ABI compatible with the `iovec`
    /// type on unix platforms and `WSABUF` on Windows. Perfect for us.
    fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
        (&self.inner).read_vectored(bufs)
    }
}

impl Write for TcpStream {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.inner.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.inner.flush()
    }
}

impl AsRawFd for TcpStream {
    fn as_raw_fd(&self) -> RawFd {
        self.inner.as_raw_fd()
    }
}

mod ffi {
    use super::*;

    pub const EVFILT_READ: i16 = -1;
    pub const EVFILT_TIMER: i16 = -7;
    pub const EV_ADD: u16 = 0x1;
    pub const EV_ENABLE: u16 = 0x4;
    pub const EV_ONESHOT: u16 = 0x10;
    pub const EV_CLEAR: u16 = 0x20;

    // To be able to pass in a timeout to `Kqueue`we need to use 
    // a timespec struct to pass in the information
    #[derive(Debug)]
    #[repr(C)]
    pub(super) struct Timespec {
        /// Seconds
        tv_sec: isize,
        /// Nanoseconds     
        v_nsec: usize,
    }

    impl Timespec {
         /// Convenience function so that we can easily create a `timespec` struct
         /// from milliseconds. We won't support granularity smaller than ms 
         /// in our library even though we could on macos.
         pub fn from_millis(milliseconds: i32) -> Self {
            let seconds = milliseconds / 1000;
            let nanoseconds = (milliseconds % 1000) * 1000 * 1000;
            Timespec {
                tv_sec: seconds as isize,
                v_nsec: nanoseconds as usize,
            }
        }
    }

    pub type Event = Kevent;
    impl Event {
        pub fn new_read_event(fd: RawFd, id: u64) -> Self {
            Event {
                ident: fd as u64,
                filter: EVFILT_READ,
                flags: EV_ADD | EV_ENABLE | EV_ONESHOT,
                fflags: 0,
                data: 0,
                udata: id,
            }
        }

        pub fn new_wakeup_event() -> Self {
            Event {
                ident: 0,
                filter: EVFILT_TIMER,
                flags: EV_ADD | EV_ENABLE | EV_CLEAR,
                fflags: 0,
                // data is where our timeout will be set but we want to timeout immideately
                data: 0,
                udata: 0, // TODO: see if windows needs u32...
            }
        }

        pub fn zero() -> Self {
            Event {
                ident: 0,
                filter: 0,
                flags: 0,
                fflags: 0,
                data: 0,
                udata: 0,
            }
        }
    }

    // https://github.com/rust-lang/libc/blob/c8aa8ec72d631bc35099bcf5d634cf0a0b841be0/src/unix/bsd/apple/mod.rs#L497
    // https://github.com/rust-lang/libc/blob/c8aa8ec72d631bc35099bcf5d634cf0a0b841be0/src/unix/bsd/apple/mod.rs#L207
    #[derive(Debug, Clone, Default)]
    #[repr(C)]
    pub struct Kevent {
        pub ident: u64,
        pub filter: i16,
        pub flags: u16,
        pub fflags: u32,
        pub data: i64,
        pub udata: u64,
    }

    impl Kevent {
        pub fn token(&self) -> Option<Token> {
            // we have no realiable way of checking if this value is initialized or not but need
            // an option to be compatible with windows.
            Some(self.udata as usize)
        }
    }

    #[link(name = "c")]
    extern "C" {
        /// Returns: positive: file descriptor, negative: error
        pub(super) fn kqueue() -> i32;
        /// Returns: nothing, all non zero return values is an error
        /// If the time limit expires, then kevent() returns 0
        pub(super) fn kevent(
            kq: i32,
            changelist: *const Kevent,
            nchanges: i32,
            eventlist: *mut Kevent,
            nevents: i32,
            timeout: *const Timespec,
        ) -> i32;

        pub fn close(d: i32) -> i32;
    }
}

pub fn kqueue() -> io::Result<i32> {
    let fd = unsafe { ffi::kqueue() };
    if fd < 0 {
        return Err(io::Error::last_os_error());
    }
    Ok(fd)
}

pub fn kevent(
    kq: RawFd,
    cl: &[ffi::Kevent],
    el: &mut [ffi::Kevent],
    n_events: i32,
    timeout_ms: Option<i32>,
) -> io::Result<usize> {
    let res = unsafe {
        let kq = kq as i32;
        let cl_len = cl.len() as i32;

        let timeout = timeout_ms.map(ffi::Timespec::from_millis);

        let timeout: *const ffi::Timespec = match &timeout {
            Some(n) => n,
            None => ptr::null(),
        };

        ffi::kevent(kq, cl.as_ptr(), cl_len, el.as_mut_ptr(), n_events, timeout)
    };
    if res < 0 {
        return Err(io::Error::last_os_error());
    }

    Ok(res as usize)
}

pub fn close(fd: RawFd) -> io::Result<()> {
    let res = unsafe { ffi::close(fd) };
    if res < 0 {
        Err(io::Error::last_os_error())
    } else {
        Ok(())
    }
}