Skip to content

Commit

Permalink
Merge pull request #15594 from idryzhov/mgmt-rpc
Browse files Browse the repository at this point in the history
mgmtd: implement YANG RPC/action support
  • Loading branch information
choppsv1 authored May 6, 2024
2 parents 216bac2 + cb88ce1 commit c54bc7a
Show file tree
Hide file tree
Showing 43 changed files with 1,404 additions and 300 deletions.
1 change: 1 addition & 0 deletions lib/mgmt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ message BeSubscribeReq {
repeated string config_xpaths = 2;
repeated string oper_xpaths = 3;
repeated string notif_xpaths = 4;
repeated string rpc_xpaths = 5;
}

message BeSubscribeReply {
Expand Down
143 changes: 143 additions & 0 deletions lib/mgmt_be_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,143 @@ static void be_client_handle_get_tree(struct mgmt_be_client *client,
be_client_send_tree_data_batch, args);
}

static void be_client_send_rpc_reply(struct mgmt_be_client *client,
uint64_t txn_id, uint64_t req_id,
uint8_t result_type,
struct lyd_node *output)
{
struct mgmt_msg_rpc_reply *rpc_reply_msg;
uint8_t **darrp;
LY_ERR err;
int ret = NB_OK;

rpc_reply_msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc_reply, 0,
MTYPE_MSG_NATIVE_RPC_REPLY);
rpc_reply_msg->refer_id = txn_id;
rpc_reply_msg->req_id = req_id;
rpc_reply_msg->code = MGMT_MSG_CODE_RPC_REPLY;
rpc_reply_msg->result_type = result_type;

if (output) {
darrp = mgmt_msg_native_get_darrp(rpc_reply_msg);
err = yang_print_tree_append(darrp, output, result_type,
LYD_PRINT_SHRINK);
lyd_free_all(output);
if (err) {
ret = NB_ERR;
goto done;
}
}

(void)be_client_send_native_msg(client, rpc_reply_msg,
mgmt_msg_native_get_msg_len(
rpc_reply_msg),
false);
done:
mgmt_msg_native_free_msg(rpc_reply_msg);
if (ret != NB_OK)
be_client_send_error(client, txn_id, req_id, false, -EINVAL,
"Can't format RPC reply");
}

/*
* Process the RPC request.
*/
static void be_client_handle_rpc(struct mgmt_be_client *client, uint64_t txn_id,
void *msgbuf, size_t msg_len)
{
struct mgmt_msg_rpc *rpc_msg = msgbuf;
struct nb_node *nb_node;
struct lyd_node *input, *output;
const char *xpath;
const char *data;
char errmsg[BUFSIZ] = { 0 };
LY_ERR err;
int ret;

debug_be_client("Received RPC request for client %s txn-id %" PRIu64
" req-id %" PRIu64,
client->name, txn_id, rpc_msg->req_id);

xpath = mgmt_msg_native_xpath_data_decode(rpc_msg, msg_len, data);
if (!xpath) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "Corrupt RPC message");
return;
}

nb_node = nb_node_find(xpath);
if (!nb_node) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "No schema found for RPC: %s",
xpath);
return;
}

if (!nb_node->cbs.rpc) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "No RPC callback for: %s", xpath);
return;
}

if (data) {
err = yang_parse_rpc(xpath, rpc_msg->request_type, data, false,
&input);
if (err) {
be_client_send_error(client, txn_id, rpc_msg->req_id,
false, -EINVAL,
"Can't parse RPC data for: %s",
xpath);
return;
}
} else {
/*
* If there's no input data, create an empty input container.
* It is especially needed for actions, because their parents
* may hold necessary information.
*/
err = lyd_new_path2(NULL, ly_native_ctx, xpath, NULL, 0, 0, 0,
NULL, &input);
if (err) {
be_client_send_error(client, txn_id, rpc_msg->req_id,
false, -EINVAL,
"Can't create input node for RPC: %s",
xpath);
return;
}
}

err = lyd_new_path2(NULL, ly_native_ctx, xpath, NULL, 0, 0, 0, NULL,
&output);
if (err) {
lyd_free_all(input);
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL,
"Can't create output node for RPC: %s",
xpath);
return;
}

ret = nb_callback_rpc(nb_node, xpath, input, output, errmsg,
sizeof(errmsg));
if (ret != NB_OK) {
lyd_free_all(input);
lyd_free_all(output);
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "%s", errmsg);
return;
}

lyd_free_all(input);
if (!lyd_child(output)) {
lyd_free_all(output);
output = NULL;
}

