Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgmtd: implement YANG RPC/action support #15594

Merged
merged 10 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mgmt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ message BeSubscribeReq {
repeated string config_xpaths = 2;
repeated string oper_xpaths = 3;
repeated string notif_xpaths = 4;
repeated string rpc_xpaths = 5;
}

message BeSubscribeReply {
Expand Down
143 changes: 143 additions & 0 deletions lib/mgmt_be_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,143 @@ static void be_client_handle_get_tree(struct mgmt_be_client *client,
be_client_send_tree_data_batch, args);
}

static void be_client_send_rpc_reply(struct mgmt_be_client *client,
uint64_t txn_id, uint64_t req_id,
uint8_t result_type,
struct lyd_node *output)
{
struct mgmt_msg_rpc_reply *rpc_reply_msg;
uint8_t **darrp;
LY_ERR err;
int ret = NB_OK;

rpc_reply_msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc_reply, 0,
MTYPE_MSG_NATIVE_RPC_REPLY);
rpc_reply_msg->refer_id = txn_id;
rpc_reply_msg->req_id = req_id;
rpc_reply_msg->code = MGMT_MSG_CODE_RPC_REPLY;
rpc_reply_msg->result_type = result_type;

if (output) {
darrp = mgmt_msg_native_get_darrp(rpc_reply_msg);
err = yang_print_tree_append(darrp, output, result_type,
LYD_PRINT_SHRINK);
lyd_free_all(output);
if (err) {
ret = NB_ERR;
goto done;
}
}

(void)be_client_send_native_msg(client, rpc_reply_msg,
mgmt_msg_native_get_msg_len(
rpc_reply_msg),
false);
done:
mgmt_msg_native_free_msg(rpc_reply_msg);
if (ret != NB_OK)
be_client_send_error(client, txn_id, req_id, false, -EINVAL,
"Can't format RPC reply");
}

/*
* Process the RPC request.
*/
static void be_client_handle_rpc(struct mgmt_be_client *client, uint64_t txn_id,
void *msgbuf, size_t msg_len)
{
struct mgmt_msg_rpc *rpc_msg = msgbuf;
struct nb_node *nb_node;
struct lyd_node *input, *output;
const char *xpath;
const char *data;
char errmsg[BUFSIZ] = { 0 };
LY_ERR err;
int ret;

debug_be_client("Received RPC request for client %s txn-id %" PRIu64
" req-id %" PRIu64,
client->name, txn_id, rpc_msg->req_id);

xpath = mgmt_msg_native_xpath_data_decode(rpc_msg, msg_len, data);
if (!xpath) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "Corrupt RPC message");
return;
}

nb_node = nb_node_find(xpath);
if (!nb_node) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "No schema found for RPC: %s",
xpath);
return;
}

if (!nb_node->cbs.rpc) {
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "No RPC callback for: %s", xpath);
return;
}

if (data) {
err = yang_parse_rpc(xpath, rpc_msg->request_type, data, false,
&input);
if (err) {
be_client_send_error(client, txn_id, rpc_msg->req_id,
false, -EINVAL,
"Can't parse RPC data for: %s",
xpath);
return;
}
} else {
/*
* If there's no input data, create an empty input container.
* It is especially needed for actions, because their parents
* may hold necessary information.
*/
err = lyd_new_path2(NULL, ly_native_ctx, xpath, NULL, 0, 0, 0,
NULL, &input);
if (err) {
be_client_send_error(client, txn_id, rpc_msg->req_id,
false, -EINVAL,
"Can't create input node for RPC: %s",
xpath);
return;
}
}

err = lyd_new_path2(NULL, ly_native_ctx, xpath, NULL, 0, 0, 0, NULL,
&output);
if (err) {
lyd_free_all(input);
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL,
"Can't create output node for RPC: %s",
xpath);
return;
}

ret = nb_callback_rpc(nb_node, xpath, input, output, errmsg,
sizeof(errmsg));
if (ret != NB_OK) {
lyd_free_all(input);
lyd_free_all(output);
be_client_send_error(client, txn_id, rpc_msg->req_id, false,
-EINVAL, "%s", errmsg);
return;
}

lyd_free_all(input);
if (!lyd_child(output)) {
lyd_free_all(output);
output = NULL;
}

be_client_send_rpc_reply(client, txn_id, rpc_msg->req_id,
rpc_msg->request_type, output);
}

