Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query & queryable token config #260

Merged
merged 16 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@
#define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0
#endif

/**
* Enable queryables
*/
#ifndef Z_FEATURE_QUERYABLES
#define Z_FEATURE_QUERYABLES 1
#endif

/**
* Enable queries
*/
#ifndef Z_FEATURE_QUERIES
#define Z_FEATURE_QUERIES 1
#endif

/**
* Enable TCP links.
*/
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ typedef struct {
void *_zn; // FIXME: _z_session_t *zn;
} _z_queryable_t;

#if Z_FEATURE_QUERYABLES == 1
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif

#endif /* ZENOH_PICO_QUERY_NETAPI_H */
4 changes: 4 additions & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ typedef struct {
_z_subscription_sptr_list_t *_remote_subscriptions;

// Session queryables
#if Z_FEATURE_QUERYABLES == 1
_z_questionable_sptr_list_t *_local_questionable;
#endif
#if Z_FEATURE_QUERIES == 1
_z_pending_query_list_t *_pending_queries;
#endif
} _z_session_t;

/**
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

#if Z_FEATURE_QUERIES == 1
/*------------------ Query ------------------*/
_z_zint_t _z_get_query_id(_z_session_t *zn);

Expand All @@ -30,5 +31,6 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context,
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
#endif

#endif /* ZENOH_PICO_SESSION_QUERY_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define _Z_QUERYABLE_COMPLETE_DEFAULT false
#define _Z_QUERYABLE_DISTANCE_DEFAULT 0

#if Z_FEATURE_QUERYABLES == 1
/*------------------ Queryable ------------------*/
_z_questionable_sptr_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id);
_z_questionable_sptr_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key);
Expand All @@ -30,5 +31,6 @@ _z_questionable_sptr_t *_z_register_questionable(_z_session_t *zn, _z_questionab
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid);
void _z_unregister_questionable(_z_session_t *zn, _z_questionable_sptr_t *q);
void _z_flush_questionables(_z_session_t *zn);
#endif

#endif /* ZENOH_PICO_SESSION_QUERYABLE_H */
24 changes: 22 additions & 2 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,12 @@ void z_queryable_drop(z_owned_queryable_t *val) { z_undeclare_queryable(val); }

OWNED_FUNCTIONS_PTR_INTERNAL(z_keyexpr_t, z_owned_keyexpr_t, keyexpr, _z_keyexpr_free, _z_keyexpr_copy)
OWNED_FUNCTIONS_PTR_INTERNAL(z_hello_t, z_owned_hello_t, hello, _z_hello_free, _z_owner_noop_copy)
OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _z_owner_noop_copy)
OWNED_FUNCTIONS_PTR_INTERNAL(z_str_array_t, z_owned_str_array_t, str_array, _z_str_array_free, _z_owner_noop_copy)

#if Z_FEATURE_QUERIES == 1
OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _z_owner_noop_copy)
#endif

#define OWNED_FUNCTIONS_CLOSURE(ownedtype, name) \
_Bool z_##name##_check(const ownedtype *val) { return val->call != NULL; } \
ownedtype *z_##name##_move(ownedtype *val) { return val; } \
Expand Down Expand Up @@ -654,14 +657,17 @@ typedef struct __z_reply_handler_wrapper_t {
} __z_reply_handler_wrapper_t;

void __z_reply_handler(_z_reply_t *reply, __z_reply_handler_wrapper_t *wrapped_ctx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queries are not supported, the entire reply handler can be ommitted.

#if Z_FEATURE_QUERIES == 1
z_owned_reply_t oreply = {._value = reply};

wrapped_ctx->user_call(&oreply, wrapped_ctx->ctx);
z_reply_drop(&oreply); // user_call is allowed to take ownership of the reply by setting oreply._value to NULL
#endif
}

int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owned_closure_reply_t *callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queries are disabled, I would completely omit any related APIs.

In doing so, the user will have an explicit error at compilation. Otherwise, it will become harder for users (and us) to debug non-compliant behavior.

Also, you can save some extra bytes in the final binary.

const z_get_options_t *options) {
#if Z_FEATURE_QUERIES == 1
int8_t ret = _Z_RES_OK;

void *ctx = callback->context;
Expand Down Expand Up @@ -694,6 +700,9 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne

ret = _z_query(zs._val, keyexpr, parameters, opt.target, opt.consolidation.mode, opt.value, __z_reply_handler,
wrapped_ctx, callback->drop, ctx);
#else
int8_t ret = _Z_ERR_GENERIC; // Not supported
#endif
return ret;
}