be_client_send_rpc_reply(client, txn_id, rpc_msg->req_id,
rpc_msg->request_type, output);
}

/*
* Process the notification.
*/
Expand Down Expand Up @@ -975,6 +1112,9 @@ static void be_client_handle_native_msg(struct mgmt_be_client *client,
case MGMT_MSG_CODE_GET_TREE:
be_client_handle_get_tree(client, txn_id, msg, msg_len);
break;
case MGMT_MSG_CODE_RPC:
be_client_handle_rpc(client, txn_id, msg, msg_len);
break;
case MGMT_MSG_CODE_NOTIFY:
be_client_handle_notify(client, msg, msg_len);
break;
Expand Down Expand Up @@ -1040,6 +1180,9 @@ int mgmt_be_send_subscr_req(struct mgmt_be_client *client_ctx,
subscr_req.n_notif_xpaths = client_ctx->cbs.nnotif_xpaths;
subscr_req.notif_xpaths = (char **)client_ctx->cbs.notif_xpaths;

subscr_req.n_rpc_xpaths = client_ctx->cbs.nrpc_xpaths;
subscr_req.rpc_xpaths = (char **)client_ctx->cbs.rpc_xpaths;

mgmtd__be_message__init(&be_msg);
be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ;
be_msg.subscr_req = &subscr_req;
Expand Down
2 changes: 2 additions & 0 deletions lib/mgmt_be_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ struct mgmt_be_client_cbs {

const char **notif_xpaths;
uint nnotif_xpaths;
const char **rpc_xpaths;
uint nrpc_xpaths;
};

/***************************************************************
Expand Down
45 changes: 45 additions & 0 deletions lib/mgmt_fe_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,33 @@ int mgmt_fe_send_edit_req(struct mgmt_fe_client *client, uint64_t session_id,
return ret;
}

int mgmt_fe_send_rpc_req(struct mgmt_fe_client *client, uint64_t session_id,
uint64_t req_id, LYD_FORMAT request_type,
const char *xpath, const char *data)
{
struct mgmt_msg_rpc *msg;
int ret;

msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc, 0,
MTYPE_MSG_NATIVE_RPC);
msg->refer_id = session_id;
msg->req_id = req_id;
msg->code = MGMT_MSG_CODE_RPC;
msg->request_type = request_type;

mgmt_msg_native_xpath_encode(msg, xpath);
if (data)
mgmt_msg_native_append(msg, data, strlen(data) + 1);

debug_fe_client("Sending RPC_REQ session-id %" PRIu64 " req-id %" PRIu64
" xpath: %s",
session_id, req_id, xpath);

ret = mgmt_msg_native_send_msg(&client->client.conn, msg, false);
mgmt_msg_native_free_msg(msg);
return ret;
}

static int mgmt_fe_client_handle_msg(struct mgmt_fe_client *client,
Mgmtd__FeMessage *fe_msg)
{
Expand Down Expand Up @@ -534,6 +561,7 @@ static void fe_client_handle_native_msg(struct mgmt_fe_client *client,
struct mgmt_msg_notify_data *notify_msg;
struct mgmt_msg_tree_data *tree_msg;
struct mgmt_msg_edit_reply *edit_msg;
struct mgmt_msg_rpc_reply *rpc_msg;
struct mgmt_msg_error *err_msg;
const char *xpath = NULL;
const char *data = NULL;
Expand Down Expand Up @@ -608,6 +636,23 @@ static void fe_client_handle_native_msg(struct mgmt_fe_client *client,
session->user_ctx, msg->req_id,
xpath);
break;
case MGMT_MSG_CODE_RPC_REPLY:
if (!session->client->cbs.rpc_notify)
return;

rpc_msg = (typeof(rpc_msg))msg;
if (msg_len < sizeof(*rpc_msg)) {
log_err_fe_client("Corrupt rpc-reply msg recv");
return;
}
dlen = msg_len - sizeof(*rpc_msg);

session->client->cbs.rpc_notify(client, client->user_data,
session->client_id,
msg->refer_id,
session->user_ctx, msg->req_id,
dlen ? rpc_msg->data : NULL);
break;
case MGMT_MSG_CODE_NOTIFY:
if (!session->client->cbs.async_notification)
return;
Expand Down
35 changes: 35 additions & 0 deletions lib/mgmt_fe_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ struct mgmt_fe_client_cbs {
uintptr_t session_ctx, uint64_t req_id,
const char *xpath);

/* Called when RPC result is returned */
int (*rpc_notify)(struct mgmt_fe_client *client, uintptr_t user_data,
uint64_t client_id, uint64_t session_id,
uintptr_t session_ctx, uint64_t req_id,
const char *result);

/* Called with asynchronous notifications from backends */
int (*async_notification)(struct mgmt_fe_client *client,
uintptr_t user_data, uint64_t client_id,
Expand Down Expand Up @@ -454,6 +460,35 @@ extern int mgmt_fe_send_edit_req(struct mgmt_fe_client *client,
uint8_t flags, uint8_t operation,
const char *xpath, const char *data);

/*
* Send RPC request to MGMTD daemon.
*
* client
* Client object.
*
* session_id
* Client session ID.
*
* req_id
* Client request ID.
*
* result_type
* The LYD_FORMAT of the result.
*
* xpath
* the xpath of the RPC.
*
* data
* the data tree.
*
* Returns:
* 0 on success, otherwise msg_conn_send_msg() return values.
*/
extern int mgmt_fe_send_rpc_req(struct mgmt_fe_client *client,
uint64_t session_id, uint64_t req_id,
LYD_FORMAT request_type, const char *xpath,
const char *data);

/*
* Destroy library and cleanup everything.
*/
Expand Down
2 changes: 2 additions & 0 deletions lib/mgmt_msg_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_GET_DATA, "native get data msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_NOTIFY, "native get data msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT, "native edit msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT_REPLY, "native edit reply msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC, "native RPC msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC_REPLY, "native RPC reply msg");

int vmgmt_msg_native_send_error(struct msg_conn *conn, uint64_t sess_or_txn_id,
uint64_t req_id, bool short_circuit_ok,
Expand Down
45 changes: 44 additions & 1 deletion lib/mgmt_msg_native.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ DECLARE_MTYPE(MSG_NATIVE_GET_DATA);
DECLARE_MTYPE(MSG_NATIVE_NOTIFY);
DECLARE_MTYPE(MSG_NATIVE_EDIT);
DECLARE_MTYPE(MSG_NATIVE_EDIT_REPLY);
DECLARE_MTYPE(MSG_NATIVE_RPC);
DECLARE_MTYPE(MSG_NATIVE_RPC_REPLY);

/*
* Native message codes
Expand All @@ -163,6 +165,8 @@ DECLARE_MTYPE(MSG_NATIVE_EDIT_REPLY);
#define MGMT_MSG_CODE_NOTIFY 4
#define MGMT_MSG_CODE_EDIT 5
#define MGMT_MSG_CODE_EDIT_REPLY 6
#define MGMT_MSG_CODE_RPC 7
#define MGMT_MSG_CODE_RPC_REPLY 8

/*
* Datastores
Expand Down Expand Up @@ -377,6 +381,42 @@ _Static_assert(sizeof(struct mgmt_msg_edit_reply) ==
offsetof(struct mgmt_msg_edit_reply, data),
"Size mismatch");

/**
* struct mgmt_msg_rpc - RPC/action request.
*
* @request_type: ``LYD_FORMAT`` for the @data.
* @data: the xpath followed by the tree data for the operation.
*/
struct mgmt_msg_rpc {
struct mgmt_msg_header;
uint8_t request_type;
uint8_t resv2[7];

alignas(8) char data[];
};

_Static_assert(sizeof(struct mgmt_msg_rpc) ==
offsetof(struct mgmt_msg_rpc, data),
"Size mismatch");

/**
* struct mgmt_msg_rpc_reply - RPC/action reply.
*
* @result_type: ``LYD_FORMAT`` for the @data.
* @data: the tree data for the reply.
*/
struct mgmt_msg_rpc_reply {
struct mgmt_msg_header;
uint8_t result_type;
uint8_t resv2[7];

alignas(8) char data[];
};

_Static_assert(sizeof(struct mgmt_msg_rpc_reply) ==
offsetof(struct mgmt_msg_rpc_reply, data),
"Size mismatch");

/*
* Validate that the message ends in a NUL terminating byte
*/
Expand Down Expand Up @@ -569,7 +609,10 @@ extern int vmgmt_msg_native_send_error(struct msg_conn *conn,
const char *__s = NULL; \
if (msg->vsplit && msg->vsplit <= __len && \
msg->data[msg->vsplit - 1] == 0) { \
(__data) = msg->data + msg->vsplit; \
if (msg->vsplit < __len) \
(__data) = msg->data + msg->vsplit; \
else \
(__data) = NULL; \
__s = msg->data; \
} \
__s; \
Expand Down
Loading

0 comments on commit c54bc7a

Please sign in to comment.