Skip to content

Commit

Permalink
mgmtd, lib: remove batch ids from all messages
Browse files Browse the repository at this point in the history
Batch IDs are only used to verify that all messages were received and
processed by a backend. It's not necessary to do that as we use reliable
stream transport - messages can't be dropped or received out of order.

This commit also fixes possible race condition that can happen if
one backend process messages slower than other backends.

Signed-off-by: Igor Ryzhov <[email protected]>
  • Loading branch information
idryzhov committed Nov 12, 2023
1 parent 19bcca4 commit b3b5951
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 301 deletions.
10 changes: 4 additions & 6 deletions lib/mgmt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ message BeTxnReply {

message BeCfgDataCreateReq {
required uint64 txn_id = 1;
required uint64 batch_id = 2;
repeated YangCfgDataReq data_req = 3;
required bool end_of_data = 4;
repeated YangCfgDataReq data_req = 2;
required bool end_of_data = 3;
}

message BeCfgDataCreateReply {
required uint64 txn_id = 1;
required uint64 batch_id = 2;
required bool success = 3;
optional string error_if_any = 4;
required bool success = 2;
optional string error_if_any = 3;
}

message BeCfgDataApplyReq {
Expand Down
73 changes: 18 additions & 55 deletions lib/mgmt_be_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ struct mgmt_be_txn_req {

PREDECL_LIST(mgmt_be_batches);
struct mgmt_be_batch_ctx {
/* Batch-Id as assigned by MGMTD */
uint64_t batch_id;

struct mgmt_be_txn_req txn_req;

uint32_t flags;
Expand Down Expand Up @@ -128,37 +125,15 @@ static int mgmt_be_client_send_msg(struct mgmt_be_client *client_ctx,
}

static struct mgmt_be_batch_ctx *
mgmt_be_find_batch_by_id(struct mgmt_be_txn_ctx *txn,
uint64_t batch_id)
mgmt_be_batch_create(struct mgmt_be_txn_ctx *txn)
{
struct mgmt_be_batch_ctx *batch = NULL;

FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) {
if (batch->batch_id == batch_id)
return batch;
}
batch = XCALLOC(MTYPE_MGMTD_BE_BATCH, sizeof(struct mgmt_be_batch_ctx));

return NULL;
}
mgmt_be_batches_add_tail(&txn->cfg_batches, batch);

static struct mgmt_be_batch_ctx *
mgmt_be_batch_create(struct mgmt_be_txn_ctx *txn, uint64_t batch_id)
{
struct mgmt_be_batch_ctx *batch = NULL;

batch = mgmt_be_find_batch_by_id(txn, batch_id);
if (!batch) {
batch = XCALLOC(MTYPE_MGMTD_BE_BATCH,
sizeof(struct mgmt_be_batch_ctx));
assert(batch);

batch->batch_id = batch_id;
mgmt_be_batches_add_tail(&txn->cfg_batches, batch);

MGMTD_BE_CLIENT_DBG("Added new batch-id: %" PRIu64
" to transaction",
batch_id);
}
MGMTD_BE_CLIENT_DBG("Added new batch to transaction");

return batch;
}
Expand Down Expand Up @@ -335,16 +310,14 @@ static int mgmt_be_process_txn_req(struct mgmt_be_client *client_ctx,
}

static int mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client *client_ctx,
uint64_t txn_id, uint64_t batch_id,
bool success,
uint64_t txn_id, bool success,
const char *error_if_any)
{
Mgmtd__BeMessage be_msg;
Mgmtd__BeCfgDataCreateReply cfgdata_reply;

mgmtd__be_cfg_data_create_reply__init(&cfgdata_reply);
cfgdata_reply.txn_id = (uint64_t)txn_id;
cfgdata_reply.batch_id = (uint64_t)batch_id;
cfgdata_reply.success = success;
if (error_if_any)
cfgdata_reply.error_if_any = (char *)error_if_any;
Expand All @@ -353,9 +326,8 @@ static int mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client *client_ctx,
be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY;
be_msg.cfg_data_reply = &cfgdata_reply;

MGMTD_BE_CLIENT_DBG("Sending CFGDATA_CREATE_REPLY txn-id: %" PRIu64
" batch-id: %" PRIu64,
txn_id, batch_id);
MGMTD_BE_CLIENT_DBG("Sending CFGDATA_CREATE_REPLY txn-id: %" PRIu64,
txn_id);

return mgmt_be_client_send_msg(client_ctx, &be_msg);
}
Expand Down Expand Up @@ -432,9 +404,8 @@ static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn)
err_buf[sizeof(err_buf) - 1] = 0;
MGMTD_BE_CLIENT_ERR(
"Failed to update configs for txn-id: %" PRIu64
" batch-id: %" PRIu64
" to candidate, err: '%s'",
txn->txn_id, batch->batch_id, err_buf);
txn->txn_id, err_buf);
return -1;
}
gettimeofday(&edit_nb_cfg_end, NULL);
Expand Down Expand Up @@ -497,9 +468,6 @@ static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn)
client_ctx->num_prep_nb_cfg++;

FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) {
mgmt_be_send_cfgdata_create_reply(
client_ctx, txn->txn_id, batch->batch_id,
error ? false : true, error ? err_buf : NULL);
if (!error) {
SET_FLAG(batch->flags,
MGMTD_BE_BATCH_FLAGS_CFG_PREPARED);
Expand All @@ -508,6 +476,9 @@ static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn)
}
}

mgmt_be_send_cfgdata_create_reply(client_ctx, txn->txn_id,
error ? false : true, error ? err_buf : NULL);

MGMTD_BE_CLIENT_DBG(
"Avg-nb-edit-duration %lu uSec, nb-prep-duration %lu (avg: %lu) uSec, batch size %u",
client_ctx->avg_edit_nb_cfg_tm, prep_nb_cfg_tm,
Expand All @@ -524,7 +495,6 @@ static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn)
*/
static int mgmt_be_update_setcfg_in_batch(struct mgmt_be_client *client_ctx,
struct mgmt_be_txn_ctx *txn,
uint64_t batch_id,
Mgmtd__YangCfgDataReq *cfg_req[],
int num_req)
{
Expand All @@ -533,17 +503,13 @@ static int mgmt_be_update_setcfg_in_batch(struct mgmt_be_client *client_ctx,
int index;
struct nb_cfg_change *cfg_chg;

batch = mgmt_be_batch_create(txn, batch_id);
if (!batch) {
MGMTD_BE_CLIENT_ERR("Batch create failed!");
return -1;
}
batch = mgmt_be_batch_create(txn);
assert(batch);

txn_req = &batch->txn_req;
txn_req->event = MGMTD_BE_TXN_PROC_SETCFG;
MGMTD_BE_CLIENT_DBG("Created SETCFG request for batch-id: %" PRIu64
" txn-id: %" PRIu64 " cfg-items:%d",
batch_id, txn->txn_id, num_req);
MGMTD_BE_CLIENT_DBG("Created SETCFG request for txn-id: %" PRIu64
" cfg-items:%d", txn->txn_id, num_req);

txn_req->req.set_cfg.num_cfg_changes = num_req;
for (index = 0; index < num_req; index++) {
Expand Down Expand Up @@ -577,7 +543,7 @@ static int mgmt_be_update_setcfg_in_batch(struct mgmt_be_client *client_ctx,
}

static int mgmt_be_process_cfgdata_req(struct mgmt_be_client *client_ctx,
uint64_t txn_id, uint64_t batch_id,
uint64_t txn_id,
Mgmtd__YangCfgDataReq *cfg_req[],
int num_req, bool end_of_data)
{
Expand All @@ -587,8 +553,7 @@ static int mgmt_be_process_cfgdata_req(struct mgmt_be_client *client_ctx,
if (!txn)
goto failed;

mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id, cfg_req,
num_req);
mgmt_be_update_setcfg_in_batch(client_ctx, txn, cfg_req, num_req);

if (txn && end_of_data) {
MGMTD_BE_CLIENT_DBG("End of data; CFG_PREPARE_REQ processing");
Expand Down Expand Up @@ -719,13 +684,11 @@ static int mgmt_be_client_handle_msg(struct mgmt_be_client *client_ctx,
break;
case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ:
MGMTD_BE_CLIENT_DBG("Got CFG_DATA_REQ txn-id: %" PRIu64
" batch-id: %" PRIu64 " end-of-data %u",
" end-of-data %u",
be_msg->cfg_data_req->txn_id,
be_msg->cfg_data_req->batch_id,
be_msg->cfg_data_req->end_of_data);
mgmt_be_process_cfgdata_req(
client_ctx, be_msg->cfg_data_req->txn_id,
be_msg->cfg_data_req->batch_id,
be_msg->cfg_data_req->data_req,
be_msg->cfg_data_req->n_data_req,
be_msg->cfg_data_req->end_of_data);
Expand Down
13 changes: 5 additions & 8 deletions mgmtd/mgmt_be_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,8 @@ mgmt_be_adapter_handle_msg(struct mgmt_be_client_adapter *adapter,
case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY:
MGMTD_BE_ADAPTER_DBG(
"Got CFGDATA_REPLY from '%s' txn-id %" PRIx64
" batch-id %" PRIu64 " err:'%s'",
adapter->name, be_msg->cfg_data_reply->txn_id,
be_msg->cfg_data_reply->batch_id,
" err:'%s'", adapter->name,
be_msg->cfg_data_reply->txn_id,
be_msg->cfg_data_reply->error_if_any
? be_msg->cfg_data_reply->error_if_any
: "None");
Expand All @@ -390,7 +389,6 @@ mgmt_be_adapter_handle_msg(struct mgmt_be_client_adapter *adapter,
*/
mgmt_txn_notify_be_cfgdata_reply(
be_msg->cfg_data_reply->txn_id,
be_msg->cfg_data_reply->batch_id,
be_msg->cfg_data_reply->success,
be_msg->cfg_data_reply->error_if_any, adapter);
break;
Expand Down Expand Up @@ -461,15 +459,14 @@ int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
}

int mgmt_be_send_cfgdata_req(struct mgmt_be_client_adapter *adapter,
uint64_t txn_id, uint64_t batch_id,
uint64_t txn_id,
Mgmtd__YangCfgDataReq **cfgdata_reqs,
size_t num_reqs, bool end_of_data)
{
Mgmtd__BeMessage be_msg;
Mgmtd__BeCfgDataCreateReq cfgdata_req;

mgmtd__be_cfg_data_create_req__init(&cfgdata_req);
cfgdata_req.batch_id = batch_id;
cfgdata_req.txn_id = txn_id;
cfgdata_req.data_req = cfgdata_reqs;
cfgdata_req.n_data_req = num_reqs;
Expand All @@ -481,8 +478,8 @@ int mgmt_be_send_cfgdata_req(struct mgmt_be_client_adapter *adapter,

MGMTD_BE_ADAPTER_DBG(
"Sending CFGDATA_CREATE_REQ to '%s' txn-id: %" PRIu64
" batch-id: %" PRIu64,
adapter->name, txn_id, batch_id);
" last: %s",
adapter->name, txn_id, end_of_data ? "yes" : "no");

return mgmt_be_adapter_send_msg(adapter, &be_msg);
}
Expand Down
5 changes: 1 addition & 4 deletions mgmtd/mgmt_be_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ extern int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
* txn_id
* Unique transaction identifier.
*
* batch_id
* Request batch ID.
*
* cfgdata_reqs
* An array of pointer to Mgmtd__YangCfgDataReq.
*
Expand All @@ -182,7 +179,7 @@ extern int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
* 0 on success, -1 on failure.
*/
extern int mgmt_be_send_cfgdata_req(struct mgmt_be_client_adapter *adapter,
uint64_t txn_id, uint64_t batch_id,
uint64_t txn_id,
Mgmtd__YangCfgDataReq **cfgdata_reqs,
size_t num_reqs, bool end_of_data);

Expand Down
Loading

0 comments on commit b3b5951

Please sign in to comment.