Skip to content

Commit

Permalink
Merge pull request #5486 from garlick/loop_builtin
Browse files Browse the repository at this point in the history
make the loop:// connector built-in rather than a DSO
  • Loading branch information
mergify[bot] authored Oct 9, 2023
2 parents 7cb4155 + adc1bf5 commit e7e40ba
Show file tree
Hide file tree
Showing 30 changed files with 225 additions and 277 deletions.
5 changes: 2 additions & 3 deletions src/broker/test/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "src/common/libzmqutil/sockopt.h"
#include "src/common/libzmqutil/cert.h"
#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libtestutil/util.h"
#include "src/common/libutil/stdlog.h"
#include "src/common/libutil/unlink_recursive.h"
#include "ccan/str/str.h"
Expand Down Expand Up @@ -709,8 +708,8 @@ int main (int argc, char *argv[])

if (!(logs = zlist_new ()))
BAIL_OUT ("zlist_new failed");
if (!(h = loopback_create (0)))
BAIL_OUT ("loopback_create failed");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("could not create loop handle");
if (flux_attr_set_cacheonly (h, "rank", "0") < 0)
BAIL_OUT ("flux_attr_set_cacheonly rank failed");
if (flux_attr_set_cacheonly (h, "hostlist", "test") < 0)
Expand Down
5 changes: 2 additions & 3 deletions src/broker/test/runat.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include "src/common/libtap/tap.h"
#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libtestutil/util.h"
#include "src/common/libutil/stdlog.h"

#include "src/broker/runat.h"
Expand Down Expand Up @@ -361,8 +360,8 @@ int main (int argc, char *argv[])
BAIL_OUT ("zlist_new failed");
if (!(r = flux_reactor_create (FLUX_REACTOR_SIGCHLD)))
BAIL_OUT ("flux_reactor_create failed");
if (!(h = loopback_create (0)))
BAIL_OUT ("loopback_create failed");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("could not create loop handle");
if (flux_set_reactor (h, r) < 0)
BAIL_OUT ("flux_set_reactor failed");
if (flux_attr_set_cacheonly (h, "rank", "0") < 0)
Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ libflux_la_SOURCES = \
flog.c \
attr.c \
handle.c \
connector_loop.c \
reactor.c \
reactor_private.h \
msg_handler.c \
Expand Down
16 changes: 11 additions & 5 deletions src/common/libflux/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ typedef flux_t *(connector_init_f)(const char *uri,
flux_error_t *errp);

