diff --git a/.github/workflows/style.yaml b/.github/workflows/style.yaml index d20c770a..1581e8c7 100644 --- a/.github/workflows/style.yaml +++ b/.github/workflows/style.yaml @@ -19,6 +19,6 @@ jobs: steps: - uses: actions/checkout@v4 - name: uncrustify - run: /ros_entrypoint.sh ament_uncrustify --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp rmw_zenoh_cpp/ + run: /ros_entrypoint.sh ament_uncrustify rmw_zenoh_cpp/ --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp - name: cpplint - run: /ros_entrypoint.sh ament_cpplint --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp rmw_zenoh_cpp/ + run: /ros_entrypoint.sh ament_cpplint rmw_zenoh_cpp/ --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 7c4a27eb..89b5a598 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -22,10 +22,6 @@ find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) find_package(zenoh_c_vendor REQUIRED) -find_package(zenohc_debug QUIET) -if(NOT zenohc_debug_FOUND) - find_package(zenohc REQUIRED) -endif() add_library(rmw_zenoh_cpp SHARED src/detail/attachment_helpers.cpp diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 3ba55dfa..f55c8434 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -13,38 +13,104 @@ /// Which endpoints to connect to. E.g. tcp/localhost:7447. /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup. + /// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to: + /// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0 connect: { + /// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout) + /// Accepts a single value (e.g. timeout_ms: 0) + /// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }). + timeout_ms: { router: -1, peer: -1, client: 0 }, + + /// The list of endpoints to connect to. + /// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"]) + /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html endpoints: [ // "/
" ], + + /// Global connect configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000" + + /// exit from application, if timeout exceed + exit_on_failure: { router: false, peer: false, client: true }, + /// connect establishing retry configuration + retry: { + /// initial wait timeout until next connect try + period_init_ms: 1000, + /// maximum wait timeout until next connect try + period_max_ms: 4000, + /// increase factor for the next timeout until nexti connect try + period_increase_factor: 2, + }, }, - /// Which endpoints to listen on. E.g. tcp/localhost:7447. + /// Which endpoints to listen on. E.g. tcp/0.0.0.0:7447. /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers, /// peers, or client can use to establish a zenoh session. + /// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to: + /// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0 listen: { + /// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout) + /// Accepts a single value (e.g. timeout_ms: 0) + /// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }). + timeout_ms: 0, + + /// The list of endpoints to listen on. + /// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"]) + /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html endpoints: [ "tcp/[::]:7447" ], + + /// Global listen configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#exit_on_failure=false;retry_period_max_ms=1000" + + /// exit from application, if timeout exceed + exit_on_failure: true, + /// listen retry configuration + retry: { + /// initial wait timeout until next try + period_init_ms: 1000, + /// maximum wait timeout until next try + period_max_ms: 4000, + /// increase factor for the next timeout until next try + period_increase_factor: 2, + }, }, + /// Configure the scouting mechanisms and their behaviours scouting: { - /// In client mode, the period dedicated to scouting for a router before failing + /// In client mode, the period in milliseconds dedicated to scouting for a router before failing. timeout: 3000, - /// In peer mode, the period dedicated to scouting remote peers before attempting other operations - delay: 1, + /// In peer mode, the maximum period in milliseconds dedicated to scouting remote peers before attempting other operations. + delay: 500, /// The multicast scouting configuration. multicast: { /// Whether multicast scouting is enabled or not + /// + /// ROS setting: disable multicast discovery by default enabled: false, /// The socket which should be used for multicast scouting address: "224.0.0.224:7446", /// The network interface which should be used for multicast scouting interface: "auto", // If not set or set to "auto" the interface if picked automatically + /// The time-to-live on multicast scouting packets + ttl: 1, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast. - /// Accepts a single value or different values for router, peer and client. - /// Each value is bit-or-like combinations of "peer", "router" and "client". - autoconnect: { router: "", peer: "router|peer" }, + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) + /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Each value is a list of: "peer", "router" and/or "client". + autoconnect: { router: [], peer: ["router", "peer"] }, /// Whether or not to listen for scout messages on UDP multicast and reply to them. listen: true, }, @@ -52,16 +118,17 @@ gossip: { /// Whether gossip scouting is enabled or not enabled: true, - /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network. - /// When false, gossip scouting informations are only propagated to the next hop. + /// When true, gossip scouting information are propagated multiple hops to all nodes in the local network. + /// When false, gossip scouting information are only propagated to the next hop. /// Activating multihop gossip implies more scouting traffic and a lower scalability. /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have /// direct connectivity with each other. multihop: false, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. - /// Accepts a single value or different values for router, peer and client. - /// Each value is bit-or-like combinations of "peer", "router" and "client". - autoconnect: { router: "", peer: "router|peer" }, + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) + /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Each value is a list of: "peer", "router" and/or "client". + autoconnect: { router: [], peer: ["router", "peer"] }, }, }, @@ -69,9 +136,10 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - /// PublicationCache which is required for transient_local durability - /// only works when time-stamping is enabled. - enabled: { router: true, peer: true, client: false }, + /// + /// ROS setting: PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. + enabled: { router: true, peer: true, client: true }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. @@ -110,7 +178,116 @@ // ], // }, - /// Configure internal transport parameters + // /// The downsampling declaration. + // downsampling: [ + // { + // /// A list of network interfaces messages will be processed on, the rest will be passed as is. + // interfaces: [ "wlan0" ], + // /// Data flow messages will be processed on. ("egress" or "ingress") + // flow: "egress", + // /// A list of downsampling rules: key_expression and the maximum frequency in Hertz + // rules: [ + // { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 }, + // ], + // }, + // ], + + // /// Configure access control (ACL) rules + // access_control: { + // /// [true/false] acl will be activated only if this is set to true + // "enabled": false, + // /// [deny/allow] default permission is deny (even if this is left empty or not specified) + // "default_permission": "deny", + // /// Rule set for permissions allowing or denying access to key-expressions + // "rules": + // [ + // { + // /// Id has to be unique within the rule set + // "id": "rule1", + // "messages": [ + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", + // ], + // "flows":["egress","ingress"], + // "permission": "allow", + // "key_exprs": [ + // "test/demo" + // ], + // }, + // { + // "id": "rule2", + // "messages": [ + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", + // ], + // "flows":["ingress"], + // "permission": "allow", + // "key_exprs": [ + // "**" + // ], + // }, + // ], + // /// List of combinations of subjects. + // /// + // /// If a subject property (i.e. username, certificate common name or interface) is empty + // /// it is interpreted as a wildcard. Moreover, a subject property cannot be an empty list. + // "subjects": + // [ + // { + // /// Id has to be unique within the subjects list + // "id": "subject1", + // /// Subjects can be interfaces + // "interfaces": [ + // "lo0", + // "en0", + // ], + // /// Subjects can be cert_common_names when using TLS or Quic + // "cert_common_names": [ + // "example.zenoh.io" + // ], + // /// Subjects can be usernames when using user/password authentication + // "usernames": [ + // "zenoh-example" + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example.zenoh.io" && username="zenoh-example") || + // /// (interface="en0" && cert_common_name="example.zenoh.io" && username="zenoh-example") + // }, + // { + // "id": "subject2", + // "interfaces": [ + // "lo0", + // "en0", + // ], + // "cert_common_names": [ + // "example2.zenoh.io" + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example2.zenoh.io") || + // /// (interface="en0" && cert_common_name="example2.zenoh.io") + // }, + // { + // "id": "subject3", + // /// An empty subject combination is a wildcard + // }, + // ], + // /// The policies list associates rules to subjects + // "policies": + // [ + // /// Each policy associates one or multiple rules to one or multiple subject combinations + // { + // /// Rules and Subjects are identified with their unique IDs declared above + // "rules": ["rule1"], + // "subjects": ["subject1", "subject2"], + // }, + // { + // "rules": ["rule2"], + // "subjects": ["subject3"], + // }, + // ] + //}, + + /// Configure internal transport parameters transport: { unicast: { /// Timeout in milliseconds when opening a link @@ -128,6 +305,8 @@ /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to /// enable 'lowlatency' you need to explicitly disable 'qos'. + /// NOTE: LowLatency transport does not support the fragmentation, so the message size should be + /// smaller than the tx batch_size. lowlatency: false, /// Enables QoS on unicast communications. qos: { @@ -140,12 +319,34 @@ enabled: false, }, }, + /// WARNING: multicast communication does not perform any negotiation upon group joining. + /// Because of that, it is important that all transport parameters are the same to make + /// sure all your nodes in the system can communicate. One common parameter to configure + /// is "transport/link/tx/batch_size" since its default value depends on the actual platform + /// when operating on multicast. + /// E.g., the batch size on Linux and Windows is 65535 bytes, on Mac OS X is 9216, and anything else is 8192. + multicast: { + /// JOIN message transmission interval in milliseconds. + join_interval: 2500, + /// Maximum number of multicast sessions. + max_sessions: 1000, + /// Enables QoS on multicast communication. + /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. + qos: { + enabled: false, + }, + /// Enables compression on multicast communication. + /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. + compression: { + enabled: false, + }, + }, link: { - /// An optional whitelist of protocols to be used for accepting and opening sessions. - /// If not configured, all the supported protocols are automatically whitelisted. - /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] - /// For example, to only enable "tls" and "quic": - // protocols: ["tls", "quic"], + /// An optional whitelist of protocols to be used for accepting and opening sessions. If not + /// configured, all the supported protocols are automatically whitelisted. The supported + /// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For + /// example, to only enable "tls" and "quic": protocols: ["tls", "quic"], + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. @@ -157,8 +358,10 @@ /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive /// messages will be sent at the configured time interval. /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, - /// set the actual keep_alive timeout to one fourth of the lease time. - /// This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity + /// set the actual keep_alive interval to one fourth of the lease time: i.e. send + /// 4 keep_alive messages in a lease period. Changing the lease time will have the + /// keep_alive messages sent more or less often. + /// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity /// check which considers a link as failed when no messages are received in 3.5 times the /// target interval. keep_alive: 4, @@ -169,6 +372,7 @@ /// Each zenoh link has a transmission queue that can be configured queue: { /// The size of each priority queue indicates the number of batches a given queue can contain. + /// NOTE: the number of batches in each priority must be included between 1 and 16. Different values will result in an error. /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU. @@ -183,19 +387,37 @@ data_low: 4, background: 4, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: 100, + /// Congestion occurs when the queue is empty (no available batch). + congestion_control: { + /// Behavior pushing CongestionControl::Drop messages to the queue. + drop: { + /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. + wait_before_drop: 1000, + }, + /// Behavior pushing CongestionControl::Block messages to the queue. + block: { + /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message + /// if still no batch is available. + wait_before_close: 5000000, + }, + }, + /// Perform batching of messages if they are smaller of the batch_size + batching: { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. + enabled: true, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: 1, + }, }, - // Number of threads dedicated to transmission - // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) - // threads: 1, }, /// Configure the zenoh RX parameters of a link rx: { /// Receiving buffer size in bytes for each link - /// The default the rx_buffer_size value is the same as the default batch size: 65335. - /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate + /// The default the rx_buffer_size value is the same as the default batch size: 65535. + /// For very high throughput scenarios, the rx_buffer_size can be increased to accommodate /// more in-flight data. This is particularly relevant when dealing with large messages. /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. buffer_size: 65535, @@ -211,29 +433,36 @@ /// or the client's keys and certificates, depending on the node's mode. If not specified /// on router mode then the default WebPKI certificates are used instead. root_ca_certificate: null, - /// Path to the TLS server private key - server_private_key: null, - /// Path to the TLS server public certificate - server_certificate: null, - /// Client authentication, if true enables mTLS (mutual authentication) - client_auth: false, - /// Path to the TLS client private key - client_private_key: null, - /// Path to the TLS client public certificate - client_certificate: null, - // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers. + /// Path to the TLS listening side private key + listen_private_key: null, + /// Path to the TLS listening side public certificate + listen_certificate: null, + /// Enables mTLS (mutual authentication), client authentication + enable_mtls: false, + /// Path to the TLS connecting side private key + connect_private_key: null, + /// Path to the TLS connecting side certificate + connect_certificate: null, + // Whether or not to verify the matching between hostname/dns and certificate when connecting, + // if set to false zenoh will disregard the common names of the certificates when verifying servers. // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your // ca to verify that the server at baz.com is actually baz.com, let this be true (default). - server_name_verification: null, + verify_name_on_connect: true, }, }, - /// Shared memory configuration + /// Shared memory configuration. + /// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise + /// settings in this section have no effect. shared_memory: { + /// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate + /// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + /// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + /// + /// ROS setting: disabled by default until fully tested enabled: false, }, - /// Access control configuration auth: { - /// The configuration of authentification. + /// The configuration of authentication. /// A password implies a username is required. usrpwd: { user: null, @@ -255,7 +484,9 @@ /// Configure the Admin Space /// Unstable: this configuration part works as advertised, but may change in a future release adminspace: { - // read and/or write permissions on the admin space + /// Enables the admin space + enabled: true, + /// read and/or write permissions on the admin space permissions: { read: true, write: false, diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 6c0ec556..daffd790 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -13,41 +13,109 @@ /// Which endpoints to connect to. E.g. tcp/localhost:7447. /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup. - /// ROS setting: By default connect to the Zenoh router on localhost on port 7447. + /// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to: + /// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0 connect: { + /// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout) + /// Accepts a single value (e.g. timeout_ms: 0) + /// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }). + timeout_ms: { router: -1, peer: -1, client: 0 }, + + /// The list of endpoints to connect to. + /// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"]) + /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html + /// + /// ROS setting: By default connect to the Zenoh router on localhost on port 7447. endpoints: [ - "tcp/localhost:7447", + "tcp/localhost:7447" ], + + /// Global connect configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000" + + /// exit from application, if timeout exceed + exit_on_failure: { router: false, peer: false, client: true }, + /// connect establishing retry configuration + retry: { + /// initial wait timeout until next connect try + period_init_ms: 1000, + /// maximum wait timeout until next connect try + period_max_ms: 4000, + /// increase factor for the next timeout until nexti connect try + period_increase_factor: 2, + }, }, - /// Which endpoints to listen on. E.g. tcp/localhost:7447. + /// Which endpoints to listen on. E.g. tcp/0.0.0.0:7447. /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers, /// peers, or client can use to establish a zenoh session. - /// ROS setting: By default accept incoming connections only from localhost (i.e. from colocalized Nodes). - /// All communications with other hosts are routed by the Zenoh router. + /// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to: + /// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0 listen: { + /// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout) + /// Accepts a single value (e.g. timeout_ms: 0) + /// or different values for router, peer and client (e.g. timeout_ms: { router: -1, peer: -1, client: 0 }). + timeout_ms: 0, + + /// The list of endpoints to listen on. + /// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"]) + /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html + /// + /// ROS setting: By default accept incoming connections only from localhost (i.e. from colocalized Nodes). + /// All communications with other hosts are routed by the Zenoh router. endpoints: [ - "tcp/localhost:0", + "tcp/localhost:0" ], + + /// Global listen configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#exit_on_failure=false;retry_period_max_ms=1000" + + /// exit from application, if timeout exceed + exit_on_failure: true, + /// listen retry configuration + retry: { + /// initial wait timeout until next try + period_init_ms: 1000, + /// maximum wait timeout until next try + period_max_ms: 4000, + /// increase factor for the next timeout until next try + period_increase_factor: 2, + }, }, + /// Configure the scouting mechanisms and their behaviours scouting: { - /// In client mode, the period dedicated to scouting for a router before failing + /// In client mode, the period in milliseconds dedicated to scouting for a router before failing. timeout: 3000, - /// In peer mode, the period dedicated to scouting remote peers before attempting other operations - delay: 1, + /// In peer mode, the maximum period in milliseconds dedicated to scouting remote peers before attempting other operations. + delay: 500, /// The multicast scouting configuration. multicast: { /// Whether multicast scouting is enabled or not + /// + /// ROS setting: disable multicast discovery by default enabled: false, /// The socket which should be used for multicast scouting address: "224.0.0.224:7446", /// The network interface which should be used for multicast scouting interface: "auto", // If not set or set to "auto" the interface if picked automatically + /// The time-to-live on multicast scouting packets + ttl: 1, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast. - /// Accepts a single value or different values for router, peer and client. - /// Each value is bit-or-like combinations of "peer", "router" and "client". - autoconnect: { router: "", peer: "router|peer" }, + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) + /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Each value is a list of: "peer", "router" and/or "client". + autoconnect: { router: [], peer: ["router", "peer"] }, /// Whether or not to listen for scout messages on UDP multicast and reply to them. listen: true, }, @@ -55,16 +123,17 @@ gossip: { /// Whether gossip scouting is enabled or not enabled: true, - /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network. - /// When false, gossip scouting informations are only propagated to the next hop. + /// When true, gossip scouting information are propagated multiple hops to all nodes in the local network. + /// When false, gossip scouting information are only propagated to the next hop. /// Activating multihop gossip implies more scouting traffic and a lower scalability. /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have /// direct connectivity with each other. multihop: false, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. - /// Accepts a single value or different values for router, peer and client. - /// Each value is bit-or-like combinations of "peer", "router" and "client". - autoconnect: { router: "", peer: "router|peer" }, + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) + /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Each value is a list of: "peer", "router" and/or "client". + autoconnect: { router: [], peer: ["router", "peer"] }, }, }, @@ -72,9 +141,10 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - /// PublicationCache which is required for transient_local durability - /// only works when time-stamping is enabled. - enabled: { router: true, peer: true, client: false }, + /// + /// ROS setting: PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. + enabled: { router: true, peer: true, client: true }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. @@ -113,7 +183,116 @@ // ], // }, - /// Configure internal transport parameters + // /// The downsampling declaration. + // downsampling: [ + // { + // /// A list of network interfaces messages will be processed on, the rest will be passed as is. + // interfaces: [ "wlan0" ], + // /// Data flow messages will be processed on. ("egress" or "ingress") + // flow: "egress", + // /// A list of downsampling rules: key_expression and the maximum frequency in Hertz + // rules: [ + // { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 }, + // ], + // }, + // ], + + // /// Configure access control (ACL) rules + // access_control: { + // /// [true/false] acl will be activated only if this is set to true + // "enabled": false, + // /// [deny/allow] default permission is deny (even if this is left empty or not specified) + // "default_permission": "deny", + // /// Rule set for permissions allowing or denying access to key-expressions + // "rules": + // [ + // { + // /// Id has to be unique within the rule set + // "id": "rule1", + // "messages": [ + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", + // ], + // "flows":["egress","ingress"], + // "permission": "allow", + // "key_exprs": [ + // "test/demo" + // ], + // }, + // { + // "id": "rule2", + // "messages": [ + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", + // ], + // "flows":["ingress"], + // "permission": "allow", + // "key_exprs": [ + // "**" + // ], + // }, + // ], + // /// List of combinations of subjects. + // /// + // /// If a subject property (i.e. username, certificate common name or interface) is empty + // /// it is interpreted as a wildcard. Moreover, a subject property cannot be an empty list. + // "subjects": + // [ + // { + // /// Id has to be unique within the subjects list + // "id": "subject1", + // /// Subjects can be interfaces + // "interfaces": [ + // "lo0", + // "en0", + // ], + // /// Subjects can be cert_common_names when using TLS or Quic + // "cert_common_names": [ + // "example.zenoh.io" + // ], + // /// Subjects can be usernames when using user/password authentication + // "usernames": [ + // "zenoh-example" + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example.zenoh.io" && username="zenoh-example") || + // /// (interface="en0" && cert_common_name="example.zenoh.io" && username="zenoh-example") + // }, + // { + // "id": "subject2", + // "interfaces": [ + // "lo0", + // "en0", + // ], + // "cert_common_names": [ + // "example2.zenoh.io" + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example2.zenoh.io") || + // /// (interface="en0" && cert_common_name="example2.zenoh.io") + // }, + // { + // "id": "subject3", + // /// An empty subject combination is a wildcard + // }, + // ], + // /// The policies list associates rules to subjects + // "policies": + // [ + // /// Each policy associates one or multiple rules to one or multiple subject combinations + // { + // /// Rules and Subjects are identified with their unique IDs declared above + // "rules": ["rule1"], + // "subjects": ["subject1", "subject2"], + // }, + // { + // "rules": ["rule2"], + // "subjects": ["subject3"], + // }, + // ] + //}, + + /// Configure internal transport parameters transport: { unicast: { /// Timeout in milliseconds when opening a link @@ -131,6 +310,8 @@ /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to /// enable 'lowlatency' you need to explicitly disable 'qos'. + /// NOTE: LowLatency transport does not support the fragmentation, so the message size should be + /// smaller than the tx batch_size. lowlatency: false, /// Enables QoS on unicast communications. qos: { @@ -143,12 +324,34 @@ enabled: false, }, }, + /// WARNING: multicast communication does not perform any negotiation upon group joining. + /// Because of that, it is important that all transport parameters are the same to make + /// sure all your nodes in the system can communicate. One common parameter to configure + /// is "transport/link/tx/batch_size" since its default value depends on the actual platform + /// when operating on multicast. + /// E.g., the batch size on Linux and Windows is 65535 bytes, on Mac OS X is 9216, and anything else is 8192. + multicast: { + /// JOIN message transmission interval in milliseconds. + join_interval: 2500, + /// Maximum number of multicast sessions. + max_sessions: 1000, + /// Enables QoS on multicast communication. + /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. + qos: { + enabled: false, + }, + /// Enables compression on multicast communication. + /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. + compression: { + enabled: false, + }, + }, link: { - /// An optional whitelist of protocols to be used for accepting and opening sessions. - /// If not configured, all the supported protocols are automatically whitelisted. - /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] - /// For example, to only enable "tls" and "quic": - // protocols: ["tls", "quic"], + /// An optional whitelist of protocols to be used for accepting and opening sessions. If not + /// configured, all the supported protocols are automatically whitelisted. The supported + /// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For + /// example, to only enable "tls" and "quic": protocols: ["tls", "quic"], + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. @@ -160,8 +363,10 @@ /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive /// messages will be sent at the configured time interval. /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, - /// set the actual keep_alive timeout to one fourth of the lease time. - /// This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity + /// set the actual keep_alive interval to one fourth of the lease time: i.e. send + /// 4 keep_alive messages in a lease period. Changing the lease time will have the + /// keep_alive messages sent more or less often. + /// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity /// check which considers a link as failed when no messages are received in 3.5 times the /// target interval. keep_alive: 4, @@ -172,6 +377,7 @@ /// Each zenoh link has a transmission queue that can be configured queue: { /// The size of each priority queue indicates the number of batches a given queue can contain. + /// NOTE: the number of batches in each priority must be included between 1 and 16. Different values will result in an error. /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU. @@ -186,19 +392,37 @@ data_low: 4, background: 4, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: 100, + /// Congestion occurs when the queue is empty (no available batch). + congestion_control: { + /// Behavior pushing CongestionControl::Drop messages to the queue. + drop: { + /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. + wait_before_drop: 1000, + }, + /// Behavior pushing CongestionControl::Block messages to the queue. + block: { + /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message + /// if still no batch is available. + wait_before_close: 5000000, + }, + }, + /// Perform batching of messages if they are smaller of the batch_size + batching: { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. + enabled: true, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: 1, + }, }, - // Number of threads dedicated to transmission - // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) - // threads: 1, }, /// Configure the zenoh RX parameters of a link rx: { /// Receiving buffer size in bytes for each link - /// The default the rx_buffer_size value is the same as the default batch size: 65335. - /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate + /// The default the rx_buffer_size value is the same as the default batch size: 65535. + /// For very high throughput scenarios, the rx_buffer_size can be increased to accommodate /// more in-flight data. This is particularly relevant when dealing with large messages. /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. buffer_size: 65535, @@ -214,29 +438,36 @@ /// or the client's keys and certificates, depending on the node's mode. If not specified /// on router mode then the default WebPKI certificates are used instead. root_ca_certificate: null, - /// Path to the TLS server private key - server_private_key: null, - /// Path to the TLS server public certificate - server_certificate: null, - /// Client authentication, if true enables mTLS (mutual authentication) - client_auth: false, - /// Path to the TLS client private key - client_private_key: null, - /// Path to the TLS client public certificate - client_certificate: null, - // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers. + /// Path to the TLS listening side private key + listen_private_key: null, + /// Path to the TLS listening side public certificate + listen_certificate: null, + /// Enables mTLS (mutual authentication), client authentication + enable_mtls: false, + /// Path to the TLS connecting side private key + connect_private_key: null, + /// Path to the TLS connecting side certificate + connect_certificate: null, + // Whether or not to verify the matching between hostname/dns and certificate when connecting, + // if set to false zenoh will disregard the common names of the certificates when verifying servers. // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your // ca to verify that the server at baz.com is actually baz.com, let this be true (default). - server_name_verification: null, + verify_name_on_connect: true, }, }, - /// Shared memory configuration + /// Shared memory configuration. + /// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise + /// settings in this section have no effect. shared_memory: { + /// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate + /// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + /// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + /// + /// ROS setting: disabled by default until fully tested enabled: false, }, - /// Access control configuration auth: { - /// The configuration of authentification. + /// The configuration of authentication. /// A password implies a username is required. usrpwd: { user: null, @@ -258,10 +489,13 @@ /// Configure the Admin Space /// Unstable: this configuration part works as advertised, but may change in a future release adminspace: { - // read and/or write permissions on the admin space + /// Enables the admin space + enabled: true, + /// read and/or write permissions on the admin space permissions: { read: true, write: false, }, }, + } diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 7733c918..4fc96805 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -16,7 +16,9 @@ #include #include -#include +#include +#include +#include #include "rmw/types.h" @@ -24,75 +26,82 @@ namespace rmw_zenoh_cpp { -//============================================================================== -bool get_gid_from_attachment( - const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - if (!z_check(*attachment)) { - return false; - } - z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("source_gid")); - if (!z_check(index)) { - return false; - } - - if (index.len != RMW_GID_STORAGE_SIZE) { - return false; - } +attachment_data_t::attachment_data_t( + const int64_t _sequence_number, + const int64_t _source_timestamp, + const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]) +{ + sequence_number = _sequence_number; + source_timestamp = _source_timestamp; + memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE); +} - memcpy(gid, index.start, index.len); +attachment_data_t::attachment_data_t(attachment_data_t && data) +{ + sequence_number = std::move(data.sequence_number); + source_timestamp = std::move(data.source_timestamp); + memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE); +} - return true; +void attachment_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) +{ + ze_owned_serializer_t serializer; + ze_serializer_empty(&serializer); + ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); + ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE); + ze_serializer_finish(z_move(serializer), attachment); } -//============================================================================== -int64_t get_int64_from_attachment( - const z_attachment_t * const attachment, const std::string & name) +attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) { - if (!z_check(*attachment)) { - // A valid request must have had an attachment - return -1; + ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); + z_owned_string_t key; + + // Deserialize the sequence_number + ze_deserializer_deserialize_string(&deserializer, &key); + if (std::string_view( + z_string_data(z_loan(key)), + z_string_len(z_loan(key))) != "sequence_number") + { + throw std::runtime_error("sequence_number is not found in the attachment."); } - - z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str())); - if (!z_check(index)) { - return -1; + z_drop(z_move(key)); + if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) { + throw std::runtime_error("Failed to deserialize the sequence_number."); } - if (index.len < 1) { - return -1; + // Deserialize the source_timestamp + ze_deserializer_deserialize_string(&deserializer, &key); + if (std::string_view( + z_string_data(z_loan(key)), + z_string_len(z_loan(key))) != "source_timestamp") + { + throw std::runtime_error("source_timestamp is not found in the attachment"); } - - if (index.len > 19) { - // The number was larger than we expected - return -1; + z_drop(z_move(key)); + if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) { + throw std::runtime_error("Failed to deserialize the source_timestamp."); } - // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. - // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char int64_str[20]; - - memcpy(int64_str, index.start, index.len); - int64_str[index.len] = '\0'; - - errno = 0; - char * endptr; - int64_t num = strtol(int64_str, &endptr, 10); - if (num == 0) { - // This is an error regardless; the client should never send this - return -1; - } else if (endptr == int64_str) { - // No values were converted, this is an error - return -1; - } else if (*endptr != '\0') { - // There was junk after the number - return -1; - } else if (errno != 0) { - // Some other error occurred, which may include overflow or underflow - return -1; + // Deserialize the source_gid + ze_deserializer_deserialize_string(&deserializer, &key); + if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") { + throw std::runtime_error("Invalid attachment: the key source_gid is not found"); } - - return num; + z_drop(z_move(key)); + z_owned_slice_t slice; + if (ze_deserializer_deserialize_slice(&deserializer, &slice)) { + throw std::runtime_error("Failed to deserialize the source_gid."); + } + if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) { + throw std::runtime_error("The length of source_gid mismatched."); + } + memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); + z_drop(z_move(slice)); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 034b6a49..7d7af015 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -17,19 +17,27 @@ #include -#include - #include "rmw/types.h" namespace rmw_zenoh_cpp { -//============================================================================== -bool get_gid_from_attachment( - const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]); -//============================================================================== -int64_t get_int64_from_attachment( - const z_attachment_t * const attachment, const std::string & name); +class attachment_data_t final +{ +public: + explicit attachment_data_t( + const int64_t _sequence_number, + const int64_t _source_timestamp, + const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); + explicit attachment_data_t(const z_loaned_bytes_t *); + explicit attachment_data_t(attachment_data_t && data); + + int64_t sequence_number; + int64_t source_timestamp; + uint8_t source_gid[RMW_GID_STORAGE_SIZE]; + + void serialize_to_zbytes(z_owned_bytes_t *); +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ATTACHMENT_HELPERS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 776d343d..b622e06d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -612,9 +612,10 @@ void GraphCache::parse_del( if (entity->type() == EntityType::Node) { // Node - // The liveliness tokens to remove pub/subs should be received before the one to remove a node - // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in - // the node below, we should update the count in graph_topics_. + // When destroying a node, Zenoh does not guarantee that liveliness tokens to remove pub/subs + // arrive before the one to remove the node from the graph despite un-registering those entities + // earlier. In such scenarios, if we find any pub/subs present in the node, we reduce their + // counts in graph_topics_. const GraphNodePtr graph_node = node_it->second; if (!graph_node->pubs_.empty() || !graph_node->subs_.empty() || diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index c3c232c4..52c8ec7f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -374,14 +374,11 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) ///============================================================================= std::string zid_to_str(const z_id_t & id) { - std::stringstream ss; - ss << std::hex; - for (std::size_t i = 0; i < sizeof(id.id); i++) { - // By Zenoh convention a z_id_t is a little endian u128. - const std::size_t le_idx = sizeof(id.id) - 1 - i; - ss << static_cast(id.id[le_idx]); - } - return ss.str(); + z_owned_string_t z_str; + z_id_to_string(&id, &z_str); + std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str))); + z_drop(z_move(z_str)); + return str; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 7bfe4c16..588419ab 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -44,7 +44,7 @@ namespace { ///============================================================================= -void client_data_handler(z_owned_reply_t * reply, void * data) +void client_data_handler(z_loaned_reply_t * reply, void * data) { auto client_data = static_cast(data); if (client_data == nullptr) { @@ -59,21 +59,20 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - if (!z_reply_check(reply)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } if (!z_reply_is_ok(reply)) { - z_value_t err = z_reply_err(reply); + const z_loaned_reply_err_t * err = z_reply_err(reply); + const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); + + z_owned_string_t err_str; + z_bytes_to_string(err_payload, &err_str); + RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", client_data->topic_info().topic_keyexpr_.c_str(), - (int)err.payload.len, - err.payload.start); + static_cast(z_string_len(z_loan(err_str))), + z_string_data(z_loan(err_str))); + z_drop(z_move(err_str)); return; } @@ -81,11 +80,10 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + z_owned_reply_t owned_reply; + z_reply_clone(&owned_reply, reply); client_data->add_new_reply( std::make_unique(reply, received_timestamp)); - - // Since we took ownership of the reply, null it out here - *reply = z_reply_null(); } ///============================================================================= @@ -109,7 +107,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ClientData::make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -171,7 +169,7 @@ std::shared_ptr ClientData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Client, @@ -196,6 +194,7 @@ std::shared_ptr ClientData::make( node, client, entity, + session, request_members, response_members, request_type_support, @@ -207,6 +206,7 @@ std::shared_ptr ClientData::make( return nullptr; } + client_data->initialized_ = true; return client_data; } @@ -215,6 +215,7 @@ ClientData::ClientData( const rmw_node_t * rmw_node, const rmw_client_t * rmw_client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, @@ -222,6 +223,7 @@ ClientData::ClientData( : rmw_node_(rmw_node), rmw_client_(rmw_client), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(request_type_support), @@ -229,35 +231,35 @@ ClientData::ClientData( wait_set_data_(nullptr), sequence_number_(1), is_shutdown_(false), + initialized_(false), num_in_flight_(0) { // Do nothing. } ///============================================================================= -bool ClientData::init(z_session_t session) +bool ClientData::init(std::shared_ptr session) { - this->keyexpr_ = - z_keyexpr_new(this->entity_->topic_info().value().topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->keyexpr_)); - }); - if (!z_check(z_loan(this->keyexpr_))) { + std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; + if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } - - this->token_ = zc_liveliness_declare_token( - session, - z_keyexpr(this->entity_->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( + auto free_ros_keyexpr = rcpputils::make_scope_exit( [this]() { - z_drop(z_move(this->token_)); + z_drop(z_move(this->keyexpr_)); }); - if (!z_check(this->token_)) { + + std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + if (z_liveliness_declare_token( + session->loan(), + &this->token_, + z_loan(liveliness_ke), + NULL + ) != Z_OK) + { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); @@ -265,8 +267,6 @@ bool ClientData::init(z_session_t session) } free_ros_keyexpr.cancel(); - free_token.cancel(); - return true; } @@ -277,13 +277,6 @@ liveliness::TopicInfo ClientData::topic_info() const return entity_->topic_info().value(); } -///============================================================================= -bool ClientData::liveliness_is_valid() const -{ - std::lock_guard lock(mutex_); - return zc_liveliness_token_check(&token_); -} - ///============================================================================= void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { @@ -301,14 +294,15 @@ void ClientData::add_new_reply(std::unique_ptr reply) reply_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " - "for client for %s", + "for client for %.*s", adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); + static_cast(z_string_len(z_loan(keystr))), + z_string_data(z_loan(keystr))); reply_queue_.pop_front(); } reply_queue_.emplace_back(std::move(reply)); @@ -338,16 +332,18 @@ rmw_ret_t ClientData::take_response( std::unique_ptr latest_reply = std::move(reply_queue_.front()); reply_queue_.pop_front(); - std::optional sample = latest_reply->get_sample(); - if (!sample) { + if (!latest_reply->get_sample().has_value()) { RMW_SET_ERROR_MSG("invalid reply sample"); return RMW_RET_ERROR; } + const z_loaned_sample_t * sample = latest_reply->get_sample().value(); // Object that manages the raw buffer + z_owned_slice_t payload; + z_bytes_to_slice(z_sample_payload(sample), &payload); eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample->payload.start)), - sample->payload.len); + reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), + z_slice_len(z_loan(payload))); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -361,28 +357,21 @@ rmw_ret_t ClientData::take_response( } // Fill in the request_header - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); + rmw_zenoh_cpp::attachment_data_t attachment(z_sample_attachment(sample)); + request_header->request_id.sequence_number = attachment.sequence_number; if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); + request_header->source_timestamp = attachment.source_timestamp; if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &sample->attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - + memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); request_header->received_timestamp = latest_reply->get_received_timestamp(); + z_drop(z_move(payload)); *taken = true; return RMW_RET_OK; @@ -403,6 +392,7 @@ rmw_ret_t ClientData::send_request( if (context_impl == nullptr) { return RMW_RET_INVALID_ARGUMENT; } + sess_ = context_impl->session(); size_t max_data_length = ( request_type_support_->get_estimated_serialized_size( @@ -436,28 +426,16 @@ rmw_ret_t ClientData::send_request( *sequence_id = sequence_number_++; // Send request - z_get_options_t opts = z_get_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num( - *sequence_id, - [this](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto always_free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); + z_get_options_t opts; + z_get_options_default(&opts); + z_owned_bytes_t attachment; + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + rmw_zenoh_cpp::create_map_and_set_sequence_num( + &attachment, *sequence_id, + local_gid); + opts.attachment = z_move(attachment); - opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within // this window, the queryable will return an invalid reply. However, it is common for actions, @@ -468,14 +446,19 @@ rmw_ret_t ClientData::send_request( // which optimizes bandwidth. The default is "None", which imples replies may come in any order // and any number. opts.consolidation = z_query_consolidation_latest(); - opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + + z_owned_bytes_t payload; + z_bytes_copy_from_buf( + &payload, reinterpret_cast(request_bytes), data_length); + opts.payload = z_move(payload); + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. num_in_flight_++; - z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, client_data_drop, this); + z_owned_closure_reply_t zn_closure_reply; + z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); z_get( - context_impl->session(), + sess_->loan(), z_loan(keyexpr_), "", z_move(zn_closure_reply), &opts); @@ -530,19 +513,18 @@ bool ClientData::detach_condition_and_queue_is_empty() ///============================================================================= void ClientData::_shutdown() { - if (is_shutdown_) { + if (is_shutdown_ || !initialized_) { return; } // Unregister this node from the ROS graph. - if (zc_liveliness_token_check(&token_)) { - zc_liveliness_undeclare_token(z_move(token_)); - } - if (z_check(z_loan(keyexpr_))) { - z_drop(z_move(keyexpr_)); - } + z_liveliness_undeclare_token(z_move(token_)); + + z_drop(z_move(keyexpr_)); + sess_.reset(); is_shutdown_ = true; + initialized_ = false; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index eda4bf9c..6bb4c3d8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this public: // Make a shared_ptr of ClientData. static std::shared_ptr make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -60,9 +60,6 @@ class ClientData final : public std::enable_shared_from_this // Get a copy of the TopicInfo of this ClientData. liveliness::TopicInfo topic_info() const; - // Returns true if liveliness token is still valid. - bool liveliness_is_valid() const; - // Copy the GID of this ClientData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; @@ -113,13 +110,14 @@ class ClientData final : public std::enable_shared_from_this const rmw_node_t * rmw_node, const rmw_client_t * client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support); // Initialize the Zenoh objects for this entity. - bool init(z_session_t session); + bool init(std::shared_ptr session); // Shutdown this client (the mutex is expected to be held by the caller). void _shutdown(); @@ -131,10 +129,12 @@ class ClientData final : public std::enable_shared_from_this const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned keyexpression. z_owned_keyexpr_t keyexpr_; // Liveliness token for the service. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; @@ -150,6 +150,8 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 50b29038..cae27896 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -36,6 +36,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "zenoh_utils.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration @@ -48,7 +49,7 @@ static std::mutex data_to_data_shared_ptr_map_mutex; static std::unordered_map> data_to_data_shared_ptr_map; -static void graph_sub_data_handler(const z_sample_t * sample, void * data); +static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data); // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing @@ -78,20 +79,26 @@ class rmw_context_impl_s::Data final } // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + z_owned_string_t shm_enabled; + zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); auto always_free_shm_enabled = rcpputils::make_scope_exit( [&shm_enabled]() { z_drop(z_move(shm_enabled)); }); // Initialize the zenoh session. - session_ = z_open(z_move(config)); - if (!z_session_check(&session_)) { + z_owned_session_t raw_session; + if (z_open(&raw_session, z_move(config), NULL) != Z_OK) { + RMW_SET_ERROR_MSG("Error setting up zenoh session."); throw std::runtime_error("Error setting up zenoh session."); } + if (session_ != nullptr) { + session_.reset(); + } + session_ = std::make_shared(raw_session); auto close_session = rcpputils::make_scope_exit( - [this]() { - z_close(z_move(session_)); + [&raw_session]() { + z_close(z_loan_mut(raw_session), NULL); }); // Verify if the zenoh router is running if configured. @@ -101,7 +108,7 @@ class rmw_context_impl_s::Data final uint64_t connection_attempts = 0; constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) { if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -116,14 +123,14 @@ class rmw_context_impl_s::Data final } // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session_)); + const z_id_t zid = z_info_zid(session_->loan()); graph_cache_ = std::make_shared(zid); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); // Query router/liveliness participants to get graph information before the session was started. // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // replies for the z_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. @@ -136,60 +143,56 @@ class rmw_context_impl_s::Data final // the code will be simpler, and if we're just going to spin over the non-blocking // reads until we obtain responses, we'll just be hogging CPU time by convincing // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(session_), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) - { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t closure; + z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); + + z_view_keyexpr_t keyexpr; + z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); + z_liveliness_get( + session_->loan(), z_loan(keyexpr), + z_move(closure), NULL); + z_owned_reply_t reply; + while (z_recv(z_loan(handler), &reply) == Z_OK) { + if (z_reply_is_ok(z_loan(reply))) { + const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); + std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); // Ignore tokens from the same session to avoid race conditions from this // query and the liveliness subscription. - graph_cache_->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); + graph_cache_->parse_put(std::move(liveliness_str), true); } else { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); } + z_drop(z_move(reply)); } - z_drop(z_move(reply)); - z_drop(z_move(channel)); + z_drop(z_move(handler)); // Initialize the shm manager if shared_memory is enabled in the config. - shm_manager_ = std::nullopt; - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) + shm_provider_ = std::nullopt; + if (strncmp( + z_string_data(z_loan(shm_enabled)), + "true", + z_string_len(z_loan(shm_enabled))) == 0) { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); - } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - shm_manager_ = - zc_shm_manager_new( - z_loan(session_), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!shm_manager_.has_value() || - !zc_shm_manager_check(&shm_manager_.value())) - { - throw std::runtime_error("Unable to create shm manager."); + // TODO(yuyuan): determine the default alignment of SHM + z_alloc_alignment_t alignment = {5}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment); + + z_owned_shm_provider_t provider; + if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider."); + throw std::runtime_error("Unable to create an SHM provider."); } + shm_provider_ = provider; } - auto free_shm_manager = rcpputils::make_scope_exit( + auto free_shm_provider = rcpputils::make_scope_exit( [this]() { - if (shm_manager_.has_value()) { - z_drop(z_move(shm_manager_.value())); + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); } }); @@ -199,27 +202,23 @@ class rmw_context_impl_s::Data final // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure( - graph_sub_data_handler, nullptr, this); - graph_subscriber_ = zc_liveliness_declare_subscriber( - z_loan(session_), - z_keyexpr(liveliness_str.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (!z_check(graph_subscriber_)) { + z_liveliness_subscriber_options_t sub_options; + z_liveliness_subscriber_options_default(&sub_options); + z_owned_closure_sample_t callback; + z_closure(&callback, graph_sub_data_handler, nullptr, this); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); + if (z_liveliness_declare_subscriber( + session_->loan(), + &graph_subscriber_, z_loan(liveliness_ke), + z_move(callback), &sub_options) != Z_OK) + { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); throw std::runtime_error("Unable to subscribe to ROS graph updates."); } close_session.cancel(); - free_shm_manager.cancel(); - undeclare_z_sub.cancel(); + free_shm_provider.cancel(); } // Shutdown the Zenoh session. @@ -233,8 +232,8 @@ class rmw_context_impl_s::Data final } z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_manager_.has_value()) { - z_drop(z_move(shm_manager_.value())); + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); } is_shutdown_ = true; @@ -242,11 +241,9 @@ class rmw_context_impl_s::Data final // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). } - // Close the zenoh session - if (z_close(z_move(session_)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } + // Drop the shared session. + session_.reset(); + return RMW_RET_OK; } @@ -256,16 +253,16 @@ class rmw_context_impl_s::Data final return enclave_; } - z_session_t session() const + std::shared_ptr session() const { std::lock_guard lock(mutex_); - return z_loan(session_); + return session_; } - std::optional & shm_manager() + std::optional & shm_provider() { std::lock_guard lock(mutex_); - return shm_manager_; + return shm_provider_; } rmw_guard_condition_t * graph_guard_condition() @@ -289,7 +286,7 @@ class rmw_context_impl_s::Data final bool session_is_valid() const { std::lock_guard lock(mutex_); - return z_check(session_); + return !z_session_is_closed(session_->loan()); } std::shared_ptr graph_cache() @@ -310,7 +307,7 @@ class rmw_context_impl_s::Data final } // Check that the Zenoh session is still valid. - if (!z_check(session_)) { + if (!session_is_valid()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create NodeData as Zenoh session is invalid."); @@ -320,7 +317,7 @@ class rmw_context_impl_s::Data final auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - z_loan(session_), + session_->loan(), domain_id_, ns, node_name, @@ -396,11 +393,11 @@ class rmw_context_impl_s::Data final std::size_t domain_id_; // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; - // An owned session. - z_owned_session_t session_; + // A shared session. + std::shared_ptr session_{nullptr}; // An optional SHM manager that is initialized of SHM is enabled in the // zenoh session config. - std::optional shm_manager_; + std::optional shm_provider_; // Graph cache. std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. @@ -419,13 +416,10 @@ class rmw_context_impl_s::Data final }; ///============================================================================= -static void graph_sub_data_handler(const z_sample_t * sample, void * data) +static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); auto data_ptr = static_cast(data); if (data_ptr == nullptr) { @@ -448,7 +442,8 @@ static void graph_sub_data_handler(const z_sample_t * sample, void * data) } // Update the graph cache. - data_shared_ptr->update_graph_cache(sample->kind, keystr._cstr); + std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str)); } ///============================================================================= @@ -475,15 +470,15 @@ std::string rmw_context_impl_s::enclave() const } ///============================================================================= -z_session_t rmw_context_impl_s::session() const +std::shared_ptr rmw_context_impl_s::session() const { return data_->session(); } ///============================================================================= -std::optional & rmw_context_impl_s::shm_manager() +std::optional & rmw_context_impl_s::shm_provider() { - return data_->shm_manager(); + return data_->shm_provider(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index b9367e6f..ea66e0c3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -47,14 +47,14 @@ class rmw_context_impl_s final // Get a copy of the enclave. std::string enclave() const; - // Loan the Zenoh session. - z_session_t session() const; + // Share the Zenoh session. + std::shared_ptr session() const; - // Get a reference to the shm_manager. + // Get a reference to the shm_provider. // Note: This is not thread-safe. // TODO(Yadunund): Remove this API and instead include a publish() API - // that handles the shm_manager once the context manages publishers. - std::optional & shm_manager(); + // that handles the shm_provider once the context manages publishers. + std::optional & shm_provider(); // Get the graph guard condition. rmw_guard_condition_t * graph_guard_condition(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 56e88143..79351a50 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -30,7 +30,7 @@ namespace rmw_zenoh_cpp std::shared_ptr NodeData::make( const rmw_node_t * const node, std::size_t id, - z_session_t session, + const z_loaned_session_t * session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -57,30 +57,26 @@ std::shared_ptr NodeData::make( } // Create the liveliness token. - zc_owned_liveliness_token_t token = zc_liveliness_declare_token( - session, - z_keyexpr(entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - if (!z_check(token)) { + std::string liveliness_keyexpr = entity->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + z_owned_liveliness_token_t token; + if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); return nullptr; } - free_token.cancel(); - return std::shared_ptr( + auto node_data = std::shared_ptr( new NodeData{ node, id, std::move(entity), std::move(token) }); + node_data->initialized_ = true; + return node_data; } ///============================================================================= @@ -88,12 +84,13 @@ NodeData::NodeData( const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token) + z_owned_liveliness_token_t token) : node_(node), id_(std::move(id)), entity_(std::move(entity)), token_(std::move(token)), is_shutdown_(false), + initialized_(false), pubs_({}) { // Do nothing. @@ -122,7 +119,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - z_session_t session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -188,7 +185,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - z_session_t session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -256,7 +253,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - z_session_t session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -323,7 +320,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service) ///============================================================================= bool NodeData::create_client_data( const rmw_client_t * const client, - z_session_t session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -398,14 +395,15 @@ rmw_ret_t NodeData::shutdown() { std::lock_guard lock(mutex_); rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { + if (is_shutdown_ || !initialized_) { return ret; } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); is_shutdown_ = true; + initialized_ = false; return ret; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index 5489e4b7..5e5f2d4c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -43,7 +43,7 @@ class NodeData final static std::shared_ptr make( const rmw_node_t * const node, std::size_t id, - z_session_t session, + const z_loaned_session_t * session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -55,7 +55,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - z_session_t session, + std::shared_ptr sess, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -70,7 +70,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - z_session_t session, + std::shared_ptr sess, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -86,7 +86,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - z_session_t session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -101,7 +101,7 @@ class NodeData final // Create a new ClientData for a given rmw_client_t. bool create_client_data( const rmw_client_t * const client, - z_session_t session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -128,7 +128,7 @@ class NodeData final const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token); + z_owned_liveliness_token_t token); // Internal mutex. mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. @@ -139,9 +139,11 @@ class NodeData final // The Entity generated for the node. std::shared_ptr entity_; // Liveliness token for the node. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; // Map of publishers. std::unordered_map pubs_; // Map of subscriptions. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index b2ea8235..df8f7495 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -15,6 +15,7 @@ #include "rmw_publisher_data.hpp" #include +#include #include #include @@ -37,9 +38,12 @@ namespace rmw_zenoh_cpp { +// TODO(yuyuan): SHM, make this configurable +#define SHM_BUF_OK_SIZE 2621440 + ///============================================================================= std::shared_ptr PublisherData::make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -79,7 +83,7 @@ std::shared_ptr PublisherData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -98,13 +102,10 @@ std::shared_ptr PublisherData::make( topic_name.c_str()); return nullptr; } - z_owned_keyexpr_t keyexpr = z_keyexpr_new( - entity->topic_info()->topic_keyexpr_.c_str()); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { + + std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; + z_view_keyexpr_t pub_ke; + if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } @@ -112,7 +113,8 @@ std::shared_ptr PublisherData::make( // Create a Publication Cache if durability is transient_local. std::optional pub_cache = std::nullopt; if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + ze_publication_cache_options_t pub_cache_opts; + ze_publication_cache_options_default(&pub_cache_opts); pub_cache_opts.history = adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; // Set the queryable_prefix to the session id so that querying subscribers can specify this @@ -121,21 +123,19 @@ std::shared_ptr PublisherData::make( // When such a prefix is added to the PublicationCache, it listens to queries with this extra // prefix (allowing to be queried in a unique way), but still replies with the original // publications' key expressions. - z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(entity->zid().c_str()); - auto always_free_queryable_prefix = rcpputils::make_scope_exit( - [&queryable_prefix]() { - z_keyexpr_drop(z_move(queryable_prefix)); - }); - pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); - pub_cache = ze_declare_publication_cache( - session, - z_loan(keyexpr), - &pub_cache_opts - ); - if (!pub_cache.has_value() || !z_check(pub_cache.value())) { + std::string queryable_prefix = entity->zid(); + z_view_keyexpr_t prefix_ke; + z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); + pub_cache_opts.queryable_prefix = z_loan(prefix_ke); + + ze_owned_publication_cache_t pub_cache_; + if (ze_declare_publication_cache( + session->loan(), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } + pub_cache = pub_cache_; } auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( [&pub_cache]() { @@ -145,20 +145,22 @@ std::shared_ptr PublisherData::make( }); // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts = z_publisher_options_default(); + z_publisher_options_t opts; + z_publisher_options_default(&opts); opts.congestion_control = Z_CONGESTION_CONTROL_DROP; - if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && - adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) - { - opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + opts.reliability = Z_RELIABILITY_RELIABLE; + + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } } + z_owned_publisher_t pub; // TODO(clalancette): What happens if the key name is a valid but empty string? - z_owned_publisher_t pub = z_declare_publisher( - session, - z_loan(keyexpr), - &opts - ); - if (!z_check(pub)) { + if (z_declare_publisher( + session->loan(), &pub, z_loan(pub_ke), &opts) != Z_OK) + { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } @@ -167,56 +169,59 @@ std::shared_ptr PublisherData::make( z_undeclare_publisher(z_move(pub)); }); - zc_owned_liveliness_token_t token = zc_liveliness_declare_token( - session, - z_keyexpr(entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - if (!z_check(token)) { + std::string liveliness_keyexpr = entity->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + z_owned_liveliness_token_t token; + if (z_liveliness_declare_token( + session->loan(), &token, z_loan(liveliness_ke), + NULL) != Z_OK) + { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the publisher."); return nullptr; } - free_token.cancel(); undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); - return std::shared_ptr( + auto pub_data = std::shared_ptr( new PublisherData{ node, std::move(entity), + std::move(session), std::move(pub), std::move(pub_cache), std::move(token), type_support->data, std::move(message_type_support) }); + pub_data->initialized_ = true; + return pub_data; } ///============================================================================= PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + z_owned_liveliness_token_t token, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), pub_(std::move(pub)), pub_cache_(std::move(pub_cache)), token_(std::move(token)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), sequence_number_(1), - is_shutdown_(false) + is_shutdown_(false), + initialized_(false) { events_mgr_ = std::make_shared(); } @@ -224,7 +229,7 @@ PublisherData::PublisherData( ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, - std::optional & shm_manager) + std::optional & shm_provider) { std::lock_guard lock(mutex_); if (is_shutdown_) { @@ -239,11 +244,11 @@ rmw_ret_t PublisherData::publish( // To store serialized message byte array. char * msg_bytes = nullptr; - std::optional shmbuf = std::nullopt; + std::optional shmbuf = std::nullopt; auto always_free_shmbuf = rcpputils::make_scope_exit( [&shmbuf]() { if (shmbuf.has_value()) { - zc_shmbuf_drop(&shmbuf.value()); + z_drop(z_move(shmbuf.value())); } }); @@ -257,22 +262,23 @@ rmw_ret_t PublisherData::publish( }); // Get memory from SHM buffer if available. - if (shm_manager.has_value() && - zc_shm_manager_check(&shm_manager.value())) - { - shmbuf = zc_shm_alloc( - &shm_manager.value(), - max_data_length); - if (!z_check(shmbuf.value())) { - zc_shm_gc(&shm_manager.value()); - shmbuf = zc_shm_alloc(&shm_manager.value(), max_data_length); - if (!z_check(shmbuf.value())) { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); - return RMW_RET_ERROR; - } + if (shm_provider.has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + auto provider = shm_provider.value(); + z_buf_layout_alloc_result_t alloc; + // TODO(yuyuan): SHM, configure this + z_alloc_alignment_t alignment = {5}; + z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); + + if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + shmbuf = std::make_optional(alloc.buf); + msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + } else { + // TODO(Yadunund): Should we revert to regular allocation and not return an error? + RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); + return RMW_RET_ERROR; } - msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf.value())); } else { // Get memory from the allocator. msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); @@ -296,50 +302,34 @@ rmw_ret_t PublisherData::publish( const size_t data_length = ser.get_serialized_data_length(); - z_owned_bytes_map_t map = - create_map_and_set_sequence_num( - sequence_number_++, - [this](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - // Mutex already locked. - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto always_free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - int ret; // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options = z_publisher_put_options_default(); - options.attachment = z_bytes_map_as_attachment(&map); - + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + z_owned_bytes_t attachment; + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); + options.attachment = z_move(attachment); + + z_owned_bytes_t payload; if (shmbuf.has_value()) { - zc_shmbuf_set_length(&shmbuf.value(), data_length); - zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); - ret = zc_publisher_put_owned(z_loan(pub_), z_move(payload), &options); + z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); } else { - // Returns 0 if success. - ret = z_publisher_put( - z_loan(pub_), - reinterpret_cast(msg_bytes), - data_length, - &options); + z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); } - if (ret) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; + + z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); + if (res != Z_OK) { + if (res == Z_ESESSION_CLOSED) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); + } else { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } } return RMW_RET_OK; @@ -348,7 +338,7 @@ rmw_ret_t PublisherData::publish( ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & /*shm_manager*/) + std::optional & /*shm_provider*/) { eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -360,47 +350,34 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - sequence_number_++, - [this](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - - // Mutex already locked. - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); const size_t data_length = ser.get_serialized_data_length(); // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options = z_publisher_put_options_default(); - options.attachment = z_bytes_map_as_attachment(&map); - - // Returns 0 if success. - int8_t ret = z_publisher_put( - z_loan(pub_), - serialized_message->buffer, - data_length, - &options); - - if (ret) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + z_owned_bytes_t attachment; + create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); + + options.attachment = z_move(attachment); + + z_owned_bytes_t payload; + z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); + + z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); + if (res != Z_OK) { + if (res == Z_ESESSION_CLOSED) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); + } else { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } } return RMW_RET_OK; @@ -427,13 +404,6 @@ void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const entity_->copy_gid(out_gid); } -///============================================================================= -bool PublisherData::liveliness_is_valid() const -{ - std::lock_guard lock(mutex_); - return zc_liveliness_token_check(&token_); -} - ///============================================================================= std::shared_ptr PublisherData::events_mgr() const { @@ -458,18 +428,24 @@ PublisherData::~PublisherData() rmw_ret_t PublisherData::shutdown() { std::lock_guard lock(mutex_); - if (is_shutdown_) { + if (is_shutdown_ || !initialized_) { return RMW_RET_OK; } // Unregister this publisher from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); + // if (pub_cache_.has_value() && !z_session_is_closed(z_loan(sess_))) { if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + ze_undeclare_publication_cache(z_move(pub_cache_.value())); + // if (z_session_is_closed(z_loan(sess_))) { + // z_drop(z_move(pub_cache_.value())); + // } } z_undeclare_publisher(z_move(pub_)); + sess_.reset(); is_shutdown_ = true; + initialized_ = false; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 7b124cc3..664d4c92 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -28,6 +28,7 @@ #include "liveliness_utils.hpp" #include "message_type_support.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -42,7 +43,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -54,12 +55,12 @@ class PublisherData final // Publish a ROS message. rmw_ret_t publish( const void * ros_message, - std::optional & shm_manager); + std::optional & shm_provider); // Publish a serialized ROS message. rmw_ret_t publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & shm_manager); + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity. std::size_t keyexpr_hash() const; @@ -70,9 +71,6 @@ class PublisherData final // Copy the GID of this PublisherData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; - // Returns true if liveliness token is still valid. - bool liveliness_is_valid() const; - // Get the events manager of this PublisherData. std::shared_ptr events_mgr() const; @@ -90,9 +88,10 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + z_owned_liveliness_token_t token, const void * type_support_impl, std::unique_ptr type_support); @@ -102,12 +101,14 @@ class PublisherData final const rmw_node_t * rmw_node_; // The Entity generated for the publisher. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned publisher. z_owned_publisher_t pub_; // Optional publication cache when durability is transient_local. std::optional pub_cache_; // Liveliness token for the publisher. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; @@ -115,6 +116,8 @@ class PublisherData final size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; }; using PublisherDataPtr = std::shared_ptr; using PublisherDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 42366ef0..842b9e7c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -39,13 +39,10 @@ namespace rmw_zenoh_cpp { ///============================================================================== -void service_data_handler(const z_query_t * query, void * data) +void service_data_handler(z_loaned_query_t * query, void * data) { - z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); - auto drop_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_query_keyexpr(query), &keystr); ServiceData * service_data = static_cast(data); @@ -53,8 +50,9 @@ void service_data_handler(const z_query_t * query, void * data) RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain ServiceData from data for " - "service for %s", - z_loan(keystr) + "service for %.*s", + static_cast(z_string_len(z_loan(keystr))), + z_string_data(z_loan(keystr)) ); return; } @@ -67,7 +65,7 @@ void service_data_handler(const z_query_t * query, void * data) ///============================================================================= std::shared_ptr ServiceData::make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -128,7 +126,7 @@ std::shared_ptr ServiceData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -152,6 +150,7 @@ std::shared_ptr ServiceData::make( new ServiceData{ node, std::move(entity), + session, request_members, response_members, std::move(request_type_support), @@ -160,54 +159,57 @@ std::shared_ptr ServiceData::make( // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data.get()); - service_data->keyexpr_ = - z_keyexpr_new(service_data->entity_->topic_info().value().topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [service_data]() { - z_drop(z_move(service_data->keyexpr_)); - }); - if (!z_check(z_loan(service_data->keyexpr_))) { + z_owned_closure_query_t callback; + z_closure(&callback, service_data_handler, nullptr, service_data.get()); + z_view_keyexpr_t service_ke; + service_data->keyexpr_ = service_data->entity_->topic_info()->topic_keyexpr_; + if (z_view_keyexpr_from_str(&service_ke, service_data->keyexpr_.c_str()) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + // Configure the queryable to process complete queries. - z_queryable_options_t qable_options = z_queryable_options_default(); + z_queryable_options_t qable_options; + z_queryable_options_default(&qable_options); qable_options.complete = true; - service_data->qable_ = z_declare_queryable( - session, - z_loan(service_data->keyexpr_), - z_move(callback), - &qable_options); + if (z_declare_queryable( + session->loan(), &service_data->qable_, z_loan(service_ke), + z_move(callback), &qable_options) != Z_OK) + { + RMW_SET_ERROR_MSG("unable to create zenoh queryable"); + return nullptr; + } auto undeclare_z_queryable = rcpputils::make_scope_exit( [service_data]() { z_undeclare_queryable(z_move(service_data->qable_)); }); - if (!z_check(service_data->qable_)) { - RMW_SET_ERROR_MSG("unable to create zenoh queryable"); + + std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - - service_data->token_ = zc_liveliness_declare_token( - session, - z_keyexpr(service_data->entity_->liveliness_keyexpr().c_str()), - NULL - ); auto free_token = rcpputils::make_scope_exit( [service_data]() { - z_drop(z_move(service_data->token_)); + if (service_data != nullptr) { + z_drop(z_move(service_data->token_)); + } }); - if (!z_check(service_data->token_)) { + if (z_liveliness_declare_token( + session->loan(), &service_data->token_, z_loan(liveliness_ke), + NULL) != Z_OK) + { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the service."); return nullptr; } - free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); free_token.cancel(); + service_data->initialized_ = true; return service_data; } @@ -215,18 +217,21 @@ std::shared_ptr ServiceData::make( ServiceData::ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(std::move(request_type_support)), response_type_support_(std::move(response_type_support)), wait_set_data_(nullptr), - is_shutdown_(false) + is_shutdown_(false), + initialized_(false) { // Do nothing. } @@ -238,13 +243,6 @@ liveliness::TopicInfo ServiceData::topic_info() const return entity_->topic_info().value(); } -///============================================================================= -bool ServiceData::liveliness_is_valid() const -{ - std::lock_guard lock(mutex_); - return zc_liveliness_token_check(&token_); -} - ///============================================================================= void ServiceData::add_new_query(std::unique_ptr query) { @@ -262,14 +260,12 @@ void ServiceData::add_new_query(std::unique_ptr query) query_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " "for service for %s", adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); + keyexpr_.c_str()); query_queue_.pop_front(); } query_queue_.emplace_back(std::move(query)); @@ -298,15 +294,16 @@ rmw_ret_t ServiceData::take_request( } std::unique_ptr query = std::move(query_queue_.front()); query_queue_.pop_front(); - const z_query_t loaned_query = query->get_query(); + const z_loaned_query_t * loaned_query = query->get_query(); // DESERIALIZE MESSAGE ======================================================== - z_value_t payload_value = z_query_value(&loaned_query); + z_owned_slice_t payload; + z_bytes_to_slice(z_query_payload(loaned_query), &payload); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload_value.payload.start)), - payload_value.payload.len); + reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), + z_slice_len(z_loan(payload))); // Object that serializes the data Cdr deser(fastbuffer); @@ -321,28 +318,21 @@ rmw_ret_t ServiceData::take_request( // Fill in the request header. // Get the sequence_number out of the attachment - z_attachment_t attachment = z_query_attachment(&loaned_query); - request_header->request_id.sequence_number = - get_int64_from_attachment(&attachment, "sequence_number"); + rmw_zenoh_cpp::attachment_data_t attachment(z_query_attachment(loaned_query)); + + request_header->request_id.sequence_number = attachment.sequence_number; if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - request_header->source_timestamp = get_int64_from_attachment( - &attachment, - "source_timestamp"); + + memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); + + request_header->source_timestamp = attachment.source_timestamp; if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - if (!get_gid_from_attachment( - &attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client GID from attachment"); - return RMW_RET_ERROR; - } - request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. @@ -363,6 +353,8 @@ rmw_ret_t ServiceData::take_request( it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query))); *taken = true; + z_drop(z_move(payload)); + return RMW_RET_OK; } @@ -432,30 +424,24 @@ rmw_ret_t ServiceData::send_response( } size_t data_length = ser.get_serialized_data_length(); - const z_query_t loaned_query = query->get_query(); - z_query_reply_options_t options = z_query_reply_options_default(); - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - request_id->sequence_number, - [request_id](z_owned_bytes_map_t * map, const char * key) - { - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = request_id->writer_guid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - options.attachment = z_bytes_map_as_attachment(&map); + const z_loaned_query_t * loaned_query = query->get_query(); + z_query_reply_options_t options; + z_query_reply_options_default(&options); + + z_owned_bytes_t attachment; + rmw_zenoh_cpp::create_map_and_set_sequence_num( + &attachment, request_id->sequence_number, + request_id->writer_guid); + options.attachment = z_move(attachment); + + z_owned_bytes_t payload; + z_bytes_copy_from_buf( + &payload, reinterpret_cast(response_bytes), data_length); + z_view_keyexpr_t service_ke; + z_view_keyexpr_from_str(&service_ke, keyexpr_.c_str()); z_query_reply( - &loaned_query, z_loan(keyexpr_), reinterpret_cast( - response_bytes), data_length, &options); + loaned_query, z_loan(service_ke), z_move(payload), &options); return RMW_RET_OK; } @@ -509,22 +495,17 @@ rmw_ret_t ServiceData::shutdown() { rmw_ret_t ret = RMW_RET_OK; std::lock_guard lock(mutex_); - if (is_shutdown_) { + if (is_shutdown_ || !initialized_) { return ret; } // Unregister this node from the ROS graph. - if (zc_liveliness_token_check(&token_)) { - zc_liveliness_undeclare_token(z_move(token_)); - } - if (z_check(z_loan(keyexpr_))) { - z_drop(z_move(keyexpr_)); - } - if (z_check(qable_)) { - z_undeclare_queryable(z_move(qable_)); - } + z_liveliness_undeclare_token(z_move(token_)); + z_undeclare_queryable(z_move(qable_)); + sess_.reset(); is_shutdown_ = true; + initialized_ = false; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index 257cc24c..e5ddc12e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -47,7 +47,7 @@ class ServiceData final public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - z_session_t session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -59,9 +59,6 @@ class ServiceData final // Get a copy of the TopicInfo of this ServiceData. liveliness::TopicInfo topic_info() const; - // Returns true if liveliness token is still valid. - bool liveliness_is_valid() const; - // Add a new ZenohQuery to the queue. void add_new_query(std::unique_ptr query); @@ -99,6 +96,7 @@ class ServiceData final ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, @@ -110,12 +108,14 @@ class ServiceData final const rmw_node_t * rmw_node_; // The Entity generated for the service. std::shared_ptr entity_; - // An owned keyexpression. - z_owned_keyexpr_t keyexpr_; + // A shared session. + std::shared_ptr sess_; + // The keyexpr string. + std::string keyexpr_; // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; @@ -132,6 +132,8 @@ class ServiceData final DataCallbackManager data_callback_mgr_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; }; using ServiceDataPtr = std::shared_ptr; using ServiceDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 54bf5487..42e636d4 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -45,13 +45,10 @@ namespace //============================================================================== // TODO(Yadunund): Make this a class member and lambda capture weak_from_this() // instead of passing a rawptr to SubscriptionData when we switch to zenoh-cpp. -void sub_data_handler(const z_sample_t * sample, void * data) +void sub_data_handler(z_loaned_sample_t * sample, void * data) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto drop_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); auto sub_data = static_cast(data); if (sub_data == nullptr) { @@ -63,51 +60,30 @@ void sub_data_handler(const z_sample_t * sample, void * data) return; } - uint8_t pub_gid[RMW_GID_STORAGE_SIZE]; - if (!get_gid_from_attachment(&sample->attachment, pub_gid)) { - // We failed to get the GID from the attachment. While this isn't fatal, - // it is unusual and so we should report it. - memset(pub_gid, 0, RMW_GID_STORAGE_SIZE); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain publisher GID from the attachment."); - } + attachment_data_t attachment(z_sample_attachment(sample)); + const z_loaned_bytes_t * payload = z_sample_payload(sample); - int64_t sequence_number = get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (sequence_number < 0) { - // We failed to get the sequence number from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - sequence_number = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); - } + z_owned_slice_t slice; + z_bytes_to_slice(payload, &slice); - int64_t source_timestamp = get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (source_timestamp < 0) { - // We failed to get the source timestamp from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - source_timestamp = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to obtain source timestamp from the attachment."); - } + std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); sub_data->add_new_message( std::make_unique( - zc_sample_payload_rcinc(sample), - sample->timestamp.time, pub_gid, sequence_number, source_timestamp), z_loan(keystr)); + slice, + z_timestamp_ntp64_time(z_sample_timestamp(sample)), + std::move(attachment)), + topic_name); } } // namespace ///============================================================================= SubscriptionData::Message::Message( - zc_owned_payload_t p, + z_owned_slice_t p, uint64_t recv_ts, - const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], - int64_t seqnum, - int64_t source_ts) -: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts) + attachment_data_t && attachment_) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { - memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE); } ///============================================================================= @@ -118,7 +94,7 @@ SubscriptionData::Message::~Message() ///============================================================================= std::shared_ptr SubscriptionData::make( - z_session_t session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -160,7 +136,7 @@ std::shared_ptr SubscriptionData::make( // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -185,6 +161,7 @@ std::shared_ptr SubscriptionData::make( node, graph_cache, std::move(entity), + std::move(session), type_support->data, std::move(message_type_support) }); @@ -202,11 +179,13 @@ SubscriptionData::SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), + sess_(std::move(sess)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), @@ -224,59 +203,53 @@ bool SubscriptionData::init() { // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_sample_t callback = z_closure(sub_data_handler, nullptr, this); - z_owned_keyexpr_t keyexpr = - z_keyexpr_new(entity_->topic_info()->topic_keyexpr_.c_str()); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { + z_owned_closure_sample_t callback; + z_closure(&callback, sub_data_handler, nullptr, this); + + std::string topic_keyexpr = entity_->topic_info()->topic_keyexpr_; + z_view_keyexpr_t sub_ke; + if (z_view_keyexpr_from_str(&sub_ke, topic_keyexpr.c_str()) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } + + rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + + sess_ = context_impl->session(); + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. - z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); - auto always_drop_keystr = rcpputils::make_scope_exit( - [&owned_key_str]() { - z_drop(z_move(owned_key_str)); - }); - - rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); - if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + ze_querying_subscriber_options_t sub_options; + ze_querying_subscriber_options_default(&sub_options); // Make the initial query to hit all the PublicationCaches, using a query_selector with // '*' in place of the queryable_prefix of each PublicationCache const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - sub_options.query_selector = z_keyexpr(selector.c_str()); + z_view_keyexpr_t selector_ke; + z_view_keyexpr_from_str(&selector_ke, selector.c_str()); + sub_options.query_selector = z_loan(selector_ke); // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. // By default a query accepts only replies that matches its query selector. // This allows us to selectively query certain PublicationCaches when defining the // set_querying_subscriber_callback below. - sub_options.query_accept_replies = ZCU_REPLY_KEYEXPR_ANY; + sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY; // As this initial query is now using a "*", the query target is not COMPLETE. sub_options.query_target = Z_QUERY_TARGET_ALL; // We set consolidation to none as we need to receive transient local messages // from a number of publishers. Eg: To receive TF data published over /tf_static // by various publishers. sub_options.query_consolidation = z_query_consolidation_none(); - if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - } - sub_ = ze_declare_querying_subscriber( - context_impl->session(), - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(std::get(sub_))) { + ze_owned_querying_subscriber_t sub; + if (ze_declare_querying_subscriber( + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) + { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } + sub_ = sub; + // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. std::weak_ptr data_wp = shared_from_this(); @@ -303,60 +276,62 @@ bool SubscriptionData::init() "QueryingSubscriberCallback triggered over %s.", selector.c_str() ); - z_get_options_t opts = z_get_options_default(); + z_get_options_t opts; + z_get_options_default(&opts); opts.timeout_ms = std::numeric_limits::max(); opts.consolidation = z_query_consolidation_latest(); - opts.accept_replies = ZCU_REPLY_KEYEXPR_ANY; + opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, selector.c_str()); ze_querying_subscriber_get( z_loan(std::get(sub_data->sub_)), - z_keyexpr(selector.c_str()), - &opts - ); + z_loan(ke), + &opts); } ); } else { // Create a regular subscriber for all other durability settings. - z_subscriber_options_t sub_options = z_subscriber_options_default(); - if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - } - sub_ = z_declare_subscriber( - context_impl->session(), - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(std::get(sub_))) { + z_subscriber_options_t sub_options; + z_subscriber_options_default(&sub_options); + + z_owned_subscriber_t sub; + if (z_declare_subscriber( + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), + &sub_options) != Z_OK) + { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } + sub_ = sub; } auto undeclare_z_sub = rcpputils::make_scope_exit( [this]() { z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub == nullptr || z_undeclare_subscriber(sub)) { + if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { RMW_SET_ERROR_MSG("failed to undeclare sub"); } else { ze_owned_querying_subscriber_t * querying_sub = std::get_if(&sub_); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { RMW_SET_ERROR_MSG("failed to undeclare sub"); } } }); // Publish to the graph that a new subscription is in town. - token_ = zc_liveliness_declare_token( - context_impl->session(), - z_keyexpr(entity_->liveliness_keyexpr().c_str()), - NULL - ); + std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + auto free_token = rcpputils::make_scope_exit( [this]() { z_drop(z_move(token_)); }); - if (!z_check(token_)) { + if (z_liveliness_declare_token( + sess_->loan(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) + { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the subscription."); @@ -385,13 +360,6 @@ liveliness::TopicInfo SubscriptionData::topic_info() const return entity_->topic_info().value(); } -///============================================================================= -bool SubscriptionData::liveliness_is_valid() const -{ - std::lock_guard lock(mutex_); - return zc_liveliness_token_check(&token_); -} - ///============================================================================= std::shared_ptr SubscriptionData::events_mgr() const { @@ -430,27 +398,26 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - if (zc_liveliness_token_check(&token_)) { - zc_liveliness_undeclare_token(z_move(token_)); - } + z_liveliness_undeclare_token(z_move(token_)); z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub != nullptr && z_subscriber_check(sub)) { - if (z_undeclare_subscriber(sub)) { + if (sub != nullptr) { + if (z_undeclare_subscriber(z_move(*sub)) != Z_OK) { RMW_SET_ERROR_MSG("failed to undeclare sub."); ret = RMW_RET_ERROR; } } else { ze_owned_querying_subscriber_t * querying_sub = std::get_if(&sub_); - if (querying_sub != nullptr && ze_querying_subscriber_check(querying_sub)) { - if (ze_undeclare_querying_subscriber(querying_sub)) { + if (querying_sub != nullptr) { + if (ze_undeclare_querying_subscriber(z_move(*querying_sub)) != Z_OK) { RMW_SET_ERROR_MSG("failed to undeclare querying sub."); ret = RMW_RET_ERROR; } } } + sess_.reset(); is_shutdown_ = true; initialized_ = false; return ret; @@ -502,10 +469,13 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); + const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); + const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(msg_data->payload.payload.start)), - msg_data->payload.payload.len); + reinterpret_cast(const_cast(payload)), + payload_len); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -519,13 +489,13 @@ rmw_ret_t SubscriptionData::take_one_message( } if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp; message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } @@ -550,28 +520,29 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - if (serialized_message->buffer_capacity < msg_data->payload.payload.len) { + const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); + const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + + if (serialized_message->buffer_capacity < payload_len) { rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len); + rmw_serialized_message_resize(serialized_message, payload_len); if (ret != RMW_RET_OK) { return ret; // Error message already set } } - serialized_message->buffer_length = msg_data->payload.payload.len; - memcpy( - serialized_message->buffer, msg_data->payload.payload.start, - msg_data->payload.payload.len); + serialized_message->buffer_length = payload_len; + memcpy(serialized_message->buffer, payload, payload_len); *taken = true; if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp; message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } @@ -609,10 +580,12 @@ void SubscriptionData::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = hash_gid(msg->publisher_gid); + const size_t gid_hash = hash_gid(msg->attachment.source_gid); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { - const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second); + const int64_t seq_increment = std::abs( + msg->attachment.sequence_number - + last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; total_messages_lost_ += num_msg_lost; @@ -625,7 +598,7 @@ void SubscriptionData::add_new_message( } } // Always update the last known sequence number for the publisher. - last_known_published_msg_[gid_hash] = msg->sequence_number; + last_known_published_msg_[gid_hash] = msg->attachment.sequence_number; message_queue_.emplace_back(std::move(msg)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index c4782e8d..d4e0cdff 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -32,7 +32,9 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" +#include "attachment_helpers.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -48,24 +50,20 @@ class SubscriptionData final : public std::enable_shared_from_this make( - z_session_t session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -78,7 +76,7 @@ class SubscriptionData final : public std::enable_shared_from_this & shm_manager); + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity. std::size_t keyexpr_hash() const; @@ -86,9 +84,6 @@ class SubscriptionData final : public std::enable_shared_from_this events_mgr() const; @@ -129,6 +124,7 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support); @@ -142,10 +138,12 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache_; // The Entity generated for the subscription. std::shared_ptr entity_; + // A shared session + std::shared_ptr sess_; // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub_; // Liveliness token for the subscription. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/type_support.hpp b/rmw_zenoh_cpp/src/detail/type_support.hpp index a4714863..c7b2be52 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.hpp +++ b/rmw_zenoh_cpp/src/detail/type_support.hpp @@ -19,7 +19,6 @@ #ifndef DETAIL__TYPE_SUPPORT_HPP_ #define DETAIL__TYPE_SUPPORT_HPP_ -#include #include #include "fastcdr/Cdr.h" diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 6d296a62..2537466b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -55,18 +55,12 @@ rmw_ret_t _get_z_config( "rmw_zenoh_cpp", "Envar %s cannot be read.", envar_name); return RMW_RET_ERROR; } - // If the environment variable contains a path to a file, try to read the configuration from it. - if (envar_uri[0] != '\0') { - // If the environment variable is set, try to read the configuration from the file. - *config = zc_config_from_file(envar_uri); - configured_uri = envar_uri; - } else { - // If the environment variable is not set use internal configuration - *config = zc_config_from_file(default_uri); - configured_uri = default_uri; - } - // Verify that the configuration is valid. - if (!z_config_check(config)) { + + // If the environment variable is set, try to read the configuration from the file, + // if the environment variable is not set use internal configuration + configured_uri = envar_uri[0] != '\0' ? envar_uri : default_uri; + // Try to read the configuration + if (zc_config_from_file(config, configured_uri) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Invalid configuration file %s", configured_uri); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 7d7afaa9..b6c5d21e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -26,7 +26,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= -rmw_ret_t zenoh_router_check(z_session_t session) +rmw_ret_t zenoh_router_check(const z_loaned_session_t * session) { // Initialize context for callback int context = 0; @@ -42,8 +42,9 @@ rmw_ret_t zenoh_router_check(z_session_t session) (*(static_cast(ctx)))++; }; - z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context); - if (z_info_routers_zid(session, z_move(router_callback))) { + z_owned_closure_zid_t router_callback; + z_closure(&router_callback, callback, NULL, &context); + if (z_info_routers_zid(session, z_move(router_callback)) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Failed to evaluate if Zenoh routers are connected to the session."); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp index f53bc2f5..1db74a4c 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp @@ -24,7 +24,7 @@ namespace rmw_zenoh_cpp /// Check if a Zenoh router is connected to the session. /// @param session Zenoh session to check. /// @return RMW_RET_OK if a Zenoh router is connected to the session. -rmw_ret_t zenoh_router_check(z_session_t session); +rmw_ret_t zenoh_router_check(const z_loaned_session_t * session); } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_ROUTER_CHECK_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index e30e80d0..4418e202 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -17,56 +17,46 @@ #include #include +#include "attachment_helpers.hpp" +#include "logging_macros.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" namespace rmw_zenoh_cpp { +/// Close the zenoh session if destructed. ///============================================================================= -z_owned_bytes_map_t -create_map_and_set_sequence_num( - int64_t sequence_number, - GIDCopier gid_copier) +const z_loaned_session_t * ZenohSession::loan() { - z_owned_bytes_map_t map = z_bytes_map_new(); - if (!z_check(map)) { - RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); - return z_bytes_map_null(); - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. - // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + return z_loan(inner_); +} +/// Close the zenoh session if destructed. +///============================================================================= +ZenohSession::~ZenohSession() +{ + z_close(z_loan_mut(inner_), NULL); +} + +///============================================================================= +void create_map_and_set_sequence_num( + z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); - char source_ts_str[20]; - if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); - gid_copier(&map, "source_gid"); - - free_attachment_map.cancel(); + int64_t source_timestamp = now_ns.count(); - return map; + rmw_zenoh_cpp::attachment_data_t data(sequence_number, source_timestamp, gid); + data.serialize_to_zbytes(out_bytes); } ///============================================================================= -ZenohQuery::ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp) +ZenohQuery::ZenohQuery( + const z_loaned_query_t * query, + std::chrono::nanoseconds::rep received_timestamp) { - query_ = z_query_clone(query); + z_query_clone(&query_, query); received_timestamp_ = received_timestamp; } @@ -77,37 +67,28 @@ std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const } ///============================================================================= -ZenohQuery::~ZenohQuery() -{ - z_drop(z_move(query_)); -} +ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));} ///============================================================================= -const z_query_t ZenohQuery::get_query() const -{ - return z_query_loan(&query_); -} +const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} ///============================================================================= ZenohReply::ZenohReply( - const z_owned_reply_t * reply, + const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp) { - reply_ = *reply; + z_reply_clone(&reply_, reply); received_timestamp_ = received_timestamp; } ///============================================================================= -ZenohReply::~ZenohReply() -{ - z_reply_drop(z_move(reply_)); -} +ZenohReply::~ZenohReply() {z_drop(z_move(reply_));} ///============================================================================= -std::optional ZenohReply::get_sample() const +std::optional ZenohReply::get_sample() const { - if (z_reply_is_ok(&reply_)) { - return z_reply_ok(&reply_); + if (z_reply_is_ok(z_loan(reply_))) { + return z_reply_ok(z_loan(reply_)); } return std::nullopt; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index a750cbf2..ae2aed3a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -25,24 +25,37 @@ namespace rmw_zenoh_cpp { + +/// A wrapped zenoh session with customized destruction. ///============================================================================= -// A function to safely copy an entity's GID as a z_bytes_t into a -// z_owned_bytes_map_t for a given key. -using GIDCopier = std::function; +class ZenohSession final +{ +public: + ZenohSession(z_owned_session_t sess) + : inner_(sess) {} + const z_loaned_session_t * loan(); + ~ZenohSession(); + +private: + z_owned_session_t inner_; +}; + ///============================================================================= -z_owned_bytes_map_t -create_map_and_set_sequence_num(int64_t sequence_number, GIDCopier gid_copier); +void +create_map_and_set_sequence_num( + z_owned_bytes_t * out_bytes, int64_t sequence_number, + uint8_t gid[RMW_GID_STORAGE_SIZE]); ///============================================================================= // A class to store the replies to service requests. class ZenohReply final { public: - ZenohReply(const z_owned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); + ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); ~ZenohReply(); - std::optional get_sample() const; + std::optional get_sample() const; std::chrono::nanoseconds::rep get_received_timestamp() const; @@ -56,11 +69,11 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp); + ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); - const z_query_t get_query() const; + const z_loaned_query_t * get_query() const; std::chrono::nanoseconds::rep get_received_timestamp() const; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index c61a44fa..e76d6062 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -94,6 +94,9 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) return RMW_RET_ERROR; } + // Enable the zenoh built-in logger + zc_try_init_log_from_env(); + // Create the context impl. context->impl = static_cast( allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index eebe1c4b..bab759a5 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include - #include #include @@ -605,7 +604,7 @@ rmw_publish( return pub_data->publish( ros_message, - context_impl->shm_manager()); + context_impl->shm_provider()); } //============================================================================== @@ -712,7 +711,7 @@ rmw_publish_serialized_message( return publisher_data->publish_serialized_message( serialized_message, - context_impl->shm_manager()); + context_impl->shm_provider()); } //============================================================================== @@ -751,7 +750,7 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (!pub_data->liveliness_is_valid()) { + if (pub_data->is_shutdown()) { return RMW_RET_ERROR; } diff --git a/rmw_zenoh_cpp/src/zenohd/main.cpp b/rmw_zenoh_cpp/src/zenohd/main.cpp index 8ae25809..ce73fc14 100644 --- a/rmw_zenoh_cpp/src/zenohd/main.cpp +++ b/rmw_zenoh_cpp/src/zenohd/main.cpp @@ -68,6 +68,9 @@ int main(int argc, char ** argv) return 1; } + // Enable the zenoh built-in logger. + zc_try_init_log_from_env(); + // Initialize the zenoh configuration for the router. z_owned_config_t config; if ((rmw_zenoh_cpp::get_z_config( @@ -78,14 +81,14 @@ int main(int argc, char ** argv) return 1; } - z_owned_session_t session = z_open(z_move(config)); - if (!z_check(session)) { + z_owned_session_t session; + if (z_open(&session, z_move(config), NULL) != Z_OK) { printf("Unable to open router session!\n"); return 1; } auto always_close_session = rcpputils::make_scope_exit( [&session]() { - z_close(z_move(session)); + z_close(z_loan_mut(session), NULL); }); printf( diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 37817882..cddd2fd4 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -15,7 +15,7 @@ find_package(ament_cmake_vendor_package REQUIRED) # Note: We separate the two args needed for cargo with "$" and not ";" as the # latter is a list separater in cmake and hence the string will be split into two # when expanded. -set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") +set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") # Set VCS_VERSION to include latest changes from zenoh-c to benefit from : # - https://github.com/eclipse-zenoh/zenoh-c/pull/340 (fix build issue) @@ -26,10 +26,13 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/shared # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 134dbfa06ca212def5fb51dd8e816734dfd8dff6 + VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" + "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" "-DZENOHC_CUSTOM_TARGET=${ZENOHC_CUSTOM_TARGET}" ) +ament_export_dependencies(zenohc) + ament_package()