Skip to content

Commit

Permalink
Implement querier (#858)
Browse files Browse the repository at this point in the history
* Add querier API

* Add querier example

* Build fix

* Mark querier API as unstable

* Querier implementation

* Fix modular build

* Add test and update example

* Add docs

* Add querier session check
  • Loading branch information
sashacmc authored Jan 24, 2025
1 parent d085699 commit d076875
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport")
set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_SESSION_CHECK 1 CACHE STRING "Toggle publisher/querier session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
Expand Down
40 changes: 40 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,46 @@ See details at :ref:`owned_types_concept`
.. c:function:: void z_reply_clone(z_owned_reply_t * dst, const z_loaned_reply_t * reply)
.. c:function:: const z_loaned_reply_t * z_reply_loan(const z_owned_reply_t * reply)
Querier
=======
Represents a Zenoh Querier entity.
Types
-----
See details at :ref:`owned_types_concept`
.. c:type:: z_owned_querier_t
.. c:type:: z_loaned_querier_t
.. c:type:: z_moved_querier_t
Option Types
------------
.. autoctype:: types.h::z_querier_options_t
.. autoctype:: types.h::z_querier_get_options_t
Constants
---------
Functions
---------
.. autocfunction:: primitives.h::z_declare_querier
.. autocfunction:: primitives.h::z_undeclare_querier
.. autocfunction:: primitives.h::z_querier_get
.. autocfunction:: primitives.h::z_querier_keyexpr
.. autocfunction:: primitives.h::z_querier_options_default
.. autocfunction:: primitives.h::z_querier_get_options_default
Ownership Functions
-------------------
See details at :ref:`owned_types_concept`
.. c:function:: const z_loaned_querier_t * z_querier_loan(const z_owned_querier_t * closure)
.. c:function:: void z_querier_drop(z_moved_querier_t * closure)
Scouting
========
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ if(UNIX)
add_example(z_get_channel unix/c11/z_get_channel.c)
add_example(z_get_attachment unix/c11/z_get_attachment.c)
add_example(z_get_liveliness unix/c11/z_get_liveliness.c)
add_example(z_querier unix/c11/z_querier.c)
add_example(z_queryable unix/c11/z_queryable.c)
add_example(z_queryable_channel unix/c11/z_queryable_channel.c)
add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c)
Expand Down
174 changes: 174 additions & 0 deletions examples/unix/c11/z_querier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <limits.h>
#include <stddef.h>
#include <stdio.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API

int main(int argc, char **argv) {
const char *selector = "demo/example/**";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;
const char *value = NULL;
int n = INT_MAX;
int timeout_ms = 0;

int opt;
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) {
switch (opt) {
case 's':
selector = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'v':
value = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case 't':
timeout_ms = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n' || optopt == 't') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

const char *ke = selector;
size_t ke_len = strlen(ke);
const char *params = strchr(selector, '?');
if (params != NULL) {
ke_len = params - ke;
params += 1;
}

z_view_keyexpr_t keyexpr;
if (z_view_keyexpr_from_substr(&keyexpr, ke, ke_len) < 0) {
printf("%.*s is not a valid key expression", (int)ke_len, ke);
exit(-1);
}

printf("Declaring Querier on '%s'...\n", ke);
z_owned_querier_t querier;

z_querier_options_t opts;
z_querier_options_default(&opts);
opts.timeout_ms = timeout_ms;

if (z_declare_querier(z_loan(s), &querier, z_loan(keyexpr), &opts) < 0) {
printf("Unable to declare Querier for key expression!\n");
exit(-1);
}

printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx != n; ++idx) {
z_sleep_s(1);
sprintf(buf, "[%4d] %s", idx, value ? value : "");
printf("Querying '%s' with payload '%s'...\n", selector, buf);
z_querier_get_options_t get_options;
z_querier_get_options_default(&get_options);

if (value != NULL) {
z_owned_bytes_t payload;
z_bytes_copy_from_str(&payload, buf);
get_options.payload = z_move(payload);
}

z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, 16);

z_querier_get(z_loan(querier), params, z_move(closure), &get_options);

z_owned_reply_t reply;
for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));

z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);

z_owned_string_t replystr;
z_bytes_to_string(z_sample_payload(sample), &replystr);

printf(">> Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(keystr)),
z_string_data(z_loan(keystr)), (int)z_string_len(z_loan(replystr)),
z_string_data(z_loan(replystr)));
z_drop(z_move(replystr));
} else {
printf(">> Received an error\n");
}
z_drop(z_move(reply));
}
z_drop(z_move(handler));
}

z_drop(z_move(querier));
z_drop(z_move(s));

