diff --git a/src/shell/plugins/interthread.c b/src/shell/plugins/interthread.c index d1dcae7..bd109b1 100644 --- a/src/shell/plugins/interthread.c +++ b/src/shell/plugins/interthread.c @@ -14,7 +14,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include #include #include @@ -31,8 +30,8 @@ struct handler { }; struct interthread { - struct flux_msglist *queue; - pthread_mutex_t lock; + flux_t *send; + flux_t *recv; flux_watcher_t *w; struct handler handlers[MAX_HANDLERS]; int handler_count; @@ -75,64 +74,46 @@ int interthread_send_pack (struct interthread *it, if (rc < 0) goto error; - pthread_mutex_lock (&it->lock); - rc = flux_msglist_append (it->queue, msg); - pthread_mutex_unlock (&it->lock); - if (rc < 0) + if (flux_send_new (it->send, &msg, 0) < 0) goto error; - flux_msg_decref (msg); return 0; error: flux_msg_decref (msg); return -1; } -const flux_msg_t *pop_queue_locked (struct interthread *it) -{ - const flux_msg_t *msg; - - pthread_mutex_lock (&it->lock); - msg = flux_msglist_pop (it->queue); - pthread_mutex_unlock (&it->lock); - - return msg; -} - static void interthread_recv (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { struct interthread *it = arg; - const flux_msg_t *msg; + flux_msg_t *msg; const char *topic; int i; - /* flux_msglist_pollfd() is edge triggered so when the reactor watcher - * is triggered, all available messages should be consumed. - */ - while ((msg = pop_queue_locked (it))) { - if (flux_msg_get_topic (msg, &topic) < 0) { - shell_warn ("interthread receive decode error - message dropped"); - flux_msg_decref (msg); - continue; - } - if (it->trace_flag) { - const char *payload; - if (flux_msg_get_payload (msg, (const void **)&payload, NULL) == 0) - shell_trace ("pmix server %s %s", topic, payload); - } - for (i = 0; i < it->handler_count; i++) { - if (!strcmp (topic, it->handlers[i].topic)) - break; - } - if (i < it->handler_count) - it->handlers[i].cb (msg, it->handlers[i].arg); - else - shell_warn ("unhandled interthread topic %s", topic); + if (!(msg = flux_recv (it->recv, FLUX_MATCH_ANY, 0))) + return; + if (flux_msg_get_topic (msg, &topic) < 0) { + shell_warn ("interthread receive decode error - message dropped"); flux_msg_decref (msg); + return; + } + if (it->trace_flag) { + const char *payload; + if (flux_msg_get_payload (msg, (const void **)&payload, NULL) == 0) + shell_trace ("pmix server %s %s", topic, payload); + } + for (i = 0; i < it->handler_count; i++) { + if (!strcmp (topic, it->handlers[i].topic)) + break; } + if (i < it->handler_count) + it->handlers[i].cb (msg, it->handlers[i].arg); + else + shell_warn ("unhandled interthread topic %s", topic); + flux_msg_decref (msg); } void interthread_destroy (struct interthread *it) @@ -140,8 +121,8 @@ void interthread_destroy (struct interthread *it) if (it) { int saved_errno = errno; flux_watcher_destroy (it->w); - flux_msglist_destroy (it->queue); - pthread_mutex_destroy (&it->lock); + flux_close (it->send); + flux_close (it->recv); free (it); errno = saved_errno; } @@ -151,20 +132,16 @@ struct interthread *interthread_create (flux_shell_t *shell) { flux_t *h = flux_shell_get_flux (shell); struct interthread *it; - int fd; if (!(it = calloc (1, sizeof (*it)))) return NULL; - pthread_mutex_init (&it->lock, NULL); - if (!(it->queue = flux_msglist_create ())) - goto error; - if ((fd = flux_msglist_pollfd (it->queue)) < 0) - goto error; - if (!(it->w = flux_fd_watcher_create (flux_get_reactor (h), - fd, - FLUX_POLLIN, - interthread_recv, - it))) + if (!(it->send = flux_open ("interthread://pmix", 0)) + || !(it->recv = flux_open ("interthread://pmix", 0)) + || !(it->w = flux_handle_watcher_create (flux_get_reactor (h), + it->recv, + FLUX_POLLIN, + interthread_recv, + it))) goto error; flux_watcher_start (it->w); it->trace_flag = 1; // temporarily force this on