Skip to content

Commit

Permalink
lib: Create a helper function for io read operations
Browse files Browse the repository at this point in the history
Currently when io is ready inside of the event system
the first FD received is always preferred as the ones
that are handled first.  This leads to results where
events associated with these first FD's are always handled
first.

In anticipation of a change to make this more fair
let's abstract the function handler.

Signed-off-by: Donald Sharp <[email protected]>
  • Loading branch information
donaldsharp committed Nov 2, 2023
1 parent ad44b54 commit f655340
Showing 1 changed file with 55 additions and 49 deletions.
104 changes: 55 additions & 49 deletions lib/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,60 @@ static int thread_process_io_helper(struct event_loop *m, struct event *thread,
return 1;
}

static inline void thread_process_io_inner_loop(struct event_loop *m,
unsigned int num,
struct pollfd *pfds, nfds_t *i,
uint32_t *ready)
{
/* no event for current fd? immediately continue */
if (pfds[*i].revents == 0)
return;

*ready = *ready + 1;

/*
* Unless someone has called event_cancel from another
* pthread, the only thing that could have changed in
* m->handler.pfds while we were asleep is the .events
* field in a given pollfd. Barring event_cancel() that
* value should be a superset of the values we have in our
* copy, so there's no need to update it. Similarily,
* barring deletion, the fd should still be a valid index
* into the master's pfds.
*
* We are including POLLERR here to do a READ event
* this is because the read should fail and the
* read function should handle it appropriately
*/
if (pfds[*i].revents & (POLLIN | POLLHUP | POLLERR)) {
thread_process_io_helper(m, m->read[pfds[*i].fd], POLLIN,
pfds[*i].revents, *i);
}
if (pfds[*i].revents & POLLOUT)
thread_process_io_helper(m, m->write[pfds[*i].fd], POLLOUT,
pfds[*i].revents, *i);

/*
* if one of our file descriptors is garbage, remove the same
* from both pfds + update sizes and index
*/
if (pfds[*i].revents & POLLNVAL) {
memmove(m->handler.pfds + *i, m->handler.pfds + *i + 1,
(m->handler.pfdcount - *i - 1) * sizeof(struct pollfd));
m->handler.pfdcount--;
m->handler.pfds[m->handler.pfdcount].fd = 0;
m->handler.pfds[m->handler.pfdcount].events = 0;

memmove(pfds + *i, pfds + *i + 1,
(m->handler.copycount - *i - 1) * sizeof(struct pollfd));
m->handler.copycount--;
m->handler.copy[m->handler.copycount].fd = 0;
m->handler.copy[m->handler.copycount].events = 0;

*i = *i - 1;
}
}

/**
* Process I/O events.
*
Expand All @@ -1609,55 +1663,7 @@ static void thread_process_io(struct event_loop *m, unsigned int num)
struct pollfd *pfds = m->handler.copy;

for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
/* no event for current fd? immediately continue */
if (pfds[i].revents == 0)
continue;

ready++;

/*
* Unless someone has called event_cancel from another
* pthread, the only thing that could have changed in
* m->handler.pfds while we were asleep is the .events
* field in a given pollfd. Barring event_cancel() that
* value should be a superset of the values we have in our
* copy, so there's no need to update it. Similarily,
* barring deletion, the fd should still be a valid index
* into the master's pfds.
*
* We are including POLLERR here to do a READ event
* this is because the read should fail and the
* read function should handle it appropriately
*/
if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
pfds[i].revents, i);
}
if (pfds[i].revents & POLLOUT)
thread_process_io_helper(m, m->write[pfds[i].fd],
POLLOUT, pfds[i].revents, i);

/*
* if one of our file descriptors is garbage, remove the same
* from both pfds + update sizes and index
*/
if (pfds[i].revents & POLLNVAL) {
memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
(m->handler.pfdcount - i - 1)
* sizeof(struct pollfd));
m->handler.pfdcount--;
m->handler.pfds[m->handler.pfdcount].fd = 0;
m->handler.pfds[m->handler.pfdcount].events = 0;

memmove(pfds + i, pfds + i + 1,
(m->handler.copycount - i - 1)
* sizeof(struct pollfd));
m->handler.copycount--;
m->handler.copy[m->handler.copycount].fd = 0;
m->handler.copy[m->handler.copycount].events = 0;

i--;
}
thread_process_io_inner_loop(m, num, pfds, &i, &ready);
}
}

Expand Down

0 comments on commit f655340

Please sign in to comment.