From 0849589c86128907f74c7f722fcdcdfa2e0e2fe4 Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Mon, 30 Oct 2023 03:30:05 -0400 Subject: [PATCH] lib: Substitute poll APIs with epoll APIs in thread management routine This commit substitutes poll APIs with epoll APIs for better performace. Note that epoll APIs are only available for Linux. For BSD platforms which do not support epoll APIs, poll APIs are still used. Signed-off-by: Kaifei Peng <2534164734@qq.com> --- configure.ac | 9 + lib/event.c | 1046 +++++++++++++++++++++++++++++++++++++++++++++++- lib/frrevent.h | 101 +++++ 3 files changed, 1153 insertions(+), 3 deletions(-) diff --git a/configure.ac b/configure.ac index 9985cdcbf8a0..32b9e7c60738 100644 --- a/configure.ac +++ b/configure.ac @@ -1494,6 +1494,15 @@ AC_SUBST([LIBM]) AC_CHECK_FUNCS([ppoll], [ AC_DEFINE([HAVE_PPOLL], [1], [have Linux/BSD ppoll()]) ]) +AC_CHECK_FUNCS([epoll_wait], [ + AC_DEFINE([HAVE_EPOLL_WAIT], [1], [have Linux epoll_wait()]) +]) +AC_CHECK_FUNCS([epoll_pwait], [ + AC_DEFINE([HAVE_EPOLL_PWAIT], [1], [have Linux epoll_pwait()]) +]) +AC_CHECK_FUNCS([epoll_pwait2], [ + AC_DEFINE([HAVE_EPOLL_PWAIT2], [1], [have Linux epoll_pwait2()]) +]) AC_CHECK_FUNCS([pollts], [ AC_DEFINE([HAVE_POLLTS], [1], [have NetBSD pollts()]) ]) diff --git a/lib/event.c b/lib/event.c index d3bc0205ae98..23ca59424b5e 100644 --- a/lib/event.c +++ b/lib/event.c @@ -1,12 +1,15 @@ // SPDX-License-Identifier: GPL-2.0-or-later /* Thread management routine * Copyright (C) 1998, 2000 Kunihiro Ishiguro + * Portions: + * Copyright (C) 2023 Kaifei Peng */ /* #define DEBUG */ #include #include +#include #include "frrevent.h" #include "memory.h" @@ -60,11 +63,23 @@ DECLARE_HEAP(event_timer_list, struct event, timeritem, event_timer_cmp); #include #endif +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +/* This macro is supposed to be called with m->mtx held */ #define AWAKEN(m) \ do { \ const unsigned char wakebyte = 0x01; \ + m->awakened = true; \ write(m->io_pipe[1], &wakebyte, 1); \ } while (0) +#else +#define AWAKEN(m) \ + do { \ + const unsigned char wakebyte = 0x01; \ + write(m->io_pipe[1], &wakebyte, 1); \ + } while (0) +#endif /* control variable for initializer */ static pthread_once_t init_once = PTHREAD_ONCE_INIT; @@ -373,6 +388,51 @@ DEFPY (service_walltime_warning, return CMD_SUCCESS; } +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +static void show_thread_poll_helper(struct vty *vty, struct event_loop *m) +{ + const char *name = m->name ? m->name : "main"; + char underline[strlen(name) + 1]; + struct event *thread; + struct epoll_event *ev; + uint32_t i; + + memset(underline, '-', sizeof(underline)); + underline[sizeof(underline) - 1] = '\0'; + + vty_out(vty, "\nShowing epoll FD's for %s\n", name); + vty_out(vty, "----------------------%s\n", underline); + vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.event_count, + m->fd_limit); + for (i = 0; i < m->handler.event_count; i++) { + ev = &(m->handler.event_ptrs[i]->ev); + vty_out(vty, "\t%6d fd:%6d events:%2d\t\t", i, ev->data.fd, + ev->events); + + if (ev->events & EPOLLIN) { + thread = m->read[ev->data.fd]; + + if (!thread) + vty_out(vty, "ERROR "); + else + vty_out(vty, "%s ", thread->xref->funcname); + } else + vty_out(vty, " "); + + if (ev->events & EPOLLOUT) { + thread = m->write[ev->data.fd]; + + if (!thread) + vty_out(vty, "ERROR\n"); + else + vty_out(vty, "%s\n", thread->xref->funcname); + } else + vty_out(vty, "\n"); + } +} +#else static void show_thread_poll_helper(struct vty *vty, struct event_loop *m) { const char *name = m->name ? m->name : "main"; @@ -413,6 +473,7 @@ static void show_thread_poll_helper(struct vty *vty, struct event_loop *m) vty_out(vty, "\n"); } } +#endif DEFUN_NOSH (show_thread_poll, show_thread_poll_cmd, @@ -523,6 +584,12 @@ struct event_loop *event_master_create(const char *name) { struct event_loop *rv; struct rlimit limit; +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + struct epoll_event pipe_read_ev; + nfds_t i; +#endif pthread_once(&init_once, &initializer); @@ -553,7 +620,7 @@ struct event_loop *event_master_create(const char *name) char tmhashname[strlen(name) + 32]; - snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash", + snprintf(tmhashname, sizeof(tmhashname), "%s - event_loop event hash", name); cpu_records_init(rv->cpu_records); @@ -577,6 +644,50 @@ struct event_loop *event_master_create(const char *name) set_nonblocking(rv->io_pipe[0]); set_nonblocking(rv->io_pipe[1]); +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + rv->awakened = false; + + /* Initialize data structures for epoll() */ + rv->handler.event_size = rv->fd_limit; + rv->handler.event_count = 0; + rv->handler.new_event_count = 0; + rv->handler.regular_revent_count = 0; + rv->handler.epoll_fd = epoll_create1(0); + rv->handler.events = + XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct frr_epoll_event) * + rv->handler.event_size); + rv->handler.event_ptrs = + XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct frr_epoll_event *) * + rv->handler.event_size); + rv->handler.new_events = + XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct frr_epoll_event) * + rv->handler.event_size); + rv->handler.new_event_ptrs = + XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct frr_epoll_event *) * + rv->handler.event_size); + rv->handler.revents = + XCALLOC(MTYPE_EVENT_MASTER, + sizeof(struct epoll_event) * rv->handler.event_size); + rv->handler.regular_revents = + XCALLOC(MTYPE_EVENT_MASTER, + sizeof(struct epoll_event) * rv->handler.event_size); + for (i = 0; i < rv->handler.event_size; i++) { + rv->handler.events[i].index = -1; + rv->handler.new_events[i].index = -1; + } + pipe_read_ev.data.fd = rv->io_pipe[0]; + pipe_read_ev.events = EPOLLIN; + if (-1 == epoll_ctl(rv->handler.epoll_fd, EPOLL_CTL_ADD, rv->io_pipe[0], + &pipe_read_ev)) { + flog_err( + EC_LIB_NO_THREAD, + "Attempting to call epoll_ctl to add io_pipe[0] but failed, fd: %d!", + rv->io_pipe[0]); + return NULL; + } +#else /* Initialize data structures for poll() */ rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; @@ -584,6 +695,7 @@ struct event_loop *event_master_create(const char *name) sizeof(struct pollfd) * rv->handler.pfdsize); rv->handler.copy = XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct pollfd) * rv->handler.pfdsize); +#endif /* add to list of threadmasters */ frr_with_mutex (&masters_mtx) { @@ -693,6 +805,12 @@ void event_master_free(struct event_loop *m) pthread_cond_destroy(&m->cancel_cond); close(m->io_pipe[0]); close(m->io_pipe[1]); +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_DEL, m->io_pipe[0], NULL); + close(m->handler.epoll_fd); +#endif list_delete(&m->cancel_req); m->cancel_req = NULL; @@ -701,8 +819,19 @@ void event_master_free(struct event_loop *m) cpu_records_fini(m->cpu_records); XFREE(MTYPE_EVENT_MASTER, m->name); +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + XFREE(MTYPE_EVENT_MASTER, m->handler.events); + XFREE(MTYPE_EVENT_MASTER, m->handler.event_ptrs); + XFREE(MTYPE_EVENT_MASTER, m->handler.new_events); + XFREE(MTYPE_EVENT_MASTER, m->handler.new_event_ptrs); + XFREE(MTYPE_EVENT_MASTER, m->handler.revents); + XFREE(MTYPE_EVENT_MASTER, m->handler.regular_revents); +#else XFREE(MTYPE_EVENT_MASTER, m->handler.pfds); XFREE(MTYPE_EVENT_MASTER, m->handler.copy); +#endif XFREE(MTYPE_EVENT_MASTER, m); } @@ -824,12 +953,181 @@ static void thread_free(struct event_loop *master, struct event *thread) XFREE(MTYPE_THREAD, thread); } +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +/* + * flush new event requests to epoll_fd set and m->handler.events + * + * @REQUIRE m->mtx + */ +static void update_events(struct event_loop *m) +{ + struct frr_epoll_event *new_frr_ev, *existing_frr_ev; + struct epoll_event ev, *new_ev, *existing_ev; + bool has_existing; + int update_fd; + struct stat fd_stat; + + /* Flush m->handler.new_events to epoll_fd set and m->handler.events */ + for (nfds_t i = 0; i < m->handler.new_event_count; i++) { + new_frr_ev = m->handler.new_event_ptrs[i]; + new_ev = &new_frr_ev->ev; + update_fd = new_ev->data.fd; + existing_frr_ev = m->handler.events + update_fd; + existing_ev = &existing_frr_ev->ev; + has_existing = (existing_frr_ev->index >= 0); + + if (fstat(update_fd, &fd_stat) == -1) { + zlog_debug("[!] In %s: fstat failed, update_fd: %d", + __func__, update_fd); + } + + /* + * Construct ev which is passed to epoll_ctl call + * Note that ev.events should be new events OR'd with + * existing events + */ + ev.data.fd = update_fd; + ev.events = has_existing ? new_ev->events | existing_ev->events + : new_ev->events; + + /* Update epoll_fd set, .events and .event_ptrs */ + if (!has_existing) { + if (S_ISREG(fd_stat.st_mode)) { + /* + * Don't add regular file into epoll set, + * directly add it into regular_revents. + */ + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .data.fd = update_fd; + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .events = ev.events; + m->handler.regular_revent_count++; + } else if (-1 == epoll_ctl(m->handler.epoll_fd, + EPOLL_CTL_ADD, update_fd, + &ev)) { + if (errno == EEXIST) { + /* We are never supposed to reach here + */ + zlog_debug( + "[!] Attempting to add an existing fd into epoll_fd set. Existing fd: %d, existing_ev: %d, req_ev: %d", + update_fd, existing_ev->events, + new_ev->events); + if (-1 == epoll_ctl(m->handler.epoll_fd, + EPOLL_CTL_MOD, + update_fd, &ev)) { + zlog_debug( + "[!] Both attemption to add into and modify an epoll_event in epoll_fd set both failed. Existing fd: %d, existing_ev: %d, req_ev: %d", + update_fd, + existing_ev->events, + new_ev->events); + } + } + } + existing_ev->data.fd = update_fd; + existing_ev->events = ev.events; + existing_frr_ev->index = m->handler.event_count++; + m->handler.event_ptrs[existing_frr_ev->index] = + existing_frr_ev; + } else { + if (S_ISREG(fd_stat.st_mode)) { + /* + * Don't add regular file into epoll set, + * directly add it into regular_revents. + */ + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .data.fd = update_fd; + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .events = ev.events; + } else if (-1 == epoll_ctl( + m->handler.epoll_fd, + EPOLL_CTL_MOD, update_fd, + &ev) /* The fd is already in + * epoll_fd set, try + * modify it in epoll set + */ + && errno == 2) { + /* + * Errno == 2, indicating update_fd is a zombie + * fd (already closed fd) + * + * + * Why zombie fds occur? + * + * Fds are always added to epoll set by calling + * event_add_read_write. Intuitively, fds are + * supposed to be removed from epoll set + * explictly by calling event_cancel_*. + * + * However, we some fds are closed without + * calling event_cancel_*. In other words, + * these fds have been IMPLICITLY removed from + * epoll set. These implicitly removed fds left + * there entries in .event(_ptr)s and + * .new_event(_ptr)s. We can re-utilize these + * entries, but must update epoll set according + * to incoming new_event. + * + * Calling epoll_set to MOD a zombie fd, we get + * errno 2. Once an errno of 2 occurs, we + * encounter a zombie fd. We call epoll_ctl with + * EPOLL_CTL_ADD instead, to re-add the fd into + * epoll set. After this ADD, the fd points to + * new underlying file structure correctly + */ + if (-1 == epoll_ctl(m->handler.epoll_fd, + EPOLL_CTL_ADD, update_fd, + &ev)) { + zlog_debug( + "[!] Reutilizing a zombie event entry(fd: %d) failed! Existing_ev: %d, req_ev: %d", + update_fd, existing_ev->events, + new_ev->events); + } + } + existing_ev->data.fd = update_fd; + existing_ev->events = ev.events; + } + + /* Clear flushed *new_frr_ev and new_frr_ev */ + new_frr_ev->index = -1; + new_frr_ev->ev.data.fd = 0; + new_frr_ev->ev.events = 0; + } + + m->handler.new_event_count = 0; +} +#endif + static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, bool *eintr_p) { sigset_t origsigs; unsigned char trash[64]; +#if !(defined(USE_EPOLL)) || \ + !(defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) nfds_t count = m->handler.copycount; +#endif + +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + if (m->handler.event_count == 0) + return 0; +#endif /* * If timer_wait is null here, that means poll() should block @@ -862,12 +1160,27 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, rcu_read_unlock(); rcu_assert_read_unlocked(); +#if !(defined(USE_EPOLL)) || \ + !(defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) /* add poll pipe poker */ assert(count + 1 < m->handler.pfdsize); m->handler.copy[count].fd = m->io_pipe[0]; m->handler.copy[count].events = POLLIN; m->handler.copy[count].revents = 0x00; +#endif +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + /* We need to deal with a signal-handling race here: we + * don't want to miss a crucial signal, such as SIGTERM or SIGINT, + * that may arrive just before we enter epoll_wait(). We will block the + * key signals, then check whether any have arrived - if so, we return + * before calling epoll_wait(). If not, we'll re-enable the signals + * in the epoll_pwait() call. + */ +#else /* We need to deal with a signal-handling race here: we * don't want to miss a crucial signal, such as SIGTERM or SIGINT, * that may arrive just before we enter poll(). We will block the @@ -875,6 +1188,7 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, * before calling poll(). If not, we'll re-enable the signals * in the ppoll() call. */ +#endif sigemptyset(&origsigs); if (m->handle_signals) { @@ -891,7 +1205,29 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, pthread_sigmask(SIG_SETMASK, NULL, &origsigs); } -#if defined(HAVE_PPOLL) +#if defined(USE_EPOLL) && defined(HAVE_EPOLL_PWAIT2) + struct timespec ts, *tsp; + + if (timeout >= 0) { + ts.tv_sec = timeout / 1000; + ts.tv_nsec = (timeout % 1000) * 1000000; + tsp = &ts; + } else + tsp = NULL; + + num = epoll_pwait2(m->handler.epoll_fd, m->handler.revents, + m->handler.event_count, tsp, &origsigs); + pthread_sigmask(SIG_SETMASK, &origsigs, NULL); +#elif defined(USE_EPOLL) && defined(HAVE_EPOLL_PWAIT) + num = epoll_pwait(m->handler.epoll_fd, m->handler.revents, + m->handler.event_count, timeout, &origsigs); + pthread_sigmask(SIG_SETMASK, &origsigs, NULL); +#elif defined(USE_EPOLL) && defined(HAVE_EPOLL_WAIT) + /* Not ideal - there is a race after we restore the signal mask */ + pthread_sigmask(SIG_SETMASK, &origsigs, NULL); + num = epoll_wait(m->handler.epoll_fd, m->handler.revents, + m->handler.event_count, timeout); +#elif defined(HAVE_PPOLL) struct timespec ts, *tsp; if (timeout >= 0) { @@ -914,9 +1250,20 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, if (num < 0 && errno == EINTR) *eintr_p = true; +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + if (num > 0 && m->awakened) { + /* AWAKENed, exhaust io_pipe */ + while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0) + ; + m->awakened = false; + } +#else if (num > 0 && m->handler.copy[count].revents != 0 && num--) while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0) ; +#endif rcu_read_lock(); @@ -928,6 +1275,97 @@ void _event_add_read_write(const struct xref_eventsched *xref, struct event_loop *m, void (*func)(struct event *), void *arg, int fd, struct event **t_ptr) { +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + struct frr_epoll_event *new_frr_ev; + struct epoll_event *new_ev; + bool has_new; + int dir = xref->event_type; + struct event *thread = NULL; + struct event **thread_array; + + if (dir == EVENT_READ) + frrtrace(9, frr_libfrr, schedule_read, m, xref->funcname, + xref->xref.file, xref->xref.line, t_ptr, fd, 0, arg, + 0); + else + frrtrace(9, frr_libfrr, schedule_write, m, xref->funcname, + xref->xref.file, xref->xref.line, t_ptr, fd, 0, arg, + 0); + + assert(fd >= 0); + if (fd >= m->fd_limit) + assert(!"Number of FD's open is greater than FRR currently configured to handle, aborting"); + + frr_with_mutex (&m->mtx) { + if (t_ptr && *t_ptr) + /* thread is already scheduled; don't reschedule */ + break; + + /* + * default: a new frr_epoll_event for the new request. + * Queue the new event at the end of .new_event_ptrs array + */ + nfds_t queuepos = m->handler.new_event_count; + + if (dir == EVENT_READ) + thread_array = m->read; + else + thread_array = m->write; + + new_frr_ev = m->handler.new_events + fd; + has_new = (new_frr_ev->index >= 0); + + /* + * if we already have a new_event for our file descriptor, find + * and use it + */ + if (has_new) { + queuepos = new_frr_ev->index; +#ifdef DEV_BUILD + zlog_debug("event_add_read_write: DEV_BUILD"); + /* + * What happens if we have a thread already + * created for this event? + */ + if (thread_array[fd]) + assert(!"Thread already scheduled for file descriptor"); +#endif + } + + thread = thread_get(m, dir, func, arg, xref); + + /* Record the new event requset in .new_events*/ + new_ev = &new_frr_ev->ev; + new_ev->data.fd = fd; + new_ev->events |= (dir == EVENT_READ ? EPOLLIN : EPOLLOUT); + /* Queue the new event request into .new_event_ptrs */ + m->handler.new_event_ptrs[queuepos] = new_frr_ev; + new_frr_ev->index = queuepos; + + if (queuepos == m->handler.new_event_count) + m->handler.new_event_count++; + + /* + * Queue the thread which will handle the new event request + * into thread_array + */ + if (thread) { + frr_with_mutex (&thread->mtx) { + thread->u.fd = fd; + thread_array[thread->u.fd] = thread; + } + + if (t_ptr) { + *t_ptr = thread; + thread->ref = t_ptr; + } + } + + AWAKEN(m); + } +#else int dir = xref->event_type; struct event *thread = NULL; struct event **thread_array; @@ -1003,6 +1441,7 @@ void _event_add_read_write(const struct xref_eventsched *xref, AWAKEN(m); } +#endif } static void _event_add_timer_timeval(const struct xref_eventsched *xref, @@ -1131,6 +1570,215 @@ void _event_add_event(const struct xref_eventsched *xref, struct event_loop *m, /* Thread cancellation ------------------------------------------------------ */ +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +/** + * + * Remove the EPOLLIN/EPOLLOUT marks of the given file descriptor in + * master->handler.events and epoll_fd set. Also, Modify + * master->handler.event_ptrs accordingly. + * + * The event to be NOT'd is passed in the 'state' parameter. + * + * @param master + * @param fd + * @param state the event to cancel. One or more (OR'd together) of the + * following: + * - EPOLLIN + * - EPOLLOUT + */ +static void event_cancel_exist_rw(struct event_loop *master, int fd, + uint32_t state) +{ + struct frr_epoll_event *existing_frr_ev; + struct epoll_event *existing_ev; + bool has_existing = false, events_cleaned = false; + struct stat fd_stat; + nfds_t i_ptr, i; + + if (fstat(fd, &fd_stat) == -1) { + if (errno != 9) { + /* fd is closed if errno == 9 */ + zlog_debug( + "[!] In %s, fstat failed, fd: %d, errno: %d)", + __func__, fd, errno); + } + return; + } + + /* Cancel EPOLLHUP too just in case some bozo set it */ + state |= EPOLLHUP; + + /* Look for the index of the canceled event in .event_ptrs array */ + existing_frr_ev = master->handler.events + fd; + has_existing = (existing_frr_ev->index >= 0); + + if (!has_existing) { + zlog_debug( + "[!] Received cancellation request for an rw job nonexistent in fd_handler->events. (Probably exists in fd_handler->new_events)"); + zlog_debug("[!] threadmaster: %s | fd: %d", + master->name ? master->name : "", fd); + return; + } + i_ptr = existing_frr_ev->index; + + /* NOT out .events of corresponding frr_epoll_event */ + existing_ev = &existing_frr_ev->ev; + existing_ev->events &= ~(state); + + events_cleaned = !(existing_ev->events & (EPOLLIN | EPOLLOUT)); + + if (events_cleaned) { + /* remove fd from epoll_fd set */ + if (S_ISREG(fd_stat.st_mode)) { + for (i = 0; i < master->handler.regular_revent_count; + i++) { + if (master->handler.regular_revents[i] + .data.fd == fd) { + break; + } + } + if (i == master->handler.regular_revent_count) { + zlog_debug( + "[!] Received cancellation request for nonexistent regular rw job"); + zlog_debug("[!] threadmaster: %s | fd: %d", + master->name ? master->name : "", + fd); + return; + } + memmove(master->handler.regular_revents + i, + master->handler.regular_revents + i + 1, + (master->handler.regular_revent_count - i - 1) * + sizeof(struct epoll_event)); + master->handler.regular_revent_count--; + master->handler + .regular_revents[master->handler + .regular_revent_count] + .data.fd = 0; + master->handler + .regular_revents[master->handler + .regular_revent_count] + .events = 0; + } else if (-1 == epoll_ctl(master->handler.epoll_fd, + EPOLL_CTL_DEL, fd, existing_ev)) { + zlog_debug( + "[!] epoll_ctl(EPOLL_CTL_DEL) error while canceling existing rw events"); + zlog_debug("[!] threadmaster: %s | fd: %d", + master->name ? master->name : "", fd); + } + /* reset existing_frr_ev */ + existing_ev->data.fd = 0; + existing_ev->events = 0; + existing_frr_ev->index = -1; + /* remove the entry in .event_ptrs */ + for (i = i_ptr; i < master->handler.event_count - 1; i++) { + master->handler.event_ptrs[i] = + master->handler.event_ptrs[i + 1]; + master->handler.event_ptrs[i]->index = i; + } + master->handler.event_count--; + } else { + /* modify the events of fd in epoll_fd set */ + if (S_ISREG(fd_stat.st_mode)) { + for (i = 0; i < master->handler.regular_revent_count; + i++) { + if (master->handler.regular_revents[i] + .data.fd == fd) { + break; + } + } + if (i == master->handler.regular_revent_count) { + zlog_debug( + "[!] Received cancellation request for nonexistent regular rw job"); + zlog_debug("[!] threadmaster: %s | fd: %d", + master->name ? master->name : "", + fd); + return; + } + master->handler.regular_revents[i].events = + existing_ev->events; + } else if (-1 == epoll_ctl(master->handler.epoll_fd, + EPOLL_CTL_MOD, fd, existing_ev)) { + zlog_debug( + "[!] epoll_ctl(EPOLL_CTL_DEL) error while canceling existing rw events"); + zlog_debug("[!] threadmaster: %s | fd: %d", + master->name ? master->name : "", fd); + } + } +} + +/** + * + * Remove the EPOLLIN/EPOLLOUT marks of the given file descriptor in + * master->handler.new_events. Also, modify master->handler.new_event_ptrs + * accordingly. + * + * The event to be NOT'd is passed in the 'state' parameter. + * + * @param master + * @param fd + * @param state the event to cancel. One or more (OR'd together) of the + * following: + * - EPOLLIN + * - EPOLLOUT + */ +static void event_cancel_new_rw(struct event_loop *master, int fd, + uint32_t state) +{ + struct frr_epoll_event *new_frr_ev; + struct epoll_event *new_ev; + bool has_new = false, events_cleaned = false; + struct stat fd_stat; + nfds_t i_ptr, i; + + if (fstat(fd, &fd_stat) == -1) { + if (errno != 9) { + /* fd is closed if errno == 9 */ + zlog_debug( + "[!] In %s, fstat failed, fd: %d, errno: %d)", + __func__, fd, errno); + } + return; + } + + /* Cancel EPOLLHUP too just in case some bozo set it */ + state |= EPOLLHUP; + + /* Look for the index of the canceled event in .new_event_ptrs array */ + new_frr_ev = master->handler.new_events + fd; + has_new = (new_frr_ev->index >= 0); + + if (!has_new) { + /* + * Received cancellation request for an rw job nonexistent in + * fd_handler->new_events. + */ + return; + } + i_ptr = new_frr_ev->index; + + /* NOT out .events of corresponding frr_epoll_event */ + new_ev = &new_frr_ev->ev; + new_ev->events &= ~(state); + + events_cleaned = !(new_ev->events & (EPOLLIN | EPOLLOUT)); + + if (events_cleaned) { + /* reset new_frr_ev */ + new_ev->data.fd = 0; + new_ev->events = 0; + new_frr_ev->index = -1; + /* remove the entry in .new_event_ptrs */ + for (i = i_ptr; i < master->handler.new_event_count - 1; i++) { + master->handler.new_event_ptrs[i] = + master->handler.new_event_ptrs[i + 1]; + master->handler.new_event_ptrs[i]->index = i; + } + master->handler.new_event_count--; + } +} +#else /** * NOT's out the .events field of pollfd corresponding to the given file * descriptor. The event to be NOT'd is passed in the 'state' parameter. @@ -1208,6 +1856,7 @@ static void event_cancel_rw(struct event_loop *master, int fd, short state, master->handler.copy[master->handler.copycount].events = 0; } } +#endif /* * Process task cancellation given a task argument: iterate through the @@ -1219,7 +1868,14 @@ static void cancel_arg_helper(struct event_loop *master, struct event *t; nfds_t i; int fd; +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + struct frr_epoll_event *frr_ev; + struct epoll_event *ev; +#else struct pollfd *pfd; +#endif /* We're only processing arg-based cancellations here. */ if (cr->eventobj == NULL) @@ -1249,6 +1905,74 @@ static void cancel_arg_helper(struct event_loop *master, return; /* Check the io tasks */ +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + /* Traverse through master->handler.event_ptrs */ + for (i = 0; i < master->handler.event_count;) { + frr_ev = *(master->handler.event_ptrs + i); + ev = &frr_ev->ev; + + if (ev->events & EPOLLIN) + t = master->read[ev->data.fd]; + else + t = master->write[ev->data.fd]; + + if (t && t->arg == cr->eventobj) { + fd = ev->data.fd; + + /* Found a match to cancel: clean up fd arrays */ + event_cancel_exist_rw(master, fd, ev->events); + + /* Clean up thread arrays */ + master->read[fd] = NULL; + master->write[fd] = NULL; + + /* Clear caller's ref */ + if (t->ref) + *t->ref = NULL; + + thread_add_unuse(master, t); + + /* Don't increment 'i' since the cancellation will have + * removed the entry from the .event_ptrs array + */ + } else + i++; + } + /* Traverse through master->handler.new_event_ptrs */ + for (i = 0; i < master->handler.new_event_count;) { + frr_ev = *(master->handler.new_event_ptrs + i); + ev = &frr_ev->ev; + + if (ev->events & EPOLLIN) + t = master->read[ev->data.fd]; + else + t = master->write[ev->data.fd]; + + if (t && t->arg == cr->eventobj) { + fd = ev->data.fd; + + /* Found a match to cancel: clean up fd arrays */ + event_cancel_new_rw(master, fd, ev->events); + + /* Clean up thread arrays */ + master->read[fd] = NULL; + master->write[fd] = NULL; + + /* Clear caller's ref */ + if (t->ref) + *t->ref = NULL; + + thread_add_unuse(master, t); + + /* Don't increment 'i' since the cancellation will have + * removed the entry from the .new_event_ptrs array + */ + } else + i++; + } +#else for (i = 0; i < master->handler.pfdcount;) { pfd = master->handler.pfds + i; @@ -1279,6 +2003,7 @@ static void cancel_arg_helper(struct event_loop *master, } else i++; } +#endif /* Check the timer tasks */ t = event_timer_list_first(&master->timer); @@ -1342,11 +2067,25 @@ static void do_event_cancel(struct event_loop *master) /* Determine the appropriate queue to cancel the thread from */ switch (thread->type) { case EVENT_READ: +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + event_cancel_exist_rw(master, thread->u.fd, EPOLLIN); + event_cancel_new_rw(master, thread->u.fd, EPOLLIN); +#else event_cancel_rw(master, thread->u.fd, POLLIN, -1); +#endif thread_array = master->read; break; case EVENT_WRITE: +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + event_cancel_exist_rw(master, thread->u.fd, EPOLLOUT); + event_cancel_new_rw(master, thread->u.fd, EPOLLOUT); +#else event_cancel_rw(master, thread->u.fd, POLLOUT, -1); +#endif thread_array = master->write; break; case EVENT_TIMER: @@ -1556,6 +2295,179 @@ static struct event *thread_run(struct event_loop *m, struct event *thread, return fetch; } +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +static int thread_process_io_helper(struct event_loop *m, struct event *thread, + uint32_t state, uint32_t actual_state, + int fd) +{ + struct event **thread_array; + struct frr_epoll_event *existing_frr_ev; + struct epoll_event *existing_ev; + bool has_existing = false; + struct stat fd_stat; + + if (fstat(fd, &fd_stat) == -1) + zlog_debug("[!] In %s, fstat failed. fd: %d)", __func__, fd); + + /* + * Clears the "state" bits of the .events field of the + * corresponding epoll_event object in m->handler.events, + * and then modify epoll_fd set accordingly (don't make + * deletion even if .events field are all cleaned). + * + * This cleans up a possible infinite loop where we refuse + * to respond to a epoll event but epoll is insistent that + * we should. + */ + existing_frr_ev = m->handler.events + fd; + has_existing = (existing_frr_ev->index >= 0); + + if (!has_existing) { + zlog_debug( + "[!] Unexpected processing of an I/O thread. fd: %d, state: %u, actual_state: %u", + fd, state, actual_state); + zlog_debug("[!] threadmaster: %s | fd: %d", + m->name ? m->name : "", fd); + return 0; + } + + existing_ev = &existing_frr_ev->ev; + existing_ev->events &= ~(state); + + /* modify the events of fd in epoll_fd set */ + if (!S_ISREG(fd_stat.st_mode) && + -1 == epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_MOD, fd, + existing_ev)) { + zlog_debug( + "[!] epoll_ctl(EPOLL_CTL_MOD) error while processing an I/O thread, fd: %d, incoming_event: %d, actual_state: %d, errno: %d", + fd, state, actual_state, errno); + } + + if (!thread) { + if ((actual_state & (EPOLLERR | EPOLLHUP | EPOLLIN)) != + (EPOLLERR | EPOLLHUP)) + flog_err( + EC_LIB_NO_THREAD, + "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!", + fd, actual_state); + return 0; + } + + if (thread->type == EVENT_READ) + thread_array = m->read; + else + thread_array = m->write; + + thread_array[thread->u.fd] = NULL; + event_list_add_tail(&m->ready, thread); + thread->type = EVENT_READY; + + return 1; +} + +/** + * Process I/O events. + * + * Walks through m->handler.revents array looking for those epoll_event + * whose .events field has something interesting. + * + * Deletes any invalid file descriptors. + * + * @param m the thread master + * @param num the number of active file descriptors (return value of + * epoll_wait()) + */ +static void thread_process_io(struct event_loop *m, unsigned int num, + int regular) +{ + struct epoll_event *revents; + nfds_t j; + int fd; + struct stat fd_stat; + + if (regular == 1) { + revents = m->handler.regular_revents; + } else if (regular == 0) { + revents = m->handler.revents; + } else { + zlog_debug("[!] In %s, wrong regular arg value: %d", __func__, + regular); + return; + } + + for (nfds_t i = 0; i < num; ++i) { + fd = revents[i].data.fd; + if (fd == m->io_pipe[0] || fd == m->io_pipe[1]) + continue; + + /* + * We are including EPOLLERR here to do a READ event + * this is because the read should fail and the + * read function should handle it appropriately + */ + if (revents[i].events & + (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { + thread_process_io_helper(m, m->read[fd], EPOLLIN, + revents[i].events, fd); + } + if (revents[i].events & EPOLLOUT) { + thread_process_io_helper(m, m->write[fd], EPOLLOUT, + revents[i].events, fd); + } + + /* if one of our file descriptors is garbage, remove the fd + * from epoll_fd set, .event(_ptr)s and .new_event(_ptr)s + */ + if (revents[i].events & (EPOLLHUP | EPOLLERR)) { + /* remove fd from epoll_fd set */ + if (fstat(fd, &fd_stat) == -1) { + zlog_debug("[!] In %s, fstat failed, fd: %d", + __func__, fd); + } + if (S_ISREG(fd_stat.st_mode)) { + for (j = 0; j < m->handler.regular_revent_count; + j++) { + if (m->handler.regular_revents[j] + .data.fd == fd) { + break; + } + } + if (j == m->handler.regular_revent_count) { + zlog_debug( + "[!] Canceling nonexistent regular rw job"); + zlog_debug( + "[!] threadmaster: %s | fd: %d", + m->name ? m->name : "", fd); + return; + } + memmove(m->handler.regular_revents + j, + m->handler.regular_revents + j + 1, + (m->handler.regular_revent_count - j - + 1) * sizeof(struct epoll_event)); + m->handler.regular_revent_count--; + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .data.fd = 0; + m->handler + .regular_revents + [m->handler + .regular_revent_count] + .events = 0; + } else if (-1 == epoll_ctl(m->handler.epoll_fd, + EPOLL_CTL_DEL, fd, NULL)) { + zlog_debug( + "[!] epoll_ctl(EPOLL_CTL_DEL) error handling EPOLLHUP/EPOLLERR"); + zlog_debug("threadmaster: %s | fd: %d", + m->name ? m->name : "", fd); + } + } + } +} +#else static int thread_process_io_helper(struct event_loop *m, struct event *thread, short state, short actual_state, int pos) { @@ -1660,6 +2572,7 @@ static void thread_process_io(struct event_loop *m, unsigned int num) } } } +#endif /* Add all timers that have popped to the ready list. */ static unsigned int thread_process_timers(struct event_loop *m, @@ -1717,8 +2630,133 @@ static unsigned int thread_process(struct event_list_head *list) return ready; } - /* Fetch next ready thread. */ +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +struct event *event_fetch(struct event_loop *m, struct event *fetch) +{ + struct event *thread = NULL; + struct timeval now; + struct timeval zerotime = {0, 0}; + struct timeval tv; + struct timeval *tw = NULL; + bool eintr_p = false; + int num = 0; + + do { + /* Handle signals if any */ + if (m->handle_signals) + frr_sigevent_process(); + + pthread_mutex_lock(&m->mtx); + + /* Process any pending cancellation requests */ + do_event_cancel(m); + + /* + * Attempt to flush ready queue before going into poll(). + * This is performance-critical. Think twice before modifying. + */ + if ((thread = event_list_pop(&m->ready))) { + fetch = thread_run(m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; + pthread_mutex_unlock(&m->mtx); + if (!m->ready_run_loop) + GETRUSAGE(&m->last_getrusage); + m->ready_run_loop = true; + break; + } + + m->ready_run_loop = false; + /* otherwise, tick through scheduling sequence */ + + /* + * Post events to ready queue. This must come before the + * following block since events should occur immediately + */ + thread_process(&m->event); + + /* + * If there are no tasks on the ready queue, we will poll() + * until a timer expires or we receive I/O, whichever comes + * first. The strategy for doing this is: + * + * - If there are events pending, set the poll() timeout to zero + * - If there are no events pending, but there are timers + * pending, set the timeout to the smallest remaining time on + * any timer. + * - If there are neither timers nor events pending, but there + * are file descriptors pending, block indefinitely in poll() + * - If nothing is pending, it's time for the application to die + * + * In every case except the last, we need to hit poll() at least + * once per loop to avoid starvation by events + */ + if (!event_list_count(&m->ready)) + tw = thread_timer_wait(&m->timer, &tv); + + if (event_list_count(&m->ready) || + (tw && !timercmp(tw, &zerotime, >))) + tw = &zerotime; + + if (!tw && m->handler.event_count == 0 && + m->handler.new_event_count == 0 && + m->handler.regular_revent_count == 0) { /* die */ + pthread_mutex_unlock(&m->mtx); + fetch = NULL; + break; + } + + /* Prepare the correct epoll_fd set for epoll_wait(). + * To this end, we call epoll_ctl() and modify m->handler.events + * according to m->handler.new_events. + */ + update_events(m); + + pthread_mutex_unlock(&m->mtx); + { + eintr_p = false; + num = fd_poll(m, tw, &eintr_p); + } + pthread_mutex_lock(&m->mtx); + + /* Handle any errors received in poll() */ + if (num < 0) { + if (eintr_p) { + pthread_mutex_unlock(&m->mtx); + /* loop around to signal handler */ + continue; + } + + /* else die */ + flog_err(EC_LIB_SYSTEM_CALL, "epoll_wait() error: %s", + safe_strerror(errno)); + + pthread_mutex_unlock(&m->mtx); + fetch = NULL; + break; + } + + /* Post timers to ready queue. */ + monotime(&now); + thread_process_timers(m, &now); + + /* Post I/O to ready queue. */ + if (m->handler.regular_revent_count > 0) + thread_process_io(m, m->handler.regular_revent_count, + 1); + if (num > 0) + thread_process_io(m, num, 0); + + pthread_mutex_unlock(&m->mtx); + + } while (!thread && m->spin); + + return fetch; +} +#else struct event *event_fetch(struct event_loop *m, struct event *fetch) { struct event *thread = NULL; @@ -1818,6 +2856,7 @@ struct event *event_fetch(struct event_loop *m, struct event *fetch) /* else die */ flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s", safe_strerror(errno)); + pthread_mutex_unlock(&m->mtx); fetch = NULL; break; @@ -1837,6 +2876,7 @@ struct event *event_fetch(struct event_loop *m, struct event *fetch) return fetch; } +#endif static unsigned long timeval_elapsed(struct timeval a, struct timeval b) { diff --git a/lib/frrevent.h b/lib/frrevent.h index 3f74df359bc2..81df5c5ca9b3 100644 --- a/lib/frrevent.h +++ b/lib/frrevent.h @@ -1,6 +1,8 @@ // SPDX-License-Identifier: GPL-2.0-or-later /* Event management routine header. * Copyright (C) 1998 Kunihiro Ishiguro + * Portions: + * Copyright (C) 2023 Kaifei Peng */ #ifndef _ZEBRA_THREAD_H @@ -9,6 +11,11 @@ #include #include #include +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +#include +#endif #include "monotime.h" #include "frratomic.h" #include "typesafe.h" @@ -42,6 +49,94 @@ struct rusage_t { PREDECL_LIST(event_list); PREDECL_HEAP(event_timer_list); +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) +struct frr_epoll_event { + /* the index of this frr_epoll_event instance in .event_ptrs or + * .new_event_ptrs + */ + int index; // index should be -1 for a invalid frr_epoll_event + + struct epoll_event ev; +}; + +struct fd_handler { + /* number of pfd that fit in the allocated space of pfds. This is a + * constant and is the same for both pfds and copy. + */ + nfds_t event_size; + + /* the epoll fd set to monitor for i/o. This should be coordinated with + * the "events" field below + */ + int epoll_fd; + + /* Below we define two arrays, namely .events and .new_events. + * The function of two arrays are analogous to .pfds and .copy in old + * version fd_handler which uses poll(). + * + * .events hold existing monitored events that have been registered in + * kernel by epoll_ctl() calls. Similar to .copy array in old version, + * .events is updated just before every time fd_poll() is called. + * + * .new_events hold event update requests during the interval from last + * fd_poll() call to next fd_poll() call. .new_events functions as a + * holder of new event request, which is similar to .pfds array in old + * version + * + * Both array are indexed by fd for fast update. However, during + * cancel_arg_helper when canceling a thread, all frr_epoll_events + * are supposed to be traversed, which is inefficient. We define two + * compact arrays .event_ptrs and .new_event_ptrs for fast canceling. + * Only (new_)event_count number of elements in .(new_)event_ptr are + * valid, and each element refers a valid frr_epoll_event. + * + * frr_epoll_event arrays and ptr arrays are mutually referenced. An + * frr_epoll_event reference its corresponding ptr element by its + * .index field. An ptr reference its corresponding frr_epoll_event + * natually since it is a pointer. + */ + + + /* the buffer which stores monitored fds and corresponding events, + * indexed by fd + */ + struct frr_epoll_event *events; + /* a compact array of pointers to frr_epoll_event in .events array, + * This array is traversed if needed when doing do_event_cancel + */ + struct frr_epoll_event **event_ptrs; + /* number of valid frr_epoll_event in .events (number of valid pointers) + * in .events_ptrs + */ + nfds_t event_count; + + /* the temp buffer which stores requests of adding new fd into + * epoll_fd set, or modifying the events of existing fds. + * Indexed by fd + */ + struct frr_epoll_event *new_events; + /* a compact array of pointers to frr_epoll_event in .new_events array, + * This array is traversed if needed when doing do_event_cancel + */ + struct frr_epoll_event **new_event_ptrs; + /* number of valid frr_epoll_event in .new_events (number of valid + * pointers in .new_events_ptrs) + */ + nfds_t new_event_count; + + /* the buffer which stores the results of epoll_wait */ + struct epoll_event *revents; + /* Vtysh might redirect stdin/stdout to regular files, which can't be + * added to epoll set and need special treatment. I/O events from/to + * regular file will be directly added to regular_revents, not into + * epoll set + */ + struct epoll_event *regular_revents; + nfds_t regular_revent_count; +}; +#else struct fd_handler { /* number of pfd that fit in the allocated space of pfds. This is a * constant and is the same for both pfds and copy. @@ -58,6 +153,7 @@ struct fd_handler { /* number of pollfds stored in copy */ nfds_t copycount; }; +#endif struct xref_eventsched { struct xref xref; @@ -82,6 +178,11 @@ struct event_loop { pthread_cond_t cancel_cond; struct cpu_records_head cpu_records[1]; int io_pipe[2]; +#if defined(USE_EPOLL) && \ + (defined(HAVE_EPOLL_WAIT) || defined(HAVE_EPOLL_PWAIT) || \ + defined(HAVE_EPOLL_PWAIT2)) + bool awakened; +#endif int fd_limit; struct fd_handler handler; unsigned long alloc;