Skip to content

Commit

Permalink
Add API and example for matching subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 8, 2025
1 parent 2514041 commit 0d7400c
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 194 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")

Expand Down
31 changes: 28 additions & 3 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/system/platform.h"

#if Z_FEATURE_PUBLICATION == 1

#if defined(Z_FEATURE_MATCHING)
void matching_status_handler(const z_matching_status_t *matching_status, void *arg) {
(void)arg;
if (matching_status->matching) {
printf("Publisher has matching subscribers.\n");
} else {
printf("Publisher has NO MORE matching subscribers.\n");
}
}
#endif

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-pub";
char *const default_value = "Pub from Pico!";
Expand All @@ -31,9 +41,10 @@ int main(int argc, char **argv) {
char *clocator = NULL;
char *llocator = NULL;
int n = 2147483647; // max int value by default
bool add_matching_listener = false;

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -53,6 +64,9 @@ int main(int argc, char **argv) {
case 'n':
n = atoi(optarg);
break;
case 'a':
add_matching_listener = true;
break;
case '?':
if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' ||
optopt == 'n') {
Expand Down Expand Up @@ -104,6 +118,17 @@ int main(int argc, char **argv) {
return -1;
}

if (add_matching_listener) {
#if defined(Z_FEATURE_MATCHING)
z_owned_closure_matching_status_t callback;
z_closure(&callback, matching_status_handler, NULL, NULL);
z_publisher_declare_background_matching_listener(z_loan(pub), z_move(callback));
#else
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n");
return -2;
#endif
}

// Publish data
printf("Press CTRL-C to quit...\n");
char buf[256];
Expand Down
436 changes: 247 additions & 189 deletions include/zenoh-pico/api/macros.h

Large diffs are not rendered by default.

61 changes: 60 additions & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1111,13 +1111,24 @@ z_result_t z_closure_zid(z_owned_closure_zid_t *closure, z_closure_zid_callback_
*/
void z_closure_zid_call(const z_loaned_closure_zid_t *closure, const z_id_t *id);

/**
* Calls a matching status closure.
*
* Parameters:
* closure: Pointer to the :c:type:`z_loaned_closure_matching_status_t` to call.
* status: Pointer to the :c:type:`z_matching_status_t` to pass to the closure.
*/
void z_closure_matching_status_call(const z_loaned_closure_matching_status_t *closure,
const z_matching_status_t *status);

/**************** Loans ****************/
_Z_OWNED_FUNCTIONS_DEF(string)
_Z_OWNED_FUNCTIONS_DEF(keyexpr)
_Z_OWNED_FUNCTIONS_DEF(config)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(session)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(subscriber)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(publisher)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(matching_listener)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(queryable)
_Z_OWNED_FUNCTIONS_DEF(hello)
_Z_OWNED_FUNCTIONS_DEF(reply)
Expand All @@ -1135,6 +1146,7 @@ _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_query)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_reply)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_hello)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_zid)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_matching_status)

_Z_VIEW_FUNCTIONS_DEF(keyexpr)
_Z_VIEW_FUNCTIONS_DEF(string)
Expand Down Expand Up @@ -1661,7 +1673,54 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher);
#endif

#if Z_FEATURE_MATCHING == 1
/**
* Declares a matching listener, registering a callback for notifying subscribers matching with a given publisher.
* The callback will be run in the background until the corresponding publisher is dropped.
*
* Parameters:
* publisher: A publisher to associate with matching listener.
* callback: A closure that will be called every time the matching status of the publisher changes (If last subscriber
* disconnects or when the first subscriber connects).
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher,
z_moved_closure_matching_status_t *callback);
/**
* Constructs matching listener, registering a callback for notifying subscribers matching with a given publisher.
*
* Parameters:
* publisher: A publisher to associate with matching listener.
* matching_listener: An uninitialized memory location where matching listener will be constructed. The matching
* listener's callback will be automatically dropped when the publisher is dropped. callback: A closure that will be
* called every time the matching status of the publisher changes (If last subscriber disconnects or when the first
* subscriber connects).
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback);
/**
* Gets publisher matching status - i.e. if there are any subscribers matching its key expression.
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status);
#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_PUBLICATION == 1

#if Z_FEATURE_QUERY == 1
/**
Expand Down
31 changes: 31 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/collections/slice.h"
#include "zenoh-pico/collections/string.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/net/matching.h"
#include "zenoh-pico/net/publish.h"
#include "zenoh-pico/net/query.h"
#include "zenoh-pico/net/reply.h"
Expand Down Expand Up @@ -110,6 +111,11 @@ _Z_OWNED_TYPE_VALUE(_z_subscriber_t, subscriber)
*/
_Z_OWNED_TYPE_VALUE(_z_publisher_t, publisher)

/**
* Represents a Zenoh Matching listener entity.
*/
_Z_OWNED_TYPE_VALUE(_z_matching_listener_t, matching_listener)

/**
* Represents a Zenoh Queryable entity.
*/
Expand All @@ -130,6 +136,16 @@ _Z_OWNED_TYPE_VALUE(_z_encoding_t, encoding)
*/
_Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)

#if defined(Z_FEATURE_UNSTABLE_API)
/**
* A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching
* Querier's key expression and target.
*/
typedef struct {
bool matching; // true if there exist matching Zenoh entities, false otherwise.
} z_matching_status_t;
#endif

/**
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
*/
Expand Down Expand Up @@ -476,6 +492,21 @@ typedef struct {
*/
_Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid)

#if defined(Z_FEATURE_UNSTABLE_API)
typedef void (*z_closure_matching_status_callback_t)(const z_matching_status_t *status, void *arg);

typedef struct {
void *context;
z_closure_matching_status_callback_t call;
z_closure_drop_callback_t drop;
} _z_closure_matching_status_t;

/**
* Represents the matching status callback closure.
*/
_Z_OWNED_TYPE_VALUE(_z_closure_matching_status_t, closure_matching_status)
#endif

#ifdef __cplusplus
}
#endif
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define Z_FEATURE_LOCAL_SUBSCRIBER 0
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
#define Z_FEATURE_BATCHING 1
#define Z_FEATURE_MATCHING 1
#define Z_FEATURE_RX_CACHE 0
#define Z_FEATURE_AUTO_RECONNECT 1
// End of CMake generation
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
#define Z_FEATURE_MATCHING @Z_FEATURE_MATCHING@
#define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@
#define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@
// End of CMake generation
Expand Down
59 changes: 59 additions & 0 deletions include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_NET_MATCHING_H
#define INCLUDE_ZENOH_PICO_NET_MATCHING_H

#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

#ifdef __cplusplus
extern "C" {
#endif

/**
* Return type when declaring a matching.
*/
typedef struct _z_matching_listener_t {
_z_keyexpr_t _key;
_z_session_weak_t _zn;
/*
_z_zint_t _id;
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
bool _is_express;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
#endif
*/
} _z_matching_listener_t;

#if Z_FEATURE_PUBLICATION == 1
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) {
return !_Z_RC_IS_NULL(&matching_listener->_zn);
}
void _z_matching_listener_clear(_z_matching_listener_t *pub);
void _z_matching_listener_free(_z_matching_listener_t **pub);
#endif

#ifdef __cplusplus
}
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_MATCHING_H */
51 changes: 50 additions & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ void z_closure_zid_call(const z_loaned_closure_zid_t *closure, const z_id_t *id)
}
}

void z_closure_matching_status_call(const z_loaned_closure_matching_status_t *closure,
const z_matching_status_t *status) {
if (closure->call != NULL) {
(closure->call)(status, closure->context);
}
}

bool _z_config_check(const _z_config_t *config) { return !_z_str_intmap_is_empty(config); }
_z_config_t _z_config_null(void) { return _z_str_intmap_make(); }
z_result_t _z_config_copy(_z_config_t *dst, const _z_config_t *src) {
Expand Down Expand Up @@ -570,6 +577,8 @@ _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_query, _z_closure_query_callback_t, z_cl
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_reply, _z_closure_reply_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_hello, z_closure_hello_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_zid, z_closure_zid_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, z_closure_matching_status_callback_t,
z_closure_drop_callback_t)

/************* Primitives **************/
typedef struct __z_hello_handler_wrapper_t {
Expand Down Expand Up @@ -1096,7 +1105,36 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
return (const z_loaned_keyexpr_t *)&publisher->_key;
}
#endif

#if Z_FEATURE_MATCHING == 1
z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher,
z_moved_closure_matching_status_t *callback) {
(void)publisher;
(void)callback;
// TODO(sashacmc): Implement
return _Z_RES_OK;
}

z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
(void)publisher;
(void)matching_listener;
(void)callback;
// TODO(sashacmc): Implement
return _Z_RES_OK;
}

z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
z_matching_status_t *matching_status) {
(void)publisher;
(void)matching_status;
// TODO(sashacmc): Implement
return _Z_RES_OK;
}
#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_PUBLICATION == 1

#if Z_FEATURE_QUERY == 1
bool _z_reply_check(const _z_reply_t *reply) {
Expand Down Expand Up @@ -1519,6 +1557,17 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) {
}
#endif

#ifdef Z_FEATURE_MATCHING
void _z_matching_listener_drop(_z_matching_listener_t *listener) {
//_z_undeclare_matching_listener(pub);
_z_matching_listener_clear(listener);
}

_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_matching_listener_t, matching_listener, _z_matching_listener_check,
_z_matching_listener_null, _z_matching_listener_drop)

#endif

/**************** Tasks ****************/
void zp_task_read_options_default(zp_task_read_options_t *options) {
#if Z_FEATURE_MULTI_THREAD == 1
Expand Down
Loading

0 comments on commit 0d7400c

Please sign in to comment.