diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 9b273a78696e..fae885cc524e 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -175,6 +175,7 @@ flux_start_LDADD = \ flux_job_LDADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/shell/libmpir.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libutil/libutil.la \ @@ -185,6 +186,7 @@ flux_job_LDADD = \ flux_exec_LDADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(fluxcmd_ldadd) flux_terminus_LDADD = \ diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index 074c7237cfea..f48e7c38620e 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -25,6 +25,7 @@ fluxcoreinclude_HEADERS = \ handle.h \ connector.h \ reactor.h \ + watcher.h \ msg_handler.h \ message.h \ msglist.h \ @@ -62,6 +63,9 @@ libflux_la_SOURCES = \ connector_local.c \ reactor.c \ reactor_private.h \ + watcher.c \ + watcher_private.h \ + watcher_wrap.c \ hwatcher.c \ msg_handler.c \ message.c \ diff --git a/src/common/libflux/flux.h b/src/common/libflux/flux.h index eb5ab5d9e8fd..acd2b1a28d55 100644 --- a/src/common/libflux/flux.h +++ b/src/common/libflux/flux.h @@ -15,6 +15,7 @@ #include "message.h" #include "handle.h" #include "reactor.h" +#include "watcher.h" #include "msg_handler.h" #include "connector.h" #include "msglist.h" diff --git a/src/common/libflux/hwatcher.c b/src/common/libflux/hwatcher.c index c68f1a9e1568..3c662a874896 100644 --- a/src/common/libflux/hwatcher.c +++ b/src/common/libflux/hwatcher.c @@ -17,7 +17,7 @@ #include "src/common/libutil/errno_safe.h" -#include "reactor_private.h" +#include "watcher_private.h" struct hwatcher { flux_watcher_t *fd_w; diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 4b398ce19520..29f2f11f2ebc 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -19,11 +19,15 @@ #include #include "src/common/libev/ev.h" -#include "src/common/libutil/log.h" -#include "src/common/libutil/fdutils.h" #include "reactor_private.h" +struct flux_reactor { + struct ev_loop *loop; + int usecount; + unsigned int errflag:1; +}; + static int valid_flags (int flags, int valid) { if ((flags & ~valid)) { @@ -136,635 +140,9 @@ void flux_reactor_active_decref (flux_reactor_t *r) ev_unref (r->loop); } -/** - ** Watchers - **/ - -void flux_watcher_set_priority (flux_watcher_t *w, int priority) -{ - if (w) { - if (w->ops->set_priority) - w->ops->set_priority (w, priority); - } -} - -void flux_watcher_start (flux_watcher_t *w) -{ - if (w) { - if (w->ops->start) - w->ops->start (w); - } -} - -void flux_watcher_stop (flux_watcher_t *w) -{ - if (w) { - if (w->ops->stop) - w->ops->stop (w); - } -} - -void flux_watcher_destroy (flux_watcher_t *w) -{ - if (w) { - if (w->ops->stop) - w->ops->stop (w); - if (w->ops->destroy) - w->ops->destroy (w); - flux_reactor_decref (w->r); - free (w); - } -} - -static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) -{ - flux_watcher_stop ((flux_watcher_t *)pw->data); - ev_prepare_stop (loop, pw); - free (pw); -} - -/* Stop a watcher in the next ev_prepare callback. To be used from periodics - * reschedule callback or other ev callbacks in which it is documented - * unsafe to modify the ev_loop or any watcher. - */ -static void watcher_stop_safe (flux_watcher_t *w) -{ - if (w) { - ev_prepare *pw = calloc (1, sizeof (*pw)); - if (!pw) /* On ENOMEM, we just have to give up */ - return; - ev_prepare_init (pw, safe_stop_cb); - pw->data = w; - ev_prepare_start (w->r->loop, pw); - } -} - -/* This is_active() callback works for "native" libev watchers, where - * w->data points to a struct ev_TYPE. - */ -static bool wrap_ev_active (flux_watcher_t *w) -{ - return ev_is_active (w->data); -} - -/* file descriptors - */ - -static void fd_start (flux_watcher_t *w) -{ - ev_io_start (w->r->loop, (ev_io *)w->data); -} - -static void fd_stop (flux_watcher_t *w) -{ - ev_io_stop (w->r->loop, (ev_io *)w->data); -} - -static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) -{ - struct flux_watcher *w = iow->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops fd_watcher = { - .start = fd_start, - .stop = fd_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, - int fd, - int events, - flux_watcher_f cb, - void *arg) -{ - ev_io *iow; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) - return NULL; - iow = watcher_get_data (w); - ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); - iow->data = w; - - return w; -} - -int flux_fd_watcher_get_fd (flux_watcher_t *w) -{ - assert (watcher_get_ops (w) == &fd_watcher); - ev_io *iow = w->data; - return iow->fd; -} - -/* Timer - */ - -static void timer_start (flux_watcher_t *w) -{ - ev_timer_start (w->r->loop, (ev_timer *)w->data); -} - -static void timer_stop (flux_watcher_t *w) -{ - ev_timer_stop (w->r->loop, (ev_timer *)w->data); -} - -static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) -{ - struct flux_watcher *w = tw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops timer_watcher = { - .start = timer_start, - .stop = timer_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, - double after, - double repeat, - flux_watcher_f cb, - void *arg) -{ - ev_timer *tw; - flux_watcher_t *w; - if (after < 0 || repeat < 0) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) - return NULL; - tw = watcher_get_data (w); - ev_timer_init (tw, timer_cb, after, repeat); - tw->data = w; - - return w; -} - -void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) -{ - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = w->data; - ev_timer_set (tw, after, repeat); -} - -void flux_timer_watcher_again (flux_watcher_t *w) -{ - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = w->data; - struct ev_loop *loop = w->r->loop; - ev_timer_again (loop, tw); -} - -/* Periodic - */ -struct f_periodic { - struct flux_watcher *w; - ev_periodic evp; - flux_reschedule_f reschedule_cb; -}; - -static void periodic_start (flux_watcher_t *w) -{ - struct f_periodic *fp = w->data; - ev_periodic_start (w->r->loop, &fp->evp); -} - -static void periodic_stop (flux_watcher_t *w) -{ - struct f_periodic *fp = w->data; - ev_periodic_stop (w->r->loop, &fp->evp); -} - -static bool periodic_is_active (flux_watcher_t *w) -{ - struct f_periodic *fp = w->data; - return ev_is_active (&fp->evp); -} - -static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) -{ - struct f_periodic *fp = pw->data; - struct flux_watcher *w = fp->w; - if (w->fn) - fp->w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) -{ - ev_tstamp rc; - struct f_periodic *fp = pw->data; - assert (fp->reschedule_cb != NULL); - rc = (ev_tstamp) fp->reschedule_cb (fp->w, (double) now, fp->w->arg); - if (rc < now) { - /* User reschedule cb returned time in the past. The watcher will - * be stopped, but not here (changing loop is not allowed in a - * libev reschedule cb. flux_watcher_stop_safe() will stop it in - * a prepare callback. - * Return time far in the future to ensure we aren't called again. - */ - watcher_stop_safe (fp->w); - return (now + 1e99); - } - return rc; -} - -static struct flux_watcher_ops periodic_watcher = { - .start = periodic_start, - .stop = periodic_stop, - .destroy = NULL, - .is_active = periodic_is_active, -}; - -flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, - double offset, - double interval, - flux_reschedule_f reschedule_cb, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - struct f_periodic *fp; - size_t size = sizeof (*fp); - if (offset < 0 || interval < 0) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) - return NULL; - fp = watcher_get_data (w); - fp->evp.data = fp; - fp->w = w; - fp->reschedule_cb = reschedule_cb; - - ev_periodic_init (&fp->evp, - periodic_cb, - offset, - interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - - return w; -} - -void flux_periodic_watcher_reset (flux_watcher_t *w, - double next, - double interval, - flux_reschedule_f reschedule_cb) -{ - struct f_periodic *fp = w->data; - struct ev_loop *loop = w->r->loop; - assert (watcher_get_ops (w) == &periodic_watcher); - fp->reschedule_cb = reschedule_cb; - ev_periodic_set (&fp->evp, - next, - interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - ev_periodic_again (loop, &fp->evp); -} - -double flux_watcher_next_wakeup (flux_watcher_t *w) -{ - if (watcher_get_ops (w) == &periodic_watcher) { - struct f_periodic *fp = w->data; - return ((double) ev_periodic_at (&fp->evp)); - } - else if (watcher_get_ops (w) == &timer_watcher) { - ev_timer *tw = w->data; - struct ev_loop *loop = w->r->loop; - return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); - } - errno = EINVAL; - return (-1.); -} - -/* Prepare - */ -static void prepare_start (flux_watcher_t *w) -{ - ev_prepare_start (w->r->loop, (ev_prepare *)w->data); -} - -static void prepare_stop (flux_watcher_t *w) -{ - ev_prepare_stop (w->r->loop, (ev_prepare *)w->data); -} - -static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) -{ - struct flux_watcher *w = pw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops prepare_watcher = { - .start = prepare_start, - .stop = prepare_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) +void *reactor_get_loop (flux_reactor_t *r) { - ev_prepare *pw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) - return NULL; - pw = watcher_get_data (w); - ev_prepare_init (pw, prepare_cb); - pw->data = w; - - return w; -} - -/* Check - */ - -static void check_set_priority (flux_watcher_t *w, int priority) -{ - ev_set_priority ((ev_check *)w->data, priority); -} - -static void check_start (flux_watcher_t *w) -{ - ev_check_start (w->r->loop, (ev_check *)w->data); -} - -static void check_stop (flux_watcher_t *w) -{ - ev_check_stop (w->r->loop, (ev_check *)w->data); -} - -static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) -{ - struct flux_watcher *w = cw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops check_watcher = { - .set_priority = check_set_priority, - .start = check_start, - .stop = check_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) -{ - ev_check *cw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) - return NULL; - cw = watcher_get_data (w); - ev_check_init (cw, check_cb); - cw->data = w; - - return w; -} - -/* Idle - */ - -static void idle_start (flux_watcher_t *w) -{ - ev_idle_start (w->r->loop, (ev_idle *)w->data); -} - -static void idle_stop (flux_watcher_t *w) -{ - ev_idle_stop (w->r->loop, (ev_idle *)w->data); -} - -static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) -{ - struct flux_watcher *w = iw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops idle_watcher = { - .start = idle_start, - .stop = idle_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) -{ - ev_idle *iw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) - return NULL; - iw = watcher_get_data (w); - ev_idle_init (iw, idle_cb); - iw->data = w; - - return w; -} - -/* Child - */ - -static void child_start (flux_watcher_t *w) -{ - ev_child_start (w->r->loop, (ev_child *)w->data); -} - -static void child_stop (flux_watcher_t *w) -{ - ev_child_stop (w->r->loop, (ev_child *)w->data); -} - -static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) -{ - struct flux_watcher *w = cw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops child_watcher = { - .start = child_start, - .stop = child_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - - -flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, - int pid, - bool trace, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_child *cw; - - if (!ev_is_default_loop (r->loop)) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) - return NULL; - cw = watcher_get_data (w); - ev_child_init (cw, child_cb, pid, trace ? 1 : 0); - cw->data = w; - - return w; -} - -int flux_child_watcher_get_rpid (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &child_watcher) { - errno = EINVAL; - return -1; - } - ev_child *cw = w->data; - return cw->rpid; -} - -int flux_child_watcher_get_rstatus (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &child_watcher) { - errno = EINVAL; - return -1; - } - ev_child *cw = w->data; - return cw->rstatus; -} - -/* Signal - */ - -static void signal_start (flux_watcher_t *w) -{ - ev_signal_start (w->r->loop, (ev_signal *)w->data); -} - -static void signal_stop (flux_watcher_t *w) -{ - ev_signal_stop (w->r->loop, (ev_signal *)w->data); -} - -static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) -{ - struct flux_watcher *w = sw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops signal_watcher = { - .start = signal_start, - .stop = signal_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, - int signum, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_signal *sw; - - if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) - return NULL; - sw = watcher_get_data (w); - ev_signal_init (sw, signal_cb, signum); - sw->data = w; - - return w; -} - -int flux_signal_watcher_get_signum (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &signal_watcher) { - errno = EINVAL; - return (-1); - } - ev_signal *sw = w->data; - return sw->signum; -} - -/* Stat - */ - -static void stat_start (flux_watcher_t *w) -{ - ev_stat_start (w->r->loop, (ev_stat *)w->data); -} - -static void stat_stop (flux_watcher_t *w) -{ - ev_stat_stop (w->r->loop, (ev_stat *)w->data); -} - -static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) -{ - struct flux_watcher *w = sw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops stat_watcher = { - .start = stat_start, - .stop = stat_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, - const char *path, - double interval, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_stat *sw; - - if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) - return NULL; - sw = watcher_get_data (w); - ev_stat_init (sw, stat_cb, path, interval); - sw->data = w; - - return w; -} - -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev) -{ - ev_stat *sw = w->data; - assert (watcher_get_ops (w) == &stat_watcher); - if (stat) - *stat = sw->attr; - if (prev) - *prev = sw->prev; -} - -bool flux_watcher_is_active (flux_watcher_t *w) -{ - if (w) { - if (w->ops->is_active) - return w->ops->is_active (w); - } - return false; + return r ? r->loop : NULL; } /* diff --git a/src/common/libflux/reactor.h b/src/common/libflux/reactor.h index 3c8105cbfde7..bce296edd74e 100644 --- a/src/common/libflux/reactor.h +++ b/src/common/libflux/reactor.h @@ -19,9 +19,6 @@ extern "C" { #endif -/* Reactor - */ - /* Flags for flux_reactor_run() */ enum { @@ -58,128 +55,6 @@ double flux_reactor_time (void); void flux_reactor_active_incref (flux_reactor_t *r); void flux_reactor_active_decref (flux_reactor_t *r); - -/* Watchers - */ - -typedef void (*flux_watcher_f)(flux_reactor_t *r, - flux_watcher_t *w, - int revents, - void *arg); - -/* Set the watcher priority. The range is [-2:2] (default 0). - * Higher priority watchers run first. - * This is a no-op if the underlying watcher doesn't support it. - * If the priority is out of range, the max or min value is set. - * The priority should only be set when the watcher is stopped. - * Currently only the check watcher supports it. - */ -void flux_watcher_set_priority (flux_watcher_t *w, int priority); - -void flux_watcher_start (flux_watcher_t *w); -void flux_watcher_stop (flux_watcher_t *w); -void flux_watcher_destroy (flux_watcher_t *w); -double flux_watcher_next_wakeup (flux_watcher_t *w); -bool flux_watcher_is_active (flux_watcher_t *w); - -/* flux_t handle - */ - -flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, - flux_t *h, - int events, - flux_watcher_f cb, - void *arg); -flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w); - -/* file descriptor - */ - -flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, - int fd, int events, - flux_watcher_f cb, - void *arg); -int flux_fd_watcher_get_fd (flux_watcher_t *w); - -/* timer - */ - -flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, - double after, - double repeat, - flux_watcher_f cb, - void *arg); - -void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat); - -void flux_timer_watcher_again (flux_watcher_t *w); - -/* periodic - */ - -typedef double (*flux_reschedule_f) (flux_watcher_t *w, double now, void *arg); - -flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, - double offset, - double interval, - flux_reschedule_f reschedule_cb, - flux_watcher_f cb, - void *arg); - -void flux_periodic_watcher_reset (flux_watcher_t *w, - double next_wakeup, - double interval, - flux_reschedule_f reschedule_cb); - - -/* prepare/check/idle - */ - -flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -/* child - */ - -flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, - int pid, - bool trace, - flux_watcher_f cb, - void *arg); -int flux_child_watcher_get_rpid (flux_watcher_t *w); -int flux_child_watcher_get_rstatus (flux_watcher_t *w); - -/* signal - */ - -flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, - int signum, - flux_watcher_f cb, - void *arg); - -int flux_signal_watcher_get_signum (flux_watcher_t *w); - -/* stat - */ - -flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, - const char *path, - double interval, - flux_watcher_f cb, - void *arg); -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); - #ifdef __cplusplus } #endif diff --git a/src/common/libflux/reactor_private.h b/src/common/libflux/reactor_private.h index e89cfc4a8666..10711f710e50 100644 --- a/src/common/libflux/reactor_private.h +++ b/src/common/libflux/reactor_private.h @@ -11,103 +11,10 @@ #ifndef _FLUX_CORE_REACTOR_PRIVATE_H #define _FLUX_CORE_REACTOR_PRIVATE_H -#include "src/common/libev/ev.h" #include "reactor.h" -struct flux_watcher_ops { - void (*set_priority) (flux_watcher_t *w, int priority); - void (*start) (flux_watcher_t *w); - void (*stop) (flux_watcher_t *w); - void (*destroy) (flux_watcher_t *w); - bool (*is_active) (flux_watcher_t *w); -}; - -struct flux_reactor { - struct ev_loop *loop; - int usecount; - unsigned int errflag:1; -}; - -struct flux_watcher { - flux_reactor_t *r; - flux_watcher_f fn; - void *arg; - struct flux_watcher_ops *ops; - void *data; -}; - -static inline int events_to_libev (int events) -{ - int e = 0; - if (events & FLUX_POLLIN) - e |= EV_READ; - if (events & FLUX_POLLOUT) - e |= EV_WRITE; - if (events & FLUX_POLLERR) - e |= EV_ERROR; - return e; -} - -static inline int libev_to_events (int events) -{ - int e = 0; - if (events & EV_READ) - e |= FLUX_POLLIN; - if (events & EV_WRITE) - e |= FLUX_POLLOUT; - if (events & EV_ERROR) - e |= FLUX_POLLERR; - return e; -} - -/* Create a custom watcher on reactor 'r' with 'data_size' bytes reserved - * for the implementor, implementation operations in 'ops' and user - * watcher callback and data 'fn' and 'arg'. - * - * Caller retrieves pointer to allocated implementation data with - * flux_watcher_data (w). - */ -static inline flux_watcher_t *watcher_create (flux_reactor_t *r, - size_t data_size, - struct flux_watcher_ops *ops, - flux_watcher_f fn, - void *arg) -{ - struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); - if (!w) - return NULL; - w->r = r; - w->ops = ops; - w->data = w + 1; - w->fn = fn; - w->arg = arg; - flux_reactor_incref (r); - return w; -} - -/* Return pointer to implementation data reserved by watcher object 'w'. - */ -static inline void *watcher_get_data (flux_watcher_t *w) -{ - if (w) - return w->data; - return NULL; -} - -/* Return pointer to flux_watcher_ops structure for this watcher. - */ -static inline struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w) -{ - if (w) - return w->ops; - return NULL; -} - -static inline void watcher_call (flux_watcher_t *w, int revents) -{ - if (w->fn) - w->fn (w->r, w, revents, w->arg); -} +/* retrieve underlying loop implementation - for watcher_wrap.c only */ +void *reactor_get_loop (flux_reactor_t *r); #endif /* !_FLUX_CORE_REACTOR_PRIVATE_H */ diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c new file mode 100644 index 000000000000..e6bc66c2454e --- /dev/null +++ b/src/common/libflux/watcher.c @@ -0,0 +1,123 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "reactor_private.h" +#include "watcher_private.h" + +struct flux_watcher { + flux_reactor_t *r; + flux_watcher_f fn; + void *arg; + struct flux_watcher_ops *ops; + void *data; +}; + +flux_watcher_t *watcher_create (flux_reactor_t *r, + size_t data_size, + struct flux_watcher_ops *ops, + flux_watcher_f fn, + void *arg) +{ + struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); + if (!w) + return NULL; + w->r = r; + w->ops = ops; + w->data = w + 1; + w->fn = fn; + w->arg = arg; + flux_reactor_incref (r); + return w; +} + +void *watcher_get_data (flux_watcher_t *w) +{ + if (w) + return w->data; + return NULL; +} + +struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w) +{ + if (w) + return w->ops; + return NULL; +} + +void watcher_call (flux_watcher_t *w, int revents) +{ + if (w->fn) + w->fn (w->r, w, revents, w->arg); +} + +void *watcher_get_arg (flux_watcher_t *w) +{ + if (w) + return w->arg; + return NULL; +} + +flux_reactor_t *watcher_get_reactor (flux_watcher_t *w) +{ + return w ? w->r : NULL; +} + +void flux_watcher_set_priority (flux_watcher_t *w, int priority) +{ + if (w) { + if (w->ops->set_priority) + w->ops->set_priority (w, priority); + } +} + +void flux_watcher_start (flux_watcher_t *w) +{ + if (w) { + if (w->ops->start) + w->ops->start (w); + } +} + +void flux_watcher_stop (flux_watcher_t *w) +{ + if (w) { + if (w->ops->stop) + w->ops->stop (w); + } +} + +bool flux_watcher_is_active (flux_watcher_t *w) +{ + if (w) { + if (w->ops->is_active) + return w->ops->is_active (w); + } + return false; +} + +void flux_watcher_destroy (flux_watcher_t *w) +{ + if (w) { + if (w->ops->stop) + w->ops->stop (w); + if (w->ops->destroy) + w->ops->destroy (w); + flux_reactor_decref (w->r); + free (w); + } +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libflux/watcher.h b/src/common/libflux/watcher.h new file mode 100644 index 000000000000..0ebe83842fe6 --- /dev/null +++ b/src/common/libflux/watcher.h @@ -0,0 +1,146 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_WATCHER_H +#define _FLUX_CORE_WATCHER_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*flux_watcher_f)(flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg); + +/* Set the watcher priority. The range is [-2:2] (default 0). + * Higher priority watchers run first. + * This is a no-op if the underlying watcher doesn't support it. + * If the priority is out of range, the max or min value is set. + * The priority should only be set when the watcher is stopped. + * Currently only the check watcher supports it. + */ +void flux_watcher_set_priority (flux_watcher_t *w, int priority); + +void flux_watcher_start (flux_watcher_t *w); +void flux_watcher_stop (flux_watcher_t *w); +void flux_watcher_destroy (flux_watcher_t *w); +double flux_watcher_next_wakeup (flux_watcher_t *w); +bool flux_watcher_is_active (flux_watcher_t *w); + +/* flux_t handle + */ + +flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, + flux_t *h, + int events, + flux_watcher_f cb, + void *arg); +flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w); + +/* file descriptor + */ + +flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, + int fd, int events, + flux_watcher_f cb, + void *arg); +int flux_fd_watcher_get_fd (flux_watcher_t *w); + +/* timer + */ + +flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, + double after, + double repeat, + flux_watcher_f cb, + void *arg); + +void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat); + +void flux_timer_watcher_again (flux_watcher_t *w); + +/* periodic + */ + +typedef double (*flux_reschedule_f) (flux_watcher_t *w, double now, void *arg); + +flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, + double offset, + double interval, + flux_reschedule_f reschedule_cb, + flux_watcher_f cb, + void *arg); + +void flux_periodic_watcher_reset (flux_watcher_t *w, + double next_wakeup, + double interval, + flux_reschedule_f reschedule_cb); + + +/* prepare/check/idle + */ + +flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +/* child + */ + +flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, + int pid, + bool trace, + flux_watcher_f cb, + void *arg); +int flux_child_watcher_get_rpid (flux_watcher_t *w); +int flux_child_watcher_get_rstatus (flux_watcher_t *w); + +/* signal + */ + +flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, + int signum, + flux_watcher_f cb, + void *arg); + +int flux_signal_watcher_get_signum (flux_watcher_t *w); + +/* stat + */ + +flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, + const char *path, + double interval, + flux_watcher_f cb, + void *arg); +void flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); + +#ifdef __cplusplus +} +#endif + +#endif /* !_FLUX_CORE_WATCHER_H */ + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libflux/watcher_private.h b/src/common/libflux/watcher_private.h new file mode 100644 index 000000000000..fae4c229bfda --- /dev/null +++ b/src/common/libflux/watcher_private.h @@ -0,0 +1,57 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* private interfaces for "subclassing" watcher.c */ + +#ifndef _FLUX_CORE_WATCHER_PRIVATE_H +#define _FLUX_CORE_WATCHER_PRIVATE_H + +#include "reactor.h" + +struct flux_watcher_ops { + void (*set_priority) (flux_watcher_t *w, int priority); + void (*start) (flux_watcher_t *w); + void (*stop) (flux_watcher_t *w); + void (*destroy) (flux_watcher_t *w); + bool (*is_active) (flux_watcher_t *w); +}; + +/* Create a custom watcher on reactor 'r' with 'data_size' bytes reserved + * for the implementor, implementation operations in 'ops' and user + * watcher callback and data 'fn' and 'arg'. + * + * Caller retrieves pointer to allocated implementation data with + * flux_watcher_data (w). + */ +flux_watcher_t *watcher_create (flux_reactor_t *r, + size_t data_size, + struct flux_watcher_ops *ops, + flux_watcher_f fn, + void *arg); + +/* Return pointer to implementation data reserved by watcher object 'w'. + */ +void *watcher_get_data (flux_watcher_t *w); + +/* Return pointer to flux_watcher_ops structure for this watcher. + */ +struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w); + +void watcher_call (flux_watcher_t *w, int revents); + +flux_reactor_t *watcher_get_reactor (flux_watcher_t *w); + +void *watcher_get_arg (flux_watcher_t *w); + +#endif /* !_FLUX_CORE_WATCHER_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c new file mode 100644 index 000000000000..42299c6a276f --- /dev/null +++ b/src/common/libflux/watcher_wrap.c @@ -0,0 +1,670 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* watcher_wrap.c - wrapped libev watchers */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libev/ev.h" + +#include "reactor_private.h" +#include "watcher_private.h" + +static inline int events_to_libev (int events) +{ + int e = 0; + if (events & FLUX_POLLIN) + e |= EV_READ; + if (events & FLUX_POLLOUT) + e |= EV_WRITE; + if (events & FLUX_POLLERR) + e |= EV_ERROR; + return e; +} + +static inline int libev_to_events (int events) +{ + int e = 0; + if (events & EV_READ) + e |= FLUX_POLLIN; + if (events & EV_WRITE) + e |= FLUX_POLLOUT; + if (events & EV_ERROR) + e |= FLUX_POLLERR; + return e; +} + +static void watcher_call_ev (flux_watcher_t *w, int revents) +{ + watcher_call (w, libev_to_events (revents)); +} + +static struct ev_loop *watcher_get_ev (flux_watcher_t *w) +{ + return reactor_get_loop (watcher_get_reactor (w)); +} + +static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +{ + flux_watcher_stop ((flux_watcher_t *)pw->data); + ev_prepare_stop (loop, pw); + free (pw); +} + +/* Stop a watcher in the next ev_prepare callback. To be used from periodics + * reschedule callback or other ev callbacks in which it is documented + * unsafe to modify the ev_loop or any watcher. + */ +static void watcher_stop_safe (flux_watcher_t *w) +{ + if (w) { + ev_prepare *pw = calloc (1, sizeof (*pw)); + if (!pw) /* On ENOMEM, we just have to give up */ + return; + ev_prepare_init (pw, safe_stop_cb); + pw->data = w; + ev_prepare_start (watcher_get_ev (w), pw); + } +} + +/* This is_active() callback works for "native" libev watchers, where + * watcher data points to a struct ev_TYPE. + */ +static bool wrap_ev_active (flux_watcher_t *w) +{ + return ev_is_active (watcher_get_data (w)); +} + +/* file descriptors + */ + +static void fd_start (flux_watcher_t *w) +{ + ev_io *iow = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + ev_io_start (loop, iow); +} + +static void fd_stop (flux_watcher_t *w) +{ + ev_io *iow = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + ev_io_stop (loop, iow); +} + +static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) +{ + struct flux_watcher *w = iow->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops fd_watcher = { + .start = fd_start, + .stop = fd_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, + int fd, + int events, + flux_watcher_f cb, + void *arg) +{ + ev_io *iow; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) + return NULL; + iow = watcher_get_data (w); + ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); + iow->data = w; + + return w; +} + +int flux_fd_watcher_get_fd (flux_watcher_t *w) +{ + assert (watcher_get_ops (w) == &fd_watcher); + ev_io *iow = watcher_get_data (w); + return iow->fd; +} + +/* Timer + */ + +static void timer_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct ev_timer *tw = watcher_get_data (w); + ev_timer_start (loop, tw); +} + +static void timer_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct ev_timer *tw = watcher_get_data (w); + ev_timer_stop (loop, tw); +} + +static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) +{ + struct flux_watcher *w = tw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops timer_watcher = { + .start = timer_start, + .stop = timer_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, + double after, + double repeat, + flux_watcher_f cb, + void *arg) +{ + ev_timer *tw; + flux_watcher_t *w; + if (after < 0 || repeat < 0) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) + return NULL; + tw = watcher_get_data (w); + ev_timer_init (tw, timer_cb, after, repeat); + tw->data = w; + + return w; +} + +void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) +{ + assert (watcher_get_ops (w) == &timer_watcher); + ev_timer *tw = watcher_get_data (w); + ev_timer_set (tw, after, repeat); +} + +void flux_timer_watcher_again (flux_watcher_t *w) +{ + assert (watcher_get_ops (w) == &timer_watcher); + struct ev_loop *loop = watcher_get_ev (w); + ev_timer *tw = watcher_get_data (w); + ev_timer_again (loop, tw); +} + +/* Periodic + */ +struct f_periodic { + struct flux_watcher *w; + ev_periodic evp; + flux_reschedule_f reschedule_cb; +}; + +static void periodic_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + ev_periodic_start (loop, &fp->evp); +} + +static void periodic_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + ev_periodic_stop (loop, &fp->evp); +} + +static bool periodic_is_active (flux_watcher_t *w) +{ + struct f_periodic *fp = watcher_get_data (w); + return ev_is_active (&fp->evp); +} + +static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) +{ + struct f_periodic *fp = pw->data; + struct flux_watcher *w = fp->w; + watcher_call_ev (w, revents); +} + +static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) +{ + ev_tstamp rc; + struct f_periodic *fp = pw->data; + assert (fp->reschedule_cb != NULL); + rc = (ev_tstamp)fp->reschedule_cb (fp->w, + (double)now, + watcher_get_arg (fp->w)); + if (rc < now) { + /* User reschedule cb returned time in the past. The watcher will + * be stopped, but not here (changing loop is not allowed in a + * libev reschedule cb. flux_watcher_stop_safe() will stop it in + * a prepare callback. + * Return time far in the future to ensure we aren't called again. + */ + watcher_stop_safe (fp->w); + return (now + 1e99); + } + return rc; +} + +static struct flux_watcher_ops periodic_watcher = { + .start = periodic_start, + .stop = periodic_stop, + .destroy = NULL, + .is_active = periodic_is_active, +}; + +flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, + double offset, + double interval, + flux_reschedule_f reschedule_cb, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + struct f_periodic *fp; + size_t size = sizeof (*fp); + if (offset < 0 || interval < 0) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) + return NULL; + fp = watcher_get_data (w); + fp->evp.data = fp; + fp->w = w; + fp->reschedule_cb = reschedule_cb; + + ev_periodic_init (&fp->evp, + periodic_cb, + offset, + interval, + reschedule_cb ? periodic_reschedule_cb : NULL); + + return w; +} + +void flux_periodic_watcher_reset (flux_watcher_t *w, + double next, + double interval, + flux_reschedule_f reschedule_cb) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + assert (watcher_get_ops (w) == &periodic_watcher); + fp->reschedule_cb = reschedule_cb; + ev_periodic_set (&fp->evp, + next, + interval, + reschedule_cb ? periodic_reschedule_cb : NULL); + ev_periodic_again (loop, &fp->evp); +} + +double flux_watcher_next_wakeup (flux_watcher_t *w) +{ + if (watcher_get_ops (w) == &periodic_watcher) { + struct f_periodic *fp = watcher_get_data (w); + return ((double) ev_periodic_at (&fp->evp)); + } + else if (watcher_get_ops (w) == &timer_watcher) { + ev_timer *tw = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); + } + errno = EINVAL; + return (-1.); +} + +/* Prepare + */ +static void prepare_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_prepare *pw = watcher_get_data (w); + ev_prepare_start (loop, pw); +} + +static void prepare_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_prepare *pw = watcher_get_data (w); + ev_prepare_stop (loop, pw); +} + +static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +{ + struct flux_watcher *w = pw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops prepare_watcher = { + .start = prepare_start, + .stop = prepare_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_prepare *pw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) + return NULL; + pw = watcher_get_data (w); + ev_prepare_init (pw, prepare_cb); + pw->data = w; + + return w; +} + +/* Check + */ + +static void check_set_priority (flux_watcher_t *w, int priority) +{ + ev_check *cw = watcher_get_data (w); + ev_set_priority (cw, priority); +} + +static void check_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_check *cw = watcher_get_data (w); + ev_check_start (loop, cw); +} + +static void check_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_check *cw = watcher_get_data (w); + ev_check_stop (loop, cw); +} + +static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) +{ + struct flux_watcher *w = cw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops check_watcher = { + .set_priority = check_set_priority, + .start = check_start, + .stop = check_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_check *cw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) + return NULL; + cw = watcher_get_data (w); + ev_check_init (cw, check_cb); + cw->data = w; + + return w; +} + +/* Idle + */ + +static void idle_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_idle *iw = watcher_get_data (w); + ev_idle_start (loop, iw); +} + +static void idle_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_idle *iw = watcher_get_data (w); + ev_idle_stop (loop, iw); +} + +static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) +{ + struct flux_watcher *w = iw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops idle_watcher = { + .start = idle_start, + .stop = idle_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_idle *iw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) + return NULL; + iw = watcher_get_data (w); + ev_idle_init (iw, idle_cb); + iw->data = w; + + return w; +} + +/* Child + */ + +static void child_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_child *cw = watcher_get_data (w); + ev_child_start (loop, cw); +} + +static void child_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_child *cw = watcher_get_data (w); + ev_child_stop (loop, cw); +} + +static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) +{ + struct flux_watcher *w = cw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops child_watcher = { + .start = child_start, + .stop = child_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + + +flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, + int pid, + bool trace, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_child *cw; + + if (!ev_is_default_loop (reactor_get_loop (r))) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) + return NULL; + cw = watcher_get_data (w); + ev_child_init (cw, child_cb, pid, trace ? 1 : 0); + cw->data = w; + + return w; +} + +int flux_child_watcher_get_rpid (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &child_watcher) { + errno = EINVAL; + return -1; + } + ev_child *cw = watcher_get_data (w); + return cw->rpid; +} + +int flux_child_watcher_get_rstatus (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &child_watcher) { + errno = EINVAL; + return -1; + } + ev_child *cw = watcher_get_data (w); + return cw->rstatus; +} + +/* Signal + */ + +static void signal_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_signal *sw = watcher_get_data (w); + ev_signal_start (loop, sw); +} + +static void signal_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_signal *sw = watcher_get_data (w); + ev_signal_stop (loop, sw); +} + +static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) +{ + struct flux_watcher *w = sw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops signal_watcher = { + .start = signal_start, + .stop = signal_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, + int signum, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_signal *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) + return NULL; + sw = watcher_get_data (w); + ev_signal_init (sw, signal_cb, signum); + sw->data = w; + + return w; +} + +int flux_signal_watcher_get_signum (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &signal_watcher) { + errno = EINVAL; + return (-1); + } + ev_signal *sw = watcher_get_data (w); + return sw->signum; +} + +/* Stat + */ + +static void stat_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_stat *sw = watcher_get_data (w); + ev_stat_start (loop, sw); +} + +static void stat_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_stat *sw = watcher_get_data (w); + ev_stat_stop (loop, sw); +} + +static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) +{ + struct flux_watcher *w = sw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops stat_watcher = { + .start = stat_start, + .stop = stat_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, + const char *path, + double interval, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_stat *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) + return NULL; + sw = watcher_get_data (w); + ev_stat_init (sw, stat_cb, path, interval); + sw->data = w; + + return w; +} + +void flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev) +{ + ev_stat *sw = watcher_get_data (w); + assert (watcher_get_ops (w) == &stat_watcher); + if (stat) + *stat = sw->attr; + if (prev) + *prev = sw->prev; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libsubprocess/Makefile.am b/src/common/libsubprocess/Makefile.am index 1e3ae4318e63..de256e12f1d4 100644 --- a/src/common/libsubprocess/Makefile.am +++ b/src/common/libsubprocess/Makefile.am @@ -79,6 +79,7 @@ test_ldadd = \ $(top_builddir)/src/common/libtestutil/libtestutil.la \ $(top_builddir)/src/common/libtap/libtap.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la diff --git a/src/common/libsubprocess/fbuf_watcher.c b/src/common/libsubprocess/fbuf_watcher.c index 2443248d4a7c..4da92f7e1bd8 100644 --- a/src/common/libsubprocess/fbuf_watcher.c +++ b/src/common/libsubprocess/fbuf_watcher.c @@ -20,7 +20,7 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/fdutils.h" #include "src/common/libutil/errno_safe.h" -#include "src/common/libflux/reactor_private.h" +#include "src/common/libflux/watcher_private.h" #include "fbuf_watcher.h" #include "fbuf.h" diff --git a/src/common/libzmqutil/zwatcher.c b/src/common/libzmqutil/zwatcher.c index d9451302a160..c3dd62cdb481 100644 --- a/src/common/libzmqutil/zwatcher.c +++ b/src/common/libzmqutil/zwatcher.c @@ -38,7 +38,7 @@ #include #include -#include "src/common/libflux/reactor_private.h" +#include "src/common/libflux/watcher_private.h" #include "src/common/libutil/errno_safe.h" #include "zwatcher.h" diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index f1a1d7bd3e20..9ce77c97ebeb 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -142,6 +142,7 @@ job_exec_la_LIBADD = \ $(builddir)/job-exec/libjob-exec.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ $(JANSSON_LIBS) @@ -206,6 +207,7 @@ job_manager_la_LIBADD = \ $(builddir)/job-manager/libjob-manager.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-optparse.la \ @@ -282,6 +284,7 @@ sdexec_la_LDFLAGS = $(fluxmod_ldflags) -module sdbus_la_SOURCES = sdbus_la_LIBADD = \ $(builddir)/sdbus/libsdbus.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libflux-core.la \ $(top_builddir)/src/common/libflux-internal.la \ $(JANSSON_LIBS) diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index cf7d8923ccc7..7b2047436c3d 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -111,6 +111,7 @@ plugins_perilog_la_SOURCES = \ plugins/perilog.c plugins_perilog_la_LIBADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la @@ -131,6 +132,7 @@ test_ldadd = \ libjob-manager.la \ $(top_builddir)/src/common/libtap/libtap.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libflux-core.la \ diff --git a/src/modules/sdbus/watcher.c b/src/modules/sdbus/watcher.c index 940e9bdfe5e2..022dc35fd568 100644 --- a/src/modules/sdbus/watcher.c +++ b/src/modules/sdbus/watcher.c @@ -21,7 +21,7 @@ #include #include -#include "src/common/libflux/reactor_private.h" +#include "src/common/libflux/watcher_private.h" #include "watcher.h" diff --git a/src/shell/Makefile.am b/src/shell/Makefile.am index 57c51d26cbd6..f3c2ead366b4 100644 --- a/src/shell/Makefile.am +++ b/src/shell/Makefile.am @@ -116,6 +116,7 @@ flux_shell_LDADD = \ $(top_builddir)/src/common/libflux-taskmap.la \ $(top_builddir)/src/common/libflux-idset.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(top_builddir)/src/common/libpmi/libpmi_server.la \ $(top_builddir)/src/common/libpmi/libpmi_common.la \ $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ diff --git a/t/Makefile.am b/t/Makefile.am index 2c05706d82e1..5a6e1636e310 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -749,6 +749,7 @@ reactor_reactorcat_CPPFLAGS = $(test_cppflags) reactor_reactorcat_LDADD = $(test_ldadd) reactor_reactorcat_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(test_ldflags) rexec_rexec_SOURCES = rexec/rexec.c @@ -756,6 +757,7 @@ rexec_rexec_CPPFLAGS = $(test_cppflags) rexec_rexec_LDADD = $(test_ldadd) rexec_rexec_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(test_ldflags) rexec_rexec_count_stdout_SOURCES = rexec/rexec_count_stdout.c @@ -768,6 +770,7 @@ rexec_rexec_getline_CPPFLAGS = $(test_cppflags) rexec_rexec_getline_LDADD = $(test_ldadd) rexec_rexec_getline_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/watcher.lo \ $(test_ldflags) ingest_job_manager_la_SOURCES = ingest/job-manager.c