/*
* Process the notification.
*/
Expand Down Expand Up @@ -975,6 +1112,9 @@ static void be_client_handle_native_msg(struct mgmt_be_client *client,
case MGMT_MSG_CODE_GET_TREE:
be_client_handle_get_tree(client, txn_id, msg, msg_len);
break;
case MGMT_MSG_CODE_RPC:
be_client_handle_rpc(client, txn_id, msg, msg_len);
break;
case MGMT_MSG_CODE_NOTIFY:
be_client_handle_notify(client, msg, msg_len);
break;
Expand Down Expand Up @@ -1040,6 +1180,9 @@ int mgmt_be_send_subscr_req(struct mgmt_be_client *client_ctx,
subscr_req.n_notif_xpaths = client_ctx->cbs.nnotif_xpaths;
subscr_req.notif_xpaths = (char **)client_ctx->cbs.notif_xpaths;

subscr_req.n_rpc_xpaths = client_ctx->cbs.nrpc_xpaths;
subscr_req.rpc_xpaths = (char **)client_ctx->cbs.rpc_xpaths;

mgmtd__be_message__init(&be_msg);
be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ;
be_msg.subscr_req = &subscr_req;
Expand Down
2 changes: 2 additions & 0 deletions lib/mgmt_be_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ struct mgmt_be_client_cbs {

const char **notif_xpaths;
uint nnotif_xpaths;
const char **rpc_xpaths;
uint nrpc_xpaths;
};

/***************************************************************
Expand Down
45 changes: 45 additions & 0 deletions lib/mgmt_fe_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,33 @@ int mgmt_fe_send_edit_req(struct mgmt_fe_client *client, uint64_t session_id,
return ret;
}

int mgmt_fe_send_rpc_req(struct mgmt_fe_client *client, uint64_t session_id,
uint64_t req_id, LYD_FORMAT request_type,
const char *xpath, const char *data)
{
struct mgmt_msg_rpc *msg;
int ret;

msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc, 0,
MTYPE_MSG_NATIVE_RPC);
msg->refer_id = session_id;
msg->req_id = req_id;
msg->code = MGMT_MSG_CODE_RPC;
msg->request_type = request_type;

mgmt_msg_native_xpath_encode(msg, xpath);
if (data)
mgmt_msg_native_append(msg, data, strlen(data) + 1);

debug_fe_client("Sending RPC_REQ session-id %" PRIu64 " req-id %" PRIu64
" xpath: %s",
session_id, req_id, xpath);

ret = mgmt_msg_native_send_msg(&client->client.conn, msg, false);
mgmt_msg_native_free_msg(msg);
return ret;
}

static int mgmt_fe_client_handle_msg(struct mgmt_fe_client *client,
Mgmtd__FeMessage *fe_msg)
{
Expand Down Expand Up @@ -534,6 +561,7 @@ static void fe_client_handle_native_msg(struct mgmt_fe_client *client,
struct mgmt_msg_notify_data *notify_msg;
struct mgmt_msg_tree_data *tree_msg;
struct mgmt_msg_edit_reply *edit_msg;
struct mgmt_msg_rpc_reply *rpc_msg;
struct mgmt_msg_error *err_msg;
const char *xpath = NULL;
const char *data = NULL;
Expand Down Expand Up @@ -608,6 +636,23 @@ static void fe_client_handle_native_msg(struct mgmt_fe_client *client,
session->user_ctx, msg->req_id,
xpath);
break;
case MGMT_MSG_CODE_RPC_REPLY:
if (!session->client->cbs.rpc_notify)
return;

rpc_msg = (typeof(rpc_msg))msg;
if (msg_len < sizeof(*rpc_msg)) {
log_err_fe_client("Corrupt rpc-reply msg recv");
return;
}
dlen = msg_len - sizeof(*rpc_msg);

session->client->cbs.rpc_notify(client, client->user_data,
session->client_id,
msg->refer_id,
session->user_ctx, msg->req_id,
dlen ? rpc_msg->data : NULL);
break;
case MGMT_MSG_CODE_NOTIFY:
if (!session->client->cbs.async_notification)
return;
Expand Down
35 changes: 35 additions & 0 deletions lib/mgmt_fe_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ struct mgmt_fe_client_cbs {
uintptr_t session_ctx, uint64_t req_id,
const char *xpath);

/* Called when RPC result is returned */
int (*rpc_notify)(struct mgmt_fe_client *client, uintptr_t user_data,
uint64_t client_id, uint64_t session_id,
uintptr_t session_ctx, uint64_t req_id,
const char *result);

/* Called with asynchronous notifications from backends */
int (*async_notification)(struct mgmt_fe_client *client,
uintptr_t user_data, uint64_t client_id,
Expand Down Expand Up @@ -454,6 +460,35 @@ extern int mgmt_fe_send_edit_req(struct mgmt_fe_client *client,
uint8_t flags, uint8_t operation,
const char *xpath, const char *data);

/*
* Send RPC request to MGMTD daemon.
*
* client
* Client object.
*
* session_id
* Client session ID.
*
* req_id
* Client request ID.
*
* result_type
* The LYD_FORMAT of the result.
*
* xpath
* the xpath of the RPC.
*
* data
* the data tree.
*
* Returns:
* 0 on success, otherwise msg_conn_send_msg() return values.
*/
extern int mgmt_fe_send_rpc_req(struct mgmt_fe_client *client,
uint64_t session_id, uint64_t req_id,
LYD_FORMAT request_type, const char *xpath,
const char *data);

/*
* Destroy library and cleanup everything.
*/
Expand Down
2 changes: 2 additions & 0 deletions lib/mgmt_msg_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_GET_DATA, "native get data msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_NOTIFY, "native get data msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT, "native edit msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_EDIT_REPLY, "native edit reply msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC, "native RPC msg");
DEFINE_MTYPE(MSG_NATIVE, MSG_NATIVE_RPC_REPLY, "native RPC reply msg");

int vmgmt_msg_native_send_error(struct msg_conn *conn, uint64_t sess_or_txn_id,
uint64_t req_id, bool short_circuit_ok,
Expand Down
45 changes: 44 additions & 1 deletion lib/mgmt_msg_native.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ DECLARE_MTYPE(MSG_NATIVE_GET_DATA);
DECLARE_MTYPE(MSG_NATIVE_NOTIFY);
DECLARE_MTYPE(MSG_NATIVE_EDIT);
DECLARE_MTYPE(MSG_NATIVE_EDIT_REPLY);
DECLARE_MTYPE(MSG_NATIVE_RPC);
DECLARE_MTYPE(MSG_NATIVE_RPC_REPLY);

/*
* Native message codes
Expand All @@ -163,6 +165,8 @@ DECLARE_MTYPE(MSG_NATIVE_EDIT_REPLY);
#define MGMT_MSG_CODE_NOTIFY 4
#define MGMT_MSG_CODE_EDIT 5
#define MGMT_MSG_CODE_EDIT_REPLY 6
#define MGMT_MSG_CODE_RPC 7
#define MGMT_MSG_CODE_RPC_REPLY 8

/*
* Datastores
Expand Down Expand Up @@ -377,6 +381,42 @@ _Static_assert(sizeof(struct mgmt_msg_edit_reply) ==
offsetof(struct mgmt_msg_edit_reply, data),
"Size mismatch");

/**
* struct mgmt_msg_rpc - RPC/action request.
*
* @request_type: ``LYD_FORMAT`` for the @data.
* @data: the xpath followed by the tree data for the operation.
*/
struct mgmt_msg_rpc {
struct mgmt_msg_header;
uint8_t request_type;
uint8_t resv2[7];

alignas(8) char data[];
};

_Static_assert(sizeof(struct mgmt_msg_rpc) ==
offsetof(struct mgmt_msg_rpc, data),
"Size mismatch");

/**
* struct mgmt_msg_rpc_reply - RPC/action reply.
*
* @result_type: ``LYD_FORMAT`` for the @data.
* @data: the tree data for the reply.
*/
struct mgmt_msg_rpc_reply {
struct mgmt_msg_header;
uint8_t result_type;
uint8_t resv2[7];

alignas(8) char data[];
};

_Static_assert(sizeof(struct mgmt_msg_rpc_reply) ==
offsetof(struct mgmt_msg_rpc_reply, data),
"Size mismatch");

/*
* Validate that the message ends in a NUL terminating byte
*/
Expand Down Expand Up @@ -569,7 +609,10 @@ extern int vmgmt_msg_native_send_error(struct msg_conn *conn,
const char *__s = NULL; \
if (msg->vsplit && msg->vsplit <= __len && \
msg->data[msg->vsplit - 1] == 0) { \
(__data) = msg->data + msg->vsplit; \
if (msg->vsplit < __len) \
(__data) = msg->data + msg->vsplit; \
else \
(__data) = NULL; \
__s = msg->data; \
} \
__s; \
Expand Down
Loading
Loading