Expand Down Expand Up @@ -896,6 +905,7 @@ z_queryable_options_t z_queryable_options_default(void) {

z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_query_t *callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queryables are disabled, I would completely omit any related APIs.

In doing so, the user will have an explicit error at compilation. Otherwise, it will become harder for users (and us) to debug non-compliant behavior.

Also, you can save some extra bytes in the final binary.

const z_queryable_options_t *options) {
#if Z_FEATURE_QUERYABLES == 1
void *ctx = callback->context;
callback->context = NULL;

Expand Down Expand Up @@ -923,14 +933,20 @@ z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_o

return (z_owned_queryable_t){
._value = _z_declare_queryable(zs._val, key, opt.complete, callback->call, callback->drop, ctx)};
#else
return (z_owned_queryable_t){._value = NULL};
#endif
}

int8_t z_undeclare_queryable(z_owned_queryable_t *queryable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queryables are disabled, I would completely omit any related APIs.

In doing so, the user will have an explicit error at compilation. Otherwise, it will become harder for users (and us) to debug non-compliant behavior.

Also, you can save some extra bytes in the final binary.

#if Z_FEATURE_QUERYABLES == 1
int8_t ret = _Z_RES_OK;

ret = _z_undeclare_queryable(queryable->_value);
_z_queryable_free(&queryable->_value);

#else
int8_t ret = _Z_ERR_GENERIC;
#endif
return ret;
}

Expand All @@ -940,6 +956,7 @@ z_query_reply_options_t z_query_reply_options_default(void) {

int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const uint8_t *payload, size_t payload_len,
const z_query_reply_options_t *options) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queryables are disabled, I would completely omit any related APIs.

In doing so, the user will have an explicit error at compilation. Otherwise, it will become harder for users (and us) to debug non-compliant behavior.

Also, you can save some extra bytes in the final binary.

#if Z_FEATURE_QUERYABLES == 1
z_query_reply_options_t opts = options == NULL ? z_query_reply_options_default() : *options;
_z_value_t value = {.payload =
{
Expand All @@ -949,6 +966,9 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
return _z_send_reply(query, keyexpr, value);
#else
return _Z_ERR_GENERIC;
#endif
}

_Bool z_reply_is_ok(const z_owned_reply_t *reply) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queryables are disabled, I would completely omit any related APIs.

In doing so, the user will have an explicit error at compilation. Otherwise, it will become harder for users (and us) to debug non-compliant behavior.

Also, you can save some extra bytes in the final binary.

Expand Down
80 changes: 42 additions & 38 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
return ret;
}

#if Z_FEATURE_QUERYABLES == 1
/*------------------ Queryable Declaration ------------------*/
_z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete,
_z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg) {
Expand Down Expand Up @@ -302,6 +303,47 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu

return ret;
}
#endif

#if Z_FEATURE_QUERIES == 1
/*------------------ Query ------------------*/
int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback,
void *arg_call, _z_drop_handler_t dropper, void *arg_drop) {
int8_t ret = _Z_RES_OK;

// Create the pending query object
_z_pending_query_t *pq = (_z_pending_query_t *)z_malloc(sizeof(_z_pending_query_t));
if (pq != NULL) {
pq->_id = _z_get_query_id(zn);
pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr);
pq->_parameters = _z_str_clone(parameters);
pq->_target = target;
pq->_consolidation = consolidation;
pq->_anykey = (strstr(pq->_parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_callback = callback;
pq->_dropper = dropper;
pq->_pending_replies = NULL;
pq->_call_arg = arg_call;
pq->_drop_arg = arg_drop;

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
_z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters));
_z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value);

if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_pending_query(zn, pq);
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
} else {
_z_pending_query_clear(pq);
}
}

return ret;
}
#endif

/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len,
Expand Down Expand Up @@ -353,44 +395,6 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay
return ret;
}

/*------------------ Query ------------------*/
int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback,
void *arg_call, _z_drop_handler_t dropper, void *arg_drop) {
int8_t ret = _Z_RES_OK;

// Create the pending query object
_z_pending_query_t *pq = (_z_pending_query_t *)z_malloc(sizeof(_z_pending_query_t));
if (pq != NULL) {
pq->_id = _z_get_query_id(zn);
pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr);
pq->_parameters = _z_str_clone(parameters);
pq->_target = target;
pq->_consolidation = consolidation;
pq->_anykey = (strstr(pq->_parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_callback = callback;
pq->_dropper = dropper;
pq->_pending_replies = NULL;
pq->_call_arg = arg_call;
pq->_drop_arg = arg_drop;

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
_z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters));
_z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value);

