Skip to content

Commit

Permalink
refactor(PollEvent): simplify flag propagation
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Kröning <[email protected]>
  • Loading branch information
mkroening authored and stlankes committed Feb 16, 2024
1 parent 0da021f commit 4f361ce
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 161 deletions.
25 changes: 6 additions & 19 deletions src/fd/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,35 +142,22 @@ impl ObjectInterface for EventFd {
}

async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();
let guard = self.state.lock().await;

let mut available = PollEvent::empty();

if guard.counter < u64::MAX - 1 {
if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}
available.insert(PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND);
}

if guard.counter > 0 {
if event.contains(PollEvent::POLLIN) {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
ret.insert(PollEvent::POLLRDBAND);
}
available.insert(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND);
}

drop(guard);

let ret = event & available;

future::poll_fn(|cx| {
if ret.is_empty() {
let mut pinned = core::pin::pin!(self.state.lock());
Expand Down
53 changes: 15 additions & 38 deletions src/fd/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,15 @@ impl Socket {
#[async_trait]
impl ObjectInterface for Socket {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

future::poll_fn(|cx| {
self.with(|socket| match socket.state() {
tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => {
if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}
let available = PollEvent::POLLOUT
| PollEvent::POLLWRNORM | PollEvent::POLLWRBAND
| PollEvent::POLLIN | PollEvent::POLLRDNORM
| PollEvent::POLLRDBAND;

if event.contains(PollEvent::POLLIN) {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
ret.insert(PollEvent::POLLRDBAND);
}
let ret = event & available;

if ret.is_empty() {
Poll::Ready(Ok(PollEvent::POLLHUP))
Expand All @@ -208,33 +193,25 @@ impl ObjectInterface for Socket {
Poll::Pending
}
_ => {
let mut available = PollEvent::empty();

if socket.can_recv()
|| socket.may_recv() && self.listen.swap(false, Ordering::Relaxed)
{
// In case, we just establish a fresh connection in non-blocking mode, we try to read data.
if event.contains(PollEvent::POLLIN) {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
ret.insert(PollEvent::POLLRDBAND);
}
available.insert(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND,
);
}

if socket.can_send() {
if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}
available.insert(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
);
}

let ret = event & available;

if ret.is_empty() {
socket.register_recv_waker(cx.waker());
socket.register_send_waker(cx.waker());
Expand Down
36 changes: 13 additions & 23 deletions src/fd/socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,37 +119,27 @@ impl Socket {
#[async_trait]
impl ObjectInterface for Socket {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

future::poll_fn(|cx| {
self.with(|socket| {
if socket.is_open() {
let ret = if socket.is_open() {
let mut avail = PollEvent::empty();

if socket.can_send() {
if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}
avail.insert(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
);
}

if socket.can_recv() {
if event.contains(PollEvent::POLLIN) {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
ret.insert(PollEvent::POLLRDBAND);
}
avail.insert(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND,
);
}

event & avail
} else {
ret.insert(PollEvent::POLLNVAL);
}
PollEvent::POLLNVAL
};

if ret.is_empty() {
socket.register_recv_waker(cx.waker());
Expand Down
60 changes: 8 additions & 52 deletions src/fd/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,8 @@ pub struct GenericStdout;
#[async_trait]
impl ObjectInterface for GenericStdout {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}

Ok(ret)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand All @@ -119,19 +108,8 @@ pub struct GenericStderr;
#[async_trait]
impl ObjectInterface for GenericStderr {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}

Ok(ret)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand Down Expand Up @@ -165,19 +143,8 @@ pub struct UhyveStdout;
#[async_trait]
impl ObjectInterface for UhyveStdout {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}

Ok(ret)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand All @@ -200,19 +167,8 @@ pub struct UhyveStderr;
#[async_trait]
impl ObjectInterface for UhyveStderr {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();

if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
}

Ok(ret)
let available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;
Ok(event & available)
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
Expand Down
39 changes: 10 additions & 29 deletions src/fs/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,14 @@ struct RomFileInterface {
#[async_trait]
impl ObjectInterface for RomFileInterface {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();
let len = self.inner.read().await.data.len();
let pos = *self.pos.lock().await;

if event.contains(PollEvent::POLLIN) && pos < len {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) && pos < len {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) && pos < len {
ret.insert(PollEvent::POLLRDBAND);
}
let ret = if pos < len {
event.intersection(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND)
} else {
PollEvent::empty()
};

Ok(ret)
}
Expand Down Expand Up @@ -135,30 +130,16 @@ pub struct RamFileInterface {
#[async_trait]
impl ObjectInterface for RamFileInterface {
async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut ret = PollEvent::empty();
let len = self.inner.read().await.data.len();
let pos = *self.pos.lock().await;

if event.contains(PollEvent::POLLIN) && pos < len {
ret.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) && pos < len {
ret.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) && pos < len {
ret.insert(PollEvent::POLLRDBAND);
}
if event.contains(PollEvent::POLLOUT) {
ret.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
ret.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
ret.insert(PollEvent::POLLWRBAND);
let mut available = PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND;

if pos < len {
available.insert(PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND);
}

Ok(ret)
Ok(event & available)
}

async fn async_read(&self, buf: &mut [u8]) -> Result<usize, IoError> {
Expand Down

0 comments on commit 4f361ce

Please sign in to comment.