Skip to content

Commit

Permalink
Merge pull request #6595 from chu11/kvs_refactor
Browse files Browse the repository at this point in the history
kvs: remove unused internal transaction request API code
  • Loading branch information
mergify[bot] authored Feb 3, 2025
2 parents a50142e + b74ab0f commit e9f7fcd
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 584 deletions.
2 changes: 1 addition & 1 deletion src/common/libkvs/kvs_txn_compact.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* append "A"
* write "B"
* append "c"
* append "C"
*
* we cannot combine the appends of "A" and "C". In this scenario, we
* generate an EINVAL error to the caller, indicating that the
Expand Down
76 changes: 16 additions & 60 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,8 @@ static void lookup_plus_request_cb (flux_t *h,

static int finalize_transaction_req (treq_t *tr,
const flux_msg_t *req,
void *data)
struct kvs_cb_data *cbd)
{
struct kvs_cb_data *cbd = data;

if (cbd->errnum) {
if (flux_respond_error (cbd->ctx->h, req, cbd->errnum, NULL) < 0) {
flux_log_error (cbd->ctx->h,
Expand Down Expand Up @@ -1643,7 +1641,15 @@ static void finalize_transaction_bynames (struct kvs_ctx *ctx,
}
nameval = json_string_value (name);
if ((tr = treq_mgr_lookup_transaction (root->trm, nameval))) {
treq_iter_request_copies (tr, finalize_transaction_req, &cbd);
const flux_msg_t *msg = treq_get_request (tr);
if (!msg) {
flux_log (ctx->h,
LOG_ERR,
"%s: transaction without a request",
__FUNCTION__);
return;
}
finalize_transaction_req (tr, msg, &cbd);
if (treq_mgr_remove_transaction (root->trm, nameval) < 0) {
flux_log_error (ctx->h,
"%s: treq_mgr_remove_transaction",
Expand Down Expand Up @@ -1780,8 +1786,12 @@ static void commit_request_cb (flux_t *h,
goto error;
}

if (!(tr = treq_create_rank (ctx->rank, ctx->seq++, 1, flags))) {
flux_log_error (h, "%s: treq_create_rank", __FUNCTION__);
/* save copy of request, will be used later via
* finalize_transaction_bynames() to send error code to original
* send.
*/
if (!(tr = treq_create (msg, ctx->rank, ctx->seq++, flags))) {
flux_log_error (h, "%s: treq_create", __FUNCTION__);
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
Expand All @@ -1792,20 +1802,7 @@ static void commit_request_cb (flux_t *h,
goto error;
}

/* save copy of request, will be used later via
* finalize_transaction_bynames() to send error code to original
* send.
*/
if (treq_add_request_copy (tr, msg) < 0)
goto error;

if (ctx->rank == 0) {
/* we use this flag to indicate if a treq has been added to
* the ready queue. We don't need to call
* treq_count_reached() b/c this is a commit and nprocs is 1
*/
treq_mark_processed (tr);

if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
ops,
Expand Down Expand Up @@ -2375,37 +2372,12 @@ static void namespace_create_request_cb (flux_t *h,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static int root_remove_process_transactions (treq_t *tr, void *data)
{
struct kvs_cb_data *cbd = data;

/* Transactions that never reached their nprocs count will never
* finish, must alert them with ENOTSUP that namespace removed.
* Final call to treq_mgr_remove_transaction() done in
* finalize_transaction_bynames() */
if (!treq_get_processed (tr)) {
json_t *names = NULL;

if (!(names = json_pack ("[ s ]", treq_get_name (tr)))) {
errno = ENOMEM;
flux_log_error (cbd->ctx->h, "%s: json_pack", __FUNCTION__);
return -1;
}

finalize_transaction_bynames (cbd->ctx, cbd->root, names, ENOTSUP);
json_decref (names);
}
return 0;
}

static void start_root_remove (struct kvs_ctx *ctx, const char *ns)
{
struct kvsroot *root;

/* safe lookup, if root removal in process, let it continue */
if ((root = kvsroot_mgr_lookup_root_safe (ctx->krm, ns))) {
struct kvs_cb_data cbd = { .ctx = ctx, .root = root };

root->remove = true;

work_queue_remove (root);
Expand All @@ -2416,22 +2388,6 @@ static void start_root_remove (struct kvs_ctx *ctx, const char *ns)
* sync.
*/
kvs_wait_version_process (root, true);

/* Ready transactions will be processed and errors returned to
* callers via the code path in kvstxn_apply(). But not ready
* transactions must be dealt with separately here.
*
* Note that now that the root has been marked as removable,
* no new transactions can become ready in the future. Checks
* in commit_request_cb() and relaycommit_request_cb() ensure
* this.
*/

if (treq_mgr_iter_transactions (root->trm,
root_remove_process_transactions,
&cbd) < 0)
flux_log_error (ctx->h, "%s: treq_mgr_iter_transactions",
__FUNCTION__);
}
}

Expand Down
Loading

0 comments on commit e9f7fcd

Please sign in to comment.