Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: refactor reactor/watcher implementation #6494

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include "src/common/libzmqutil/msg_zsock.h"
#include "src/common/libzmqutil/sockopt.h"
#include "src/common/libzmqutil/reactor.h"
#include "src/common/libzmqutil/zwatcher.h"
#include "src/common/libzmqutil/zap.h"
#include "src/common/libzmqutil/cert.h"
#include "src/common/libzmqutil/monitor.h"
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ flux_job_LDADD = \
$(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \
$(top_builddir)/src/common/libdebugged/libdebugged.la \
$(top_builddir)/src/common/libterminus/libterminus.la \
$(fluxcmd_ldadd)
$(fluxcmd_ldadd) \
$(top_builddir)/src/common/libflux/libflux.la

flux_exec_LDADD = \
$(top_builddir)/src/common/libsubprocess/libsubprocess.la \
$(fluxcmd_ldadd)
$(fluxcmd_ldadd) \
$(top_builddir)/src/common/libflux/libflux.la

flux_terminus_LDADD = \
$(top_builddir)/src/common/libterminus/libterminus.la \
Expand Down
7 changes: 5 additions & 2 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fluxcoreinclude_HEADERS = \
handle.h \
connector.h \
reactor.h \
watcher.h \
msg_handler.h \
message.h \
msglist.h \
Expand Down Expand Up @@ -62,6 +63,10 @@ libflux_la_SOURCES = \
connector_local.c \
reactor.c \
reactor_private.h \
watcher.c \
watcher_private.h \
watcher_wrap.c \
hwatcher.c \
msg_handler.c \
message.c \
message_private.h \
Expand All @@ -79,8 +84,6 @@ libflux_la_SOURCES = \
module.c \
conf_private.h \
conf.c \
ev_flux.h \
ev_flux.c \
control.c \
future.c \
composite_future.c \
Expand Down
107 changes: 0 additions & 107 deletions src/common/libflux/ev_flux.c

This file was deleted.

37 changes: 0 additions & 37 deletions src/common/libflux/ev_flux.h

This file was deleted.

1 change: 1 addition & 0 deletions src/common/libflux/flux.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "message.h"
#include "handle.h"
#include "reactor.h"
#include "watcher.h"
#include "msg_handler.h"
#include "connector.h"
#include "msglist.h"
Expand Down
154 changes: 154 additions & 0 deletions src/common/libflux/hwatcher.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* hwatcher.c - reactor watcher for flux_t handle */

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>

#include "src/common/libutil/errno_safe.h"

#include "watcher_private.h"

struct hwatcher {
flux_watcher_t *fd_w;
flux_watcher_t *prepare_w;
flux_watcher_t *idle_w;
flux_watcher_t *check_w;
flux_t *h;
int events;
};

static void hwatcher_start (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_start (hw->prepare_w);
flux_watcher_start (hw->check_w);
}

static void hwatcher_stop (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_stop (hw->prepare_w);
flux_watcher_stop (hw->check_w);
flux_watcher_stop (hw->fd_w);
flux_watcher_stop (hw->idle_w);
}

static bool hwatcher_is_active (flux_watcher_t *w)

Check warning on line 49 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L49

Added line #L49 was not covered by tests
{
struct hwatcher *hw = watcher_get_data (w);

Check warning on line 51 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L51

Added line #L51 was not covered by tests

return flux_watcher_is_active (hw->prepare_w);

Check warning on line 53 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L53

Added line #L53 was not covered by tests
}

static void hwatcher_destroy (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);
if (hw) {
flux_watcher_destroy (hw->prepare_w);
flux_watcher_destroy (hw->check_w);
flux_watcher_destroy (hw->fd_w);
flux_watcher_destroy (hw->idle_w);
}
}

static void hwatcher_prepare_cb (flux_reactor_t *r,
flux_watcher_t *prepare_w,
int prepare_revents,
void *arg)
{
flux_watcher_t *w = arg;
struct hwatcher *hw = watcher_get_data (w);
int hevents;

if ((hevents = flux_pollevents (hw->h)) < 0)
hevents = FLUX_POLLERR;

Check warning on line 77 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L77

Added line #L77 was not covered by tests

if ((hevents & hw->events))
flux_watcher_start (hw->idle_w);
else
flux_watcher_start (hw->fd_w);
}

static void hwatcher_check_cb (flux_reactor_t *r,
flux_watcher_t *check_w,
int check_revents,
void *arg)
{
flux_watcher_t *w = arg;
struct hwatcher *hw = watcher_get_data (w);
int hevents;
int revents;

flux_watcher_stop (hw->fd_w);
flux_watcher_stop (hw->idle_w);

if ((hevents = flux_pollevents (hw->h)) < 0)
hevents = FLUX_POLLERR;

Check warning on line 99 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L99

Added line #L99 was not covered by tests
revents = (hevents & hw->events);

if (revents)
watcher_call (w, revents);
}

static struct flux_watcher_ops hwatcher_ops = {
.start = hwatcher_start,
.stop = hwatcher_stop,
.is_active = hwatcher_is_active,
.destroy = hwatcher_destroy,
};

flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r,
flux_t *h,
int events,
flux_watcher_f cb,
void *arg)
{
struct hwatcher *hw;
flux_watcher_t *w;
if (!(w = watcher_create (r, sizeof (*hw), &hwatcher_ops, cb, arg)))
return NULL;
hw = watcher_get_data (w);
hw->events = events | FLUX_POLLERR;
hw->h = h;

if (!(hw->prepare_w = flux_prepare_watcher_create (r,
hwatcher_prepare_cb,
w))
|| !(hw->check_w = flux_check_watcher_create (r, hwatcher_check_cb, w))
|| !(hw->idle_w = flux_idle_watcher_create (r, NULL, NULL)))
goto error;

Check warning on line 132 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L132

Added line #L132 was not covered by tests

int fd;
if ((fd = flux_pollfd (h)) < 0
|| !(hw->fd_w = flux_fd_watcher_create (r, fd, FLUX_POLLIN, NULL, w)))
goto error;

Check warning on line 137 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L137

Added line #L137 was not covered by tests
return w;
error:
ERRNO_SAFE_WRAP (flux_watcher_destroy, w);
return NULL;

Check warning on line 141 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L139-L141

Added lines #L139 - L141 were not covered by tests
}

flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w)
{
if (watcher_get_ops (w) != &hwatcher_ops) {
errno = EINVAL;
return NULL;

Check warning on line 148 in src/common/libflux/hwatcher.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/hwatcher.c#L147-L148

Added lines #L147 - L148 were not covered by tests
}
struct hwatcher *hw = watcher_get_data (w);
return hw->h;
}

// vi:ts=4 sw=4 expandtab
Loading
Loading