Skip to content

Commit

Permalink
Merge pull request #6458 from chu11/kvs_watch_cleanup
Browse files Browse the repository at this point in the history
kvs-watch: misc cleanup
  • Loading branch information
mergify[bot] authored Nov 22, 2024
2 parents af5feae + 0047178 commit 8015f88
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,27 @@ static int handle_initial_response (flux_t *h,
NULL,
&w->append_offset) < 0) {
flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__);
return -1;
goto error_respond;
}
}

if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}

w->initial_rootseq = root_seq;
w->responded = true;
return 0;

error_respond:
if (!w->mute) {
if (flux_respond_error (h, w->request, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}
return -1;
}

static int handle_compare_response (flux_t *h,
Expand All @@ -277,7 +286,9 @@ static int handle_compare_response (flux_t *h,
w->prev = json_incref (val);

if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}

Expand All @@ -293,7 +304,9 @@ static int handle_compare_response (flux_t *h,
w->prev = json_incref (val);

if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}
}
Expand All @@ -302,8 +315,8 @@ static int handle_compare_response (flux_t *h,
}

static int handle_append_response (flux_t *h,
struct watcher *w,
json_t *val)
struct watcher *w,
json_t *val)
{
if (!w->responded) {
/* this is the first response case, store the first response
Expand All @@ -313,11 +326,13 @@ static int handle_append_response (flux_t *h,
NULL,
&w->append_offset) < 0) {
flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__);
return -1;
goto error_respond;
}

if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}

Expand All @@ -332,7 +347,7 @@ static int handle_append_response (flux_t *h,
&new_data,
&new_offset) < 0) {
flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__);
return -1;
goto error_respond;
}

/* check length to determine if append actually happened, note
Expand All @@ -345,34 +360,45 @@ static int handle_append_response (flux_t *h,
if (new_offset < w->append_offset) {
free (new_data);
errno = EINVAL;
return -1;
goto error_respond;
}

if (!(new_val = treeobj_create_val (new_data + w->append_offset,
new_offset - w->append_offset))) {
free (new_data);
return -1;
goto error_respond;
}

free (new_data);
w->append_offset = new_offset;

if (flux_respond_pack (h, w->request, "{ s:o }", "val", new_val) < 0) {
json_decref (new_val);
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}
}

return 0;

error_respond:
if (!w->mute) {
if (flux_respond_error (h, w->request, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}
return -1;
}

static int handle_normal_response (flux_t *h,
struct watcher *w,
json_t *val)
{
if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
return -1;
}

Expand Down Expand Up @@ -437,13 +463,11 @@ static void handle_lookup_response (flux_future_t *f,
}

if (handle_initial_response (h, w, val, root_seq) < 0)
goto error;
goto finished;
}
else {
/* First check for ENOENT */
if (!flux_rpc_get_unpack (f, "{ s:i s:i }",
"errno", &errnum,
"rootseq", &root_seq)) {
if (!flux_rpc_get_unpack (f, "{ s:i }", "errno", &errnum)) {
assert (errnum == ENOENT);
errno = errnum;
goto error;
Expand All @@ -463,15 +487,15 @@ static void handle_lookup_response (flux_future_t *f,
if ((w->flags & FLUX_KVS_WATCH_FULL)
|| (w->flags & FLUX_KVS_WATCH_UNIQ)) {
if (handle_compare_response (h, w, val) < 0)
goto error;
goto finished;
}
else if (w->flags & FLUX_KVS_WATCH_APPEND) {
if (handle_append_response (h, w, val) < 0)
goto error;
goto finished;
}
else {
if (handle_normal_response (h, w, val) < 0)
goto error;
goto finished;
}
}
}
Expand All @@ -481,6 +505,7 @@ static void handle_lookup_response (flux_future_t *f,
if (flux_respond_error (h, w->request, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}
finished:
w->finished = true;
}

Expand Down Expand Up @@ -647,11 +672,15 @@ static void watcher_respond (struct ns_monitor *nsm, struct watcher *w)
}
/* flux_kvs_lookup (FLUX_KVS_WATCH)
*
* Ordering note: KVS lookups can be returned out of order. KVS lookup
* futures are added to the w->lookups zlist in commit order here, and
* in lookup_continuation(), fulfilled futures are popped off the head
* of w->lookups until an unfulfilled future is encountered, so that
* responses are always returned to the watcher in commit order.
* Ordering note: KVS lookups can be returned out of order because
* they are processed asynchronously. For example, some values
* may be cached within the KVS while others are not.
*
* KVS lookup futures are added to the w->lookups zlist in commit
* order here, and in lookup_continuation(), fulfilled futures are
* popped off the head of w->lookups until an unfulfilled future
* is encountered, so that responses are always returned to the
* watcher in commit order.
*
* Security note: although the requester has already been authenticated
* to access the namespace by check_authorization() above, we make the
Expand Down Expand Up @@ -1061,7 +1090,9 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
"watchers", watchers,
"namespace-count", (int)zhash_size (ctx->namespaces),
"namespaces", stats) < 0)
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
flux_log_error (h,
"%s: failed to respond to kvs-watch.stats-get",
__FUNCTION__);
json_decref (stats);
return;
nomem:
Expand Down

0 comments on commit 8015f88

Please sign in to comment.