diff --git a/examples/unix/c11/z_querier.c b/examples/unix/c11/z_querier.c index 39222e783..594fd6c3c 100644 --- a/examples/unix/c11/z_querier.c +++ b/examples/unix/c11/z_querier.c @@ -19,6 +19,17 @@ #if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API +#if Z_FEATURE_MATCHING == 1 +void matching_status_handler(const z_matching_status_t *matching_status, void *arg) { + (void)arg; + if (matching_status->matching) { + printf("Querier has matching queryable.\n"); + } else { + printf("Querier has NO MORE matching queryables.\n"); + } +} +#endif + int main(int argc, char **argv) { const char *selector = "demo/example/**"; const char *mode = "client"; @@ -27,9 +38,10 @@ int main(int argc, char **argv) { const char *value = NULL; int n = INT_MAX; int timeout_ms = 0; + bool add_matching_listener = false; int opt; - while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) { + while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:a")) != -1) { switch (opt) { case 's': selector = optarg; @@ -52,6 +64,9 @@ int main(int argc, char **argv) { case 't': timeout_ms = atoi(optarg); break; + case 'a': + add_matching_listener = true; + break; case '?': if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' || optopt == 'n' || optopt == 't') { @@ -115,6 +130,17 @@ int main(int argc, char **argv) { exit(-1); } + if (add_matching_listener) { +#if Z_FEATURE_MATCHING == 1 + z_owned_closure_matching_status_t callback; + z_closure(&callback, matching_status_handler, NULL, NULL); + z_querier_declare_background_matching_listener(z_loan(querier), z_move(callback)); +#else + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n"); + return -2; +#endif + } + printf("Press CTRL-C to quit...\n"); char buf[256]; for (int idx = 0; idx != n; ++idx) { diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index dd6d28429..4c45a09d5 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1843,6 +1843,55 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. */ const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier); + +#if Z_FEATURE_MATCHING == 1 +/** + * Declares a matching listener, registering a callback for notifying queryables matching the given querier key + * expression and target. The callback will be run in the background until the corresponding querier is dropped. + * + * Parameters: + * querier: A querier to associate with matching listener. + * callback: A closure that will be called every time the matching status of the querier changes (If last + * queryable disconnects or when the first queryable connects). + * + * Return: + * ``0`` if put operation is successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier, + z_moved_closure_matching_status_t *callback); +/** + * Constructs matching listener, registering a callback for notifying queryables matching with a given querier's + * key expression and target. + * + * Parameters: + * querier: A querier to associate with matching listener. + * matching_listener: An uninitialized memory location where matching listener will be constructed. The matching + * listener's callback will be automatically dropped when the querier is dropped. + * callback: A closure that will be called every time the matching status of the querier changes (If last + * queryable disconnects or when the first queryable connects). + * + * Return: + * ``0`` if put operation is successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier, + z_owned_matching_listener_t *matching_listener, + z_moved_closure_matching_status_t *callback); +/** + * Gets querier matching status - i.e. if there are any queryables matching its key expression and target. + * + * Return: + * ``0`` if put operation is successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status); + +#endif // Z_FEATURE_MATCHING == 1 + #endif // Z_FEATURE_UNSTABLE_API /** diff --git a/include/zenoh-pico/net/matching.h b/include/zenoh-pico/net/matching.h index d5ab26c97..0112508e6 100644 --- a/include/zenoh-pico/net/matching.h +++ b/include/zenoh-pico/net/matching.h @@ -29,7 +29,7 @@ typedef struct _z_matching_listener_t { #if Z_FEATURE_MATCHING == 1 _z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, - _z_closure_matching_status_t callback); + uint8_t interest_type_flag, _z_closure_matching_status_t callback); z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id); z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener); // Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes. @@ -37,8 +37,8 @@ static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_ static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) { return !_Z_RC_IS_NULL(&matching_listener->_zn); } -void _z_matching_listener_clear(_z_matching_listener_t *pub); -void _z_matching_listener_free(_z_matching_listener_t **pub); +void _z_matching_listener_clear(_z_matching_listener_t *listener); +void _z_matching_listener_free(_z_matching_listener_t **listener); #endif // Z_FEATURE_MATCHING == 1 #ifdef __cplusplus diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h index e878a8d4b..c7e767728 100644 --- a/include/zenoh-pico/session/matching.h +++ b/include/zenoh-pico/session/matching.h @@ -36,6 +36,9 @@ typedef struct { } _z_closure_matching_status_t; #if Z_FEATURE_MATCHING == 1 + +#define _Z_MATCHING_LISTENER_CTX_NULL_ID 0xFFFFFFFF + typedef struct _z_matching_listener_ctx_t { uint32_t decl_id; _z_closure_matching_status_t callback; diff --git a/src/api/api.c b/src/api/api.c index 3a9083c3e..c3750c5ab 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1122,8 +1122,8 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub z_owned_matching_listener_t *matching_listener, z_moved_closure_matching_status_t *callback) { _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn); - _z_matching_listener_t listener = - _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val); + _z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, + _Z_INTEREST_FLAG_SUBSCRIBERS, callback->_this._val); _z_session_rc_drop(&sess_rc); z_internal_closure_matching_status_null(&callback->_this); @@ -1135,8 +1135,6 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status) { - // Ideally this should be implemented as a real request to the router, but this works much faster. - // And it works as long as filtering is enabled along with interest matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE; return _Z_RES_OK; } @@ -1353,6 +1351,37 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) { return (const z_loaned_keyexpr_t *)&querier->_key; } + +#if Z_FEATURE_MATCHING == 1 +z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier, + z_moved_closure_matching_status_t *callback) { + z_owned_matching_listener_t listener; + _Z_RETURN_IF_ERR(z_querier_declare_matching_listener(querier, &listener, callback)); + _z_matching_listener_clear(&listener._val); + return _Z_RES_OK; +} +z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier, + z_owned_matching_listener_t *matching_listener, + z_moved_closure_matching_status_t *callback) { + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn); + _z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &querier->_key, querier->_id, + _Z_INTEREST_FLAG_QUERYABLES, callback->_this._val); + _z_session_rc_drop(&sess_rc); + + z_internal_closure_matching_status_null(&callback->_this); + + matching_listener->_val = listener; + + return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC; +} + +z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) { + matching_status->matching = querier->_filter.ctx->state != WRITE_FILTER_ACTIVE; + return _Z_RES_OK; +} + +#endif // Z_FEATURE_MATCHING == 1 + #endif // Z_FEATURE_UNSTABLE_API bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; } diff --git a/src/net/matching.c b/src/net/matching.c index e2c7c8ada..e47142253 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -20,6 +20,7 @@ #include "zenoh-pico/api/types.h" #include "zenoh-pico/net/primitives.h" #include "zenoh-pico/net/session.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/session/matching.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/utils/logging.h" @@ -28,18 +29,21 @@ #if Z_FEATURE_MATCHING == 1 static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) { _z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg; - switch (msg->type) { - case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: { - ctx->decl_id = msg->id; - z_matching_status_t status = {.matching = true}; - z_closure_matching_status_call(&ctx->callback, &status); + case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: + case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: { + if (ctx->decl_id == _Z_MATCHING_LISTENER_CTX_NULL_ID) { + ctx->decl_id = msg->id; + z_matching_status_t status = {.matching = true}; + z_closure_matching_status_call(&ctx->callback, &status); + } break; } - case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER: { + case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER: + case _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE: { if (ctx->decl_id == msg->id) { - ctx->decl_id = 0; + ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID; z_matching_status_t status = {.matching = false}; z_closure_matching_status_call(&ctx->callback, &status); } @@ -52,9 +56,9 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar } _z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, - _z_closure_matching_status_t callback) { - uint8_t flags = _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE | - _Z_INTEREST_FLAG_AGGREGATE; + uint8_t interest_type_flag, _z_closure_matching_status_t callback) { + uint8_t flags = interest_type_flag | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE | + _Z_INTEREST_FLAG_AGGREGATE | _Z_INTEREST_FLAG_CURRENT; _z_matching_listener_t ret = _z_matching_listener_null(); _z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback); diff --git a/src/net/primitives.c b/src/net/primitives.c index 191999ed4..bed512126 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -541,6 +541,9 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier) { if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) { return _Z_ERR_ENTITY_UNKNOWN; } +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&querier->_zn), querier->_id); +#endif _z_write_filter_destroy(_Z_RC_IN_VAL(&querier->_zn), &querier->_filter); _z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id); return _Z_RES_OK; diff --git a/src/session/matching.c b/src/session/matching.c index ddeb425ca..1fc7a3afc 100644 --- a/src/session/matching.c +++ b/src/session/matching.c @@ -18,7 +18,7 @@ _z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) { _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); - ctx->decl_id = 0; + ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID; ctx->callback = callback; return ctx; diff --git a/tests/z_api_matching_test.c b/tests/z_api_matching_test.c index f77a726c8..4824930b7 100644 --- a/tests/z_api_matching_test.c +++ b/tests/z_api_matching_test.c @@ -25,6 +25,16 @@ #undef NDEBUG #include +static const char* PUB_EXPR = "zenoh-pico/matching/test/val"; +static const char* SUB_EXPR = "zenoh-pico/matching/test/*"; +static const char* SUB_EXPR_WRONG = "zenoh-pico/matching/test_wrong/*"; + +static const char* QUERIABLE_EXPR = "zenoh-pico/matching/query_test/val"; +static const char* QUERY_EXPR = "zenoh-pico/matching/query_test/*"; + +static unsigned long DEFAULT_TIMEOUT_S = 10; +static int SUBSCRIBER_TESTS_COUNT = 3; + typedef enum { NONE, MATCH, UNMATCH, DROP } context_state_t; typedef struct context_t { @@ -44,7 +54,7 @@ static void _context_drop(context_t* c) { z_mutex_drop(z_mutex_move(&c->m)); } -static void _context_wait(context_t* c, context_state_t state, int timeout_s) { +static void _context_wait(context_t* c, context_state_t state, unsigned long timeout_s) { z_mutex_lock(z_mutex_loan_mut(&c->m)); if (c->state != state) { printf("Waiting for state %d...\n", state); @@ -90,10 +100,6 @@ static void _context_notify(context_t* c, context_state_t state) { } \ } -const char* pub_expr = "zenoh-pico/matching/test/val"; -const char* sub_expr = "zenoh-pico/matching/test/*"; -const char* sub_expr_wrong = "zenoh-pico/matching/test_wrong/*"; - void on_receive(const z_matching_status_t* s, void* context) { context_t* c = (context_t*)context; _context_notify(c, s->matching ? MATCH : UNMATCH); @@ -104,8 +110,8 @@ void on_drop(void* context) { _context_notify(c, DROP); } -void test_matching_sub(bool background) { - printf("test_matching_sub: background=%d\n", background); +void test_matching_publisher_sub(bool background) { + printf("test_publisher_matching_sub: background=%d\n", background); context_t context = {0}; _context_init(&context); @@ -115,8 +121,8 @@ void test_matching_sub(bool background) { z_config_default(&c1); z_config_default(&c2); z_view_keyexpr_t k_sub, k_pub; - z_view_keyexpr_from_str(&k_sub, sub_expr); - z_view_keyexpr_from_str(&k_pub, pub_expr); + z_view_keyexpr_from_str(&k_sub, SUB_EXPR); + z_view_keyexpr_from_str(&k_pub, PUB_EXPR); assert_ok(z_open(&s1, z_config_move(&c1), NULL)); assert_ok(z_open(&s2, z_config_move(&c2), NULL)); @@ -141,21 +147,92 @@ void test_matching_sub(bool background) { z_closure_matching_status_move(&closure))); } - z_owned_subscriber_t sub; - z_owned_closure_sample_t callback; - z_closure_sample(&callback, NULL, NULL, NULL); - assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), - z_closure_sample_move(&callback), NULL)); + for (int i = 0; i != SUBSCRIBER_TESTS_COUNT; i++) { + z_owned_subscriber_t sub; + z_owned_closure_sample_t callback; + z_closure_sample(&callback, NULL, NULL, NULL); + assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), + z_closure_sample_move(&callback), NULL)); - _context_wait(&context, MATCH, 10); + _context_wait(&context, MATCH, DEFAULT_TIMEOUT_S); - z_subscriber_drop(z_subscriber_move(&sub)); + z_subscriber_drop(z_subscriber_move(&sub)); - _context_wait(&context, UNMATCH, 10); + _context_wait(&context, UNMATCH, DEFAULT_TIMEOUT_S); + } z_publisher_drop(z_publisher_move(&pub)); - _context_wait(&context, DROP, 10); + _context_wait(&context, DROP, DEFAULT_TIMEOUT_S); + + if (!background) { + z_matching_listener_drop(z_matching_listener_move(&matching_listener)); + } + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); + + _context_drop(&context); +} + +void test_matching_querier_sub(bool background) { + printf("test_matching_querier_sub: background=%d\n", background); + + context_t context = {0}; + _context_init(&context); + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k_queryable, k_querier; + z_view_keyexpr_from_str(&k_queryable, QUERIABLE_EXPR); + z_view_keyexpr_from_str(&k_querier, QUERY_EXPR); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_querier_t querier; + assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL)); + + z_owned_closure_matching_status_t closure; + z_closure_matching_status(&closure, on_receive, on_drop, (void*)(&context)); + + z_owned_matching_listener_t matching_listener; + if (background) { + assert_ok(z_querier_declare_background_matching_listener(z_querier_loan(&querier), + z_closure_matching_status_move(&closure))); + } else { + assert_ok(z_querier_declare_matching_listener(z_querier_loan(&querier), &matching_listener, + z_closure_matching_status_move(&closure))); + } + + for (int i = 0; i != SUBSCRIBER_TESTS_COUNT; i++) { + z_owned_queryable_t queryable; + z_owned_closure_query_t callback; + z_closure_query(&callback, NULL, NULL, NULL); + assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable, z_view_keyexpr_loan(&k_queryable), + z_closure_query_move(&callback), NULL)); + + _context_wait(&context, MATCH, DEFAULT_TIMEOUT_S); + + z_queryable_drop(z_queryable_move(&queryable)); + + _context_wait(&context, UNMATCH, DEFAULT_TIMEOUT_S); + } + + z_querier_drop(z_querier_move(&querier)); + + _context_wait(&context, DROP, DEFAULT_TIMEOUT_S); if (!background) { z_matching_listener_drop(z_matching_listener_move(&matching_listener)); @@ -172,11 +249,11 @@ void test_matching_sub(bool background) { _context_drop(&context); } -static void _check_status(z_owned_publisher_t* pub, bool expected) { +static void _check_publisher_status(z_owned_publisher_t* pub, bool expected) { z_matching_status_t status; status.matching = !expected; z_clock_t clock = z_clock_now(); - while (status.matching != expected && z_clock_elapsed_s(&clock) < 10) { + while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) { assert_ok(z_publisher_get_matching_status(z_publisher_loan(pub), &status)); z_sleep_ms(100); } @@ -186,15 +263,17 @@ static void _check_status(z_owned_publisher_t* pub, bool expected) { } } -void test_matching_get(void) { +void test_matching_publisher_get(void) { + printf("test_matching_publisher_get\n"); + z_owned_session_t s1, s2; z_owned_config_t c1, c2; z_config_default(&c1); z_config_default(&c2); z_view_keyexpr_t k_sub, k_pub, k_sub_wrong; - z_view_keyexpr_from_str(&k_sub, sub_expr); - z_view_keyexpr_from_str(&k_pub, pub_expr); - z_view_keyexpr_from_str(&k_sub_wrong, sub_expr_wrong); + z_view_keyexpr_from_str(&k_sub, SUB_EXPR); + z_view_keyexpr_from_str(&k_pub, PUB_EXPR); + z_view_keyexpr_from_str(&k_sub_wrong, SUB_EXPR_WRONG); assert_ok(z_open(&s1, z_config_move(&c1), NULL)); assert_ok(z_open(&s2, z_config_move(&c2), NULL)); @@ -208,7 +287,7 @@ void test_matching_get(void) { assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); z_sleep_s(1); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_owned_subscriber_t sub_wrong; z_owned_closure_sample_t callback_wrong; @@ -217,7 +296,7 @@ void test_matching_get(void) { z_closure_sample_move(&callback_wrong), NULL)); z_sleep_s(1); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_owned_subscriber_t sub; z_owned_closure_sample_t callback; @@ -225,11 +304,11 @@ void test_matching_get(void) { assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), z_closure_sample_move(&callback), NULL)); - _check_status(&pub, true); + _check_publisher_status(&pub, true); z_subscriber_drop(z_subscriber_move(&sub)); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_publisher_drop(z_publisher_move(&pub)); z_subscriber_drop(z_subscriber_move(&sub_wrong)); @@ -243,12 +322,89 @@ void test_matching_get(void) { z_session_drop(z_session_move(&s2)); } +static void _check_querier_status(z_owned_querier_t* querier, bool expected) { + z_matching_status_t status; + status.matching = !expected; + z_clock_t clock = z_clock_now(); + while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) { + assert_ok(z_querier_get_matching_status(z_querier_loan(querier), &status)); + z_sleep_ms(100); + } + if (status.matching != expected) { + fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching); + assert(false); + } +} + +void test_matching_querier_get(void) { + printf("test_matching_querier_get\n"); + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k_sub, k_querier, k_querier_wrong; + z_view_keyexpr_from_str(&k_sub, QUERIABLE_EXPR); + z_view_keyexpr_from_str(&k_querier, QUERY_EXPR); + z_view_keyexpr_from_str(&k_querier_wrong, SUB_EXPR_WRONG); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_querier_t querier; + assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL)); + z_sleep_s(1); + + _check_querier_status(&querier, false); + + z_owned_queryable_t queryable_wrong; + z_owned_closure_query_t callback_wrong; + z_closure_query(&callback_wrong, NULL, NULL, NULL); + assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable_wrong, z_view_keyexpr_loan(&k_querier_wrong), + z_closure_query_move(&callback_wrong), NULL)); + z_sleep_s(1); + + _check_querier_status(&querier, false); + + z_owned_queryable_t queryable; + z_owned_closure_query_t callback; + z_closure_query(&callback, NULL, NULL, NULL); + assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable, z_view_keyexpr_loan(&k_sub), + z_closure_query_move(&callback), NULL)); + + _check_querier_status(&querier, true); + + z_queryable_drop(z_queryable_move(&queryable)); + + _check_querier_status(&querier, false); + + z_querier_drop(z_querier_move(&querier)); + z_queryable_drop(z_queryable_move(&queryable_wrong)); + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); +} + int main(int argc, char** argv) { (void)argc; (void)argv; - test_matching_sub(true); - test_matching_sub(false); - test_matching_get(); + test_matching_publisher_sub(true); + test_matching_publisher_sub(false); + test_matching_publisher_get(); + + test_matching_querier_sub(true); + test_matching_querier_sub(false); + test_matching_querier_get(); } #else