From 9bd061c5611fdddd2e7778211ed41327baebfa73 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 20 Mar 2024 09:40:47 +0100 Subject: [PATCH] Protocol cleanup (#374) * Remove ACK message * Cleanup pull and subscriber * Remove Put/Del from ResponseBody * Cleanup flags * Comment unusued variables in pull examples * Fix alignment test --- examples/arduino/z_pull.ino | 49 +++---- examples/espidf/z_pull.c | 48 +++---- examples/freertos_plus_tcp/z_pull.c | 42 +++--- examples/mbed/z_pull.cpp | 46 ++++--- examples/unix/c11/z_get.c | 2 +- examples/unix/c11/z_pull.c | 55 ++++---- examples/unix/c99/z_get.c | 2 +- examples/unix/c99/z_pull.c | 56 ++++---- examples/windows/z_get.c | 2 +- examples/windows/z_pull.c | 52 ++++---- examples/zephyr/z_pull.c | 44 ++++--- include/zenoh-pico/api/constants.h | 10 -- include/zenoh-pico/api/macros.h | 13 -- include/zenoh-pico/api/primitives.h | 66 ---------- include/zenoh-pico/api/types.h | 25 ---- include/zenoh-pico/net/primitives.h | 11 -- include/zenoh-pico/net/session.h | 1 - include/zenoh-pico/net/subscribe.h | 12 +- include/zenoh-pico/protocol/codec/core.h | 4 - include/zenoh-pico/protocol/codec/message.h | 13 +- include/zenoh-pico/protocol/core.h | 18 --- .../protocol/definitions/declarations.h | 3 +- .../zenoh-pico/protocol/definitions/message.h | 47 ------- .../zenoh-pico/protocol/definitions/network.h | 15 +-- include/zenoh-pico/session/subscription.h | 3 - src/api/api.c | 45 +------ src/net/primitives.c | 22 +--- src/net/subscribe.c | 14 +- src/protocol/codec/declarations.c | 6 +- src/protocol/codec/message.c | 90 ------------- src/protocol/codec/network.c | 34 ----- src/protocol/definitions/declarations.c | 7 +- src/protocol/definitions/network.c | 51 +------- src/session/rx.c | 37 ------ src/session/subscription.c | 3 - src/session/utils.c | 1 - tests/z_api_alignment_test.c | 33 +---- tests/z_api_null_drop_test.c | 7 - tests/z_msgcodec_test.c | 123 +----------------- 39 files changed, 250 insertions(+), 862 deletions(-) diff --git a/examples/arduino/z_pull.ino b/examples/arduino/z_pull.ino index 7dc5600e8..17b7b3722 100644 --- a/examples/arduino/z_pull.ino +++ b/examples/arduino/z_pull.ino @@ -34,20 +34,22 @@ #define KEYEXPR "demo/example/**" -z_owned_pull_subscriber_t sub; +// @TODO +// z_owned_pull_subscriber_t sub; -void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - std::string val((const char *)sample->payload.start, sample->payload.len); +// @TODO +// void data_handler(const z_sample_t *sample, void *arg) { +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// std::string val((const char *)sample->payload.start, sample->payload.len); - Serial.print(" >> [Subscription listener] Received ("); - Serial.print(z_str_loan(&keystr)); - Serial.print(", "); - Serial.print(val.c_str()); - Serial.println(")"); +// Serial.print(" >> [Subscription listener] Received ("); +// Serial.print(z_str_loan(&keystr)); +// Serial.print(", "); +// Serial.print(val.c_str()); +// Serial.println(")"); - z_str_drop(z_str_move(&keystr)); -} +// z_str_drop(z_str_move(&keystr)); +// } void setup() { // Initialize Serial for debug @@ -91,23 +93,26 @@ void setup() { Serial.print("Declaring Subscriber on "); Serial.print(KEYEXPR); Serial.println(" ..."); - z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); - sub = z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL); - if (!z_pull_subscriber_check(&sub)) { - Serial.println("Unable to declare subscriber."); - while (1) { - ; - } - } - Serial.println("OK"); - Serial.println("Zenoh setup finished!"); + // @TODO + // z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); + // @TODO + // sub = z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL); + // if (!z_pull_subscriber_check(&sub)) { + // Serial.println("Unable to declare subscriber."); + // while (1) { + // ; + // } + // } + // Serial.println("OK"); + // Serial.println("Zenoh setup finished!"); + Serial.println("Pull Subscriber not supported... exiting"); delay(300); } void loop() { delay(5000); - z_subscriber_pull(z_pull_subscriber_loan(&sub)); + // z_subscriber_pull(z_pull_subscriber_loan(&sub)); } #else void setup() { diff --git a/examples/espidf/z_pull.c b/examples/espidf/z_pull.c index 5f42b2852..1f092d8eb 100644 --- a/examples/espidf/z_pull.c +++ b/examples/espidf/z_pull.c @@ -100,12 +100,13 @@ void wifi_init_sta(void) { vEventGroupDelete(s_event_group_handler); } -void data_handler(const z_sample_t* sample, void* arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} +// @TODO +// void data_handler(const z_sample_t* sample, void* arg) { +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, +// sample->payload.start); +// z_drop(z_move(keystr)); +// } void app_main() { esp_err_t ret = nvs_flash_init(); @@ -144,23 +145,26 @@ void app_main() { zp_start_read_task(z_loan(s), NULL); zp_start_lease_task(z_loan(s), NULL); + // @TODO + // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...", KEYEXPR); - z_owned_closure_sample_t callback = z_closure(data_handler); - z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - exit(-1); - } - printf("OK!\n"); - - while (1) { - sleep(5); - printf("Pulling data from '%s'...\n", KEYEXPR); - z_subscriber_pull(z_loan(sub)); - } - - printf("Closing Zenoh Session..."); - z_undeclare_pull_subscriber(z_move(sub)); + // @TODO + // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); + // if (!z_check(sub)) { + // printf("Unable to declare subscriber.\n"); + // exit(-1); + // } + // printf("OK!\n"); + + // while (1) { + // sleep(5); + // printf("Pulling data from '%s'...\n", KEYEXPR); + // z_subscriber_pull(z_loan(sub)); + // } + + // printf("Closing Zenoh Session..."); + // z_undeclare_pull_subscriber(z_move(sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop the receive and the session lease loop for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/examples/freertos_plus_tcp/z_pull.c b/examples/freertos_plus_tcp/z_pull.c index b8f5c5ee8..b4da92a31 100644 --- a/examples/freertos_plus_tcp/z_pull.c +++ b/examples/freertos_plus_tcp/z_pull.c @@ -28,13 +28,14 @@ #define KEYEXPR "demo/example/**" -void data_handler(const z_sample_t *sample, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *ctx) { +// (void)(ctx); +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, +// sample->payload.start); +// z_drop(z_move(keystr)); +// } void app_main(void) { z_owned_config_t config = z_config_default(); @@ -57,21 +58,24 @@ void app_main(void) { return; } - z_owned_closure_sample_t callback = z_closure(data_handler); + // @TODO + // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...\n", KEYEXPR); - z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - return; - } + // @TODO + // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); + // if (!z_check(sub)) { + // printf("Unable to declare subscriber.\n"); + // return; + // } - while (1) { - zp_sleep_s(5); - printf("Pulling data from '%s'...\n", KEYEXPR); - z_subscriber_pull(z_loan(sub)); - } + // while (1) { + // zp_sleep_s(5); + // printf("Pulling data from '%s'...\n", KEYEXPR); + // z_subscriber_pull(z_loan(sub)); + // } - z_undeclare_pull_subscriber(z_move(sub)); + // z_undeclare_pull_subscriber(z_move(sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/examples/mbed/z_pull.cpp b/examples/mbed/z_pull.cpp index 4bcd33ed5..aae56316e 100644 --- a/examples/mbed/z_pull.cpp +++ b/examples/mbed/z_pull.cpp @@ -30,12 +30,13 @@ #define KEYEXPR "demo/example/**" -void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); - z_str_drop(z_str_move(&keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *arg) { +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, +// sample->payload.start); +// z_str_drop(z_str_move(&keystr)); +// } int main(int argc, char **argv) { randLIB_seed_random(); @@ -64,24 +65,27 @@ int main(int argc, char **argv) { zp_start_read_task(z_session_loan(&s), NULL); zp_start_lease_task(z_session_loan(&s), NULL); + // @TODO + // z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); printf("Declaring Subscriber on '%s'...", KEYEXPR); - z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); - z_owned_pull_subscriber_t sub = - z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL); - if (!z_pull_subscriber_check(&sub)) { - printf("Unable to declare subscriber.\n"); - exit(-1); - } - printf("OK!\n"); + // @TODO + // z_owned_pull_subscriber_t sub = + // z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL); + // if (!z_pull_subscriber_check(&sub)) { + // printf("Unable to declare subscriber.\n"); + // exit(-1); + // } + // printf("OK!\n"); - while (1) { - zp_sleep_s(5); - printf("Pulling data from '%s'...\n", KEYEXPR); - z_subscriber_pull(z_pull_subscriber_loan(&sub)); - } + // while (1) { + // zp_sleep_s(5); + // printf("Pulling data from '%s'...\n", KEYEXPR); + // z_subscriber_pull(z_pull_subscriber_loan(&sub)); + // } - printf("Closing Zenoh Session..."); - z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub)); + // printf("Closing Zenoh Session..."); + // z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop the receive and the session lease loop for zenoh-pico zp_stop_read_task(z_session_loan(&s)); diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index ca45ec493..eb7543f21 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -102,7 +102,7 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); + printf("Enter any key to get data or 'q' to quit...\n"); char c = '\0'; while (1) { fflush(stdin); diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index c6f710bfb..a52a23b0d 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -19,13 +19,14 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 -void data_handler(const z_sample_t *sample, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *ctx) { +// (void)(ctx); +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, +// sample->payload.start); +// z_drop(z_move(keystr)); +// } int main(int argc, char **argv) { const char *keyexpr = "demo/example/**"; @@ -71,27 +72,31 @@ int main(int argc, char **argv) { return -1; } - z_owned_closure_sample_t callback = z_closure(data_handler); + // @TODO + // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - return -1; - } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Remove unused result warning - if (c == 'q') { - break; - } - z_subscriber_pull(z_loan(sub)); - } + // @TODO + // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + // if (!z_check(sub)) { + // printf("Unable to declare subscriber.\n"); + // return -1; + // } + + // printf("Enter any key to pull data or 'q' to quit...\n"); + // char c = '\0'; + // while (1) { + // fflush(stdin); + // int ret = scanf("%c", &c); + // (void)ret; // Remove unused result warning + // if (c == 'q') { + // break; + // } + // z_subscriber_pull(z_loan(sub)); + // } - z_undeclare_pull_subscriber(z_move(sub)); + // z_undeclare_pull_subscriber(z_move(sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/examples/unix/c99/z_get.c b/examples/unix/c99/z_get.c index 46121e59e..bfb1dacc1 100644 --- a/examples/unix/c99/z_get.c +++ b/examples/unix/c99/z_get.c @@ -102,7 +102,7 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); + printf("Enter any key to get data or 'q' to quit...\n"); char c = '\0'; while (1) { fflush(stdin); diff --git a/examples/unix/c99/z_pull.c b/examples/unix/c99/z_pull.c index efe2ba8bf..72cc35216 100644 --- a/examples/unix/c99/z_pull.c +++ b/examples/unix/c99/z_pull.c @@ -19,13 +19,14 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 -void data_handler(const z_sample_t *sample, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); - z_str_drop(z_str_move(&keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *ctx) { +// (void)(ctx); +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, +// sample->payload.start); +// z_str_drop(z_str_move(&keystr)); +// } int main(int argc, char **argv) { const char *keyexpr = "demo/example/**"; @@ -71,28 +72,31 @@ int main(int argc, char **argv) { return -1; } - z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); + // @TODO + // z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL); printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_pull_subscriber_t sub = - z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(keyexpr), z_closure_sample_move(&callback), NULL); - if (!z_pull_subscriber_check(&sub)) { - printf("Unable to declare subscriber.\n"); - return -1; - } + // @TODO + // z_owned_pull_subscriber_t sub = + // z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(keyexpr), z_closure_sample_move(&callback), NULL); + // if (!z_pull_subscriber_check(&sub)) { + // printf("Unable to declare subscriber.\n"); + // return -1; + // } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Clear unused result warning - if (c == 'q') { - break; - } - z_subscriber_pull(z_pull_subscriber_loan(&sub)); - } + // printf("Enter any key to pull data or 'q' to quit...\n"); + // char c = '\0'; + // while (1) { + // fflush(stdin); + // int ret = scanf("%c", &c); + // (void)ret; // Clear unused result warning + // if (c == 'q') { + // break; + // } + // z_subscriber_pull(z_pull_subscriber_loan(&sub)); + // } - z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub)); + // z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_session_loan(&s)); diff --git a/examples/windows/z_get.c b/examples/windows/z_get.c index 2815656dc..7843f600b 100644 --- a/examples/windows/z_get.c +++ b/examples/windows/z_get.c @@ -67,7 +67,7 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); + printf("Enter any key to get data or 'q' to quit...\n"); char c = '\0'; while (1) { fflush(stdin); diff --git a/examples/windows/z_pull.c b/examples/windows/z_pull.c index 512b2bb79..515b5e618 100644 --- a/examples/windows/z_pull.c +++ b/examples/windows/z_pull.c @@ -18,13 +18,14 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 -void data_handler(const z_sample_t *sample, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *ctx) { +// (void)(ctx); +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, +// sample->payload.start); +// z_drop(z_move(keystr)); +// } int main(int argc, char **argv) { (void)(argc); @@ -51,26 +52,29 @@ int main(int argc, char **argv) { return -1; } - z_owned_closure_sample_t callback = z_closure(data_handler); + // @TODO + // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - return -1; - } + // @TODO + // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + // if (!z_check(sub)) { + // printf("Unable to declare subscriber.\n"); + // return -1; + // } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - scanf("%c", &c); - if (c == 'q') { - break; - } - z_subscriber_pull(z_loan(sub)); - } + // printf("Enter any key to pull data or 'q' to quit...\n"); + // char c = '\0'; + // while (1) { + // fflush(stdin); + // scanf("%c", &c); + // if (c == 'q') { + // break; + // } + // z_subscriber_pull(z_loan(sub)); + // } - z_undeclare_pull_subscriber(z_move(sub)); + // z_undeclare_pull_subscriber(z_move(sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/examples/zephyr/z_pull.c b/examples/zephyr/z_pull.c index 61bed75d0..20e4109b5 100644 --- a/examples/zephyr/z_pull.c +++ b/examples/zephyr/z_pull.c @@ -29,12 +29,13 @@ #define KEYEXPR "demo/example/**" -void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} +// @TODO +// void data_handler(const z_sample_t *sample, void *arg) { +// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); +// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, +// sample->payload.start); +// z_drop(z_move(keystr)); +// } int main(int argc, char **argv) { sleep(5); @@ -59,23 +60,26 @@ int main(int argc, char **argv) { zp_start_read_task(z_loan(s), NULL); zp_start_lease_task(z_loan(s), NULL); + // @TODO + // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...", KEYEXPR); - z_owned_closure_sample_t callback = z_closure(data_handler); - z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - exit(-1); - } - printf("OK!\n"); + // @TODO + // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL); + // if (!z_check(sub)) { + // printf("Unable to declare subscriber.\n"); + // exit(-1); + // } + // printf("OK!\n"); - while (1) { - sleep(5); - printf("Pulling data from '%s'...\n", KEYEXPR); - z_subscriber_pull(z_loan(sub)); - } + // while (1) { + // sleep(5); + // printf("Pulling data from '%s'...\n", KEYEXPR); + // z_subscriber_pull(z_loan(sub)); + // } - printf("Closing Zenoh Session..."); - z_undeclare_pull_subscriber(z_move(sub)); + // printf("Closing Zenoh Session..."); + // z_undeclare_pull_subscriber(z_move(sub)); + printf("Pull Subscriber not supported... exiting\n"); // Stop the receive and the session lease loop for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/include/zenoh-pico/api/constants.h b/include/zenoh-pico/api/constants.h index 9e9e095ca..a43286225 100644 --- a/include/zenoh-pico/api/constants.h +++ b/include/zenoh-pico/api/constants.h @@ -214,16 +214,6 @@ typedef enum { } z_priority_t; #define Z_PRIORITY_DEFAULT Z_PRIORITY_DATA -/** - * Subscription mode values. - * - * Enumerators: - * Z_SUBMODE_PUSH: Defines the subscription with a push paradigm. - * Z_SUBMODE_PULL: Defines the subscription with a pull paradigm. - */ -typedef enum { Z_SUBMODE_PUSH = 0, Z_SUBMODE_PULL = 1 } z_submode_t; -#define Z_SUBMODE_DEFAULT Z_SUBMODE_PUSH - /** * Query target values. * diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index c7cb700a9..0a99bf736 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -38,7 +38,6 @@ z_owned_config_t : z_config_loan, \ z_owned_scouting_config_t : z_scouting_config_loan, \ z_owned_session_t : z_session_loan, \ - z_owned_pull_subscriber_t : z_pull_subscriber_loan, \ z_owned_publisher_t : z_publisher_loan, \ z_owned_reply_t : z_reply_loan, \ z_owned_hello_t : z_hello_loan, \ @@ -57,7 +56,6 @@ z_owned_scouting_config_t * : z_scouting_config_drop, \ z_owned_session_t * : z_session_drop, \ z_owned_subscriber_t * : z_subscriber_drop, \ - z_owned_pull_subscriber_t * : z_pull_subscriber_drop, \ z_owned_publisher_t * : z_publisher_drop, \ z_owned_queryable_t * : z_queryable_drop, \ z_owned_reply_t * : z_reply_drop, \ @@ -83,7 +81,6 @@ z_owned_keyexpr_t * : z_keyexpr_null, \ z_owned_config_t * : z_config_null, \ z_owned_scouting_config_t * : z_scouting_config_null, \ - z_owned_pull_subscriber_t * : z_pull_subscriber_null, \ z_owned_subscriber_t * : z_subscriber_null, \ z_owned_queryable_t * : z_queryable_null, \ z_owned_reply_t * : z_reply_null, \ @@ -113,7 +110,6 @@ z_owned_scouting_config_t : z_scouting_config_check, \ z_owned_session_t : z_session_check, \ z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ z_owned_publisher_t : z_publisher_check, \ z_owned_queryable_t : z_queryable_check, \ z_owned_reply_t : z_reply_check, \ @@ -152,7 +148,6 @@ z_owned_scouting_config_t : z_scouting_config_move, \ z_owned_session_t : z_session_move, \ z_owned_subscriber_t : z_subscriber_move, \ - z_owned_pull_subscriber_t : z_pull_subscriber_move, \ z_owned_publisher_t : z_publisher_move, \ z_owned_queryable_t : z_queryable_move, \ z_owned_reply_t : z_reply_move, \ @@ -180,7 +175,6 @@ z_owned_config_t : z_config_clone, \ z_owned_session_t : z_session_clone, \ z_owned_subscriber_t : z_subscriber_clone, \ - z_owned_pull_subscriber_t : z_pull_subscriber_clone, \ z_owned_publisher_t : z_publisher_clone, \ z_owned_queryable_t : z_queryable_clone, \ z_owned_reply_t : z_reply_clone, \ @@ -201,7 +195,6 @@ z_owned_keyexpr_t * : z_keyexpr_null, \ z_owned_config_t * : z_config_null, \ z_owned_scouting_config_t * : z_scouting_config_null, \ - z_owned_pull_subscriber_t * : z_pull_subscriber_null, \ z_owned_subscriber_t * : z_subscriber_null, \ z_owned_queryable_t * : z_queryable_null, \ z_owned_reply_t * : z_reply_null, \ @@ -242,7 +235,6 @@ template<> struct zenoh_loan_type{ typedef z_session_t type; template<> struct zenoh_loan_type{ typedef z_keyexpr_t type; }; template<> struct zenoh_loan_type{ typedef z_config_t type; }; template<> struct zenoh_loan_type{ typedef z_publisher_t type; }; -template<> struct zenoh_loan_type{ typedef z_pull_subscriber_t type; }; template<> struct zenoh_loan_type{ typedef z_hello_t type; }; template<> struct zenoh_loan_type{ typedef const char* type; }; @@ -250,7 +242,6 @@ template<> inline z_session_t z_loan(const z_owned_session_t& x) { return z_sess template<> inline z_keyexpr_t z_loan(const z_owned_keyexpr_t& x) { return z_keyexpr_loan(&x); } template<> inline z_config_t z_loan(const z_owned_config_t& x) { return z_config_loan(&x); } template<> inline z_publisher_t z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); } -template<> inline z_pull_subscriber_t z_loan(const z_owned_pull_subscriber_t& x) { return z_pull_subscriber_loan(&x); } template<> inline z_hello_t z_loan(const z_owned_hello_t& x) { return z_hello_loan(&x); } template<> inline const char* z_loan(const z_owned_str_t& x) { return z_str_loan(&x); } @@ -262,7 +253,6 @@ template<> struct zenoh_drop_type { typedef int8_t type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef int8_t type; }; template<> struct zenoh_drop_type { typedef int8_t type; }; template<> struct zenoh_drop_type { typedef int8_t type; }; template<> struct zenoh_drop_type { typedef void type; }; @@ -279,7 +269,6 @@ template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_pub template<> inline void z_drop(z_owned_keyexpr_t* v) { z_keyexpr_drop(v); } template<> inline void z_drop(z_owned_config_t* v) { z_config_drop(v); } template<> inline void z_drop(z_owned_scouting_config_t* v) { z_scouting_config_drop(v); } -template<> inline int8_t z_drop(z_owned_pull_subscriber_t* v) { return z_undeclare_pull_subscriber(v); } template<> inline int8_t z_drop(z_owned_subscriber_t* v) { return z_undeclare_subscriber(v); } template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_queryable(v); } template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); } @@ -296,7 +285,6 @@ inline void z_null(z_owned_publisher_t& v) { v = z_publisher_null(); } inline void z_null(z_owned_keyexpr_t& v) { v = z_keyexpr_null(); } inline void z_null(z_owned_config_t& v) { v = z_config_null(); } inline void z_null(z_owned_scouting_config_t& v) { v = z_scouting_config_null(); } -inline void z_null(z_owned_pull_subscriber_t& v) { v = z_pull_subscriber_null(); } inline void z_null(z_owned_subscriber_t& v) { v = z_subscriber_null(); } inline void z_null(z_owned_queryable_t& v) { v = z_queryable_null(); } inline void z_null(z_owned_reply_t& v) { v = z_reply_null(); } @@ -316,7 +304,6 @@ inline bool z_check(const z_owned_config_t& v) { return z_config_check(&v); } inline bool z_check(const z_owned_scouting_config_t& v) { return z_scouting_config_check(&v); } inline bool z_check(const z_bytes_t& v) { return z_bytes_check(&v); } inline bool z_check(const z_owned_subscriber_t& v) { return z_subscriber_check(&v); } -inline bool z_check(const z_owned_pull_subscriber_t& v) { return z_pull_subscriber_check(&v); } inline bool z_check(const z_owned_queryable_t& v) { return z_queryable_check(&v); } inline bool z_check(const z_owned_reply_t& v) { return z_reply_check(&v); } inline bool z_check(const z_owned_hello_t& v) { return z_hello_check(&v); } diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index f661ca4cb..a72878b99 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -687,7 +687,6 @@ _OWNED_FUNCTIONS(z_config_t, z_owned_config_t, config) _OWNED_FUNCTIONS(z_scouting_config_t, z_owned_scouting_config_t, scouting_config) _OWNED_FUNCTIONS(z_session_t, z_owned_session_t, session) _OWNED_FUNCTIONS(z_subscriber_t, z_owned_subscriber_t, subscriber) -_OWNED_FUNCTIONS(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber) _OWNED_FUNCTIONS(z_publisher_t, z_owned_publisher_t, publisher) _OWNED_FUNCTIONS(z_queryable_t, z_owned_queryable_t, queryable) _OWNED_FUNCTIONS(z_hello_t, z_owned_hello_t, hello) @@ -1190,71 +1189,6 @@ int8_t z_undeclare_subscriber(z_owned_subscriber_t *sub); * */ z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub); - -/** - * Constructs the default values for the pull subscriber entity. - * - * Returns: - * Returns the constructed :c:type:`z_pull_subscriber_options_t`. - */ -z_pull_subscriber_options_t z_pull_subscriber_options_default(void); - -/** - * Declares a pull subscriber for the given keyexpr. - * - * Data can be pulled with this subscriber with the help of the - * :c:func:`z_pull` function. Received data is processed by means of callbacks. - * - * Like most ``z_owned_X_t`` types, you may obtain an instance of :c:type:`z_owned_pull_subscriber_t` by loaning it - * using ``z_pull_subscriber_loan(&val)``. The ``z_loan(val)`` macro, available if your compiler supports C11's - * ``_Generic``, is equivalent to writing ``z_pull_subscriber_loan(&val)``. - * - * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said - * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, - * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but - * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` - * is valid. - * - * To check if ``val`` is still valid, you may use ``z_pull_subscriber_check(&val)`` or ``z_check(val)`` if your - * compiler supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. - * - * Parameters: - * zs: A loaned instance of the the :c:type:`z_session_t` where to declare the subscriber. - * keyexpr: A loaned instance of :c:type:`z_keyexpr_t` to associate with the subscriber. - * callback: A moved instance of :c:type:`z_owned_closure_sample_t` containing the callbacks to be called and the - * context to pass to them. options: The options to apply to the pull subscriber. If ``NULL`` is passed, the default - * options will be applied. - * - * Returns: - * A :c:type:`z_owned_pull_subscriber_t` with either a valid subscriber or a failing subscriber. - * Should the pull subscriber be invalid, ``z_check(val)`` ing the returned value will return ``false``. - */ -z_owned_pull_subscriber_t z_declare_pull_subscriber(z_session_t zs, z_keyexpr_t keyexpr, - z_owned_closure_sample_t *callback, - const z_pull_subscriber_options_t *options); - -/** - * Undeclares the pull subscriber generated by a call to :c:func:`z_declare_pull_subscriber`. - * - * Parameters: - * sub: A moved instance of :c:type:`z_owned_pull_subscriber_t` to undeclare. - * - * Returns: - * Returns ``0`` if the undeclare pull subscriber operation is successful, or a ``negative value`` otherwise. - */ -int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub); - -/** - * Pulls data for :c:type:`z_owned_pull_subscriber_t`. The pulled data will be provided - * by calling the **callback** function provided to the :c:func:`z_declare_pull_subscriber` function. - * - * Parameters: - * sub: A loaned instance of :c:type:`z_pull_subscriber_t` from where to pull the data. - * - * Returns: - * Returns ``0`` if the pull operation is successful, or a ``negative value`` otherwise. - */ -int8_t z_subscriber_pull(const z_pull_subscriber_t sub); #endif /** diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index d7bb29451..6932536e4 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -149,20 +149,6 @@ typedef struct { } z_subscriber_t; _OWNED_TYPE_PTR(_z_subscriber_t, subscriber) -/** - * Represents a Zenoh Pull Subscriber entity. - * - * Operations over :c:type:`z_pull_subscriber_t` must be done using the provided functions: - * - * - :c:func:`z_declare_pull_subscriber` - * - :c:func:`z_undeclare_pull_subscriber` - * - :c:func:`z_subscriber_pull` - */ -typedef struct { - _z_pull_subscriber_t *_val; -} z_pull_subscriber_t; -_OWNED_TYPE_PTR(_z_pull_subscriber_t, pull_subscriber) - /** * Represents a Zenoh Publisher entity. * @@ -233,17 +219,6 @@ typedef struct { z_reliability_t reliability; } z_subscriber_options_t; -/** - * Represents the set of options that can be applied to a pull subscriber, - * upon its declaration via :c:func:`z_declare_pull_subscriber`. - * - * Members: - * z_reliability_t reliability: The subscription reliability. - */ -typedef struct { - z_reliability_t reliability; -} z_pull_subscriber_options_t; - /** * Represents the replies consolidation to apply on replies to a :c:func:`z_get`. * diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 1d95bac7c..ae8980ceb 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -156,17 +156,6 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr * 0 if success, or a negative value identifying the error. */ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub); - -/** - * Pull data for a pull mode :c:type:`_z_subscriber_t`. The pulled data will be provided - * by calling the **callback** function provided to the :c:func:`_z_declare_subscriber` function. - * - * Parameters: - * sub: The :c:type:`_z_subscriber_t` to pull from. - * Returns: - * ``0`` in case of success, ``-1`` in case of failure. - */ -int8_t _z_subscriber_pull(const _z_subscriber_t *sub); #endif #if Z_FEATURE_QUERYABLE == 1 diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 8d68188b4..822bbf094 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -41,7 +41,6 @@ typedef struct _z_session_t { // Session counters uint16_t _resource_id; uint32_t _entity_id; - _z_zint_t _pull_id; _z_zint_t _query_id; _z_zint_t _interest_id; diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index e2aafe540..bc98a9972 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -28,8 +28,6 @@ typedef struct { _z_session_rc_t _zn; } _z_subscriber_t; -typedef _z_subscriber_t _z_pull_subscriber_t; - #if Z_FEATURE_SUBSCRIPTION == 1 /** * Create a default subscription info for a push subscriber. @@ -37,15 +35,7 @@ typedef _z_subscriber_t _z_pull_subscriber_t; * Returns: * A :c:type:`_z_subinfo_t` containing the created subscription info. */ -_z_subinfo_t _z_subinfo_push_default(void); - -/** - * Create a default subscription info for a pull subscriber. - * - * Returns: - * A :c:type:`_z_subinfo_t` containing the created subscription info. - */ -_z_subinfo_t _z_subinfo_pull_default(void); +_z_subinfo_t _z_subinfo_default(void); void _z_subscriber_clear(_z_subscriber_t *sub); void _z_subscriber_free(_z_subscriber_t **sub); diff --git a/include/zenoh-pico/protocol/codec/core.h b/include/zenoh-pico/protocol/codec/core.h index 4919b0e01..13cb2d06a 100644 --- a/include/zenoh-pico/protocol/codec/core.h +++ b/include/zenoh-pico/protocol/codec/core.h @@ -76,10 +76,6 @@ int8_t _z_zbuf_read_exact(_z_zbuf_t *zbf, uint8_t *dest, size_t length); int8_t _z_str_encode(_z_wbuf_t *buf, const char *s); int8_t _z_str_decode(char **str, _z_zbuf_t *buf); -int8_t _z_period_encode(_z_wbuf_t *wbf, const _z_period_t *m); -int8_t _z_period_decode(_z_period_t *p, _z_zbuf_t *zbf); -int8_t _z_period_decode_na(_z_period_t *p, _z_zbuf_t *zbf); - int8_t _z_keyexpr_encode(_z_wbuf_t *buf, _Bool has_suffix, const _z_keyexpr_t *ke); int8_t _z_keyexpr_decode(_z_keyexpr_t *ke, _z_zbuf_t *buf, _Bool has_suffix); diff --git a/include/zenoh-pico/protocol/codec/message.h b/include/zenoh-pico/protocol/codec/message.h index 89080475a..1ad2de4ba 100644 --- a/include/zenoh-pico/protocol/codec/message.h +++ b/include/zenoh-pico/protocol/codec/message.h @@ -18,27 +18,18 @@ #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/iobuf.h" -int8_t _z_push_body_encode(_z_wbuf_t *buf, const _z_push_body_t *ts); -int8_t _z_push_body_decode(_z_push_body_t *ts, _z_zbuf_t *buf, uint8_t header); +int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb); +int8_t _z_push_body_decode(_z_push_body_t *body, _z_zbuf_t *zbf, uint8_t header); int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *query); int8_t _z_query_decode(_z_msg_query_t *query, _z_zbuf_t *zbf, uint8_t header); -int8_t _z_pull_encode(_z_wbuf_t *wbf, const _z_msg_pull_t *pull); -int8_t _z_pull_decode(_z_msg_pull_t *pull, _z_zbuf_t *zbf, uint8_t header); - int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply); int8_t _z_reply_decode(_z_msg_reply_t *reply, _z_zbuf_t *zbf, uint8_t header); int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err); int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header); -int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack); -int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header); - -int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb); -int8_t _z_push_body_decode(_z_push_body_t *body, _z_zbuf_t *zbf, uint8_t header); - int8_t _z_put_encode(_z_wbuf_t *wbf, const _z_msg_put_t *put); int8_t _z_put_decode(_z_msg_put_t *put, _z_zbuf_t *zbf, uint8_t header); diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 2eaa3e495..113566319 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -272,33 +272,15 @@ typedef struct { _z_zint_t n; } _z_target_complete_body_t; -/** - * The subscription period. - * - * Members: - * unsigned int origin: - * unsigned int period: - * unsigned int duration: - */ -typedef struct { - unsigned int origin; - unsigned int period; - unsigned int duration; -} _z_period_t; - /** * Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created * :c:type:`_z_subscription_rc_t`. * * Members: - * _z_period_t *period: The subscription period. * z_reliability_t reliability: The subscription reliability. - * _z_submode_t mode: The subscription mode. */ typedef struct { - _z_period_t period; z_reliability_t reliability; - z_submode_t mode; } _z_subinfo_t; typedef struct { diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 1df9c60f7..4e0bae713 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -34,7 +34,6 @@ typedef struct { _z_keyexpr_t _keyexpr; uint32_t _id; struct { - _Bool _pull_mode; _Bool _reliable; } _ext_subinfo; } _z_decl_subscriber_t; @@ -130,7 +129,7 @@ void _z_decl_fix_mapping(_z_declaration_t* msg, uint16_t mapping); _z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key); _z_declaration_t _z_make_undecl_keyexpr(uint16_t id); -_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable, _Bool pull_mode); +_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable); _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete); diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index 83d8f61ba..f1bfed6b5 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -24,25 +24,13 @@ #define _Z_MID_Z_QUERY 0x03 #define _Z_MID_Z_REPLY 0x04 #define _Z_MID_Z_ERR 0x05 -#define _Z_MID_Z_ACK 0x06 -#define _Z_MID_Z_PULL 0x07 -#define _Z_MID_Z_LINK_STATE_LIST 0x10 /* Zenoh message flags */ #define _Z_FLAG_Z_Z 0x80 -#define _Z_FLAG_Z_B 0x40 // 1 << 6 | QueryPayload if B==1 then QueryPayload is present #define _Z_FLAG_Z_D 0x20 // 1 << 5 | Dropping if D==1 then the message can be dropped -#define _Z_FLAG_Z_F \ - 0x20 // 1 << 5 | Final if F==1 then this is the final message (e.g., ReplyContext, Pull) -#define _Z_FLAG_Z_I 0x40 // 1 << 6 | DataInfo if I==1 then DataInfo is present #define _Z_FLAG_Z_K 0x80 // 1 << 7 | ResourceKey if K==1 then keyexpr is string -#define _Z_FLAG_Z_N 0x40 // 1 << 6 | MaxSamples if N==1 then the MaxSamples is indicated -#define _Z_FLAG_Z_P 0x20 // 1 << 7 | Period if P==1 then a period is present -#define _Z_FLAG_Z_Q 0x40 // 1 << 6 | QueryableKind if Q==1 then the queryable kind is present #define _Z_FLAG_Z_R \ 0x20 // 1 << 5 | Reliable if R==1 then it concerns the reliable channel, best-effort otherwise -#define _Z_FLAG_Z_S 0x40 // 1 << 6 | SubMode if S==1 then the declaration SubMode is indicated -#define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present #define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero #define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header @@ -70,41 +58,6 @@ typedef struct { } _z_msg_err_t; void _z_msg_err_clear(_z_msg_err_t *err); -/// Flags: -/// - T: Timestamp If T==1 then the timestamp if present -/// - X: Reserved -/// - Z: Extension If Z==1 then at least one extension is present -/// -/// 7 6 5 4 3 2 1 0 -/// +-+-+-+-+-+-+-+-+ -/// |Z|X|T| ACK | -/// +-+-+-+---------+ -/// ~ ts: ~ if T==1 -/// +---------------+ -/// ~ [err_exts] ~ if Z==1 -/// +---------------+ -typedef struct { - _z_timestamp_t _timestamp; - _z_source_info_t _ext_source_info; -} _z_msg_ack_t; -#define _Z_FLAG_Z_A_T 0x20 - -/// Flags: -/// - T: Timestamp If T==1 then the timestamp if present -/// - X: Reserved -/// - Z: Extension If Z==1 then at least one extension is present -/// -/// 7 6 5 4 3 2 1 0 -/// +-+-+-+-+-+-+-+-+ -/// |Z|X|X| PULL | -/// +---------------+ -/// ~ [pull_exts] ~ if Z==1 -/// +---------------+ -typedef struct { - _z_source_info_t _ext_source_info; -} _z_msg_pull_t; -static inline void _z_msg_pull_clear(_z_msg_pull_t *pull) { (void)pull; } - typedef struct { _z_timestamp_t _timestamp; _z_source_info_t _source_info; diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 689102dbe..94d7736c8 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -99,12 +99,15 @@ typedef struct { z_query_target_t _ext_target; uint32_t _ext_budget; uint32_t _ext_timeout_ms; - enum { _Z_REQUEST_QUERY, _Z_REQUEST_PUT, _Z_REQUEST_DEL, _Z_REQUEST_PULL } _tag; + enum { + _Z_REQUEST_QUERY, + _Z_REQUEST_PUT, + _Z_REQUEST_DEL, + } _tag; union { _z_msg_query_t _query; _z_msg_put_t _put; _z_msg_del_t _del; - _z_msg_pull_t _pull; } _body; } _z_n_msg_request_t; typedef struct { @@ -180,16 +183,10 @@ typedef struct { enum { _Z_RESPONSE_BODY_REPLY, _Z_RESPONSE_BODY_ERR, - _Z_RESPONSE_BODY_ACK, - _Z_RESPONSE_BODY_PUT, - _Z_RESPONSE_BODY_DEL, } _tag; union { _z_msg_reply_t _reply; _z_msg_err_t _err; - _z_msg_ack_t _ack; - _z_msg_put_t _put; - _z_msg_del_t _del; } _body; } _z_n_msg_response_t; void _z_n_msg_response_clear(_z_n_msg_response_t *msg); @@ -222,7 +219,6 @@ _Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_ _Z_VEC_DEFINE(_z_network_message, _z_network_message_t) void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping); -_z_network_message_t _z_msg_make_pull(_z_keyexpr_t key, _z_zint_t pull_id); _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes_t) parameters, _z_zint_t qid, z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value #if Z_FEATURE_ATTACHMENT == 1 @@ -231,7 +227,6 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt #endif ); _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); -_z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key); _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid); _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration); _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index f13ffe05f..b84091f2c 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -39,9 +39,6 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co ); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); - -/*------------------ Pull ------------------*/ -_z_zint_t _z_get_pull_id(_z_session_t *zn); #endif #endif /* INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H */ diff --git a/src/api/api.c b/src/api/api.c index 9a64be720..1369c82b1 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -976,18 +976,10 @@ OWNED_FUNCTIONS_PTR_COMMON(z_subscriber_t, z_owned_subscriber_t, subscriber) OWNED_FUNCTIONS_PTR_CLONE(z_subscriber_t, z_owned_subscriber_t, subscriber, _z_owner_noop_copy) void z_subscriber_drop(z_owned_subscriber_t *val) { z_undeclare_subscriber(val); } -OWNED_FUNCTIONS_PTR_COMMON(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber) -OWNED_FUNCTIONS_PTR_CLONE(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber, _z_owner_noop_copy) -void z_pull_subscriber_drop(z_owned_pull_subscriber_t *val) { z_undeclare_pull_subscriber(val); } - z_subscriber_options_t z_subscriber_options_default(void) { return (z_subscriber_options_t){.reliability = Z_RELIABILITY_DEFAULT}; } -z_pull_subscriber_options_t z_pull_subscriber_options_default(void) { - return (z_pull_subscriber_options_t){.reliability = Z_RELIABILITY_DEFAULT}; -} - z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_sample_t *callback, const z_subscriber_options_t *options) { void *ctx = callback->context; @@ -1023,7 +1015,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z } } - _z_subinfo_t subinfo = _z_subinfo_push_default(); + _z_subinfo_t subinfo = _z_subinfo_default(); if (options != NULL) { subinfo.reliability = options->reliability; } @@ -1035,30 +1027,6 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z return (z_owned_subscriber_t){._value = sub}; } -z_owned_pull_subscriber_t z_declare_pull_subscriber(z_session_t zs, z_keyexpr_t keyexpr, - z_owned_closure_sample_t *callback, - const z_pull_subscriber_options_t *options) { - (void)(options); - - void *ctx = callback->context; - callback->context = NULL; - - z_keyexpr_t key = keyexpr; - _z_resource_t *r = _z_get_resource_by_key(&zs._val.in->val, &keyexpr); - if (r == NULL) { - uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); - key = _z_rid_with_suffix(id, NULL); - } - - _z_subinfo_t subinfo = _z_subinfo_pull_default(); - if (options != NULL) { - subinfo.reliability = options->reliability; - } - - return (z_owned_pull_subscriber_t){ - ._value = _z_declare_subscriber(&zs._val, key, subinfo, callback->call, callback->drop, ctx)}; -} - int8_t z_undeclare_subscriber(z_owned_subscriber_t *sub) { int8_t ret = _Z_RES_OK; @@ -1068,17 +1036,6 @@ int8_t z_undeclare_subscriber(z_owned_subscriber_t *sub) { return ret; } -int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub) { - int8_t ret = _Z_RES_OK; - - _z_undeclare_subscriber(sub->_value); - _z_subscriber_free(&sub->_value); - - return ret; -} - -int8_t z_subscriber_pull(const z_pull_subscriber_t sub) { return _z_subscriber_pull(sub._val); } - z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { z_owned_keyexpr_t ret = z_keyexpr_null(); uint32_t lookup = sub._val->_entity_id; diff --git a/src/net/primitives.c b/src/net/primitives.c index 2aa5b7920..7d459350a 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -212,8 +212,8 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr return NULL; } // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_decl_subscriber( - &keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL); + _z_declaration_t declaration = + _z_make_decl_subscriber(&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_subscription(&zn->in->val, _Z_RESOURCE_IS_LOCAL, sp_s); @@ -254,24 +254,6 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { _z_session_rc_drop(&sub->_zn); return _Z_RES_OK; } - -/*------------------ Pull ------------------*/ -int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { - int8_t ret = _Z_RES_OK; - - _z_subscription_rc_t *s = _z_get_subscription_by_id(&sub->_zn.in->val, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); - if (s != NULL) { - _z_zint_t pull_id = _z_get_pull_id(&sub->_zn.in->val); - _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->in->val._key), pull_id); - if (_z_send_n_msg(&sub->_zn.in->val, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; - } - - return ret; -} #endif #if Z_FEATURE_QUERYABLE == 1 diff --git a/src/net/subscribe.c b/src/net/subscribe.c index 316966897..23c7faddb 100644 --- a/src/net/subscribe.c +++ b/src/net/subscribe.c @@ -14,19 +14,9 @@ #include "zenoh-pico/net/subscribe.h" #if Z_FEATURE_SUBSCRIPTION == 1 -_z_subinfo_t _z_subinfo_push_default(void) { +_z_subinfo_t _z_subinfo_default(void) { _z_subinfo_t si; - si.reliability = Z_RELIABILITY_RELIABLE; - si.mode = Z_SUBMODE_PUSH; - si.period = (_z_period_t){.origin = 0, .period = 0, .duration = 0}; - return si; -} - -_z_subinfo_t _z_subinfo_pull_default(void) { - _z_subinfo_t si; - si.reliability = Z_RELIABILITY_RELIABLE; - si.mode = Z_SUBMODE_PULL; - si.period = (_z_period_t){.origin = 0, .period = 0, .duration = 0}; + si.reliability = Z_RELIABILITY_BEST_EFFORT; return si; } diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 2a44e49c9..d9d86d684 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -77,12 +77,11 @@ int8_t _z_decl_commons_encode(_z_wbuf_t *wbf, uint8_t header, _Bool has_extensio } int8_t _z_decl_subscriber_encode(_z_wbuf_t *wbf, const _z_decl_subscriber_t *decl) { uint8_t header = _Z_DECL_SUBSCRIBER_MID; - _Bool has_submode_ext = decl->_ext_subinfo._pull_mode || decl->_ext_subinfo._reliable; + _Bool has_submode_ext = decl->_ext_subinfo._reliable; _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, has_submode_ext, decl->_id, decl->_keyexpr)); if (has_submode_ext) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZINT | 0x01)); - _Z_RETURN_IF_ERR( - _z_uint8_encode(wbf, (decl->_ext_subinfo._pull_mode ? 2 : 0) | (decl->_ext_subinfo._reliable ? 1 : 0))); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, (decl->_ext_subinfo._reliable ? 1 : 0))); } return _Z_RES_OK; @@ -301,7 +300,6 @@ int8_t _z_decl_subscriber_decode_extensions(_z_msg_ext_t *extension, void *ctx) _z_decl_subscriber_t *decl = (_z_decl_subscriber_t *)ctx; switch (extension->_header) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { - decl->_ext_subinfo._pull_mode = _Z_HAS_FLAG(extension->_body._zint._val, 2); decl->_ext_subinfo._reliable = _Z_HAS_FLAG(extension->_body._zint._val, 1); } break; default: diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 240ba4dce..7dd176d87 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -617,96 +617,6 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) { return _Z_RES_OK; } -int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) { - int8_t ret = _Z_RES_OK; - uint8_t header = _Z_MID_Z_ACK; - _Bool has_ts = _z_timestamp_check(&ack->_timestamp); - _Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id) || ack->_ext_source_info._source_sn != 0 || - ack->_ext_source_info._entity_id != 0; - if (has_ts) { - header |= _Z_FLAG_Z_A_T; - } - if (has_sinfo_ext) { - header |= _Z_FLAG_Z_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - if (has_ts) { - _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &ack->_timestamp)); - } - if (has_sinfo_ext) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01)); - _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &ack->_ext_source_info)); - } - return ret; -} -int8_t _z_ack_decode_extension(_z_msg_ext_t *extension, void *ctx) { - int8_t ret = _Z_RES_OK; - _z_msg_ack_t *ack = (_z_msg_ack_t *)ctx; - switch (_Z_EXT_FULL_ID(extension->_header)) { - case _Z_MSG_EXT_ENC_ZBUF | 0x01: { - _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); - ret = _z_source_info_decode(&ack->_ext_source_info, &zbf); - break; - } - default: - if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { - ret = _z_msg_ext_unknown_error(extension, 0x0b); - } - } - return ret; -} -int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header) { - int8_t ret = _Z_RES_OK; - *ack = (_z_msg_ack_t){0}; - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_A_T)) { - _Z_RETURN_IF_ERR(_z_timestamp_decode(&ack->_timestamp, zbf)); - } - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { - ret = _z_msg_ext_decode_iter(zbf, _z_ack_decode_extension, ack); - } - return ret; -} - -int8_t _z_pull_encode(_z_wbuf_t *wbf, const _z_msg_pull_t *pull) { - int8_t ret = _Z_RES_OK; - uint8_t header = _Z_MID_Z_PULL; - _Bool has_info_ext = _z_id_check(pull->_ext_source_info._id) || pull->_ext_source_info._source_sn != 0 || - pull->_ext_source_info._entity_id != 0; - if (has_info_ext) { - header |= _Z_FLAG_Z_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - if (has_info_ext) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01)); - _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &pull->_ext_source_info)); - } - return ret; -} -int8_t _z_pull_decode_extension(_z_msg_ext_t *extension, void *ctx) { - int8_t ret = _Z_RES_OK; - _z_msg_pull_t *pull = (_z_msg_pull_t *)ctx; - switch (_Z_EXT_FULL_ID(extension->_header)) { - case _Z_MSG_EXT_ENC_ZBUF | 0x01: { - _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); - ret = _z_source_info_decode(&pull->_ext_source_info, &zbf); - break; - } - default: - if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { - ret = _z_msg_ext_unknown_error(extension, 0x0c); - } - } - return ret; -} -int8_t _z_pull_decode(_z_msg_pull_t *pull, _z_zbuf_t *zbf, uint8_t header) { - *pull = (_z_msg_pull_t){0}; - int8_t ret = _Z_RES_OK; - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { - ret = _z_msg_ext_decode_iter(zbf, _z_pull_decode_extension, pull); - } - return ret; -} - /*=============================*/ /* Scouting Messages */ /*=============================*/ diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index e957cb8d9..5074b8a2e 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -162,9 +162,6 @@ int8_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) { case _Z_REQUEST_DEL: { _Z_RETURN_IF_ERR(_z_del_encode(wbf, &msg->_body._del)); } break; - case _Z_REQUEST_PULL: { - _Z_RETURN_IF_ERR(_z_pull_encode(wbf, &msg->_body._pull)); - } break; } return ret; } @@ -233,10 +230,6 @@ int8_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint8_t msg->_tag = _Z_REQUEST_DEL; _Z_RETURN_IF_ERR(_z_del_decode(&msg->_body._del, zbf, zheader)); } break; - case _Z_MID_Z_PULL: { - msg->_tag = _Z_REQUEST_PULL; - _Z_RETURN_IF_ERR(_z_pull_decode(&msg->_body._pull, zbf, zheader)); - } break; default: return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; } @@ -305,18 +298,6 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { _Z_RETURN_IF_ERR(_z_err_encode(wbf, &msg->_body._err)); break; } - case _Z_RESPONSE_BODY_ACK: { - _Z_RETURN_IF_ERR(_z_ack_encode(wbf, &msg->_body._ack)); - break; - } - case _Z_RESPONSE_BODY_PUT: { - _Z_RETURN_IF_ERR(_z_put_encode(wbf, &msg->_body._put)); - break; - } - case _Z_RESPONSE_BODY_DEL: { - _Z_RETURN_IF_ERR(_z_del_encode(wbf, &msg->_body._del)); - break; - } } return ret; @@ -379,21 +360,6 @@ int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t head ret = _z_err_decode(&msg->_body._err, zbf, inner_header); break; } - case _Z_MID_Z_ACK: { - msg->_tag = _Z_RESPONSE_BODY_ACK; - ret = _z_ack_decode(&msg->_body._ack, zbf, inner_header); - break; - } - case _Z_MID_Z_PUT: { - msg->_tag = _Z_RESPONSE_BODY_PUT; - ret = _z_put_decode(&msg->_body._put, zbf, inner_header); - break; - } - case _Z_MID_Z_DEL: { - msg->_tag = _Z_RESPONSE_BODY_DEL; - ret = _z_del_decode(&msg->_body._del, zbf, inner_header); - break; - } default: { _Z_ERROR("Unknown N_MID: %d", _Z_MID(inner_header)); ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index 56f004a18..709899619 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -69,12 +69,11 @@ _z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key) { _z_declaration_t _z_make_undecl_keyexpr(uint16_t id) { return (_z_declaration_t){._tag = _Z_UNDECL_KEXPR, ._body = {._undecl_kexpr = {._id = id}}}; } -_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable, _Bool pull_mode) { +_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable) { return (_z_declaration_t){ ._tag = _Z_DECL_SUBSCRIBER, - ._body = {._decl_subscriber = {._id = id, - ._keyexpr = _z_keyexpr_steal(key), - ._ext_subinfo = {._pull_mode = pull_mode, ._reliable = reliable}}}}; + ._body = {._decl_subscriber = { + ._id = id, ._keyexpr = _z_keyexpr_steal(key), ._ext_subinfo = {._reliable = reliable}}}}; } _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index 5f26ecf58..9836113cb 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -56,9 +56,6 @@ void _z_n_msg_request_clear(_z_n_msg_request_t *msg) { case _Z_REQUEST_DEL: { _z_msg_del_clear(&msg->_body._del); } break; - case _Z_REQUEST_PULL: { - _z_msg_pull_clear(&msg->_body._pull); - } break; } } @@ -100,16 +97,6 @@ void _z_n_msg_response_clear(_z_n_msg_response_t *msg) { _z_msg_err_clear(&msg->_body._err); break; } - case _Z_RESPONSE_BODY_ACK: { - break; - } - case _Z_RESPONSE_BODY_PUT: { - _z_msg_put_clear(&msg->_body._put); - break; - } - case _Z_RESPONSE_BODY_DEL: { - break; - } } } @@ -143,30 +130,7 @@ void _z_n_msg_free(_z_network_message_t **msg) { *msg = NULL; } } -_z_network_message_t _z_msg_make_pull(_z_keyexpr_t key, _z_zint_t pull_id) { - _z_network_message_t ret = { - ._tag = _Z_N_REQUEST, - ._body = - { - ._request = - { - ._rid = pull_id, - ._key = key, - ._tag = _Z_REQUEST_PULL, - ._body = - { - ._pull = {._ext_source_info = _z_source_info_null()}, - }, - ._ext_budget = 0, - ._ext_qos = _Z_N_QOS_DEFAULT, - ._ext_target = Z_QUERY_TARGET_BEST_MATCHING, - ._ext_timestamp = _z_timestamp_null(), - ._ext_timeout_ms = 0, - }, - }, - }; - return ret; -} + _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes_t) parameters, _z_zint_t qid, z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value #if Z_FEATURE_ATTACHMENT == 1 @@ -244,18 +208,7 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke }; } -_z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key) { - return (_z_network_message_t){ - ._tag = _Z_N_RESPONSE, - ._body._response = - { - ._tag = _Z_RESPONSE_BODY_ACK, - ._request_id = rid, - ._key = _z_keyexpr_steal(key), - ._body._ack = {._timestamp = _z_timestamp_null(), ._ext_source_info = _z_source_info_null()}, - }, - }; -} + void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { switch (msg->_tag) { case _Z_N_DECLARE: { diff --git a/src/session/rx.c b/src/session/rx.c index fae8d2a36..451ed1551 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -115,8 +115,6 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint ); #endif if (ret == _Z_RES_OK) { - _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); - ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); _z_network_message_t final = _z_n_msg_make_response_final(req._rid); ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); } @@ -133,15 +131,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint ); #endif if (ret == _Z_RES_OK) { - _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); - ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); _z_network_message_t final = _z_n_msg_make_response_final(req._rid); ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); } } break; - case _Z_REQUEST_PULL: { - // @TODO: define behaviour - } break; } } break; case _Z_N_RESPONSE: { @@ -159,36 +152,6 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _Z_ERROR("Received Err for query %zu: message=%.*s", response._request_id, (int)payload.len, payload.start); } break; - case _Z_RESPONSE_BODY_ACK: { - // @TODO: implement ACKs for puts/dels - } break; - case _Z_RESPONSE_BODY_PUT: { - _z_msg_put_t put = response._body._put; -#if Z_FEATURE_SUBSCRIPTION == 1 -#if Z_FEATURE_ATTACHMENT == 1 - z_attachment_t att = _z_encoded_as_attachment(&put._attachment); -#endif - ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp -#if Z_FEATURE_ATTACHMENT == 1 - , - att -#endif - ); -#endif - } break; - case _Z_RESPONSE_BODY_DEL: { - _z_msg_del_t del = response._body._del; -#if Z_FEATURE_SUBSCRIPTION == 1 - ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_null() -#endif - ); -#endif - } break; } } break; case _Z_N_RESPONSE_FINAL: { diff --git a/src/session/subscription.c b/src/session/subscription.c index 68e0cb300..bbe38d100 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -37,9 +37,6 @@ void _z_subscription_clear(_z_subscription_t *sub) { _z_keyexpr_clear(&sub->_key); } -/*------------------ Pull ------------------*/ -_z_zint_t _z_get_pull_id(_z_session_t *zn) { return zn->_pull_id++; } - _z_subscription_rc_t *__z_get_subscription_by_id(_z_subscription_rc_list_t *subs, const _z_zint_t id) { _z_subscription_rc_t *ret = NULL; diff --git a/src/session/utils.c b/src/session/utils.c index 0791f4e47..d5d7fe1d7 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -57,7 +57,6 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { zn->_entity_id = 1; zn->_resource_id = 1; zn->_query_id = 1; - zn->_pull_id = 1; // Initialize the data structs zn->_local_resources = NULL; diff --git a/tests/z_api_alignment_test.c b/tests/z_api_alignment_test.c index f95fb38e3..84a3a53df 100644 --- a/tests/z_api_alignment_test.c +++ b/tests/z_api_alignment_test.c @@ -308,6 +308,7 @@ int main(int argc, char **argv) { printf("Session delete..."); z_delete_options_t _ret_delete_opt = z_delete_options_default(); + _ret_delete_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; _ret_int8 = z_delete(z_loan(s1), z_loan(_ret_expr), &_ret_delete_opt); assert_eq(_ret_int8, 0); printf("Ok\n"); @@ -322,17 +323,6 @@ int main(int argc, char **argv) { assert(!z_check(_ret_expr)); printf("Ok\n"); - _ret_int8 = z_undeclare_subscriber(z_move(_ret_sub)); - assert_eq(_ret_int8, 0); - - printf("Declaring Pull Subscriber..."); - z_owned_closure_sample_t _ret_closure_sample2 = z_closure(data_handler, NULL, &ls1); - z_pull_subscriber_options_t _ret_psub_opt = z_pull_subscriber_options_default(); - z_owned_pull_subscriber_t _ret_psub = - z_declare_pull_subscriber(z_loan(s2), z_keyexpr(keyexpr_str), z_move(_ret_closure_sample2), &_ret_psub_opt); - assert(z_check(_ret_psub)); - printf("Ok\n"); - printf("Declaring Publisher..."); z_publisher_options_t _ret_pub_opt = z_publisher_options_default(); _ret_pub_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; @@ -348,13 +338,6 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); - - printf("Pull Subscriber Pulling data..."); - _ret_int8 = z_subscriber_pull(z_loan(_ret_psub)); - assert_eq(_ret_int8, 0); - printf("Ok\n"); - sleep(SLEEP); assert_eq(datas, 3); @@ -364,13 +347,6 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); - - printf("Pull Subscriber Pulling data..."); - _ret_int8 = z_subscriber_pull(z_loan(_ret_psub)); - assert_eq(_ret_int8, 0); - printf("Ok\n"); - sleep(SLEEP); assert_eq(datas, 4); @@ -380,9 +356,12 @@ int main(int argc, char **argv) { assert(!z_check(_ret_pub)); printf("Ok\n"); - printf("Undeclaring Pull Subscriber..."); - _ret_int8 = z_undeclare_pull_subscriber(z_move(_ret_psub)); + sleep(SLEEP); + + printf("Undeclaring Subscriber..."); + _ret_int8 = z_undeclare_subscriber(z_move(_ret_sub)); assert_eq(_ret_int8, 0); + assert(!z_check(_ret_sub)); printf("Ok\n"); sleep(SLEEP); diff --git a/tests/z_api_null_drop_test.c b/tests/z_api_null_drop_test.c index a3c0f74cf..51d65328d 100644 --- a/tests/z_api_null_drop_test.c +++ b/tests/z_api_null_drop_test.c @@ -81,15 +81,10 @@ int main(void) { assert(!z_check(publisher_null_2)); #endif #if Z_FEATURE_SUBSCRIPTION == 1 - z_owned_pull_subscriber_t pull_subscriber_null_1 = z_pull_subscriber_null(); z_owned_subscriber_t subscriber_null_1 = z_subscriber_null(); - assert(!z_check(pull_subscriber_null_1)); assert(!z_check(subscriber_null_1)); - z_owned_pull_subscriber_t pull_subscriber_null_2; z_owned_subscriber_t subscriber_null_2; - z_null(&pull_subscriber_null_2); z_null(&subscriber_null_2); - assert(!z_check(pull_subscriber_null_2)); assert(!z_check(subscriber_null_2)); #endif #if Z_FEATURE_QUERYABLE == 1 @@ -150,9 +145,7 @@ int main(void) { z_drop(z_move(publisher_null_2)); #endif #if Z_FEATURE_SUBSCRIPTION == 1 - z_drop(z_move(pull_subscriber_null_1)); z_drop(z_move(subscriber_null_1)); - z_drop(z_move(pull_subscriber_null_2)); z_drop(z_move(subscriber_null_2)); #endif #if Z_FEATURE_QUERYABLE == 1 diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 0de0b7320..23e1e5f8d 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -600,37 +600,15 @@ void timestamp_field(void) { /*------------------ SubInfo field ------------------*/ _z_subinfo_t gen_subinfo(void) { _z_subinfo_t sm; - sm.mode = gen_bool() ? Z_SUBMODE_PUSH : Z_SUBMODE_PULL; sm.reliability = gen_bool() ? Z_RELIABILITY_RELIABLE : Z_RELIABILITY_BEST_EFFORT; - if (gen_bool()) { - sm.period.origin = gen_uint(); - sm.period.period = gen_uint(); - sm.period.duration = gen_uint(); - } else { - sm.period.origin = 0; - sm.period.period = 0; - sm.period.duration = 0; - } return sm; } void assert_eq_subinfo(_z_subinfo_t *left, _z_subinfo_t *right) { printf("SubInfo -> "); - printf("Mode (%u:%u), ", left->mode, right->mode); - assert(left->mode == right->mode); - printf("Reliable (%u:%u), ", left->reliability, right->reliability); assert(left->reliability == right->reliability); - - printf("Period ("); - printf("<%u:%u,%u>", left->period.origin, left->period.period, left->period.duration); - printf(":"); - printf("<%u:%u,%u>", right->period.origin, right->period.period, right->period.duration); - printf(")"); - assert(left->period.origin == right->period.origin); - assert(left->period.period == right->period.period); - assert(left->period.duration == right->period.duration); } /*------------------ ResKey field ------------------*/ @@ -745,15 +723,13 @@ _z_decl_subscriber_t gen_subscriber_declaration(void) { _z_subinfo_t subinfo = gen_subinfo(); _z_decl_subscriber_t e_sd = {._keyexpr = gen_keyexpr(), ._id = (uint32_t)gen_uint64(), - ._ext_subinfo = {._pull_mode = subinfo.mode == Z_SUBMODE_PULL, - ._reliable = subinfo.reliability == Z_RELIABILITY_RELIABLE}}; + ._ext_subinfo = {._reliable = subinfo.reliability == Z_RELIABILITY_RELIABLE}}; return e_sd; } void assert_eq_subscriber_declaration(const _z_decl_subscriber_t *left, const _z_decl_subscriber_t *right) { assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); assert(left->_id == right->_id); - assert(left->_ext_subinfo._pull_mode == right->_ext_subinfo._pull_mode); assert(left->_ext_subinfo._reliable == right->_ext_subinfo._reliable); } @@ -1131,33 +1107,6 @@ void push_body_message(void) { _z_wbuf_clear(&wbf); } -/*------------------ Pull message ------------------*/ -_z_msg_pull_t gen_pull_message(void) { return (_z_msg_pull_t){._ext_source_info = _z_source_info_null()}; } - -void assert_eq_pull_message(_z_msg_pull_t *left, _z_msg_pull_t *right) { - assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); -} - -void pull_message(void) { - printf("\n>> Pull message\n"); - _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); - _z_msg_pull_t e_pull_msg = gen_pull_message(); - - assert(_z_pull_encode(&wbf, &e_pull_msg) == _Z_RES_OK); - - _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); - uint8_t header = _z_zbuf_read(&zbf); - - _z_msg_pull_t d_pull_msg; - assert(_z_pull_decode(&d_pull_msg, &zbf, header) == _Z_RES_OK); - assert_eq_pull_message(&e_pull_msg, &d_pull_msg); - - _z_msg_pull_clear(&e_pull_msg); - _z_msg_pull_clear(&d_pull_msg); - _z_zbuf_clear(&zbf); - _z_wbuf_clear(&wbf); -} - _z_msg_query_t gen_query(void) { return (_z_msg_query_t){ ._consolidation = (gen_uint8() % 4) - 1, @@ -1222,32 +1171,6 @@ void err_message(void) { _z_wbuf_clear(&wbf); } -_z_msg_ack_t gen_ack(void) { - return (_z_msg_ack_t){ - ._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null(), - ._timestamp = gen_timestamp(), - }; -} - -void assert_eq_ack(const _z_msg_ack_t *left, const _z_msg_ack_t *right) { - assert_eq_timestamp(&left->_timestamp, &right->_timestamp); - assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); -} - -void ack_message(void) { - printf("\n>> Ack message\n"); - _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); - _z_msg_ack_t expected = gen_ack(); - assert(_z_ack_encode(&wbf, &expected) == _Z_RES_OK); - _z_msg_ack_t decoded; - _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); - uint8_t header = _z_zbuf_read(&zbf); - assert(_Z_RES_OK == _z_ack_decode(&decoded, &zbf, header)); - assert_eq_ack(&expected, &decoded); - _z_zbuf_clear(&zbf); - _z_wbuf_clear(&wbf); -} - _z_msg_reply_t gen_reply(void) { return (_z_msg_reply_t){ ._consolidation = (gen_uint8() % 4) - 1, @@ -1318,16 +1241,12 @@ _z_n_msg_request_t gen_request(void) { ._ext_budget = gen_bool() ? (uint32_t)gen_uint64() : 0, ._ext_timeout_ms = gen_bool() ? (uint32_t)gen_uint64() : 0, }; - switch (gen_uint8() % 4) { + switch (gen_uint8() % 2) { case 0: { request._tag = _Z_REQUEST_QUERY; request._body._query = gen_query(); } break; - case 1: { - request._tag = _Z_REQUEST_PULL; - request._body._pull = - (_z_msg_pull_t){._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null()}; - } break; + case 1: default: { _z_push_body_t body = gen_push_body(); if (body._is_put) { @@ -1363,9 +1282,6 @@ void assert_eq_request(const _z_n_msg_request_t *left, const _z_n_msg_request_t assert_eq_push_body(&(_z_push_body_t){._is_put = false, ._body._del = left->_body._del}, &(_z_push_body_t){._is_put = false, ._body._del = right->_body._del}); } break; - case _Z_REQUEST_PULL: { - assert_eq_source_info(&left->_body._pull._ext_source_info, &right->_body._pull._ext_source_info); - } break; } } @@ -1394,29 +1310,16 @@ _z_n_msg_response_t gen_response(void) { ._ext_timestamp = gen_bool() ? gen_timestamp() : _z_timestamp_null(), ._ext_responder = {._eid = gen_uint16(), ._zid = gen_zid()}, }; - switch (gen_uint() % 5) { + switch (gen_uint() % 2) { case 0: { - ret._tag = _Z_RESPONSE_BODY_ACK; - ret._body._ack = gen_ack(); - } break; - case 1: { ret._tag = _Z_RESPONSE_BODY_ERR; ret._body._err = gen_err(); } break; - case 2: { + case 1: + default: { ret._tag = _Z_RESPONSE_BODY_REPLY; ret._body._reply = gen_reply(); } break; - default: { - _z_push_body_t body = gen_push_body(); - if (body._is_put) { - ret._tag = _Z_RESPONSE_BODY_PUT; - ret._body._put = body._body._put; - } else { - ret._tag = _Z_RESPONSE_BODY_DEL; - ret._body._del = body._body._del; - } - } break; } return ret; } @@ -1435,17 +1338,7 @@ void assert_eq_response(const _z_n_msg_response_t *left, const _z_n_msg_response } break; case _Z_RESPONSE_BODY_ERR: { assert_eq_err(&left->_body._err, &right->_body._err); - } break; - case _Z_RESPONSE_BODY_ACK: { - assert_eq_ack(&left->_body._ack, &right->_body._ack); - } break; - case _Z_RESPONSE_BODY_PUT: { - assert_eq_push_body(&(_z_push_body_t){._is_put = true, ._body._put = left->_body._put}, - &(_z_push_body_t){._is_put = true, ._body._put = right->_body._put}); - } break; - case _Z_RESPONSE_BODY_DEL: { - assert_eq_push_body(&(_z_push_body_t){._is_put = false, ._body._del = left->_body._del}, - &(_z_push_body_t){._is_put = false, ._body._del = right->_body._del}); + } break; } } @@ -1877,10 +1770,8 @@ int main(void) { // Zenoh messages declare_message(); push_body_message(); - pull_message(); query_message(); err_message(); - ack_message(); reply_message(); // Network messages