Skip to content

Commit

Permalink
Merge pull request #1067 from mkroening/ready!
Browse files Browse the repository at this point in the history
refactor(eventfd): use `ready!` to extract `Poll::Ready`
  • Loading branch information
stlankes authored Feb 18, 2024
2 parents aa01ba5 + cc45bd0 commit d0439ea
Showing 1 changed file with 45 additions and 57 deletions.
102 changes: 45 additions & 57 deletions src/fd/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloc::boxed::Box;
use alloc::collections::vec_deque::VecDeque;
use core::future::{self, Future};
use core::mem;
use core::task::{Poll, Waker};
use core::task::{ready, Poll, Waker};

use async_lock::Mutex;
use async_trait::async_trait;
Expand Down Expand Up @@ -64,38 +64,32 @@ impl ObjectInterface for EventFd {
future::poll_fn(|cx| {
if self.flags.contains(EventFlags::EFD_SEMAPHORE) {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if guard.counter > 0 {
guard.counter -= 1;
let tmp = u64::to_ne_bytes(1);
buf[..len].copy_from_slice(&tmp);
if let Some(cx) = guard.write_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
let mut guard = ready!(pinned.as_mut().poll(cx));
if guard.counter > 0 {
guard.counter -= 1;
let tmp = u64::to_ne_bytes(1);
buf[..len].copy_from_slice(&tmp);
if let Some(cx) = guard.write_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
}
} else {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
let tmp = guard.counter;
if tmp > 0 {
guard.counter = 0;
buf[..len].copy_from_slice(&u64::to_ne_bytes(tmp));
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
let mut guard = ready!(pinned.as_mut().poll(cx));
let tmp = guard.counter;
if tmp > 0 {
guard.counter = 0;
buf[..len].copy_from_slice(&u64::to_ne_bytes(tmp));
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
}
}
Expand All @@ -114,27 +108,24 @@ impl ObjectInterface for EventFd {

future::poll_fn(|cx| {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if u64::MAX - guard.counter > c {
guard.counter += c;
if self.flags.contains(EventFlags::EFD_SEMAPHORE) {
for _i in 0..c {
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
} else {
break;
}
let mut guard = ready!(pinned.as_mut().poll(cx));
if u64::MAX - guard.counter > c {
guard.counter += c;
if self.flags.contains(EventFlags::EFD_SEMAPHORE) {
for _i in 0..c {
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
} else {
break;
}
} else if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}

Poll::Ready(Ok(len))
} else {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
} else if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}

Poll::Ready(Ok(len))
} else {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
}
})
Expand All @@ -161,22 +152,19 @@ impl ObjectInterface for EventFd {
future::poll_fn(|cx| {
if ret.is_empty() {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if event.intersects(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDNORM,
) {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
} else if event.intersects(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
) {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(ret))
}
} else {
let mut guard = ready!(pinned.as_mut().poll(cx));
if event
.intersects(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDNORM)
{
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
} else if event
.intersects(PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND)
{
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(ret))
}
} else {
Poll::Ready(Ok(ret))
Expand Down

0 comments on commit d0439ea

Please sign in to comment.