Skip to content

Commit

Permalink
interthread: add op->send_new()
Browse files Browse the repository at this point in the history
Problem: the interthread connector plugin does not implement
the send_new() operation, which can reduce message copying.

Add a send_new() operation.
  • Loading branch information
garlick committed Oct 12, 2023
1 parent 3dddbed commit 78bf6d4
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions src/common/libflux/connector_interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
#include <flux/core.h>

#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/aux.h"
#include "ccan/list/list.h"
#include "ccan/str/str.h"
#include "message_private.h" // for access to msg->aux

struct msglist_safe {
struct flux_msglist *queue;
Expand Down Expand Up @@ -253,30 +255,40 @@ static int router_process (flux_msg_t *msg, const char *name)
return 0;
}

static int op_send (void *impl, const flux_msg_t *msg, int flags)
static int op_send_new (void *impl, flux_msg_t **msg, int flags)
{
struct interthread_ctx *ctx = impl;
flux_msg_t *cpy;
struct flux_msg_cred cred;
int rc = -1;

if (!(cpy = flux_msg_copy (msg, true)))
if (flux_msg_get_cred (*msg, &cred) < 0)
return -1;
if (flux_msg_get_cred (cpy, &cred) < 0)
goto done;
if (cred.userid == FLUX_USERID_UNKNOWN
&& cred.rolemask == FLUX_ROLE_NONE) {
if (flux_msg_set_cred (cpy, ctx->cred) < 0)
goto done;
if (flux_msg_set_cred (*msg, ctx->cred) < 0)
return -1;
}
if (ctx->router) {
if (router_process (cpy, ctx->router) < 0)
goto done;
if (router_process (*msg, ctx->router) < 0)
return -1;
}
rc = msglist_safe_append (ctx->send, cpy);
done:
flux_msg_destroy (cpy);
return rc;
aux_destroy (&(*msg)->aux);
if (msglist_safe_append (ctx->send, *msg) < 0)
return -1;
flux_msg_destroy (*msg);
*msg = NULL;
return 0;
}

static int op_send (void *impl, const flux_msg_t *msg, int flags)
{
flux_msg_t *cpy;

if (!(cpy = flux_msg_copy (msg, true))
|| op_send_new (impl, &cpy, flags)) {
flux_msg_destroy (cpy);
return -1;

Check warning on line 289 in src/common/libflux/connector_interthread.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/connector_interthread.c#L288-L289

Added lines #L288 - L289 were not covered by tests
}
return 0;
}

static flux_msg_t *op_recv (void *impl, int flags)
Expand Down Expand Up @@ -383,6 +395,7 @@ static const struct flux_handle_ops handle_ops = {
.pollfd = op_pollfd,
.pollevents = op_pollevents,
.send = op_send,
.send_new = op_send_new,
.recv = op_recv,
.setopt = op_setopt,
.impl_destroy = op_fini,
Expand Down

0 comments on commit 78bf6d4

Please sign in to comment.