return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
22 changes: 22 additions & 0 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
z_owned_session_t : z_session_loan, \
z_owned_subscriber_t : z_subscriber_loan, \
z_owned_publisher_t : z_publisher_loan, \
z_owned_querier_t : z_querier_loan, \
z_owned_matching_listener_t : z_matching_listener_loan, \
z_owned_queryable_t : z_queryable_loan, \
z_owned_liveliness_token_t : z_liveliness_token_loan, \
Expand Down Expand Up @@ -82,6 +83,7 @@
z_owned_config_t : z_config_loan_mut, \
z_owned_session_t : z_session_loan_mut, \
z_owned_publisher_t : z_publisher_loan_mut, \
z_owned_querier_t : z_querier_loan_mut, \
z_owned_matching_listener_t : z_matching_listener_loan_mut, \
z_owned_queryable_t : z_queryable_loan_mut, \
z_owned_liveliness_token_t : z_liveliness_token_loan_mut, \
Expand Down Expand Up @@ -116,6 +118,7 @@
z_moved_session_t* : z_session_drop, \
z_moved_subscriber_t* : z_subscriber_drop, \
z_moved_publisher_t* : z_publisher_drop, \
z_moved_querier_t* : z_querier_drop, \
z_moved_matching_listener_t* : z_matching_listener_drop, \
z_moved_queryable_t* : z_queryable_drop, \
z_moved_liveliness_token_t* : z_liveliness_token_drop, \
Expand Down Expand Up @@ -165,6 +168,7 @@
z_owned_session_t : z_internal_session_check, \
z_owned_subscriber_t : z_internal_subscriber_check, \
z_owned_publisher_t : z_internal_publisher_check, \
z_owned_querier_t : z_internal_querier_check, \
z_owned_matching_listener_t : z_internal_matching_listener_check, \
z_owned_queryable_t : z_internal_queryable_check, \
z_owned_liveliness_token_t : z_internal_liveliness_token_check, \
Expand Down Expand Up @@ -237,6 +241,7 @@
z_owned_session_t : z_session_move, \
z_owned_subscriber_t : z_subscriber_move, \
z_owned_publisher_t : z_publisher_move, \
z_owned_querier_t : z_querier_move, \
z_owned_matching_listener_t: z_matching_listener_move, \
z_owned_queryable_t : z_queryable_move, \
z_owned_liveliness_token_t : z_liveliness_token_move, \
Expand Down Expand Up @@ -298,6 +303,7 @@
z_owned_keyexpr_t *: z_keyexpr_take, \
z_owned_mutex_t *: z_mutex_take, \
z_owned_publisher_t *: z_publisher_take, \
z_owned_querier_t *: z_querier_take, \
z_owned_matching_listener_t *: z_matching_listener_take, \
z_owned_query_t *: z_query_take, \
z_owned_queryable_t *: z_queryable_take, \
Expand Down Expand Up @@ -351,6 +357,7 @@
#define z_internal_null(x) _Generic((x), \
z_owned_session_t * : z_internal_session_null, \
z_owned_publisher_t * : z_internal_publisher_null, \
z_owned_querier_t * : z_internal_querier_null, \
z_owned_matching_listener_t * : z_internal_matching_listener_null, \
z_owned_keyexpr_t * : z_internal_keyexpr_null, \
z_owned_config_t * : z_internal_config_null, \
Expand Down Expand Up @@ -410,6 +417,7 @@ inline const z_loaned_config_t* z_loan(const z_owned_config_t& x) { return z_con
inline const z_loaned_session_t* z_loan(const z_owned_session_t& x) { return z_session_loan(&x); }
inline const z_loaned_subscriber_t* z_loan(const z_owned_subscriber_t& x) { return z_subscriber_loan(&x); }
inline const z_loaned_publisher_t* z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); }
inline const z_loaned_querier_t* z_loan(const z_owned_querier_t& x) { return z_querier_loan(&x); }
inline const z_loaned_matching_listener_t* z_loan(const z_owned_matching_listener_t& x) { return z_matching_listener_loan(&x); }
inline const z_loaned_queryable_t* z_loan(const z_owned_queryable_t& x) { return z_queryable_loan(&x); }
inline const z_loaned_liveliness_token_t* z_loan(const z_owned_liveliness_token_t& x) { return z_liveliness_token_loan(&x); }
Expand Down Expand Up @@ -449,6 +457,7 @@ inline z_loaned_keyexpr_t* z_loan_mut(z_view_keyexpr_t& x) { return z_view_keyex
inline z_loaned_config_t* z_loan_mut(z_owned_config_t& x) { return z_config_loan_mut(&x); }
inline z_loaned_session_t* z_loan_mut(z_owned_session_t& x) { return z_session_loan_mut(&x); }
inline z_loaned_publisher_t* z_loan_mut(z_owned_publisher_t& x) { return z_publisher_loan_mut(&x); }
inline z_loaned_querier_t* z_loan_mut(z_owned_querier_t& x) { return z_querier_loan_mut(&x); }
inline z_loaned_matching_listener_t* z_loan_mut(z_owned_matching_listener_t& x) { return z_matching_listener_loan_mut(&x); }
inline z_loaned_queryable_t* z_loan_mut(z_owned_queryable_t& x) { return z_queryable_loan_mut(&x); }
inline z_loaned_liveliness_token_t* z_loan_mut(z_owned_liveliness_token_t& x) { return z_liveliness_token_loan_mut(&x); }
Expand All @@ -474,6 +483,7 @@ inline ze_loaned_serializer_t* z_loan_mut(ze_owned_serializer_t& x) { return ze_
// z_drop definition
inline void z_drop(z_moved_session_t* v) { z_session_drop(v); }
inline void z_drop(z_moved_publisher_t* v) { z_publisher_drop(v); }
inline void z_drop(z_moved_querier_t* v) { z_querier_drop(v); }
inline void z_drop(z_moved_matching_listener_t* v) { z_matching_listener_drop(v); }
inline void z_drop(z_moved_keyexpr_t* v) { z_keyexpr_drop(v); }
inline void z_drop(z_moved_config_t* v) { z_config_drop(v); }
Expand Down Expand Up @@ -510,6 +520,7 @@ inline void z_drop(ze_moved_serializer_t* v) { ze_serializer_drop(v); }
// z_internal_null definition
inline void z_internal_null(z_owned_session_t* v) { z_internal_session_null(v); }
inline void z_internal_null(z_owned_publisher_t* v) { z_internal_publisher_null(v); }
inline void z_internal_null(z_owned_querier_t* v) { z_internal_querier_null(v); }
inline void z_internal_null(z_owned_matching_listener_t* v) { z_internal_matching_listener_null(v); }
inline void z_internal_null(z_owned_keyexpr_t* v) { z_internal_keyexpr_null(v); }
inline void z_internal_null(z_owned_config_t* v) { z_internal_config_null(v); }
Expand Down Expand Up @@ -542,6 +553,7 @@ inline void z_internal_null(ze_owned_serializer_t* v) { return ze_internal_seria
// z_internal_check definition
inline bool z_internal_check(const z_owned_session_t& v) { return z_internal_session_check(&v); }
inline bool z_internal_check(const z_owned_publisher_t& v) { return z_internal_publisher_check(&v); }
inline bool z_internal_check(const z_owned_querier_t& v) { return z_internal_querier_check(&v); }
inline bool z_internal_check(const z_owned_matching_listener_t& v) { return z_internal_matching_listener_check(&v); }
inline bool z_internal_check(const z_owned_keyexpr_t& v) { return z_internal_keyexpr_check(&v); }
inline bool z_internal_check(const z_owned_config_t& v) { return z_internal_config_check(&v); }
Expand Down Expand Up @@ -690,6 +702,7 @@ inline z_moved_reply_err_t* z_move(z_owned_reply_err_t& x) { return z_reply_err_
inline z_moved_hello_t* z_move(z_owned_hello_t& x) { return z_hello_move(&x); }
inline z_moved_keyexpr_t* z_move(z_owned_keyexpr_t& x) { return z_keyexpr_move(&x); }
inline z_moved_publisher_t* z_move(z_owned_publisher_t& x) { return z_publisher_move(&x); }
inline z_moved_querier_t* z_move(z_owned_querier_t& x) { return z_querier_move(&x); }
inline z_moved_matching_listener_t* z_move(z_owned_matching_listener_t& x) { return z_matching_listener_move(&x); }
inline z_moved_query_t* z_move(z_owned_query_t& x) { return z_query_move(&x); }
inline z_moved_queryable_t* z_move(z_owned_queryable_t& x) { return z_queryable_move(&x); }
Expand Down Expand Up @@ -717,6 +730,7 @@ inline ze_moved_serializer_t* z_move(ze_owned_serializer_t& x) { return ze_seria
// z_take definition
inline void z_take(z_owned_session_t* this_, z_moved_session_t* v) { return z_session_take(this_, v); }
inline void z_take(z_owned_publisher_t* this_, z_moved_publisher_t* v) { return z_publisher_take(this_, v); }
inline void z_take(z_owned_querier_t* this_, z_moved_querier_t* v) { return z_querier_take(this_, v); }
inline void z_take(z_owned_matching_listener_t* this_, z_moved_matching_listener_t* v) {
return z_matching_listener_take(this_, v);
}
Expand Down Expand Up @@ -847,6 +861,14 @@ struct z_owned_to_loaned_type_t<z_owned_publisher_t> {
typedef z_loaned_publisher_t type;
};
template <>
struct z_loaned_to_owned_type_t<z_loaned_querier_t> {
typedef z_owned_querier_t type;
};
template <>
struct z_owned_to_loaned_type_t<z_owned_querier_t> {
typedef z_loaned_querier_t type;
};
template <>
struct z_loaned_to_owned_type_t<z_loaned_matching_listener_t> {
typedef z_owned_matching_listener_t type;
};
Expand Down
Loading

0 comments on commit d076875

Please sign in to comment.