struct flux_handle_ops {
int (*setopt)(void *impl, const char *option,
const void *val, size_t len);
int (*getopt)(void *impl, const char *option,
void *val, size_t len);
int (*setopt)(void *impl,
const char *option,
const void *val,
size_t len);
int (*getopt)(void *impl,
const char *option,
void *val,
size_t len);
int (*pollfd)(void *impl);
int (*pollevents)(void *impl);
int (*send)(void *impl, const flux_msg_t *msg, int flags);
Expand All @@ -37,7 +41,9 @@ struct flux_handle_ops {
void (*impl_destroy)(void *impl);
};

flux_t *flux_handle_create (void *impl, const struct flux_handle_ops *ops, int flags);
flux_t *flux_handle_create (void *impl,
const struct flux_handle_ops *ops,
int flags);
void flux_handle_destroy (flux_t *hp);

#ifdef __cplusplus
Expand Down
58 changes: 55 additions & 3 deletions src/connectors/loop/loop.c → src/common/libflux/connector_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <flux/core.h>

#include "ccan/str/str.h"

typedef struct {
flux_t *h;

Expand Down Expand Up @@ -87,6 +89,56 @@ static flux_msg_t *op_recv (void *impl, int flags)
return msg;
}

static int op_getopt (void *impl, const char *option, void *val, size_t size)
{
loop_ctx_t *c = impl;

if (streq (option, FLUX_OPT_TESTING_USERID)) {
if (size != sizeof (c->cred.userid) || !val)
goto error;
memcpy (val, &c->cred.userid, size);
}
else if (streq (option, FLUX_OPT_TESTING_ROLEMASK)) {
if (size != sizeof (c->cred.rolemask) || !val)
goto error;
memcpy (val, &c->cred.rolemask, size);
}
else
goto error;
return 0;
error:
errno = EINVAL;
return -1;
}

static int op_setopt (void *impl,
const char *option,
const void *val,
size_t size)
{
loop_ctx_t *c = impl;
size_t val_size;

if (streq (option, FLUX_OPT_TESTING_USERID)) {
val_size = sizeof (c->cred.userid);
if (size != val_size || !val)
goto error;
memcpy (&c->cred.userid, val, val_size);
}
else if (streq (option, FLUX_OPT_TESTING_ROLEMASK)) {
val_size = sizeof (c->cred.rolemask);
if (size != val_size || !val)
goto error;
memcpy (&c->cred.rolemask, val, val_size);
}
else
goto error;
return 0;
error:
errno = EINVAL;
return -1;
}

static void op_fini (void *impl)
{
loop_ctx_t *c = impl;
Expand All @@ -95,7 +147,7 @@ static void op_fini (void *impl)
free (c);
}

flux_t *connector_init (const char *path, int flags, flux_error_t *errp)
flux_t *connector_loop_init (const char *path, int flags, flux_error_t *errp)
{
loop_ctx_t *c = malloc (sizeof (*c));
if (!c) {
Expand Down Expand Up @@ -129,8 +181,8 @@ static const struct flux_handle_ops handle_ops = {
.pollevents = op_pollevents,
.send = op_send,
.recv = op_recv,
.getopt = NULL,
.setopt = NULL,
.getopt = op_getopt,
.setopt = op_setopt,
.impl_destroy = op_fini,
};

Expand Down
59 changes: 48 additions & 11 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "src/common/libutil/errno_safe.h"
#include "src/common/libutil/monotime.h"
#include "src/common/libutil/errprintf.h"
#include "ccan/array_size/array_size.h"
#include "ccan/str/str.h"

#include "handle.h"
#include "reactor.h"
Expand Down Expand Up @@ -60,7 +62,7 @@ struct profiling_context {
};
#endif

struct flux_handle_struct {
struct flux_handle {
flux_t *parent; // if FLUX_O_CLONE, my parent
struct aux_item *aux;
int usecount;
Expand All @@ -85,6 +87,17 @@ struct flux_handle_struct {
struct rpc_track *tracker;
};

struct builtin_connector {
const char *scheme;
connector_init_f *init;
};

flux_t *connector_loop_init (const char *uri, int flags, flux_error_t *errp);

static struct builtin_connector builtin_connectors[] = {
{ "loop", &connector_loop_init },
};

static void handle_trace (flux_t *h, const char *fmt, ...)
__attribute__ ((format (printf, 2, 3)));

Expand Down Expand Up @@ -208,6 +221,14 @@ static void profiling_msg_snapshot (flux_t *h,

#endif

static connector_init_f *find_connector_builtin (const char *scheme)
{
for (int i = 0; i < ARRAY_SIZE (builtin_connectors); i++)
if (streq (scheme, builtin_connectors[i].scheme))
return builtin_connectors[i].init;
return NULL;
}

static char *find_file (const char *name, const char *searchpath)
{
char *path;
Expand All @@ -224,9 +245,9 @@ static char *find_file (const char *name, const char *searchpath)
return path;
}

static connector_init_f *find_connector (const char *scheme,
void **dsop,
flux_error_t *errp)
static connector_init_f *find_connector_dso (const char *scheme,
void **dsop,
flux_error_t *errp)
{
char name[PATH_MAX];
const char *searchpath = getenv ("FLUX_CONNECTOR_PATH");
Expand Down Expand Up @@ -316,14 +337,16 @@ flux_t *flux_open_ex (const char *uri, int flags, flux_error_t *errp)
*path = '\0';
path = strtrim (path + 3, " \t");
}
if (!(connector_init = find_connector (scheme, &dso, errp)))
if (!(connector_init = find_connector_builtin (scheme))
&& !(connector_init = find_connector_dso (scheme, &dso, errp)))
goto error;
if (getenv ("FLUX_HANDLE_TRACE"))
flags |= FLUX_O_TRACE;
if (getenv ("FLUX_HANDLE_MATCHDEBUG"))
flags |= FLUX_O_MATCHDEBUG;
if (!(h = connector_init (path, flags, errp))) {
ERRNO_SAFE_WRAP (dlclose, dso);
if (dso)
ERRNO_SAFE_WRAP (dlclose, dso);
goto error;
}
h->dso = dso;
Expand All @@ -332,14 +355,18 @@ flux_t *flux_open_ex (const char *uri, int flags, flux_error_t *errp)
#endif
if ((s = getenv ("FLUX_HANDLE_USERID"))) {
uint32_t userid = strtoul (s, NULL, 10);
if (flux_opt_set (h, FLUX_OPT_TESTING_USERID, &userid,
sizeof (userid)) < 0)
if (flux_opt_set (h,
FLUX_OPT_TESTING_USERID,
&userid,
sizeof (userid)) < 0)
goto error_handle;
}
if ((s = getenv ("FLUX_HANDLE_ROLEMASK"))) {
uint32_t rolemask = strtoul (s, NULL, 0);
if (flux_opt_set (h, FLUX_OPT_TESTING_ROLEMASK, &rolemask,
sizeof (rolemask)) < 0)
if (flux_opt_set (h,
FLUX_OPT_TESTING_ROLEMASK,
&rolemask,
sizeof (rolemask)) < 0)
goto error_handle;
}
free (scheme);
Expand Down Expand Up @@ -369,7 +396,9 @@ void flux_close (flux_t *h)
flux_handle_destroy (h);
}

flux_t *flux_handle_create (void *impl, const struct flux_handle_ops *ops, int flags)
flux_t *flux_handle_create (void *impl,
const struct flux_handle_ops *ops,
int flags)
{
flux_t *h;

Expand Down Expand Up @@ -549,6 +578,10 @@ int flux_flags_get (flux_t *h)

int flux_opt_get (flux_t *h, const char *option, void *val, size_t len)
{
if (!h || !option) {
errno = EINVAL;
return -1;
}
h = lookup_clone_ancestor (h);
if (!h->ops->getopt) {
errno = EINVAL;
Expand All @@ -559,6 +592,10 @@ int flux_opt_get (flux_t *h, const char *option, void *val, size_t len)

int flux_opt_set (flux_t *h, const char *option, const void *val, size_t len)
{
if (!h || !option) {
errno = EINVAL;
return -1;
}
h = lookup_clone_ancestor (h);
if (!h->ops->setopt) {
errno = EINVAL;
Expand Down
2 changes: 1 addition & 1 deletion src/common/libflux/handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
extern "C" {
#endif

typedef struct flux_handle_struct flux_t;
typedef struct flux_handle flux_t;

typedef struct {
int request_tx;
Expand Down
5 changes: 2 additions & 3 deletions src/common/libflux/test/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "src/common/libflux/conf_private.h"

#include "src/common/libtap/tap.h"
#include "src/common/libtestutil/util.h"
#include "ccan/str/str.h"

const char *t1 = \
Expand Down Expand Up @@ -351,8 +350,8 @@ void test_in_handle (void)

/* create test handle
*/
if (!(h = loopback_create (0)))
BAIL_OUT ("loopback_create failed");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("failed to create loop handle");

/* create test config
*/
Expand Down
5 changes: 2 additions & 3 deletions src/common/libflux/test/disconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <flux/core.h>

#include "src/common/libtap/tap.h"
#include "src/common/libtestutil/util.h"

flux_msg_t *create_request (int sender,
uint32_t rolemask,
Expand Down Expand Up @@ -142,8 +141,8 @@ void check_cancel (void)
int count;
uint32_t matchtag;

if (!(h = loopback_create (0)))
BAIL_OUT ("could not create loopback handle");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("failed to create loop handle");

/* populate list of requests with unique senders
*/
Expand Down
5 changes: 2 additions & 3 deletions src/common/libflux/test/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "src/common/libutil/xzmalloc.h"
#include "src/common/libtap/tap.h"
#include "src/common/libtestutil/util.h"

int cb2_called;
void cb2 (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg)
Expand Down Expand Up @@ -524,8 +523,8 @@ int main (int argc, char *argv[])

plan (NO_PLAN);

if (!(h = loopback_create (0)))
BAIL_OUT ("can't continue without loopback handle");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("can't continue without loop handle");
ok ((r = flux_get_reactor (h)) != NULL,
"handle created reactor on demand");

Expand Down
8 changes: 4 additions & 4 deletions src/common/libflux/test/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void test_subscribe_badparams (void)
{
flux_t *h;

if (!(h = loopback_create (0)))
BAIL_OUT ("loopback_create failed");
if (!(h = flux_open ("loop://", 0)))
BAIL_OUT ("could not create loop handle");

errno = 0;
ok (flux_event_subscribe_ex (NULL, "foo", 0) == NULL && errno == EINVAL,
Expand Down Expand Up @@ -288,8 +288,8 @@ void test_subscribe_nosub (void)
{
flux_t *h;

if (!(h = loopback_create (FLUX_O_TEST_NOSUB)))
BAIL_OUT ("loopback_create failed");
if (!(h = flux_open ("loop://", FLUX_O_TEST_NOSUB)))
BAIL_OUT ("could not create loop handle");

ok (flux_event_subscribe (h, "foo") == 0,
"flux_event_subscribe succeeds in loopback with TEST_NOSUB flag");
Expand Down
Loading

0 comments on commit e7e40ba

Please sign in to comment.