From 78bf6d4a5d00222be9c1b278b92770b2628a2e20 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 10 Oct 2023 16:37:20 -0700 Subject: [PATCH] interthread: add op->send_new() Problem: the interthread connector plugin does not implement the send_new() operation, which can reduce message copying. Add a send_new() operation. --- src/common/libflux/connector_interthread.c | 41 ++++++++++++++-------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/common/libflux/connector_interthread.c b/src/common/libflux/connector_interthread.c index 06a910b04698..1f9425b59ebe 100644 --- a/src/common/libflux/connector_interthread.c +++ b/src/common/libflux/connector_interthread.c @@ -31,8 +31,10 @@ #include #include "src/common/libutil/errprintf.h" +#include "src/common/libutil/aux.h" #include "ccan/list/list.h" #include "ccan/str/str.h" +#include "message_private.h" // for access to msg->aux struct msglist_safe { struct flux_msglist *queue; @@ -253,30 +255,40 @@ static int router_process (flux_msg_t *msg, const char *name) return 0; } -static int op_send (void *impl, const flux_msg_t *msg, int flags) +static int op_send_new (void *impl, flux_msg_t **msg, int flags) { struct interthread_ctx *ctx = impl; - flux_msg_t *cpy; struct flux_msg_cred cred; - int rc = -1; - if (!(cpy = flux_msg_copy (msg, true))) + if (flux_msg_get_cred (*msg, &cred) < 0) return -1; - if (flux_msg_get_cred (cpy, &cred) < 0) - goto done; if (cred.userid == FLUX_USERID_UNKNOWN && cred.rolemask == FLUX_ROLE_NONE) { - if (flux_msg_set_cred (cpy, ctx->cred) < 0) - goto done; + if (flux_msg_set_cred (*msg, ctx->cred) < 0) + return -1; } if (ctx->router) { - if (router_process (cpy, ctx->router) < 0) - goto done; + if (router_process (*msg, ctx->router) < 0) + return -1; } - rc = msglist_safe_append (ctx->send, cpy); -done: - flux_msg_destroy (cpy); - return rc; + aux_destroy (&(*msg)->aux); + if (msglist_safe_append (ctx->send, *msg) < 0) + return -1; + flux_msg_destroy (*msg); + *msg = NULL; + return 0; +} + +static int op_send (void *impl, const flux_msg_t *msg, int flags) +{ + flux_msg_t *cpy; + + if (!(cpy = flux_msg_copy (msg, true)) + || op_send_new (impl, &cpy, flags)) { + flux_msg_destroy (cpy); + return -1; + } + return 0; } static flux_msg_t *op_recv (void *impl, int flags) @@ -383,6 +395,7 @@ static const struct flux_handle_ops handle_ops = { .pollfd = op_pollfd, .pollevents = op_pollevents, .send = op_send, + .send_new = op_send_new, .recv = op_recv, .setopt = op_setopt, .impl_destroy = op_fini,