diff --git a/src/modules/content/checkpoint.c b/src/modules/content/checkpoint.c index b0d0b0c878a3..a4e08df9a820 100644 --- a/src/modules/content/checkpoint.c +++ b/src/modules/content/checkpoint.c @@ -194,6 +194,83 @@ static int checkpoint_put_forward (struct content_checkpoint *checkpoint, return -1; } +static void checkpoint_content_flush_continuation (flux_future_t *f, void *arg) +{ + struct content_checkpoint *checkpoint = arg; + const flux_msg_t *msg = flux_future_aux_get (f, "msg"); + const char *key; + json_t *value; + int flags = 0; + const char *errstr = NULL; + + assert (msg); + + if (flux_rpc_get (f, NULL) < 0) { + errstr = "error flushing content"; + goto error; + } + + if (flux_request_unpack (msg, + NULL, + "{s:s s:o s?i}", + "key", &key, + "value", &value, + "flags", &flags) < 0) + goto error; + + if (checkpoint_put_forward (checkpoint, + msg, + key, + value, + &errstr) < 0) + goto error; + + flux_future_destroy (f); + return; + +error: + if (flux_respond_error (checkpoint->h, msg, errno, errstr) < 0) + flux_log_error (checkpoint->h, + "error responding to checkpoint-put request"); + flux_future_destroy (f); +} + +static int checkpoint_content_flush (struct content_checkpoint *checkpoint, + const flux_msg_t *msg, + const char **errstr) +{ + const char *topic = "content.flush"; + uint32_t rank; + flux_future_t *f = NULL; + + if (flux_get_rank (checkpoint->h, &rank) < 0) { + (*errstr) = "error retrieving rank"; + return -1; + } + + if (!(f = flux_rpc (checkpoint->h, topic, NULL, rank, 0)) + || flux_future_then (f, + -1, + checkpoint_content_flush_continuation, + checkpoint) < 0) + goto error; + + if (flux_future_aux_set (f, + "msg", + (void *)flux_msg_incref (msg), + (flux_free_f)flux_msg_decref) < 0) { + flux_msg_decref (msg); + goto error; + } + + return 0; + +error: + (*errstr) = "error starting content.flush RPC"; + flux_future_destroy (f); + return -1; +} + void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { @@ -216,11 +293,9 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, "value", &value) < 0) goto error; - if (checkpoint_put_forward (checkpoint, - msg, - key, - value, - &errstr) < 0) + if (checkpoint_content_flush (checkpoint, + msg, + &errstr) < 0) goto error; return;