Skip to content

Commit

Permalink
simplify interthread channel implementation
Browse files Browse the repository at this point in the history
Problem: the interthread channel will be harder to maintain than
necessary due to its use of the obscure flux_msglist pollfd/pollevents
interface, an fd watcher, and a pthread mutex.

Now that we have the interthread:// connector in flux-core,
reimplement the interthread channel with back to back flux_t handles
and a message watcher.
  • Loading branch information
garlick committed Dec 14, 2024
1 parent 9f45072 commit 95dcec0
Showing 1 changed file with 32 additions and 55 deletions.
87 changes: 32 additions & 55 deletions src/shell/plugins/interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <pthread.h>
#include <jansson.h>
#include <flux/core.h>

Expand All @@ -31,8 +30,8 @@ struct handler {
};

struct interthread {
struct flux_msglist *queue;
pthread_mutex_t lock;
flux_t *send;
flux_t *recv;
flux_watcher_t *w;
struct handler handlers[MAX_HANDLERS];
int handler_count;
Expand Down Expand Up @@ -75,73 +74,55 @@ int interthread_send_pack (struct interthread *it,
if (rc < 0)
goto error;

pthread_mutex_lock (&it->lock);
rc = flux_msglist_append (it->queue, msg);
pthread_mutex_unlock (&it->lock);
if (rc < 0)
if (flux_send_new (it->send, &msg, 0) < 0)
goto error;

flux_msg_decref (msg);
return 0;
error:
flux_msg_decref (msg);
return -1;
}

const flux_msg_t *pop_queue_locked (struct interthread *it)
{
const flux_msg_t *msg;

pthread_mutex_lock (&it->lock);
msg = flux_msglist_pop (it->queue);
pthread_mutex_unlock (&it->lock);

return msg;
}

static void interthread_recv (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct interthread *it = arg;
const flux_msg_t *msg;
flux_msg_t *msg;
const char *topic;
int i;

/* flux_msglist_pollfd() is edge triggered so when the reactor watcher
* is triggered, all available messages should be consumed.
*/
while ((msg = pop_queue_locked (it))) {
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
flux_msg_decref (msg);
continue;
}
if (it->trace_flag) {
const char *payload;
if (flux_msg_get_payload (msg, (const void **)&payload, NULL) == 0)
shell_trace ("pmix server %s %s", topic, payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
if (!(msg = flux_recv (it->recv, FLUX_MATCH_ANY, 0)))
return;
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
flux_msg_decref (msg);
return;
}
if (it->trace_flag) {
const char *payload;
if (flux_msg_get_payload (msg, (const void **)&payload, NULL) == 0)
shell_trace ("pmix server %s %s", topic, payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
flux_msg_decref (msg);
}

void interthread_destroy (struct interthread *it)
{
if (it) {
int saved_errno = errno;
flux_watcher_destroy (it->w);
flux_msglist_destroy (it->queue);
pthread_mutex_destroy (&it->lock);
flux_close (it->send);
flux_close (it->recv);
free (it);
errno = saved_errno;
}
Expand All @@ -151,20 +132,16 @@ struct interthread *interthread_create (flux_shell_t *shell)
{
flux_t *h = flux_shell_get_flux (shell);
struct interthread *it;
int fd;

if (!(it = calloc (1, sizeof (*it))))
return NULL;
pthread_mutex_init (&it->lock, NULL);
if (!(it->queue = flux_msglist_create ()))
goto error;
if ((fd = flux_msglist_pollfd (it->queue)) < 0)
goto error;
if (!(it->w = flux_fd_watcher_create (flux_get_reactor (h),
fd,
FLUX_POLLIN,
interthread_recv,
it)))
if (!(it->send = flux_open ("interthread://pmix", 0))
|| !(it->recv = flux_open ("interthread://pmix", 0))
|| !(it->w = flux_handle_watcher_create (flux_get_reactor (h),
it->recv,
FLUX_POLLIN,
interthread_recv,
it)))
goto error;
flux_watcher_start (it->w);
it->trace_flag = 1; // temporarily force this on
Expand Down

0 comments on commit 95dcec0

Please sign in to comment.