if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_pending_query(zn, pq);
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
} else {
_z_pending_query_clear(pq);
}
}

return ret;
}

/*------------------ Pull ------------------*/
int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
int8_t ret = _Z_RES_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "zenoh-pico/session/query.h"

#if Z_FEATURE_QUERYABLES == 1
void _z_queryable_clear(_z_queryable_t *qbl) {
// Nothing to clear
(void)(qbl);
Expand All @@ -28,3 +29,4 @@ void _z_queryable_free(_z_queryable_t **qbl) {
*qbl = NULL;
}
}
#endif
2 changes: 2 additions & 0 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERIES == 1
_z_reply_t *_z_reply_alloc_and_move(_z_reply_t *_reply) {
_z_reply_t *reply = (_z_reply_t *)z_malloc(sizeof(_z_reply_t));
if (reply != NULL) {
Expand Down Expand Up @@ -290,3 +291,4 @@ void _z_flush_pending_queries(_z_session_t *zn) {
_z_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
}
#endif
4 changes: 4 additions & 0 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERYABLES == 1

_Bool _z_questionable_eq(const _z_questionable_t *one, const _z_questionable_t *two) { return one->_id == two->_id; }

void _z_questionable_clear(_z_questionable_t *qle) {
Expand Down Expand Up @@ -224,3 +226,5 @@ void _z_flush_questionables(_z_session_t *zn) {
_z_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
}

#endif
12 changes: 12 additions & 0 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_n_msg_request_t req = msg->_body._request;
switch (req._tag) {
case _Z_REQUEST_QUERY: {
#if Z_FEATURE_QUERYABLES == 1
_z_msg_query_t *query = &req._body._query;
ret = _z_trigger_queryables(zn, query, req._key, req._rid);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n");
#endif
} break;
case _Z_REQUEST_PUT: {
_z_msg_put_t put = req._body._put;
Expand Down Expand Up @@ -127,9 +131,13 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_n_msg_response_t response = msg->_body._response;
switch (response._tag) {
case _Z_RESPONSE_BODY_REPLY: {
#if Z_FEATURE_QUERIES == 1
_z_msg_reply_t reply = response._body._reply;
ret = _z_trigger_query_reply_partial(zn, response._request_id, response._key, reply._value.payload,
reply._value.encoding, Z_SAMPLE_KIND_PUT, reply._timestamp);
#else
_Z_DEBUG("_Z_RESPONSE_BODY_REPLY dropped, queries not supported\n");
#endif
} break;
case _Z_RESPONSE_BODY_ERR: {
// @TODO: expose errors to the user
Expand All @@ -154,9 +162,13 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
}
} break;
case _Z_N_RESPONSE_FINAL: {
#if Z_FEATURE_QUERIES == 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response and ResponseFinal are not limited to queries/queryables.

_Z_DEBUG("Handling _Z_N_RESPONSE_FINAL\n");
_z_zint_t id = msg->_body._response_final._request_id;
_z_trigger_query_reply_final(zn, id);
#else
_Z_DEBUG("_Z_N_RESPONSE_FINAL dropped, queries not supported\n");
#endif
} break;
}
_z_msg_clear(msg);
Expand Down
9 changes: 9 additions & 0 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) {
zn->_remote_resources = NULL;
zn->_local_subscriptions = NULL;
zn->_remote_subscriptions = NULL;
#if Z_FEATURE_QUERYABLES == 1
zn->_local_questionable = NULL;
#endif
#if Z_FEATURE_QUERIES == 1
zn->_pending_queries = NULL;
#endif

#if Z_FEATURE_MULTI_THREAD == 1
ret = _z_mutex_init(&zn->_mutex_inner);
Expand Down Expand Up @@ -100,8 +104,13 @@ void _z_session_clear(_z_session_t *zn) {
// Clean up the entities
_z_flush_resources(zn);
_z_flush_subscriptions(zn);

#if Z_FEATURE_QUERYABLES == 1
_z_flush_questionables(zn);
#endif
#if Z_FEATURE_QUERIES == 1
_z_flush_pending_queries(zn);
#endif

#if Z_FEATURE_MULTI_THREAD == 1
_z_mutex_free(&zn->_mutex_inner);
Expand Down
Loading