Skip to content

Commit

Permalink
Merge pull request #1064 from mkroening/simple-poll-event
Browse files Browse the repository at this point in the history
refactor(PollEvent): simplify flag propagation
  • Loading branch information
mkroening authored Feb 16, 2024
2 parents 294fabd + 6f6cf95 commit b8ef114
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 183 deletions.
31 changes: 9 additions & 22 deletions src/fd/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,37 +142,24 @@ impl ObjectInterface for EventFd {
}

async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();
let guard = self.state.lock().await;

let mut available = PollEvent::empty();

if guard.counter < u64::MAX - 1 {
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}
available.insert(PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND);
}

if guard.counter > 0 {
if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
available.insert(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND);
}

drop(guard);

let ret = event & available;

future::poll_fn(|cx| {
if result.is_empty() {
if ret.is_empty() {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if event.intersects(
Expand All @@ -186,13 +173,13 @@ impl ObjectInterface for EventFd {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(result))
Poll::Ready(Ok(ret))
}
} else {
Poll::Pending
}
} else {
Poll::Ready(Ok(result))
Poll::Ready(Ok(ret))
}
})
.await
Expand Down
75 changes: 21 additions & 54 deletions src/fd/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,20 @@ impl Socket {
#[async_trait]
impl ObjectInterface for Socket {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

future::poll_fn(|cx| {
self.with(|socket| match socket.state() {
tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => {
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}
let available = PollEvent::POLLOUT
| PollEvent::POLLWRNORM | PollEvent::POLLWRBAND
| PollEvent::POLLIN | PollEvent::POLLRDNORM
| PollEvent::POLLRDBAND;

if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
let ret = event & available;

if result.is_empty() {
if ret.is_empty() {
Poll::Ready(Ok(PollEvent::POLLHUP))
} else {
Poll::Ready(Ok(result))
Poll::Ready(Ok(ret))
}
}
tcp::State::FinWait1 | tcp::State::FinWait2 | tcp::State::TimeWait => {
Expand All @@ -208,49 +193,31 @@ impl ObjectInterface for Socket {
Poll::Pending
}
_ => {
if socket.may_recv() && self.listen.swap(false, Ordering::Relaxed) {
let mut available = PollEvent::empty();

if socket.can_recv()
|| socket.may_recv() && self.listen.swap(false, Ordering::Relaxed)
{
// In case, we just establish a fresh connection in non-blocking mode, we try to read data.
if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
available.insert(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND,
);
}

if socket.can_send() {
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}
available.insert(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
);
}

if socket.can_recv() {
if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
}
let ret = event & available;

if result.is_empty() {
if ret.is_empty() {
socket.register_recv_waker(cx.waker());
socket.register_send_waker(cx.waker());
Poll::Pending
} else {
Poll::Ready(Ok(result))
Poll::Ready(Ok(ret))
}
}
})
Expand Down
40 changes: 15 additions & 25 deletions src/fd/socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,44 +119,34 @@ impl Socket {
#[async_trait]
impl ObjectInterface for Socket {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

future::poll_fn(|cx| {
self.with(|socket| {
if socket.is_open() {
let ret = if socket.is_open() {
let mut avail = PollEvent::empty();

if socket.can_send() {
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}
avail.insert(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
);
}

if socket.can_recv() {
if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
avail.insert(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND,
);
}

event & avail
} else {
result.insert(PollEvent::POLLNVAL);
}
PollEvent::POLLNVAL
};

if result.is_empty() {
if ret.is_empty() {
socket.register_recv_waker(cx.waker());
socket.register_send_waker(cx.waker());
Poll::Pending
} else {
Poll::Ready(Ok(result))
Poll::Ready(Ok(ret))
}
})
})
Expand Down
60 changes: 8 additions & 52 deletions src/fd/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,8 @@ pub struct GenericStdout;
#[async_trait]
impl ObjectInterface for GenericStdout {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}

Ok(result)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand All @@ -119,19 +108,8 @@ pub struct GenericStderr;
#[async_trait]
impl ObjectInterface for GenericStderr {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}

Ok(result)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand Down Expand Up @@ -165,19 +143,8 @@ pub struct UhyveStdout;
#[async_trait]
impl ObjectInterface for UhyveStdout {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}

Ok(result)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand All @@ -200,19 +167,8 @@ pub struct UhyveStderr;
#[async_trait]
impl ObjectInterface for UhyveStderr {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}

Ok(result)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand Down
41 changes: 11 additions & 30 deletions src/fs/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,16 @@ struct RomFileInterface {
#[async_trait]
impl ObjectInterface for RomFileInterface {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();
let len = self.inner.read().await.data.len();
let pos = *self.pos.lock().await;

if event.contains(PollEvent::POLLIN) && pos < len {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) && pos < len {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) && pos < len {
result.insert(PollEvent::POLLRDBAND);
}
let ret = if pos < len {
event.intersection(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND)
} else {
PollEvent::empty()
};

Ok(result)
Ok(ret)
}

async fn async_read(&self, buf: &mut [u8]) -> Result<usize, IoError> {
Expand Down Expand Up @@ -135,30 +130,16 @@ pub struct RamFileInterface {
#[async_trait]
impl ObjectInterface for RamFileInterface {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();
let len = self.inner.read().await.data.len();
let pos = *self.pos.lock().await;

if event.contains(PollEvent::POLLIN) && pos < len {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) && pos < len {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) && pos < len {
result.insert(PollEvent::POLLRDBAND);
}
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
let mut available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;

if pos < len {
available.insert(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND);
}

Ok(result)
Ok(event & available)
}

async fn async_read(&self, buf: &mut [u8]) -> Result<usize, IoError> {
Expand Down

0 comments on commit b8ef114

Please sign in to comment.