diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 76498daaa595..2c19a3ac576c 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -252,18 +252,27 @@ static int handle_initial_response (flux_t *h, NULL, &w->append_offset) < 0) { flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - return -1; + goto error_respond; } } if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } w->initial_rootseq = root_seq; w->responded = true; return 0; + +error_respond: + if (!w->mute) { + if (flux_respond_error (h, w->request, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + } + return -1; } static int handle_compare_response (flux_t *h, @@ -277,7 +286,9 @@ static int handle_compare_response (flux_t *h, w->prev = json_incref (val); if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } @@ -293,7 +304,9 @@ static int handle_compare_response (flux_t *h, w->prev = json_incref (val); if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } } @@ -302,8 +315,8 @@ static int handle_compare_response (flux_t *h, } static int handle_append_response (flux_t *h, - struct watcher *w, - json_t *val) + struct watcher *w, + json_t *val) { if (!w->responded) { /* this is the first response case, store the first response @@ -313,11 +326,13 @@ static int handle_append_response (flux_t *h, NULL, &w->append_offset) < 0) { flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - return -1; + goto error_respond; } if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } @@ -332,7 +347,7 @@ static int handle_append_response (flux_t *h, &new_data, &new_offset) < 0) { flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - return -1; + goto error_respond; } /* check length to determine if append actually happened, note @@ -345,13 +360,13 @@ static int handle_append_response (flux_t *h, if (new_offset < w->append_offset) { free (new_data); errno = EINVAL; - return -1; + goto error_respond; } if (!(new_val = treeobj_create_val (new_data + w->append_offset, new_offset - w->append_offset))) { free (new_data); - return -1; + goto error_respond; } free (new_data); @@ -359,12 +374,21 @@ static int handle_append_response (flux_t *h, if (flux_respond_pack (h, w->request, "{ s:o }", "val", new_val) < 0) { json_decref (new_val); - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } } return 0; + +error_respond: + if (!w->mute) { + if (flux_respond_error (h, w->request, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + } + return -1; } static int handle_normal_response (flux_t *h, @@ -372,7 +396,9 @@ static int handle_normal_response (flux_t *h, json_t *val) { if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); return -1; } @@ -437,13 +463,11 @@ static void handle_lookup_response (flux_future_t *f, } if (handle_initial_response (h, w, val, root_seq) < 0) - goto error; + goto finished; } else { /* First check for ENOENT */ - if (!flux_rpc_get_unpack (f, "{ s:i s:i }", - "errno", &errnum, - "rootseq", &root_seq)) { + if (!flux_rpc_get_unpack (f, "{ s:i }", "errno", &errnum)) { assert (errnum == ENOENT); errno = errnum; goto error; @@ -463,15 +487,15 @@ static void handle_lookup_response (flux_future_t *f, if ((w->flags & FLUX_KVS_WATCH_FULL) || (w->flags & FLUX_KVS_WATCH_UNIQ)) { if (handle_compare_response (h, w, val) < 0) - goto error; + goto finished; } else if (w->flags & FLUX_KVS_WATCH_APPEND) { if (handle_append_response (h, w, val) < 0) - goto error; + goto finished; } else { if (handle_normal_response (h, w, val) < 0) - goto error; + goto finished; } } } @@ -481,6 +505,7 @@ static void handle_lookup_response (flux_future_t *f, if (flux_respond_error (h, w->request, errno, NULL) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +finished: w->finished = true; } @@ -647,11 +672,15 @@ static void watcher_respond (struct ns_monitor *nsm, struct watcher *w) } /* flux_kvs_lookup (FLUX_KVS_WATCH) * - * Ordering note: KVS lookups can be returned out of order. KVS lookup - * futures are added to the w->lookups zlist in commit order here, and - * in lookup_continuation(), fulfilled futures are popped off the head - * of w->lookups until an unfulfilled future is encountered, so that - * responses are always returned to the watcher in commit order. + * Ordering note: KVS lookups can be returned out of order because + * they are processed asynchronously. For example, some values + * may be cached within the KVS while others are not. + * + * KVS lookup futures are added to the w->lookups zlist in commit + * order here, and in lookup_continuation(), fulfilled futures are + * popped off the head of w->lookups until an unfulfilled future + * is encountered, so that responses are always returned to the + * watcher in commit order. * * Security note: although the requester has already been authenticated * to access the namespace by check_authorization() above, we make the @@ -1061,7 +1090,9 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh, "watchers", watchers, "namespace-count", (int)zhash_size (ctx->namespaces), "namespaces", stats) < 0) - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + flux_log_error (h, + "%s: failed to respond to kvs-watch.stats-get", + __FUNCTION__); json_decref (stats); return; nomem: