diff --git a/src/modules/job-info/guest_watch.c b/src/modules/job-info/guest_watch.c index 782abe8591e0..f007d47d8dbc 100644 --- a/src/modules/job-info/guest_watch.c +++ b/src/modules/job-info/guest_watch.c @@ -639,24 +639,20 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg) if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) { flux_log_error (ctx->h, "%s: flux_respond_pack", __FUNCTION__); - goto error_cancel; + + /* If we haven't sent a cancellation yet, must do so so that + * the future's matchtag will eventually be freed */ + if (!gw->eventlog_watch_canceled) + (void) send_eventlog_watch_cancel (gw, + gw->guest_namespace_watch_f, + false); + goto cleanup; } gw->offset += strlen (event); flux_future_reset (f); return; -error_cancel: - /* If we haven't sent a cancellation yet, must do so so that - * the future's matchtag will eventually be freed */ - if (!gw->eventlog_watch_canceled) { - int save_errno = errno; - (void) send_eventlog_watch_cancel (gw, - gw->guest_namespace_watch_f, - false); - errno = save_errno; - } - error: if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0) flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__); @@ -706,9 +702,9 @@ static int main_namespace_lookup (struct guest_watch_ctx *gw) goto error; if (!(gw->main_namespace_lookup_f = flux_rpc_message (gw->ctx->h, - msg, - FLUX_NODEID_ANY, - 0))) { + msg, + FLUX_NODEID_ANY, + 0))) { flux_log_error (gw->ctx->h, "%s: flux_rpc_message", __FUNCTION__); goto error; } @@ -763,7 +759,7 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg) "event", tok, toklen) < 0) { flux_log_error (ctx->h, "%s: flux_respond_pack", __FUNCTION__); - goto error; + goto cleanup; } } diff --git a/src/modules/job-info/update.c b/src/modules/job-info/update.c index a607791d21e9..cb2a3d4557eb 100644 --- a/src/modules/job-info/update.c +++ b/src/modules/job-info/update.c @@ -157,17 +157,20 @@ static void eventlog_continuation (flux_future_t *f, void *arg) if (flux_job_event_watch_get (f, &s) < 0) { flux_log_error (ctx->h, "%s: flux_job_event_watch_get", __FUNCTION__); - goto error_cancel; + eventlog_watch_cancel (uc); + goto cleanup; } if (!(event = eventlog_entry_decode (s))) { flux_log_error (uc->ctx->h, "%s: eventlog_entry_decode", __FUNCTION__); - goto error_cancel; + eventlog_watch_cancel (uc); + goto cleanup; } if (eventlog_entry_parse (event, NULL, &name, &context) < 0) { flux_log_error (uc->ctx->h, "%s: eventlog_entry_decode", __FUNCTION__); - goto error_cancel; + eventlog_watch_cancel (uc); + goto cleanup; } if (context && streq (name, uc->update_name)) { @@ -196,7 +199,8 @@ static void eventlog_continuation (flux_future_t *f, void *arg) "{s:O}", uc->key, uc->update_object) < 0) { flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); - goto error_cancel; + eventlog_watch_cancel (uc); + goto cleanup; } msg = flux_msglist_next (uc->msglist); } @@ -207,11 +211,6 @@ static void eventlog_continuation (flux_future_t *f, void *arg) json_decref (event); return; -error_cancel: - /* Must do so so that the future's matchtag will eventually be - * freed */ - eventlog_watch_cancel (uc); - error: msg = flux_msglist_first (uc->msglist); while (msg) { @@ -360,7 +359,7 @@ static void lookup_continuation (flux_future_t *f, void *arg) if (flux_respond_pack (uc->ctx->h, msg, "{s:O}", uc->key, uc->update_object) < 0) { flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); - goto error; + goto cleanup; } next: @@ -520,7 +519,7 @@ void update_watch_cb (flux_t *h, "{s:O}", uc->key, uc->update_object) < 0) { flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); - goto error; + goto cleanup; } } /* if uc->update_object has not been set, the initial lookup @@ -536,6 +535,7 @@ void update_watch_cb (flux_t *h, error: if (flux_respond_error (h, msg, errno, errmsg) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +cleanup: free (index_key); } diff --git a/src/modules/job-info/watch.c b/src/modules/job-info/watch.c index e0ccf3966bd9..36877a8a9111 100644 --- a/src/modules/job-info/watch.c +++ b/src/modules/job-info/watch.c @@ -252,8 +252,15 @@ static void watch_continuation (flux_future_t *f, void *arg) } if (!w->allow) { - if (eventlog_allow (ctx, w->msg, w->id, s) < 0) - goto error_cancel; + if (eventlog_allow (ctx, w->msg, w->id, s) < 0) { + if (!w->kvs_watch_canceled) { + if (flux_kvs_lookup_cancel (w->watch_f) < 0) + flux_log_error (ctx->h, + "%s: flux_kvs_lookup_cancel", + __FUNCTION__); + } + goto cleanup; + } w->allow = true; } @@ -266,7 +273,13 @@ static void watch_continuation (flux_future_t *f, void *arg) flux_log_error (ctx->h, "%s: flux_respond_pack", __FUNCTION__); - goto error_cancel; + if (!w->kvs_watch_canceled) { + if (flux_kvs_lookup_cancel (w->watch_f) < 0) + flux_log_error (ctx->h, + "%s: flux_kvs_lookup_cancel", + __FUNCTION__); + } + goto cleanup; } /* When watching the main job eventlog, we return ENODATA back @@ -294,18 +307,6 @@ static void watch_continuation (flux_future_t *f, void *arg) flux_future_reset (f); return; -error_cancel: - /* If we haven't sent a cancellation yet, must do so so that - * the future's matchtag will eventually be freed */ - if (!w->kvs_watch_canceled) { - int save_errno = errno; - if (flux_kvs_lookup_cancel (w->watch_f) < 0) - flux_log_error (ctx->h, - "%s: flux_kvs_lookup_cancel", - __FUNCTION__); - errno = save_errno; - } - error: if (flux_respond_error (ctx->h, w->msg, errno, errmsg) < 0) flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);