Skip to content

Commit

Permalink
Merge pull request #107 from garlick/interthread_cleanup
Browse files Browse the repository at this point in the history
simplify interthread channel implementation
  • Loading branch information
mergify[bot] authored Dec 14, 2024
2 parents 8d8d3b7 + 95dcec0 commit 0d603cc
Showing 1 changed file with 32 additions and 60 deletions.
92 changes: 32 additions & 60 deletions src/shell/plugins/interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <pthread.h>
#include <jansson.h>
#include <flux/core.h>

Expand All @@ -31,8 +30,8 @@ struct handler {
};

struct interthread {
struct flux_msglist *queue;
pthread_mutex_t lock;
flux_t *send;
flux_t *recv;
flux_watcher_t *w;
struct handler handlers[MAX_HANDLERS];
int handler_count;
Expand Down Expand Up @@ -75,78 +74,55 @@ int interthread_send_pack (struct interthread *it,
if (rc < 0)
goto error;

pthread_mutex_lock (&it->lock);
rc = flux_msglist_append (it->queue, msg);
pthread_mutex_unlock (&it->lock);
if (rc < 0)
if (flux_send_new (it->send, &msg, 0) < 0)
goto error;

flux_msg_decref (msg);
return 0;
error:
flux_msg_decref (msg);
return -1;
}

const flux_msg_t *pop_queue_locked (struct interthread *it)
{
const flux_msg_t *msg;

pthread_mutex_lock (&it->lock);
msg = flux_msglist_pop (it->queue);
pthread_mutex_unlock (&it->lock);

return msg;
}

static void interthread_recv (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct interthread *it = arg;
const flux_msg_t *msg;
flux_msg_t *msg;
const char *topic;
int i;

/* flux_msglist_pollfd() is edge triggered so when the reactor watcher
* is triggered, all available messages should be consumed.
*/
while ((msg = pop_queue_locked (it))) {
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
flux_msg_decref (msg);
continue;
}
if (it->trace_flag) {
const char *payload;
size_t size;
if (flux_msg_get_payload (msg, (const void **)&payload, &size) == 0
&& size > 0)
shell_trace ("pmix server %s %.*s",
topic,
(int)size - 1,
payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
if (!(msg = flux_recv (it->recv, FLUX_MATCH_ANY, 0)))
return;
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
flux_msg_decref (msg);
return;
}
if (it->trace_flag) {
const char *payload;
if (flux_msg_get_payload (msg, (const void **)&payload, NULL) == 0)
shell_trace ("pmix server %s %s", topic, payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
flux_msg_decref (msg);
}

void interthread_destroy (struct interthread *it)
{
if (it) {
int saved_errno = errno;
flux_watcher_destroy (it->w);
flux_msglist_destroy (it->queue);
pthread_mutex_destroy (&it->lock);
flux_close (it->send);
flux_close (it->recv);
free (it);
errno = saved_errno;
}
Expand All @@ -156,20 +132,16 @@ struct interthread *interthread_create (flux_shell_t *shell)
{
flux_t *h = flux_shell_get_flux (shell);
struct interthread *it;
int fd;

if (!(it = calloc (1, sizeof (*it))))
return NULL;
pthread_mutex_init (&it->lock, NULL);
if (!(it->queue = flux_msglist_create ()))
goto error;
if ((fd = flux_msglist_pollfd (it->queue)) < 0)
goto error;
if (!(it->w = flux_fd_watcher_create (flux_get_reactor (h),
fd,
FLUX_POLLIN,
interthread_recv,
it)))
if (!(it->send = flux_open ("interthread://pmix", 0))
|| !(it->recv = flux_open ("interthread://pmix", 0))
|| !(it->w = flux_handle_watcher_create (flux_get_reactor (h),
it->recv,
FLUX_POLLIN,
interthread_recv,
it)))
goto error;
flux_watcher_start (it->w);
it->trace_flag = 1; // temporarily force this on
Expand Down

0 comments on commit 0d603cc

Please sign in to comment.