From 0a6fe3394b509965c3a882cb5ccaa1c67133a066 Mon Sep 17 00:00:00 2001 From: Sebastian Reimers Date: Mon, 18 Sep 2023 10:09:14 +0200 Subject: [PATCH] main: fd_listen fhs alloc rewrite (#805) --- include/re_main.h | 7 +- src/main/main.c | 409 +++++++++++++++++++++++++++----------------- src/mqueue/mqueue.c | 6 +- src/tcp/tcp.c | 49 ++++-- src/udp/udp.c | 21 ++- 5 files changed, 304 insertions(+), 188 deletions(-) diff --git a/include/re_main.h b/include/re_main.h index f9595b36f..23b664fa0 100644 --- a/include/re_main.h +++ b/include/re_main.h @@ -7,6 +7,7 @@ #include "re_async.h" struct re; +struct re_fhs; enum { #ifndef FD_READ @@ -35,10 +36,10 @@ typedef void (fd_h)(int flags, void *arg); typedef void (re_signal_h)(int sig); -int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg); -void fd_close(re_sock_t fd); +int fd_listen(struct re_fhs **fhs, re_sock_t fd, int flags, fd_h *fh, + void *arg); +struct re_fhs *fd_close(struct re_fhs *fhs); int fd_setsize(int maxfds); -void fd_debug(void); int libre_init(void); void libre_close(void); diff --git a/src/main/main.c b/src/main/main.c index 385c3b0ef..1e6af642b 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -2,7 +2,7 @@ * @file main.c Main polling routine * * Copyright (C) 2010 Creytiv.com - * Copyright (C) 2020-2022 Sebastian Reimers + * Copyright (C) Sebastian Reimers */ #include #ifdef HAVE_SYS_TIME_H @@ -57,8 +57,8 @@ /** Main loop values */ enum { RE_THREAD_WORKERS = 4, - MAX_BLOCKING = 500, /**< Maximum time spent in handler in [ms] */ -#if defined (FD_SETSIZE) + MAX_BLOCKING = 500, /**< Maximum time spent in handler in [ms] */ +#if defined(FD_SETSIZE) DEFAULT_MAXFDS = FD_SETSIZE #else DEFAULT_MAXFDS = 128 @@ -66,7 +66,8 @@ enum { }; /** File descriptor handler struct */ -struct fhs { +struct re_fhs { + int index; re_sock_t fd; /**< File Descriptor */ int flags; /**< Polling flags (Read, Write, etc.) */ fd_h* fh; /**< Event handler */ @@ -75,14 +76,16 @@ struct fhs { /** Polling loop data */ struct re { - struct fhs *fhs; /** File descriptor handler set */ int maxfds; /**< Maximum number of polling fds */ int nfds; /**< Number of active file descriptors */ enum poll_method method; /**< The current polling method */ RE_ATOMIC bool polling; /**< Is polling flag */ int sig; /**< Last caught signal */ struct tmrl *tmrl; /**< List of timers */ - + struct mbuf *fhsld; /**< fhs delete list */ +#ifdef HAVE_SELECT + struct re_fhs **fhsl; /**< Select fhs pointer list */ +#endif #ifdef HAVE_EPOLL struct epoll_event *events; /**< Event set for epoll() */ int epfd; /**< epoll control file descriptor */ @@ -106,14 +109,33 @@ static once_flag flag = ONCE_FLAG_INIT; static void poll_close(struct re *re); +static void fhsld_flush(struct re *re) +{ + if (!re->fhsld) + return; + + re->fhsld->pos = 0; + + while (re->fhsld->pos < re->fhsld->end) { + intptr_t p = mbuf_read_ptr(re->fhsld); + mem_deref((void *)p); + } + + re->fhsld->pos = 0; + re->fhsld->end = 0; +} + + static void re_destructor(void *arg) { struct re *re = arg; poll_close(re); + fhsld_flush(re); mem_deref(re->mutex); mem_deref(re->async); mem_deref(re->tmrl); + mem_deref(re->fhsld); } @@ -141,7 +163,14 @@ int re_alloc(struct re **rep) if (!re) return ENOMEM; + re->fhsld = mbuf_alloc(64 * sizeof(void *)); + if (!re->fhsld) { + err = ENOMEM; + goto out; + } + err = mutex_alloc_tp(&re->mutex, mtx_recursive); + if (err) { DEBUG_WARNING("thread_init: mtx_init error\n"); goto out; @@ -165,7 +194,6 @@ int re_alloc(struct re **rep) re->kqfd = -1; #endif - out: if (err) mem_deref(re); @@ -222,38 +250,6 @@ static inline void re_unlock(struct re *re) } -#ifdef WIN32 -/** - * This code emulates POSIX numbering. There is no locking, - * so zero thread-safety. - * - * @param re Poll state - * @param fd File descriptor - * - * @return fhs index if success, otherwise -1 - */ -static int lookup_fd_index(struct re* re, re_sock_t fd) { - int i; - - for (i = 0; i < re->nfds; i++) { - if (!re->fhs[i].fh) - continue; - - if (re->fhs[i].fd == fd) - return i; - } - - /* if nothing is found a linear search for the first - * zeroed handler */ - for (i = 0; i < re->maxfds; i++) { - if (!re->fhs[i].fh) - return i; - } - - return -1; -} -#endif - #if MAIN_DEBUG /** * Call the application event handler @@ -262,33 +258,77 @@ static int lookup_fd_index(struct re* re, re_sock_t fd) { * @param i File descriptor handler index * @param flags Event flags */ -static void fd_handler(struct re *re, int i, int flags) +static void fd_handler(struct re_fhs *fhs, int flags) { const uint64_t tick = tmr_jiffies(); uint32_t diff; - DEBUG_INFO("event on fd=%d index=%d (flags=0x%02x)...\n", - re->fhs[i].fd, i, flags); + DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fhs->fd, flags); - re->fhs[i].fh(flags, re->fhs[i].arg); + fhs->fh(flags, fhs->arg); diff = (uint32_t)(tmr_jiffies() - tick); if (diff > MAX_BLOCKING) { DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n", diff, MAX_BLOCKING, - re->fhs[i].fh, re->fhs[i].arg); + fhs->fh, fhs->arg); } } #endif +#ifdef HAVE_SELECT +static int set_select_fds(struct re *re, struct re_fhs *fhs) +{ + int i = -1; + + if (!re || !fhs) + return EINVAL; + + if (fhs->index != -1) { + i = fhs->index; + } + else { + /* if nothing is found a linear search for the first + * zeroed handler */ + for (int j = 0; j < re->maxfds; j++) { + if (!re->fhsl[j]) { + i = j; + break; + } + } + } + + if (i == -1) + return ERANGE; + + if (fhs->flags) { + re->fhsl[i] = fhs; + fhs->index = i; + } + else { + re->fhsl[i] = NULL; + fhs->index = -1; + } + + return 0; +} +#endif + + #ifdef HAVE_EPOLL -static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) +static int set_epoll_fds(struct re *re, struct re_fhs *fhs) { struct epoll_event event; int err = 0; + if (!re || !fhs) + return EINVAL; + + re_sock_t fd = fhs->fd; + int flags = fhs->flags; + if (re->epfd < 0) return EBADFD; @@ -297,7 +337,7 @@ static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags); if (flags) { - event.data.fd = fd; + event.data.ptr = fhs; if (flags & FD_READ) event.events |= EPOLLIN; @@ -343,11 +383,17 @@ static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) #ifdef HAVE_KQUEUE -static int set_kqueue_fds(struct re *re, re_sock_t fd, int flags) +static int set_kqueue_fds(struct re *re, struct re_fhs *fhs) { struct kevent kev[2]; int r, n = 0; + if (!fhs) + return EINVAL; + + re_sock_t fd = fhs->fd; + int flags = fhs->flags; + memset(kev, 0, sizeof(kev)); /* always delete the events */ @@ -358,11 +404,11 @@ static int set_kqueue_fds(struct re *re, re_sock_t fd, int flags) memset(kev, 0, sizeof(kev)); if (flags & FD_WRITE) { - EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0); + EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, fhs); ++n; } if (flags & FD_READ) { - EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0); + EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, fhs); ++n; } @@ -393,6 +439,17 @@ static int poll_init(struct re *re) switch (re->method) { +#ifdef HAVE_SELECT + case METHOD_SELECT: + if (re->fhsl) + return 0; + + re->fhsl = mem_zalloc(re->maxfds * sizeof(void *), NULL); + if (!re->fhsl) + return ENOMEM; + break; +#endif + #ifdef HAVE_EPOLL case METHOD_EPOLL: if (!re->events) { @@ -438,6 +495,8 @@ static int poll_init(struct re *re) #endif default: + DEBUG_WARNING("poll init: no method\n"); + return EINVAL; break; } return 0; @@ -452,8 +511,13 @@ static void poll_close(struct re *re) DEBUG_INFO("poll close\n"); - re->fhs = mem_deref(re->fhs); re->maxfds = 0; + re->nfds = 0; + re->method = METHOD_NULL; + +#ifdef HAVE_SELECT + re->fhsl = mem_deref(re->fhsl); +#endif #ifdef HAVE_EPOLL DEBUG_INFO("poll_close: epfd=%d\n", re->epfd); @@ -507,6 +571,8 @@ static int poll_setup(struct re *re) /** * Listen for events on a file descriptor * + * @param fhs File descriptor handler struct pointer (don't use mem_deref(), + * use fd_close() instead) * @param fd File descriptor * @param flags Wanted event flags * @param fh Event handler @@ -514,18 +580,20 @@ static int poll_setup(struct re *re) * * @return 0 if success, otherwise errorcode */ -int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) +int fd_listen(struct re_fhs **fhsp, re_sock_t fd, int flags, fd_h fh, + void *arg) { struct re *re = re_get(); + struct re_fhs *fhs; int err = 0; - int i; if (!re) { DEBUG_WARNING("fd_listen: re not ready\n"); return EINVAL; } - DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags); + if (!fhsp || !flags || !fh) + return EINVAL; #ifndef RELEASE err = re_thread_check(true); @@ -538,68 +606,67 @@ int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) return EBADF; } - if (flags || fh) { - err = poll_setup(re); - if (err) - return err; - } + err = poll_setup(re); + if (err) + return err; -#ifdef WIN32 - /* Windows file descriptors do not follow POSIX standard ranges. */ - i = lookup_fd_index(re, fd); - if (i < 0) { - DEBUG_WARNING("fd_listen: fd=%d - no free fd_index\n", fd); - return EMFILE; - } -#else - i = fd; -#endif + fhs = *fhsp; + if (!fhs) { + fhs = mem_zalloc(sizeof(struct re_fhs), NULL); + if (!fhs) + return ENOMEM; - if (i >= re->maxfds) { - if (flags) { - DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x" - " - Max %d fds\n", - fd, flags, re->maxfds); - } - return EMFILE; - } + fhs->fd = fd; + fhs->index = -1; - /* Update fh set */ - if (re->fhs) { - re->fhs[i].fd = fd; - re->fhs[i].flags = flags; - re->fhs[i].fh = fh; - re->fhs[i].arg = arg; + DEBUG_INFO("fd_listen/new: fd=%d flags=0x%02x\n", fd, flags); + + ++re->nfds; + } + else { + if (unlikely(fhs->fd != fd)) { + DEBUG_WARNING("fd_listen: fhs reuse conflict %d\n", + fd); + return EBADF; + } + DEBUG_INFO("fd_listen/update: fd=%d flags=0x%02x\n", fd, + flags); } - re->nfds = max(re->nfds, i+1); + fhs->flags = flags; + fhs->fh = fh; + fhs->arg = arg; switch (re->method) { - +#ifdef HAVE_SELECT + case METHOD_SELECT: + err = set_select_fds(re, fhs); + break; +#endif #ifdef HAVE_EPOLL case METHOD_EPOLL: - if (re->epfd < 0) - return EBADFD; - err = set_epoll_fds(re, fd, flags); + err = set_epoll_fds(re, fhs); break; #endif #ifdef HAVE_KQUEUE case METHOD_KQUEUE: - err = set_kqueue_fds(re, fd, flags); + err = set_kqueue_fds(re, fhs); break; #endif default: + err = ENOTSUP; break; } if (err) { - if (flags && fh) { - fd_close(fd); - DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n", - fd, flags, err); - } + mem_deref(fhs); + DEBUG_WARNING("fd_listen err: fd=%d flags=0x%02x (%m)\n", fd, + flags, err); + } + else { + *fhsp = fhs; } return err; @@ -607,13 +674,58 @@ int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) /** - * Stop listening for events on a file descriptor + * Stop and destruct listening for events on a file descriptor * - * @param fd File descriptor + * @param fhs File descriptor handler struct pointer + * + * @return always NULL */ -void fd_close(re_sock_t fd) +struct re_fhs *fd_close(struct re_fhs *fhs) { - (void)fd_listen(fd, 0, NULL, NULL); + struct re *re = re_get(); + int err = 0; + + if (!fhs || !re) + return NULL; + + fhs->flags = 0; + fhs->fh = NULL; + fhs->arg = NULL; + + switch (re->method) { +#ifdef HAVE_SELECT + case METHOD_SELECT: + err = set_select_fds(re, fhs); + break; +#endif +#ifdef HAVE_EPOLL + case METHOD_EPOLL: + err = set_epoll_fds(re, fhs); + break; +#endif + +#ifdef HAVE_KQUEUE + case METHOD_KQUEUE: + err = set_kqueue_fds(re, fhs); + break; +#endif + + default: + err = ENOTSUP; + break; + } + + if (err) { + DEBUG_WARNING("fd_close err: fd=%d (%m)\n", fhs->fd, err); + } + else { + DEBUG_INFO("fd_close: fd=%d\n", fhs->fd); + } + + mbuf_write_ptr(re->fhsld, (intptr_t)fhs); + --re->nfds; + + return NULL; } @@ -627,7 +739,9 @@ void fd_close(re_sock_t fd) static int fd_poll(struct re *re) { const uint64_t to = tmr_next_timeout(re->tmrl); - int i, n, index; + int i, n; + int nfds = re->nfds; + struct re_fhs *fhs = NULL; #ifdef HAVE_SELECT fd_set rfds, wfds, efds; #endif @@ -640,33 +754,48 @@ static int fd_poll(struct re *re) #ifdef HAVE_SELECT case METHOD_SELECT: { struct timeval tv; + int max_fd_plus_1 = 0; + int cfds = 0; /* Clear and update fd sets */ FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); - for (i=0; infds; i++) { - re_sock_t fd = re->fhs[i].fd; - if (!re->fhs[i].fh) + for (i = 0; cfds < nfds; i++) { + fhs = re->fhsl[i]; + + if (!fhs || !fhs->fh) continue; - if (re->fhs[i].flags & FD_READ) + ++cfds; + + re_sock_t fd = fhs->fd; + if (fhs->flags & FD_READ) FD_SET(fd, &rfds); - if (re->fhs[i].flags & FD_WRITE) + if (fhs->flags & FD_WRITE) FD_SET(fd, &wfds); - if (re->fhs[i].flags & FD_EXCEPT) + if (fhs->flags & FD_EXCEPT) FD_SET(fd, &efds); + +/* not needed on WIN32 since select nfds arg is ignored */ +#if !defined(WIN32) + max_fd_plus_1 = max(max_fd_plus_1, fd + 1); +#endif } + nfds = re->maxfds; + #ifdef WIN32 tv.tv_sec = (long) to / 1000; #else tv.tv_sec = (time_t) to / 1000; #endif tv.tv_usec = (uint32_t) (to % 1000) * 1000; + re_unlock(re); - n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL); + n = select(max_fd_plus_1, &rfds, &wfds, &efds, + to ? &tv : NULL); re_lock(re); } break; @@ -705,7 +834,7 @@ static int fd_poll(struct re *re) return RE_ERRNO_SOCK; /* Check for events */ - for (i=0; (n > 0) && (i < re->nfds); i++) { + for (i=0; (n > 0) && (i < nfds); i++) { re_sock_t fd; int flags = 0; @@ -713,7 +842,11 @@ static int fd_poll(struct re *re) #ifdef HAVE_SELECT case METHOD_SELECT: - fd = re->fhs[i].fd; + fhs = re->fhsl[i]; + if (!fhs) + break; + + fd = fhs->fd; if (FD_ISSET(fd, &rfds)) flags |= FD_READ; if (FD_ISSET(fd, &wfds)) @@ -724,7 +857,8 @@ static int fd_poll(struct re *re) #endif #ifdef HAVE_EPOLL case METHOD_EPOLL: - fd = re->events[i].data.fd; + fhs = re->events[i].data.ptr; + fd = fhs->fd; if (re->events[i].events & EPOLLIN) flags |= FD_READ; @@ -746,11 +880,7 @@ static int fd_poll(struct re *re) struct kevent *kev = &re->evlist[i]; fd = (int)kev->ident; - - if (fd >= re->maxfds) { - DEBUG_WARNING("large fd=%d\n", fd); - break; - } + fhs = kev->udata; if (kev->filter == EVFILT_READ) flags |= FD_READ; @@ -783,17 +913,12 @@ static int fd_poll(struct re *re) if (!flags) continue; -#ifdef WIN32 - index = i; -#else - index = fd; -#endif - if (re->fhs[index].fh) { + if (fhs && fhs->fh) { #if MAIN_DEBUG - fd_handler(re, index, flags); + fd_handler(fhs, flags); #else - re->fhs[index].fh(flags, re->fhs[index].arg); + fhs->fh(flags, fhs->arg); #endif } @@ -801,6 +926,9 @@ static int fd_poll(struct re *re) --n; } + /* Delayed fhs deref to avoid dangling fhs pointers */ + fhsld_flush(re); + return 0; } @@ -826,7 +954,6 @@ int fd_setsize(int maxfds) } if (!maxfds) { - fd_debug(); poll_close(re); return 0; } @@ -852,48 +979,10 @@ int fd_setsize(int maxfds) if (!re->maxfds) re->maxfds = maxfds; - if (!re->fhs) { - DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n", - re->maxfds, re->maxfds * sizeof(*re->fhs)); - - re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL); - if (!re->fhs) - return ENOMEM; - } - return 0; } -/** - * Print all file descriptors in-use - */ -void fd_debug(void) -{ - const struct re *re = re_get(); - int i; - - if (!re) { - DEBUG_WARNING("fd_debug: re not ready\n"); - return; - } - - if (!re->fhs) - return; - - for (i=0; infds; i++) { - - if (!re->fhs[i].flags) - continue; - - (void)re_fprintf(stderr, - "fd %d in use: flags=%x fh=%p arg=%p\n", - i, re->fhs[i].flags, re->fhs[i].fh, - re->fhs[i].arg); - } -} - - #ifdef HAVE_SIGNAL /* Thread-safe signal handling */ static void signal_handler(int sig) diff --git a/src/mqueue/mqueue.c b/src/mqueue/mqueue.c index 9c3a70a6f..214c0c94c 100644 --- a/src/mqueue/mqueue.c +++ b/src/mqueue/mqueue.c @@ -34,6 +34,7 @@ */ struct mqueue { re_sock_t pfd[2]; + struct re_fhs *fhs; mqueue_h *h; void *arg; }; @@ -50,7 +51,7 @@ static void destructor(void *arg) struct mqueue *q = arg; if (q->pfd[0] != RE_BAD_SOCK) { - fd_close(q->pfd[0]); + q->fhs = fd_close(q->fhs); (void)close(q->pfd[0]); } if (q->pfd[1] != RE_BAD_SOCK) @@ -108,6 +109,7 @@ int mqueue_alloc(struct mqueue **mqp, mqueue_h *h, void *arg) if (!mq) return ENOMEM; + mq->fhs = NULL; mq->h = h; mq->arg = arg; @@ -125,7 +127,7 @@ int mqueue_alloc(struct mqueue **mqp, mqueue_h *h, void *arg) if (err) goto out; - err = fd_listen(mq->pfd[0], FD_READ, event_handler, mq); + err = fd_listen(&mq->fhs, mq->pfd[0], FD_READ, event_handler, mq); if (err) goto out; diff --git a/src/tcp/tcp.c b/src/tcp/tcp.c index cbc06f355..889713e51 100644 --- a/src/tcp/tcp.c +++ b/src/tcp/tcp.c @@ -49,6 +49,7 @@ enum { /** Defines a listening TCP socket */ struct tcp_sock { + struct re_fhs *fhs; re_sock_t fd; /**< Listening file descriptor */ re_sock_t fdc; /**< Cached connection file descriptor */ tcp_conn_h *connh; /**< TCP Connect handler */ @@ -61,6 +62,7 @@ struct tcp_sock { struct tcp_conn { struct list helpers; /**< List of TCP-helpers */ struct list sendq; /**< Sending queue */ + struct re_fhs *fhs; re_sock_t fdc; /**< Connection file descriptor */ tcp_estab_h *estabh; /**< Connection established handler */ tcp_send_h *sendh; /**< Data send handler */ @@ -130,7 +132,7 @@ static void sock_destructor(void *data) struct tcp_sock *ts = data; if (ts->fd != RE_BAD_SOCK) { - fd_close(ts->fd); + ts->fhs = fd_close(ts->fhs); (void)close(ts->fd); } if (ts->fdc != RE_BAD_SOCK) @@ -138,6 +140,22 @@ static void sock_destructor(void *data) } +static struct tcp_sock *sock_constructor(void) +{ + struct tcp_sock *ts; + + ts = mem_zalloc(sizeof(*ts), sock_destructor); + if (!ts) + return NULL; + + ts->fhs = NULL; + ts->fd = RE_BAD_SOCK; + ts->fdc = RE_BAD_SOCK; + + return ts; +} + + static void conn_destructor(void *data) { struct tcp_conn *tc = data; @@ -146,7 +164,7 @@ static void conn_destructor(void *data) list_flush(&tc->sendq); if (tc->fdc != RE_BAD_SOCK) { - fd_close(tc->fdc); + tc->fhs = fd_close(tc->fhs); (void)close(tc->fdc); } } @@ -180,7 +198,7 @@ static int enqueue(struct tcp_conn *tc, struct mbuf *mb) if (!tc->sendq.head && !tc->sendh) { - err = fd_listen(tc->fdc, FD_READ | FD_WRITE, + err = fd_listen(&tc->fhs, tc->fdc, FD_READ | FD_WRITE, tcp_recv_handler, tc); if (err) return err; @@ -253,7 +271,7 @@ static void conn_close(struct tcp_conn *tc, int err) /* Stop polling */ if (tc->fdc != RE_BAD_SOCK) { - fd_close(tc->fdc); + tc->fhs = fd_close(tc->fhs); (void)close(tc->fdc); tc->fdc = RE_BAD_SOCK; } @@ -323,7 +341,7 @@ static void tcp_recv_handler(int flags, void *arg) if (!tc->sendq.head && !tc->sendh) { - err = fd_listen(tc->fdc, FD_READ, + err = fd_listen(&tc->fhs, tc->fdc, FD_READ, tcp_recv_handler, tc); if (err) { conn_close(tc, err); @@ -339,7 +357,8 @@ static void tcp_recv_handler(int flags, void *arg) tc->connected = true; - err = fd_listen(tc->fdc, FD_READ, tcp_recv_handler, tc); + err = fd_listen(&tc->fhs, tc->fdc, FD_READ, tcp_recv_handler, + tc); if (err) { DEBUG_WARNING("recv handler: fd_listen(): %m\n", err); conn_close(tc, err); @@ -458,6 +477,7 @@ static struct tcp_conn *conn_alloc(tcp_estab_h *eh, tcp_recv_h *rh, list_init(&tc->helpers); + tc->fhs = NULL; tc->fdc = RE_BAD_SOCK; tc->rxsz = TCP_RXSZ_DEFAULT; tc->txqsz_max = TCP_TXQSZ_DEFAULT; @@ -555,7 +575,7 @@ int tcp_sock_alloc_fd(struct tcp_sock **tsp, re_sock_t fd, tcp_conn_h *ch, if (!tsp || fd == RE_BAD_SOCK) return EINVAL; - ts = mem_zalloc(sizeof(*ts), sock_destructor); + ts = sock_constructor(); if (!ts) return ENOMEM; @@ -566,7 +586,7 @@ int tcp_sock_alloc_fd(struct tcp_sock **tsp, re_sock_t fd, tcp_conn_h *ch, *tsp = ts; - return fd_listen(ts->fd, FD_READ, tcp_conn_handler, ts); + return fd_listen(&ts->fhs, ts->fd, FD_READ, tcp_conn_handler, ts); } @@ -592,7 +612,7 @@ int tcp_sock_alloc(struct tcp_sock **tsp, const struct sa *local, if (!tsp) return EINVAL; - ts = mem_zalloc(sizeof(*ts), sock_destructor); + ts = sock_constructor(); if (!ts) return ENOMEM; @@ -684,7 +704,7 @@ struct tcp_sock *tcp_sock_dup(struct tcp_sock *tso) if (!tso) return NULL; - ts = mem_zalloc(sizeof(*ts), sock_destructor); + ts = sock_constructor(); if (!ts) return NULL; @@ -789,7 +809,7 @@ int tcp_sock_listen(struct tcp_sock *ts, int backlog) return err; } - return fd_listen(ts->fd, FD_READ, tcp_conn_handler, ts); + return fd_listen(&ts->fhs, ts->fd, FD_READ, tcp_conn_handler, ts); } @@ -822,7 +842,7 @@ int tcp_accept(struct tcp_conn **tcp, struct tcp_sock *ts, tcp_estab_h *eh, tc->fdc = ts->fdc; ts->fdc = RE_BAD_SOCK; - err = fd_listen(tc->fdc, FD_READ | FD_WRITE | FD_EXCEPT, + err = fd_listen(&tc->fhs, tc->fdc, FD_READ | FD_WRITE | FD_EXCEPT, tcp_recv_handler, tc); if (err) { DEBUG_WARNING("accept: fd_listen(): %m\n", err); @@ -1097,7 +1117,7 @@ int tcp_conn_connect(struct tcp_conn *tc, const struct sa *peer) if (err) return err; - return fd_listen(tc->fdc, FD_READ | FD_WRITE | FD_EXCEPT, + return fd_listen(&tc->fhs, tc->fdc, FD_READ | FD_WRITE | FD_EXCEPT, tcp_recv_handler, tc); } @@ -1222,7 +1242,8 @@ int tcp_set_send(struct tcp_conn *tc, tcp_send_h *sendh) if (tc->sendq.head || !sendh) return 0; - return fd_listen(tc->fdc, FD_READ | FD_WRITE, tcp_recv_handler, tc); + return fd_listen(&tc->fhs, tc->fdc, FD_READ | FD_WRITE, + tcp_recv_handler, tc); } diff --git a/src/udp/udp.c b/src/udp/udp.c index 71096b6b3..496e7581d 100644 --- a/src/udp/udp.c +++ b/src/udp/udp.c @@ -65,6 +65,7 @@ struct udp_sock { udp_recv_h *rh; /**< Receive handler */ udp_error_h *eh; /**< Error handler */ void *arg; /**< Handler argument */ + struct re_fhs *fhs; re_sock_t fd; /**< Socket file descriptor */ bool conn; /**< Connected socket flag */ size_t rxsz; /**< Maximum receive chunk size */ @@ -133,7 +134,7 @@ static void udp_destructor(void *data) #endif if (RE_BAD_SOCK != us->fd) { - fd_close(us->fd); + us->fhs = fd_close(us->fhs); (void)close(us->fd); } } @@ -228,6 +229,9 @@ static int udp_alloc(struct udp_sock **usp) list_init(&us->helpers); + us->fhs = NULL; + us->fd = RE_BAD_SOCK; + err = mutex_alloc(&us->lock); if (err) { mem_deref(us); @@ -261,12 +265,13 @@ int udp_listen(struct udp_sock **usp, const struct sa *local, char serv[6] = "0"; int af, error, err = 0; + if (!usp) + return EINVAL; + err = udp_alloc(&us); if (err) return err; - us->fd = RE_BAD_SOCK; - if (local) { af = sa_af(local); (void)re_snprintf(addr, sizeof(addr), "%H", @@ -378,7 +383,6 @@ int udp_alloc_sockless(struct udp_sock **usp, if (err) return err; - us->fd = RE_BAD_SOCK; us->sendh = sendh; us->rh = recvh ? recvh : dummy_udp_recv_handler; us->arg = arg; @@ -396,7 +400,7 @@ int udp_alloc_fd(struct udp_sock **usp, re_sock_t fd, struct udp_sock *us; int err; - if (!usp || fd==RE_BAD_SOCK) + if (!usp || fd == RE_BAD_SOCK) return EINVAL; err = udp_alloc(&us); @@ -434,8 +438,6 @@ int udp_open(struct udp_sock **usp, int af) if (err) return err; - us->fd = RE_BAD_SOCK; - fd = socket(af, SOCK_DGRAM, IPPROTO_UDP); if (fd == RE_BAD_SOCK) { err = RE_ERRNO_SOCK; @@ -756,7 +758,8 @@ int udp_thread_attach(struct udp_sock *us) return EINVAL; if (RE_BAD_SOCK != us->fd) { - err = fd_listen(us->fd, FD_READ, udp_read_handler, us); + err = fd_listen(&us->fhs, us->fd, FD_READ, udp_read_handler, + us); if (err) goto out; } @@ -780,7 +783,7 @@ void udp_thread_detach(struct udp_sock *us) return; if (RE_BAD_SOCK != us->fd) - fd_close(us->fd); + us->fhs = fd_close(us->fhs); }