diff --git a/Cargo.lock b/Cargo.lock index 8363b46684..a7198e5862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5173,6 +5173,7 @@ dependencies = [ "uhlc", "validated_struct", "zenoh-core", + "zenoh-keyexpr", "zenoh-macros", "zenoh-protocol", "zenoh-result", @@ -5358,11 +5359,11 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "zenoh-config", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", "zenoh-result", - "zenoh-util", ] [[package]] diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 8d140ee9ba..552107328f 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -25,6 +25,9 @@ /// /// It is also possible to specify a priority range and/or a reliability setting to be used on the link. /// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link. + /// + /// For TCP and TLS links, it is possible to specify the TCP buffer sizes: + /// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000 connect: { /// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout) /// Accepts a single value (e.g. timeout_ms: 0) @@ -68,6 +71,9 @@ /// /// It is also possible to specify a priority range and/or a reliability setting to be used on the link. /// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link. + /// + /// For TCP and TLS links, it is possible to specify the TCP buffer sizes: + /// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000 listen: { /// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout) /// Accepts a single value (e.g. timeout_ms: 0) @@ -128,10 +134,10 @@ /// The time-to-live on multicast scouting packets ttl: 1, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast. - /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) - /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is, + /// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). /// Each value is a list of: "peer", "router" and/or "client". - autoconnect: { router: [], peer: ["router", "peer"] }, + autoconnect: { router: [], peer: ["router", "peer"], client: ["router", "peer"] }, /// Whether or not to listen for scout messages on UDP multicast and reply to them. listen: true, }, @@ -146,10 +152,10 @@ /// direct connectivity with each other. multihop: false, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. - /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) - /// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). + /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is, + /// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). /// Each value is a list of: "peer", "router" and/or "client". - autoconnect: { router: [], peer: ["router", "peer"] }, + autoconnect: { router: [], peer: ["router", "peer"], client: ["router", "peer"] }, }, }, @@ -184,6 +190,27 @@ }, }, + // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values) + // qos: { + // /// Overwrite QoS options for PUT and DELETE messages + // publication: [ + // { + // /// PUT and DELETE messages on key expressions that are included by these key expressions + // /// will have their QoS options overwritten by the given config. + // key_exprs: ["demo/**", "example/key"], + // /// Configurations that will be applied on the publisher. + // /// Options that are supplied here will overwrite the configuration given in Zenoh API + // config: { + // congestion_control: "block", + // priority: "data_high", + // express: true, + // reliability: "best_effort", + // allowed_destination: "remote", + // }, + // }, + // ], + // }, + // /// The declarations aggregation strategy. // aggregation: { // /// A list of key-expressions for which all included subscribers will be aggregated into. @@ -473,7 +500,22 @@ // If set to true, links that require certificates (tls/quic) will automatically disconnect when the time of expiration of the remote certificate chain is reached // note that mTLS (client authentication) is required for a listener to disconnect a client on expiration close_link_on_expiration: false, + /// Optional configuration for TCP system buffers sizes for TLS links + /// + /// Configure TCP read buffer size (bytes) + // so_rcvbuf: 123456, + /// Configure TCP write buffer size (bytes) + // so_sndbuf: 123456, }, + // // Configure optional TCP link specific parameters + // tcp: { + // /// Optional configuration for TCP system buffers sizes for TCP links + // /// + // /// Configure TCP read buffer size (bytes) + // // so_rcvbuf: 123456, + // /// Configure TCP write buffer size (bytes) + // // so_sndbuf: 123456, + // } }, /// Shared memory configuration. /// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise diff --git a/README.md b/README.md index 8020c2d0b2..58eb84f803 100644 --- a/README.md +++ b/README.md @@ -151,7 +151,7 @@ Zenoh's router is built as `target/release/zenohd`. All the examples are built i * run the Zenoh router with permission to perform config changes via the admin space, and with a memory storage: ```sh - ./target/release/zenohd --adminspace-permissions=rw --cfg='plugins/storage_manager/storages/demo:{key_expr:"demo/example/**",volume:"memory"}' + ./target/release/zenohd --rest-http-port=8000 --adminspace-permissions=rw --cfg='plugins/storage_manager/storages/demo:{key_expr:"demo/example/**",volume:"memory"}' ``` * in another shell, get info of the zenoh router via the zenoh admin space (you may use `jq` for pretty json formatting): @@ -246,11 +246,11 @@ By default the Zenoh router is delivered or built with 2 plugins. These may be c > [!WARNING] > Since `v0.6`, `zenohd` no longer loads every available plugin at startup. Instead, only configured plugins are loaded (after processing `--cfg` and `--plugin` options). Once `zenohd` is running, plugins can be hot-loaded and, if they support it, reconfigured at runtime by editing their configuration through the adminspace. -Note that the REST plugin is added to the configuration by the default value of the `--rest-http-port` CLI argument. - **[REST plugin](https://zenoh.io/docs/manual/plugin-http/)** (exposing a REST API): This plugin converts GET and PUT REST requests into Zenoh gets and puts respectively. +Note that to activate the REST plugin on `zenohd` the CLI argument should be passed: `--rest-http-port=8000` (or any other port of your choice). + **[Storages plugin](https://zenoh.io/docs/manual/plugin-storage-manager/)** (managing [backends and storages](https://zenoh.io/docs/manual/plugin-storage-manager/#backends-and-volumes)) This plugin allows you to easily define storages. These will store key-value pairs they subscribed to, and send the most recent ones when queried. Check out [DEFAULT_CONFIG.json5](DEFAULT_CONFIG.json5) for info on how to configure them. diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index fc30abce9d..73b2b4869b 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -39,6 +39,8 @@ where more, sn, ext_qos, + ext_first, + ext_drop, } = x; // Header @@ -49,7 +51,10 @@ where if *more { header |= flag::M; } - if ext_qos != &ext::QoSType::DEFAULT { + let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8 + + ext_first.is_some() as u8 + + ext_drop.is_some() as u8; + if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; @@ -59,7 +64,16 @@ where // Extensions if ext_qos != &ext::QoSType::DEFAULT { - self.write(&mut *writer, (*ext_qos, false))?; + n_exts -= 1; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; + } + if let Some(first) = ext_first { + n_exts -= 1; + self.write(&mut *writer, (first, n_exts != 0))? + } + if let Some(drop) = ext_drop { + n_exts -= 1; + self.write(&mut *writer, (drop, n_exts != 0))? } Ok(()) @@ -99,6 +113,8 @@ where // Extensions let mut ext_qos = ext::QoSType::DEFAULT; + let mut ext_first = None; + let mut ext_drop = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -110,6 +126,16 @@ where ext_qos = q; has_ext = ext; } + ext::First::ID => { + let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?; + ext_first = Some(first); + has_ext = ext; + } + ext::Drop::ID => { + let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?; + ext_drop = Some(drop); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Fragment", ext)?; } @@ -121,6 +147,8 @@ where more, sn, ext_qos, + ext_first, + ext_drop, }) } } @@ -139,6 +167,8 @@ where sn, payload, ext_qos, + ext_first, + ext_drop, } = x; // Header @@ -147,6 +177,8 @@ where more: *more, sn: *sn, ext_qos: *ext_qos, + ext_first: *ext_first, + ext_drop: *ext_drop, }; self.write(&mut *writer, &header)?; @@ -185,6 +217,8 @@ where more: header.more, sn: header.sn, ext_qos: header.ext_qos, + ext_first: header.ext_first, + ext_drop: header.ext_drop, payload, }) } diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index dd47dc2c27..25b2b29f04 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -52,6 +52,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -64,7 +65,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -125,6 +127,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -186,6 +192,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -228,6 +235,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitSyn", ext)?; } @@ -248,6 +260,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } @@ -275,6 +288,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -287,7 +301,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -351,6 +366,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -415,6 +434,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -457,6 +477,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitAck", ext)?; } @@ -478,6 +503,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index 3f70d2ec8b..5504e5d03e 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -150,6 +150,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, } = x; // Header @@ -160,7 +161,9 @@ where if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST { header |= flag::S; } - let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; if n_exts != 0 { header |= flag::Z; } @@ -201,6 +204,10 @@ where n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -264,6 +271,7 @@ where // Extensions let mut ext_qos = None; let mut ext_shm = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -280,6 +288,11 @@ where ext_shm = Some(s); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Join", ext)?; } @@ -296,6 +309,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 3adae0fb72..973eac7e1a 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -176,3 +176,43 @@ where Ok((ext.into(), more)) } } + +// Extensions: Patch +impl WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output { + let (x, more) = x; + let ext: ZExtZ64<{ ID }> = x.into(); + + self.write(&mut *writer, (&ext, more)) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let header: u8 = self.read(&mut *reader)?; + let codec = Zenoh080Header::new(header); + codec.read(reader) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?; + Ok((ext.into(), more)) + } +} diff --git a/commons/zenoh-config/Cargo.toml b/commons/zenoh-config/Cargo.toml index 0a650d5d46..d7e24beae5 100644 --- a/commons/zenoh-config/Cargo.toml +++ b/commons/zenoh-config/Cargo.toml @@ -26,6 +26,7 @@ description = "Internal crate for zenoh." [features] internal = [] transport_tcp = [] +unstable = [] [dependencies] tracing = { workspace = true } @@ -36,6 +37,7 @@ serde_json = { workspace = true } serde_yaml = { workspace = true } validated_struct = { workspace = true, features = ["json5", "json_get"] } zenoh-core = { workspace = true } +zenoh-keyexpr = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-util = { workspace = true } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 51c851b3ab..8ab4d5d3a6 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -21,6 +21,7 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; +pub mod qos; pub mod wrappers; #[allow(unused_imports)] @@ -30,6 +31,7 @@ use std::{ }; use include::recursive_include; +use qos::PublisherQoSConfList; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -360,6 +362,14 @@ validated_struct::validator! { /// A list of key-expressions for which all included publishers will be aggregated into. publishers: Vec, }, + + /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) + pub qos: #[derive(Default)] + QoSConfig { + /// A list of QoS configurations for PUT and DELETE messages by key expressions + publication: PublisherQoSConfList, + }, + pub transport: #[derive(Default)] TransportConf { pub unicast: TransportUnicastConf { @@ -489,6 +499,10 @@ validated_struct::validator! { connect_certificate: Option, verify_name_on_connect: Option, close_link_on_expiration: Option, + /// Configure TCP write buffer size + pub so_sndbuf: Option, + /// Configure TCP read buffer size + pub so_rcvbuf: Option, // Skip serializing field because they contain secrets #[serde(skip_serializing)] root_ca_certificate_base64: Option, @@ -501,6 +515,13 @@ validated_struct::validator! { #[serde(skip_serializing)] connect_certificate_base64 : Option, }, + pub tcp: #[derive(Default)] + TcpConf { + /// Configure TCP write buffer size + pub so_sndbuf: Option, + /// Configure TCP read buffer size + pub so_rcvbuf: Option, + }, pub unixpipe: #[derive(Default)] UnixPipeConf { file_access_mask: Option diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs new file mode 100644 index 0000000000..526c02d175 --- /dev/null +++ b/commons/zenoh-config/src/qos.rs @@ -0,0 +1,119 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use serde::{Deserialize, Serialize}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; +use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability}; + +#[derive(Debug, Deserialize, Default, Serialize, Clone)] +pub struct PublisherQoSConfList(pub(crate) Vec); + +impl From for KeBoxTree { + fn from(value: PublisherQoSConfList) -> KeBoxTree { + let mut tree = KeBoxTree::new(); + for conf in value.0 { + for key_expr in conf.key_exprs { + // NOTE: we don't check key_expr unicity + tree.insert(&key_expr, conf.config.clone()); + } + } + tree + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub(crate) struct PublisherQoSConf { + pub key_exprs: Vec, + pub config: PublisherQoSConfig, +} + +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +pub struct PublisherQoSConfig { + pub congestion_control: Option, + pub priority: Option, + pub express: Option, + #[cfg(feature = "unstable")] + pub reliability: Option, + #[cfg(feature = "unstable")] + pub allowed_destination: Option, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum PublisherCongestionControlConf { + Drop, + Block, +} + +impl From for CongestionControl { + fn from(value: PublisherCongestionControlConf) -> Self { + match value { + PublisherCongestionControlConf::Drop => Self::Drop, + PublisherCongestionControlConf::Block => Self::Block, + } + } +} + +impl From for PublisherCongestionControlConf { + fn from(value: CongestionControl) -> Self { + match value { + CongestionControl::Drop => Self::Drop, + CongestionControl::Block => Self::Block, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherPriorityConf { + RealTime = 1, + InteractiveHigh = 2, + InteractiveLow = 3, + DataHigh = 4, + Data = 5, + DataLow = 6, + Background = 7, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherReliabilityConf { + BestEffort, + Reliable, +} + +impl From for Reliability { + fn from(value: PublisherReliabilityConf) -> Self { + match value { + PublisherReliabilityConf::BestEffort => Self::BestEffort, + PublisherReliabilityConf::Reliable => Self::Reliable, + } + } +} + +impl From for PublisherReliabilityConf { + fn from(value: Reliability) -> Self { + match value { + Reliability::BestEffort => Self::BestEffort, + Reliability::Reliable => Self::Reliable, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherLocalityConf { + SessionLocal, + Remote, + Any, +} diff --git a/commons/zenoh-protocol/src/transport/fragment.rs b/commons/zenoh-protocol/src/transport/fragment.rs index c82aefbd8a..ab63393ab7 100644 --- a/commons/zenoh-protocol/src/transport/fragment.rs +++ b/commons/zenoh-protocol/src/transport/fragment.rs @@ -75,14 +75,26 @@ pub struct Fragment { pub sn: TransportSn, pub payload: ZSlice, pub ext_qos: ext::QoSType, + pub ext_first: Option, + pub ext_drop: Option, } // Extensions pub mod ext { - use crate::{common::ZExtZ64, zextz64}; + use crate::{ + common::{ZExtUnit, ZExtZ64}, + zextunit, zextz64, + }; pub type QoS = zextz64!(0x1, true); pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>; + + /// # Start extension + /// Mark the first fragment of a fragmented message + pub type First = zextunit!(0x2, false); + /// # Stop extension + /// Indicate that the remaining fragments has been dropped + pub type Drop = zextunit!(0x3, false); } impl Fragment { @@ -97,6 +109,8 @@ impl Fragment { let sn: TransportSn = rng.gen(); let payload = ZSlice::rand(rng.gen_range(8..128)); let ext_qos = ext::QoSType::rand(); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); Fragment { reliability, @@ -104,6 +118,8 @@ impl Fragment { more, payload, ext_qos, + ext_first, + ext_drop, } } } @@ -115,6 +131,8 @@ pub struct FragmentHeader { pub more: bool, pub sn: TransportSn, pub ext_qos: ext::QoSType, + pub ext_first: Option, + pub ext_drop: Option, } impl FragmentHeader { @@ -128,12 +146,16 @@ impl FragmentHeader { let more = rng.gen_bool(0.5); let sn: TransportSn = rng.gen(); let ext_qos = ext::QoSType::rand(); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); FragmentHeader { reliability, more, sn, ext_qos, + ext_first, + ext_drop, } } } diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index b514e6b9b0..f86b162bfc 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -131,6 +131,7 @@ pub struct InitSyn { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } // Extensions @@ -165,6 +166,13 @@ pub mod ext { /// # Compression extension /// Used to negotiate the use of compression on the link pub type Compression = zextunit!(0x6, false); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation first/drop markers + pub type Patch = zextz64!(0x7, false); + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl InitSyn { @@ -189,6 +197,7 @@ impl InitSyn { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -204,6 +213,7 @@ impl InitSyn { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } @@ -234,6 +244,7 @@ pub struct InitAck { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } impl InitAck { @@ -263,6 +274,7 @@ impl InitAck { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -279,6 +291,7 @@ impl InitAck { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index 26ed290c04..afbe55f3ce 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -102,6 +102,7 @@ pub struct Join { pub next_sn: PrioritySn, pub ext_qos: Option, pub ext_shm: Option, + pub ext_patch: ext::PatchType, } pub mod flag { @@ -115,7 +116,10 @@ pub mod ext { use alloc::boxed::Box; use super::{Priority, PrioritySn}; - use crate::{common::ZExtZBuf, zextzbuf}; + use crate::{ + common::{ZExtZ64, ZExtZBuf}, + zextz64, zextzbuf, + }; /// # QoS extension /// Used to announce next sn when QoS is enabled @@ -125,6 +129,13 @@ pub mod ext { /// # Shm extension /// Used to advertise shared memory capabilities pub type Shm = zextzbuf!(0x2, true); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation first/drop markers + pub type Patch = zextz64!(0x7, false); // use the same id as Init + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl Join { @@ -151,6 +162,7 @@ impl Join { .gen_bool(0.5) .then_some(Box::new([PrioritySn::rand(); Priority::NUM])); let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -162,6 +174,7 @@ impl Join { next_sn, ext_qos, ext_shm, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index ba2ac32c4a..d534fef04a 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -311,4 +311,42 @@ pub mod ext { ZExtZ64::new(ext.inner as u64) } } + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct PatchType(u8); + + impl PatchType { + pub const NONE: Self = Self(0); + pub const CURRENT: Self = Self(1); + + pub fn new(int: u8) -> Self { + Self(int) + } + + pub fn raw(self) -> u8 { + self.0 + } + + pub fn has_fragmentation_markers(&self) -> bool { + self.0 >= 1 + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + Self(rand::thread_rng().gen()) + } + } + + impl From> for PatchType { + fn from(ext: ZExtZ64) -> Self { + Self(ext.value as u8) + } + } + + impl From> for ZExtZ64 { + fn from(ext: PatchType) -> Self { + ZExtZ64::new(ext.0 as u64) + } + } } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index fae26cd02d..6165a36a31 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -21,6 +21,7 @@ extern crate alloc; mod listener; mod multicast; +pub mod tcp; #[cfg(feature = "tls")] pub mod tls; mod unicast; @@ -44,6 +45,8 @@ use zenoh_result::ZResult; /*************************************/ pub const BIND_INTERFACE: &str = "iface"; +pub const TCP_SO_SND_BUF: &str = "so_sndbuf"; +pub const TCP_SO_RCV_BUF: &str = "so_rcvbuf"; #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { diff --git a/io/zenoh-link-commons/src/tcp.rs b/io/zenoh-link-commons/src/tcp.rs new file mode 100644 index 0000000000..db7da4d562 --- /dev/null +++ b/io/zenoh-link-commons/src/tcp.rs @@ -0,0 +1,100 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::net::SocketAddr; + +use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use zenoh_result::{zerror, ZResult}; + +pub struct TcpSocketConfig<'a> { + tx_buffer_size: Option, + rx_buffer_size: Option, + iface: Option<&'a str>, +} + +impl<'a> TcpSocketConfig<'a> { + pub fn new( + tx_buffer_size: Option, + rx_buffer_size: Option, + iface: Option<&'a str>, + ) -> Self { + Self { + tx_buffer_size, + rx_buffer_size, + iface, + } + } + + /// Build a new TCPListener bound to `addr` with the given configuration parameters + pub fn new_listener(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> { + let socket = self.socket_with_config(addr)?; + // Build a TcpListener from TcpSocket + // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html + socket.set_reuseaddr(true)?; + socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?; + // backlog (the maximum number of pending connections are queued): 1024 + let listener = socket + .listen(1024) + .map_err(|e| zerror!("{}: {}", addr, e))?; + + let local_addr = listener + .local_addr() + .map_err(|e| zerror!("{}: {}", addr, e))?; + + Ok((listener, local_addr)) + } + + /// Connect to a TCP socket address at `dst_addr` with the given configuration parameters + pub async fn new_link( + &self, + dst_addr: &SocketAddr, + ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { + let socket = self.socket_with_config(dst_addr)?; + // Build a TcpStream from TcpSocket + // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html + let stream = socket + .connect(*dst_addr) + .await + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + let src_addr = stream + .local_addr() + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + let dst_addr = stream + .peer_addr() + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + Ok((stream, src_addr, dst_addr)) + } + + /// Creates a TcpSocket with the provided config + fn socket_with_config(&self, addr: &SocketAddr) -> ZResult { + let socket = match addr { + SocketAddr::V4(_) => TcpSocket::new_v4(), + SocketAddr::V6(_) => TcpSocket::new_v6(), + }?; + + if let Some(iface) = self.iface { + zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + } + if let Some(size) = self.tx_buffer_size { + socket.set_send_buffer_size(size)?; + } + if let Some(size) = self.rx_buffer_size { + socket.set_recv_buffer_size(size)?; + } + + Ok(socket) + } +} diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index b092aaf9d6..d621182b5b 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -34,7 +34,9 @@ use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL #[cfg(feature = "transport_tcp")] pub use zenoh_link_tcp as tcp; #[cfg(feature = "transport_tcp")] -use zenoh_link_tcp::{LinkManagerUnicastTcp, TcpLocatorInspector, TCP_LOCATOR_PREFIX}; +use zenoh_link_tcp::{ + LinkManagerUnicastTcp, TcpConfigurator, TcpLocatorInspector, TCP_LOCATOR_PREFIX, +}; #[cfg(feature = "transport_tls")] pub use zenoh_link_tls as tls; #[cfg(feature = "transport_tls")] @@ -172,6 +174,8 @@ impl LocatorInspector { } #[derive(Default)] pub struct LinkConfigurator { + #[cfg(feature = "transport_tcp")] + tcp_inspector: TcpConfigurator, #[cfg(feature = "transport_quic")] quic_inspector: QuicConfigurator, #[cfg(feature = "transport_tls")] @@ -199,6 +203,13 @@ impl LinkConfigurator { errors.insert(proto, e); } }; + #[cfg(feature = "transport_tcp")] + { + insert_config( + TCP_LOCATOR_PREFIX.into(), + self.tcp_inspector.inspect_config(config), + ); + } #[cfg(feature = "transport_quic")] { insert_config( diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 8a631bdfbc..c6bce930ea 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -30,8 +30,8 @@ socket2 = { workspace = true } tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } tokio-util = { workspace = true, features = ["rt"] } tracing = {workspace = true} +zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } -zenoh-util = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/lib.rs b/io/zenoh-links/zenoh-link-tcp/src/lib.rs index 0654943f4f..bd642ece83 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/lib.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/lib.rs @@ -29,7 +29,9 @@ use zenoh_protocol::{ use zenoh_result::{zerror, ZResult}; mod unicast; +mod utils; pub use unicast::*; +pub use utils::TcpConfigurator; // Default MTU (TCP PDU) in bytes. // NOTE: Since TCP is a byte-stream oriented transport, theoretically it has diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index e3eb9d9796..d61bf515dd 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -16,12 +16,12 @@ use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, t use async_trait::async_trait; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - net::{TcpListener, TcpSocket, TcpStream}, + net::{TcpListener, TcpStream}, }; use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ - get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, tcp::TcpSocketConfig, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -29,9 +29,9 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; -use super::{ - get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT, - TCP_LOCATOR_PREFIX, +use crate::{ + get_tcp_addrs, utils::TcpLinkConfig, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, + TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX, }; pub struct LinkUnicastTcp { @@ -241,80 +241,22 @@ impl LinkManagerUnicastTcp { } } -impl LinkManagerUnicastTcp { - async fn new_link_inner( - &self, - dst_addr: &SocketAddr, - iface: Option<&str>, - ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { - let socket = match dst_addr { - SocketAddr::V4(_) => TcpSocket::new_v4(), - SocketAddr::V6(_) => TcpSocket::new_v6(), - }?; - - if let Some(iface) = iface { - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; - } - - // Build a TcpStream from TcpSocket - // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html - let stream = socket - .connect(*dst_addr) - .await - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - let src_addr = stream - .local_addr() - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - let dst_addr = stream - .peer_addr() - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - Ok((stream, src_addr, dst_addr)) - } - - async fn new_listener_inner( - &self, - addr: &SocketAddr, - iface: Option<&str>, - ) -> ZResult<(TcpListener, SocketAddr)> { - let socket = match addr { - SocketAddr::V4(_) => TcpSocket::new_v4(), - SocketAddr::V6(_) => TcpSocket::new_v6(), - }?; - - if let Some(iface) = iface { - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; - } - - // Build a TcpListener from TcpSocket - // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html - socket.set_reuseaddr(true)?; - socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?; - // backlog (the maximum number of pending connections are queued): 1024 - let listener = socket - .listen(1024) - .map_err(|e| zerror!("{}: {}", addr, e))?; - - let local_addr = listener - .local_addr() - .map_err(|e| zerror!("{}: {}", addr, e))?; - - Ok((listener, local_addr)) - } -} - #[async_trait] impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + + let link_config = TcpLinkConfig::new(&config)?; + let socket_config = TcpSocketConfig::new( + link_config.tx_buffer_size, + link_config.rx_buffer_size, + link_config.bind_iface, + ); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da, iface).await { + match socket_config.new_link(&da).await { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -339,11 +281,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + + let link_config = TcpLinkConfig::new(&config)?; + let socket_config: TcpSocketConfig<'_> = link_config.into(); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da, iface).await { + match socket_config.new_listener(&da) { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( diff --git a/io/zenoh-links/zenoh-link-tcp/src/utils.rs b/io/zenoh-links/zenoh-link-tcp/src/utils.rs new file mode 100644 index 0000000000..6772bc8f9a --- /dev/null +++ b/io/zenoh-links/zenoh-link-tcp/src/utils.rs @@ -0,0 +1,79 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh_config::Config as ZenohConfig; +use zenoh_link_commons::{ + tcp::TcpSocketConfig, ConfigurationInspector, BIND_INTERFACE, TCP_SO_RCV_BUF, TCP_SO_SND_BUF, +}; +use zenoh_protocol::core::{parameters, Config}; +use zenoh_result::{zerror, ZResult}; + +#[derive(Default, Clone, Copy, Debug)] +pub struct TcpConfigurator; + +impl ConfigurationInspector for TcpConfigurator { + fn inspect_config(&self, config: &ZenohConfig) -> ZResult { + let mut ps: Vec<(&str, &str)> = vec![]; + let c = config.transport().link().tcp(); + + let rx_buffer_size; + if let Some(size) = c.so_rcvbuf() { + rx_buffer_size = size.to_string(); + ps.push((TCP_SO_RCV_BUF, &rx_buffer_size)); + } + let tx_buffer_size; + if let Some(size) = c.so_sndbuf() { + tx_buffer_size = size.to_string(); + ps.push((TCP_SO_SND_BUF, &tx_buffer_size)); + } + + Ok(parameters::from_iter(ps.drain(..))) + } +} + +pub(crate) struct TcpLinkConfig<'a> { + pub(crate) rx_buffer_size: Option, + pub(crate) tx_buffer_size: Option, + pub(crate) bind_iface: Option<&'a str>, +} + +impl<'a> TcpLinkConfig<'a> { + pub(crate) fn new(config: &'a Config) -> ZResult { + let mut tcp_config = Self { + rx_buffer_size: None, + tx_buffer_size: None, + bind_iface: config.get(BIND_INTERFACE), + }; + + if let Some(size) = config.get(TCP_SO_RCV_BUF) { + tcp_config.rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + if let Some(size) = config.get(TCP_SO_SND_BUF) { + tcp_config.tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + + Ok(tcp_config) + } +} + +impl<'a> From> for TcpSocketConfig<'a> { + fn from(value: TcpLinkConfig<'a>) -> Self { + Self::new(value.tx_buffer_size, value.rx_buffer_size, value.bind_iface) + } +} diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 046288800e..62250d354a 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -324,29 +324,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let connector = TlsConnector::from(config); // Initialize the TcpStream - let tcp_stream = TcpStream::connect(addr).await.map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; - - let src_addr = tcp_stream.local_addr().map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; - - let dst_addr = tcp_stream.peer_addr().map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; + let (tcp_stream, src_addr, dst_addr) = client_config + .tcp_socket_config + .new_link(&addr) + .await + .map_err(|e| { + zerror!( + "Can not create a new TLS link bound to {:?}: {}", + server_name, + e + ) + })?; // Initialize the TlsStream let tls_stream = connector @@ -404,13 +392,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { .map_err(|e| zerror!("Cannot create a new TLS listener on {addr}. {e}"))?; // Initialize the TcpListener - let socket = TcpListener::bind(addr) - .await + let (socket, local_addr) = tls_server_config + .tcp_socket_config + .new_listener(&addr) .map_err(|e| zerror!("Can not create a new TLS listener on {}: {}", addr, e))?; - let local_addr = socket - .local_addr() - .map_err(|e| zerror!("Can not create a new TLS listener on {}: {}", addr, e))?; let local_port = local_addr.port(); // Initialize the TlsAcceptor diff --git a/io/zenoh-links/zenoh-link-tls/src/utils.rs b/io/zenoh-links/zenoh-link-tls/src/utils.rs index 74e7cc9e51..32a5f929a7 100644 --- a/io/zenoh-links/zenoh-link-tls/src/utils.rs +++ b/io/zenoh-links/zenoh-link-tls/src/utils.rs @@ -31,7 +31,10 @@ use rustls_pki_types::ServerName; use secrecy::ExposeSecret; use webpki::anchor_from_trusted_cert; use zenoh_config::Config as ZenohConfig; -use zenoh_link_commons::{tls::WebPkiVerifierAnyServerName, ConfigurationInspector}; +use zenoh_link_commons::{ + tcp::TcpSocketConfig, tls::WebPkiVerifierAnyServerName, ConfigurationInspector, BIND_INTERFACE, + TCP_SO_RCV_BUF, TCP_SO_SND_BUF, +}; use zenoh_protocol::core::{ endpoint::{Address, Config}, parameters, @@ -150,18 +153,31 @@ impl ConfigurationInspector for TlsConfigurator { false => ps.push((TLS_CLOSE_LINK_ON_EXPIRATION, "false")), } + let rx_buffer_size; + if let Some(size) = c.so_rcvbuf() { + rx_buffer_size = size.to_string(); + ps.push((TCP_SO_RCV_BUF, &rx_buffer_size)); + } + + let tx_buffer_size; + if let Some(size) = c.so_sndbuf() { + tx_buffer_size = size.to_string(); + ps.push((TCP_SO_SND_BUF, &tx_buffer_size)); + } + Ok(parameters::from_iter(ps.drain(..))) } } -pub(crate) struct TlsServerConfig { +pub(crate) struct TlsServerConfig<'a> { pub(crate) server_config: ServerConfig, pub(crate) tls_handshake_timeout: Duration, pub(crate) tls_close_link_on_expiration: bool, + pub(crate) tcp_socket_config: TcpSocketConfig<'a>, } -impl TlsServerConfig { - pub async fn new(config: &Config<'_>) -> ZResult { +impl<'a> TlsServerConfig<'a> { + pub async fn new(config: &'a Config<'_>) -> ZResult { let tls_server_client_auth: bool = match config.get(TLS_ENABLE_MTLS) { Some(s) => s .parse() @@ -241,10 +257,30 @@ impl TlsServerConfig { .unwrap_or(config::TLS_HANDSHAKE_TIMEOUT_MS_DEFAULT), ); + let mut tcp_rx_buffer_size = None; + if let Some(size) = config.get(TCP_SO_RCV_BUF) { + tcp_rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + let mut tcp_tx_buffer_size = None; + if let Some(size) = config.get(TCP_SO_SND_BUF) { + tcp_tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + Ok(TlsServerConfig { server_config: sc, tls_handshake_timeout, tls_close_link_on_expiration, + tcp_socket_config: TcpSocketConfig::new( + tcp_tx_buffer_size, + tcp_rx_buffer_size, + config.get(BIND_INTERFACE), + ), }) } @@ -269,13 +305,14 @@ impl TlsServerConfig { } } -pub(crate) struct TlsClientConfig { +pub(crate) struct TlsClientConfig<'a> { pub(crate) client_config: ClientConfig, pub(crate) tls_close_link_on_expiration: bool, + pub(crate) tcp_socket_config: TcpSocketConfig<'a>, } -impl TlsClientConfig { - pub async fn new(config: &Config<'_>) -> ZResult { +impl<'a> TlsClientConfig<'a> { + pub async fn new(config: &'a Config<'_>) -> ZResult { let tls_client_server_auth: bool = match config.get(TLS_ENABLE_MTLS) { Some(s) => s .parse() @@ -386,9 +423,30 @@ impl TlsClientConfig { .with_no_client_auth() } }; + + let mut tcp_rx_buffer_size = None; + if let Some(size) = config.get(TCP_SO_RCV_BUF) { + tcp_rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + let mut tcp_tx_buffer_size = None; + if let Some(size) = config.get(TCP_SO_SND_BUF) { + tcp_tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + Ok(TlsClientConfig { client_config: cc, tls_close_link_on_expiration, + tcp_socket_config: TcpSocketConfig::new( + tcp_tx_buffer_size, + tcp_rx_buffer_size, + config.get(BIND_INTERFACE), + ), }) } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 2c7316c7cb..65150f728a 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -201,6 +201,9 @@ pub struct WBatch { // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, + // an ephemeral batch will not be recycled in the pipeline + // it can be used to push a stop fragment when no batch are available + pub ephemeral: bool, } impl WBatch { @@ -209,6 +212,7 @@ impl WBatch { buffer: BBuf::with_capacity(config.mtu as usize), codec: Zenoh080Batch::new(), config, + ephemeral: false, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -219,6 +223,17 @@ impl WBatch { batch } + pub fn new_ephemeral(config: BatchConfig) -> Self { + Self { + ephemeral: true, + ..Self::new(config) + } + } + + pub fn is_ephemeral(&self) -> bool { + self.ephemeral + } + /// Verify that the [`WBatch`] has no serialized bytes. #[inline(always)] pub fn is_empty(&self) -> bool { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 0dbded9209..256b4e760a 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -14,7 +14,7 @@ use std::{ ops::Add, sync::{ - atomic::{AtomicBool, AtomicU32, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, @@ -34,6 +34,7 @@ use zenoh_protocol::{ core::Priority, network::NetworkMessage, transport::{ + fragment, fragment::FragmentHeader, frame::{self, FrameHeader}, AtomicBatchSize, BatchSize, TransportMessage, @@ -232,6 +233,8 @@ struct StageIn { mutex: StageInMutex, fragbuf: ZBuf, batching: bool, + // used for stop fragment + batch_config: BatchConfig, } impl StageIn { @@ -352,19 +355,32 @@ impl StageIn { more: true, sn, ext_qos: frame.ext_qos, + ext_first: Some(fragment::ext::First::new()), + ext_drop: None, }; let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch - // If deadline is reached, sequence number is incremented with `SeqNumGenerator::get` - // in order to break the fragment chain already sent. - batch = zgetbatch_rets!(let _ = tch.sn.get()); + batch = zgetbatch_rets!({ + // If no fragment has been sent, the sequence number is just reset + if fragment.ext_first.is_some() { + tch.sn.set(sn).unwrap() + // Otherwise, an ephemeral batch is created to send the stop fragment + } else { + let mut batch = WBatch::new_ephemeral(self.batch_config); + self.fragbuf.clear(); + fragment.ext_drop = Some(fragment::ext::Drop::new()); + let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); + self.s_out.move_batch(batch); + } + }); // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) { Ok(_) => { // Update the SN fragment.sn = tch.sn.get(); + fragment.ext_first = None; // Move the serialization batch into the OUT pipeline self.s_out.move_batch(batch); } @@ -675,6 +691,7 @@ impl TransmissionPipeline { }, fragbuf: ZBuf::empty(), batching: config.batching_enabled, + batch_config: config.batch, })); // The stage out for this priority @@ -688,28 +705,62 @@ impl TransmissionPipeline { }); } - let active = Arc::new(AtomicBool::new(true)); + let active = Arc::new(TransmissionPipelineStatus { + disabled: AtomicBool::new(false), + congested: AtomicU8::new(0), + }); let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), - active: active.clone(), + status: active.clone(), wait_before_drop: config.wait_before_drop, wait_before_close: config.wait_before_close, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), n_out_r, - active, + status: active, }; (producer, consumer) } } +struct TransmissionPipelineStatus { + // The whole pipeline is enabled or disabled + disabled: AtomicBool, + // Bitflags to indicate the given priority queue is congested + congested: AtomicU8, +} + +impl TransmissionPipelineStatus { + fn set_disabled(&self, status: bool) { + self.disabled.store(status, Ordering::Relaxed); + } + + fn is_disabled(&self) -> bool { + self.disabled.load(Ordering::Relaxed) + } + + fn set_congested(&self, priority: Priority, status: bool) { + let prioflag = 1 << priority as u8; + if status { + self.congested.fetch_or(prioflag, Ordering::Relaxed); + } else { + self.congested.fetch_and(!prioflag, Ordering::Relaxed); + } + } + + fn is_congested(&self, priority: Priority) -> bool { + let prioflag = 1 << priority as u8; + self.congested.load(Ordering::Relaxed) & prioflag != 0 + } +} + #[derive(Clone)] pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, - active: Arc, + status: Arc, wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -724,8 +775,13 @@ impl TransmissionPipelineProducer { } else { (0, Priority::DEFAULT) }; + // If message is droppable, compute a deadline after which the sample could be dropped let (wait_time, max_wait_time) = if msg.is_droppable() { + // Checked if we are blocked on the priority queue and we drop directly the message + if self.status.is_congested(priority) { + return false; + } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { (self.wait_before_close, None) @@ -733,7 +789,11 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority, &mut deadline) + let sent = queue.push_network_message(&mut msg, priority, &mut deadline); + if !sent { + self.status.set_congested(priority, true); + } + sent } #[inline] @@ -750,7 +810,7 @@ impl TransmissionPipelineProducer { } pub(crate) fn disable(&self) { - self.active.store(false, Ordering::Relaxed); + self.status.set_disabled(true); // Acquire all the locks, in_guard first, out_guard later // Use the same locking order as in drain to avoid deadlocks @@ -768,17 +828,18 @@ pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, n_out_r: Waiter, - active: Arc, + status: Arc, } impl TransmissionPipelineConsumer { - pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - while self.active.load(Ordering::Relaxed) { + pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> { + while !self.status.is_disabled() { let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum for (prio, queue) in self.stage_out.iter_mut().enumerate() { match queue.try_pull() { Pull::Some(batch) => { + let prio = Priority::try_from(prio as u8).unwrap(); return Some((batch, prio)); } Pull::Backoff(deadline) => { @@ -818,8 +879,11 @@ impl TransmissionPipelineConsumer { None } - pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { - self.stage_out[priority].refill(batch); + pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { + if !batch.is_ephemeral() { + self.stage_out[priority as usize].refill(batch); + self.status.set_congested(priority, false); + } } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index c4c23290ee..a4badd2e41 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -24,7 +24,9 @@ use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn}, + transport::{ + join::ext::PatchType, BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn, + }, }; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -431,10 +433,10 @@ async fn tx_task( link.send_batch(&mut batch).await?; // Keep track of next SNs if let Some(sn) = batch.codec.latest_sn.reliable { - last_sns[priority].reliable = sn; + last_sns[priority as usize].reliable = sn; } if let Some(sn) = batch.codec.latest_sn.best_effort { - last_sns[priority].best_effort = sn; + last_sns[priority as usize].best_effort = sn; } #[cfg(feature = "stats")] { @@ -495,6 +497,7 @@ async fn tx_task( next_sn, ext_qos, ext_shm: None, + ext_patch: PatchType::CURRENT } .into(); diff --git a/io/zenoh-transport/src/multicast/manager.rs b/io/zenoh-transport/src/multicast/manager.rs index e2899b1d1a..8046d5bd58 100644 --- a/io/zenoh-transport/src/multicast/manager.rs +++ b/io/zenoh-transport/src/multicast/manager.rs @@ -256,9 +256,15 @@ impl TransportManager { .await?; // Fill and merge the endpoint configuration if let Some(config) = self.config.endpoints.get(endpoint.protocol().as_str()) { - endpoint - .config_mut() - .extend_from_iter(parameters::iter(config))?; + let mut config = parameters::Parameters::from(config.as_str()); + // Overwrite config with current endpoint parameters + config.extend_from_iter(endpoint.config().iter()); + endpoint = EndPoint::new( + endpoint.protocol(), + endpoint.address(), + endpoint.metadata(), + config.as_str(), + )?; } // Open the link diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 8562d5b3eb..dc501742ec 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,7 +166,7 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -183,6 +183,8 @@ impl TransportMulticastInner { more, sn, ext_qos, + ext_first, + ext_drop, payload, } = fragment; @@ -205,10 +207,25 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } + if peer.patch.has_fragmentation_markers() { + if ext_first.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); + return Ok(()); + } + if ext_drop.is_some() { + guard.defrag.clear(); + return Ok(()); + } + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } @@ -236,14 +253,16 @@ impl TransportMulticastInner { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.precedes(sn)?; if !precedes { tracing::debug!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.manager.config.zid, + message_type, sn, guard.sn.next() ); diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 3777978a3c..b14bed85ee 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + cmp::min, collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, @@ -25,7 +26,7 @@ use zenoh_core::{zcondfeat, zread, zwrite}; use zenoh_link::{Link, Locator}; use zenoh_protocol::{ core::{Bits, Field, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{batch_size, close, Close, Join, TransportMessage}, + transport::{batch_size, close, join::ext::PatchType, Close, Join, TransportMessage}, }; use zenoh_result::{bail, ZResult}; use zenoh_task::TaskController; @@ -61,6 +62,7 @@ pub(super) struct TransportMulticastPeer { token: CancellationToken, pub(super) priority_rx: Box<[TransportPriorityRx]>, pub(super) handler: Arc, + pub(super) patch: PatchType, } impl TransportMulticastPeer { @@ -415,6 +417,7 @@ impl TransportMulticastInner { token, priority_rx, handler, + patch: min(PatchType::CURRENT, join.ext_patch), }; zwrite!(self.peers).insert(locator.clone(), peer); diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index db2310c32f..386b9f832c 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -61,6 +61,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateAccept, ext_lowlatency: ext::lowlatency::StateAccept, + ext_patch: ext::patch::StateAccept, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -143,6 +144,7 @@ struct AcceptLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -268,6 +270,12 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_syn((&mut state.transport.ext_patch, init_syn.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitSynOut { other_zid: init_syn.zid, other_whatami: init_syn.whatami, @@ -330,7 +338,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Extension MultiLink + // Extension Compression let ext_compression = zcondfeat!( "transport_compression", self.ext_compression @@ -340,6 +348,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_ack(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Create the cookie let cookie_nonce: u64 = zasynclock!(self.prng).gen(); let cookie = Cookie { @@ -358,6 +373,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_lowlatency: state.transport.ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression: state.link.ext_compression, + ext_patch: state.transport.ext_patch, }; let mut encrypted = vec![]; @@ -391,6 +407,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -491,6 +508,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { #[cfg(feature = "shared-memory")] ext_shm: cookie.ext_shm, ext_lowlatency: cookie.ext_lowlatency, + ext_patch: cookie.ext_patch, }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -681,6 +699,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Init handshake @@ -719,6 +738,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::StateAccept::new( manager.config.unicast.is_lowlatency, ), + ext_patch: ext::patch::StateAccept::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -786,6 +806,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: osyn_out.other_auth_id, + patch: state.transport.ext_patch.get(), }; let a_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 4220f8e08b..58f56fa92a 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -44,6 +44,7 @@ pub(crate) struct Cookie { pub(crate) ext_lowlatency: ext::lowlatency::StateAccept, #[cfg(feature = "transport_compression")] pub(crate) ext_compression: ext::compression::StateAccept, + pub(crate) ext_patch: ext::patch::StateAccept, } impl WCodec<&Cookie, &mut W> for Zenoh080 @@ -70,6 +71,7 @@ where self.write(&mut *writer, &x.ext_lowlatency)?; #[cfg(feature = "transport_compression")] self.write(&mut *writer, &x.ext_compression)?; + self.write(&mut *writer, &x.ext_patch)?; Ok(()) } @@ -100,6 +102,7 @@ where let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "transport_compression")] let ext_compression: ext::compression::StateAccept = self.read(&mut *reader)?; + let ext_patch: ext::patch::StateAccept = self.read(&mut *reader)?; let cookie = Cookie { zid, @@ -117,6 +120,7 @@ where ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression, + ext_patch, }; Ok(cookie) @@ -188,6 +192,7 @@ impl Cookie { ext_lowlatency: ext::lowlatency::StateAccept::rand(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::StateAccept::rand(), + ext_patch: ext::patch::StateAccept::rand(), } } } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs index f4aafa832c..269979b84e 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod compression; pub(crate) mod lowlatency; #[cfg(feature = "transport_multilink")] pub(crate) mod multilink; +pub(crate) mod patch; pub(crate) mod qos; #[cfg(feature = "shared-memory")] pub(crate) mod shm; diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs new file mode 100644 index 0000000000..16509df209 --- /dev/null +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -0,0 +1,203 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{cmp::min, marker::PhantomData}; + +use async_trait::async_trait; +use zenoh_buffers::{ + reader::{DidntRead, Reader}, + writer::{DidntWrite, Writer}, +}; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_protocol::transport::init::ext::PatchType; +use zenoh_result::{bail, Error as ZError}; + +use crate::unicast::establishment::{AcceptFsm, OpenFsm}; + +// Extension Fsm +pub(crate) struct PatchFsm<'a> { + _a: PhantomData<&'a ()>, +} + +impl PatchFsm<'_> { + pub(crate) const fn new() -> Self { + Self { _a: PhantomData } + } +} + +/*************************************/ +/* OPEN */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen { + patch: PatchType, +} + +impl StateOpen { + pub(crate) const fn new() -> Self { + Self { + patch: PatchType::NONE, + } + } + + pub(crate) const fn get(&self) -> PatchType { + self.patch + } +} + +#[async_trait] +impl<'a> OpenFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type SendInitSynIn = &'a StateOpen; + type SendInitSynOut = PatchType; + async fn send_init_syn( + self, + _state: Self::SendInitSynIn, + ) -> Result { + Ok(PatchType::CURRENT) + } + + type RecvInitAckIn = (&'a mut StateOpen, PatchType); + type RecvInitAckOut = (); + async fn recv_init_ack( + self, + input: Self::RecvInitAckIn, + ) -> Result { + let (state, other_ext) = input; + if other_ext > PatchType::CURRENT { + bail!( + "Acceptor patch should be lesser or equal to {current:?}, found {other:?}", + current = PatchType::CURRENT.raw(), + other = other_ext.raw(), + ); + } + state.patch = other_ext; + Ok(()) + } + + type SendOpenSynIn = &'a StateOpen; + type SendOpenSynOut = (); + async fn send_open_syn( + self, + _state: Self::SendOpenSynIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } + + type RecvOpenAckIn = (&'a mut StateOpen, ()); + type RecvOpenAckOut = (); + async fn recv_open_ack( + self, + _state: Self::RecvOpenAckIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } +} + +/*************************************/ +/* ACCEPT */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept { + patch: PatchType, +} + +impl StateAccept { + pub(crate) const fn new() -> Self { + Self { + patch: PatchType::NONE, + } + } + + pub(crate) const fn get(&self) -> PatchType { + self.patch + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + Self { + patch: PatchType::rand(), + } + } +} + +// Codec +impl WCodec<&StateAccept, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { + let raw = x.patch.raw(); + self.write(&mut *writer, raw)?; + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let raw: u8 = self.read(&mut *reader)?; + let patch = PatchType::new(raw); + Ok(StateAccept { patch }) + } +} + +#[async_trait] +impl<'a> AcceptFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type RecvInitSynIn = (&'a mut StateAccept, PatchType); + type RecvInitSynOut = (); + async fn recv_init_syn( + self, + input: Self::RecvInitSynIn, + ) -> Result { + let (state, other_ext) = input; + state.patch = other_ext; + Ok(()) + } + + type SendInitAckIn = &'a StateAccept; + type SendInitAckOut = PatchType; + async fn send_init_ack( + self, + state: Self::SendInitAckIn, + ) -> Result { + Ok(min(PatchType::CURRENT, state.patch)) + } + + type RecvOpenSynIn = (&'a mut StateAccept, ()); + type RecvOpenSynOut = (); + async fn recv_open_syn( + self, + _state: Self::RecvOpenSynIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } + + type SendOpenAckIn = &'a StateAccept; + type SendOpenAckOut = (); + async fn send_open_ack( + self, + _state: Self::SendOpenAckIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index f3ce60b354..a4283e3643 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -58,6 +58,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen, ext_lowlatency: ext::lowlatency::StateOpen, + ext_patch: ext::patch::StateOpen, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -124,6 +125,7 @@ struct OpenLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -192,6 +194,13 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_syn(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let msg: TransportMessage = InitSyn { version: input.mine_version, whatami: input.mine_whatami, @@ -206,6 +215,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -347,6 +357,12 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_ack((&mut state.transport.ext_patch, init_ack.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitAckOut { other_zid: init_ack.zid, other_whatami: init_ack.whatami, @@ -575,6 +591,7 @@ pub(crate) async fn open_link( ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Clippy raises a warning because `batch_size::UNICAST` is currently equal to `BatchSize::MAX`. @@ -599,8 +616,8 @@ pub(crate) async fn open_link( .open(manager.config.unicast.max_links > 1), #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen::new(), - ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), + ext_patch: ext::patch::StateOpen::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -669,6 +686,7 @@ pub(crate) async fn open_link( is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: UsrPwdId(None), + patch: state.transport.ext_patch.get(), }; let o_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 5511451b3d..7497604757 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -385,9 +385,15 @@ impl TransportManager { .await?; // Fill and merge the endpoint configuration if let Some(config) = self.config.endpoints.get(endpoint.protocol().as_str()) { - endpoint - .config_mut() - .extend_from_iter(parameters::iter(config))?; + let mut config = parameters::Parameters::from(config.as_str()); + // Overwrite config with current endpoint parameters + config.extend_from_iter(endpoint.config().iter()); + endpoint = EndPoint::new( + endpoint.protocol(), + endpoint.address(), + endpoint.metadata(), + config.as_str(), + )?; }; manager.new_listener(endpoint).await } @@ -705,9 +711,15 @@ impl TransportManager { .await?; // Fill and merge the endpoint configuration if let Some(config) = self.config.endpoints.get(endpoint.protocol().as_str()) { - endpoint - .config_mut() - .extend_from_iter(parameters::iter(config))?; + let mut config = parameters::Parameters::from(config.as_str()); + // Overwrite config with current endpoint parameters + config.extend_from_iter(endpoint.config().iter()); + endpoint = EndPoint::new( + endpoint.protocol(), + endpoint.address(), + endpoint.metadata(), + config.as_str(), + )?; }; // Create a new link associated by calling the Link Manager diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 4539135fe9..ba0fd06d1e 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -34,7 +34,7 @@ use zenoh_link::Link; use zenoh_protocol::{ core::{Bits, WhatAmI, ZenohIdProto}, network::NetworkMessage, - transport::{close, TransportSn}, + transport::{close, init::ext::PatchType, TransportSn}, }; use zenoh_result::{zerror, ZResult}; @@ -63,6 +63,7 @@ pub(crate) struct TransportConfigUnicast { pub(crate) is_lowlatency: bool, #[cfg(feature = "auth_usrpwd")] pub(crate) auth_id: UsrPwdId, + pub(crate) patch: PatchType, } /// [`TransportUnicast`] is the transport handler returned diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index e69a305876..3ac8a92ed4 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,7 +97,7 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -123,6 +123,8 @@ impl TransportUnicastUniversal { more, sn, ext_qos: qos, + ext_first, + ext_drop, payload, } = fragment; @@ -143,10 +145,25 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } + if self.config.patch.has_fragmentation_markers() { + if ext_first.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); + return Ok(()); + } + if ext_drop.is_some() { + guard.defrag.clear(); + return Ok(()); + } + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } @@ -178,14 +195,16 @@ impl TransportUnicastUniversal { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.roll(sn)?; if !precedes { tracing::trace!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.config.zid, + message_type, sn, guard.sn.next() ); diff --git a/io/zenoh-transport/tests/unicast_fragmentation.rs b/io/zenoh-transport/tests/unicast_fragmentation.rs new file mode 100644 index 0000000000..f45e65702a --- /dev/null +++ b/io/zenoh-transport/tests/unicast_fragmentation.rs @@ -0,0 +1,349 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + any::Any, + convert::TryFrom, + fmt::Write as _, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use lazy_static::lazy_static; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohIdProto}, + network::{ + push::ext::{NodeIdType, QoSType}, + NetworkMessage, Push, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const SLEEP_SEND: Duration = Duration::from_millis(1); + +const MSG_COUNT: usize = 100; +lazy_static! { + #[derive(Debug)] + static ref MSG: NetworkMessage = Push { + wire_expr: "test".into(), + // Set CongestionControl::Drop to test + ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Drop, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::DEFAULT, + payload: Put { + // 10 MB payload to stress fragmentation + payload: (0..10_000_000).map(|b| b as u8).collect::>().into(), + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), + } + .into(); +} + +// Transport Handler for the router +struct SHRouter { + count: Arc, +} + +impl Default for SHRouter { + fn default() -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl SHRouter { + fn get_count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} + +impl TransportEventHandler for SHRouter { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + let arc = Arc::new(SCRouter::new(self.count.clone())); + Ok(arc) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the router +pub struct SCRouter { + count: Arc, +} + +impl SCRouter { + pub fn new(count: Arc) -> Self { + Self { count } + } +} + +impl TransportPeerEventHandler for SCRouter { + fn handle_message(&self, message: NetworkMessage) -> ZResult<()> { + assert_eq!(message, *MSG); + self.count.fetch_add(1, Ordering::SeqCst); + std::thread::sleep(2 * SLEEP_SEND); + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +// Transport Handler for the client +#[derive(Default)] +struct SHClient; + +impl TransportEventHandler for SHClient { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + Ok(Arc::new(SCClient)) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the client +#[derive(Default)] +pub struct SCClient; + +impl TransportPeerEventHandler for SCClient { + fn handle_message(&self, _message: NetworkMessage) -> ZResult<()> { + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +async fn open_transport_unicast( + client_endpoints: &[EndPoint], + server_endpoints: &[EndPoint], +) -> ( + TransportManager, + Arc, + TransportManager, + TransportUnicast, +) { + // Define client and router IDs + let client_id = ZenohIdProto::try_from([1]).unwrap(); + let router_id = ZenohIdProto::try_from([2]).unwrap(); + + // Create the router transport manager + let router_handler = Arc::new(SHRouter::default()); + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + server_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let router_manager = TransportManager::builder() + .zid(router_id) + .whatami(WhatAmI::Router) + .unicast(unicast) + .build(router_handler.clone()) + .unwrap(); + + // Create the listener on the router + for e in server_endpoints.iter() { + println!("Add endpoint: {}", e); + let _ = ztimeout!(router_manager.add_listener(e.clone())).unwrap(); + } + + // Create the client transport manager + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + client_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let client_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client_id) + .unicast(unicast) + .build(Arc::new(SHClient)) + .unwrap(); + + // Create an empty transport with the client + // Open transport -> This should be accepted + for e in client_endpoints.iter() { + println!("Opening transport with {}", e); + let _ = ztimeout!(client_manager.open_transport_unicast(e.clone())).unwrap(); + } + + let client_transport = ztimeout!(client_manager.get_transport_unicast(&router_id)).unwrap(); + + // Return the handlers + ( + router_manager, + router_handler, + client_manager, + client_transport, + ) +} + +async fn close_transport( + router_manager: TransportManager, + client_manager: TransportManager, + client_transport: TransportUnicast, + endpoints: &[EndPoint], +) { + // Close the client transport + let mut ee = String::new(); + for e in endpoints.iter() { + let _ = write!(ee, "{e} "); + } + println!("Closing transport with {}", ee); + ztimeout!(client_transport.close()).unwrap(); + + ztimeout!(async { + while !router_manager.get_transports_unicast().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Stop the locators on the manager + for e in endpoints.iter() { + println!("Del locator: {}", e); + ztimeout!(router_manager.del_listener(e)).unwrap(); + } + + ztimeout!(async { + while !router_manager.get_listeners().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; + + ztimeout!(router_manager.close()); + ztimeout!(client_manager.close()); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn test_transport(router_handler: Arc, client_transport: TransportUnicast) { + println!("Sending {} messages...", MSG_COUNT); + + ztimeout!(async { + let mut sent = 0; + while router_handler.get_count() < MSG_COUNT { + if client_transport.schedule(MSG.clone()).is_ok() { + sent += 1; + println!( + "Sent: {sent}. Received: {}/{MSG_COUNT}", + router_handler.get_count() + ); + } + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn run_single(client_endpoints: &[EndPoint], server_endpoints: &[EndPoint]) { + println!( + "\n>>> Running test for: {:?}, {:?}", + client_endpoints, server_endpoints, + ); + + #[allow(unused_variables)] // Used when stats feature is enabled + let (router_manager, router_handler, client_manager, client_transport) = + open_transport_unicast(client_endpoints, server_endpoints).await; + + test_transport(router_handler.clone(), client_transport.clone()).await; + + #[cfg(feature = "stats")] + { + let c_stats = client_transport.get_stats().unwrap().report(); + println!("\tClient: {:?}", c_stats); + let r_stats = ztimeout!(router_manager.get_transport_unicast(&client_manager.config.zid)) + .unwrap() + .get_stats() + .map(|s| s.report()) + .unwrap(); + println!("\tRouter: {:?}", r_stats); + } + + close_transport( + router_manager, + client_manager, + client_transport, + client_endpoints, + ) + .await; +} + +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn fragmentation_unicast_tcp_only() { + zenoh_util::init_log_from_env_or("error"); + + // Define the locators + let endpoints: Vec = vec![format!("tcp/127.0.0.1:{}", 16800).parse().unwrap()]; + // Run + run_single(&endpoints, &endpoints).await; +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 4f50deefd2..39faaf148c 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -64,7 +64,7 @@ transport_udp = ["zenoh-transport/transport_udp"] transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] transport_vsock = ["zenoh-transport/transport_vsock"] -unstable = ["internal_config", "zenoh-keyexpr/unstable"] +unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"] internal_config = [] [dependencies] diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 689c12a96d..f72548bfc2 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -13,7 +13,10 @@ // use std::future::{IntoFuture, Ready}; +use itertools::Itertools; +use zenoh_config::qos::PublisherQoSConfig; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; use zenoh_protocol::core::CongestionControl; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; @@ -204,7 +207,8 @@ impl Resolvable for PublicationBuilder { impl Wait for PublicationBuilder, PublicationBuilderPut> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, self.kind.payload, @@ -226,7 +230,8 @@ impl Wait for PublicationBuilder, PublicationBuilderPut impl Wait for PublicationBuilder, PublicationBuilderDelete> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, ZBytes::new(), @@ -368,6 +373,58 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { } impl PublisherBuilder<'_, '_> { + /// Looks up if any configured QoS overwrites apply on the builder's key expression. + /// Returns a new builder with the overwritten QoS parameters. + pub(crate) fn apply_qos_overwrites(self) -> Self { + let mut qos_overwrites = PublisherQoSConfig::default(); + if let Ok(key_expr) = &self.key_expr { + // get overwritten builder + let state = zread!(self.session.0.state); + let nodes_including = state + .publisher_qos_tree + .nodes_including(key_expr) + .collect_vec(); + for node in &nodes_including { + // Take the first one yielded by the iterator that has overwrites + if let Some(overwrites) = node.weight() { + qos_overwrites = overwrites.clone(); + // log warning if multiple keyexprs include it + if nodes_including.len() > 1 { + tracing::warn!( + "Publisher declared on `{}` which is included by multiple key_exprs in qos config. Using qos config for `{}`", + key_expr, + node.keyexpr(), + ); + } + break; + } + } + } + + Self { + congestion_control: qos_overwrites + .congestion_control + .map(|cc| cc.into()) + .unwrap_or(self.congestion_control), + priority: qos_overwrites + .priority + .map(|p| p.into()) + .unwrap_or(self.priority), + is_express: qos_overwrites.express.unwrap_or(self.is_express), + #[cfg(feature = "unstable")] + reliability: qos_overwrites + .reliability + .map(|r| r.into()) + .unwrap_or(self.reliability), + #[cfg(feature = "unstable")] + destination: qos_overwrites + .allowed_destination + .map(|d| d.into()) + .unwrap_or(self.destination), + ..self + } + } + /// Changes the [`crate::sample::Locality`] applied when routing the data. /// /// This restricts the matching subscribers that will receive the published data to the ones @@ -399,7 +456,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> { } impl Wait for PublisherBuilder<'_, '_> { - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self = self.apply_qos_overwrites(); let mut key_expr = self.key_expr?; if !key_expr.is_fully_optimized(&self.session.0) { key_expr = self.session.declare_keyexpr(key_expr).wait()?; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 2b2f21d8b0..eb0858fe21 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,6 +22,7 @@ use std::{ use futures::Sink; use tracing::error; +use zenoh_config::qos::PublisherPriorityConf; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; @@ -482,6 +483,34 @@ impl TryFrom for Priority { } } +impl From for Priority { + fn from(value: PublisherPriorityConf) -> Self { + match value { + PublisherPriorityConf::RealTime => Self::RealTime, + PublisherPriorityConf::InteractiveHigh => Self::InteractiveHigh, + PublisherPriorityConf::InteractiveLow => Self::InteractiveLow, + PublisherPriorityConf::DataHigh => Self::DataHigh, + PublisherPriorityConf::Data => Self::Data, + PublisherPriorityConf::DataLow => Self::DataLow, + PublisherPriorityConf::Background => Self::Background, + } + } +} + +impl From for PublisherPriorityConf { + fn from(value: Priority) -> Self { + match value { + Priority::RealTime => Self::RealTime, + Priority::InteractiveHigh => Self::InteractiveHigh, + Priority::InteractiveLow => Self::InteractiveLow, + Priority::DataHigh => Self::DataHigh, + Priority::Data => Self::Data, + Priority::DataLow => Self::DataLow, + Priority::Background => Self::Background, + } + } +} + type ProtocolPriority = zenoh_protocol::core::Priority; impl From for ProtocolPriority { fn from(prio: Priority) -> Self { diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 75d6c32b63..27b43b1b89 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use serde::{Deserialize, Serialize}; -use zenoh_config::wrappers::EntityGlobalId; +use zenoh_config::{qos::PublisherLocalityConf, wrappers::EntityGlobalId}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{ @@ -50,6 +50,26 @@ pub(crate) enum Locality { Any, } +impl From for Locality { + fn from(value: PublisherLocalityConf) -> Self { + match value { + PublisherLocalityConf::SessionLocal => Self::SessionLocal, + PublisherLocalityConf::Remote => Self::Remote, + PublisherLocalityConf::Any => Self::Any, + } + } +} + +impl From for PublisherLocalityConf { + fn from(value: Locality) -> Self { + match value { + Locality::SessionLocal => Self::SessionLocal, + Locality::Remote => Self::Remote, + Locality::Any => Self::Any, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Default)] pub(crate) struct DataInfo { pub kind: SampleKind, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 294427426f..cdd1f464c9 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -35,8 +35,9 @@ use uhlc::Timestamp; use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; -use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; +use zenoh_config::{qos::PublisherQoSConfig, unwrap_or_default, wrappers::ZenohId}; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; +use zenoh_keyexpr::keyexpr_tree::KeBoxTree; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -153,12 +154,14 @@ pub(crate) struct SessionState { pub(crate) liveliness_queries: HashMap, pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, + pub(crate) publisher_qos_tree: KeBoxTree, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, aggregated_publishers: Vec, + publisher_qos_tree: KeBoxTree, ) -> SessionState { SessionState { primitives: None, @@ -188,6 +191,7 @@ impl SessionState { liveliness_queries: HashMap::new(), aggregated_subscribers, aggregated_publishers, + publisher_qos_tree, } } } @@ -678,9 +682,13 @@ impl Session { ) -> impl Resolve { ResolveClosure::new(move || { let router = runtime.router(); + let config = runtime.config().lock(); + let publisher_qos = config.0.qos().publication().clone(); + drop(config); let state = RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, + publisher_qos.into(), )); let session = Session(Arc::new(SessionInner { weak_counter: Mutex::new(0), @@ -1580,19 +1588,29 @@ impl SessionInner { }, }), }); + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_matching_status( + &state, + &sub_state.key_expr, + MatchingStatusType::Subscribers, + false, + ) + } } } else { drop(state); - } - #[cfg(feature = "unstable")] - { - let state = zread!(self.state); - self.update_matching_status( - &state, - &sub_state.key_expr, - MatchingStatusType::Subscribers, - false, - ) + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_matching_status( + &state, + &sub_state.key_expr, + MatchingStatusType::Subscribers, + false, + ) + } } } SubscriberKind::LivelinessSubscriber => { diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 4995d05e70..4a29858e31 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -62,3 +62,163 @@ async fn qos_pubsub() { assert_eq!(sample.congestion_control(), CongestionControl::Block); assert!(!sample.express()); } + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn qos_pubsub_overwrite_config() { + use zenoh::{qos::Reliability, sample::Locality}; + + let qos_config_overwrite = zenoh::Config::from_json5( + r#" + { + qos: { + publication: [ + { + key_exprs: ["test/qos_overwrite/overwritten", "test/not_applicable/**"], + config: { + congestion_control: "drop", + express: false, + reliability: "best_effort", + allowed_destination: "any", + }, + }, + { + key_exprs: ["test/not_applicable"], + config: { + congestion_control: "drop", + express: false, + reliability: "best_effort", + allowed_destination: "any", + }, + }, + ] + } + } + "#, + ) + .unwrap(); + let session1 = ztimeout!(zenoh::open(qos_config_overwrite)).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + + let subscriber = ztimeout!(session2.declare_subscriber("test/qos_overwrite/**")).unwrap(); + tokio::time::sleep(SLEEP).await; + + // Session API tests + + // Session API - overwritten PUT + ztimeout!(session1 + .put("test/qos_overwrite/overwritten", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - overwritten DELETE + ztimeout!(session1 + .delete("test/qos_overwrite/overwritten") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - non-overwritten PUT + ztimeout!(session1 + .put("test/qos_overwrite/no_overwrite", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Session API - non-overwritten DELETE + ztimeout!(session1 + .delete("test/qos_overwrite/no_overwrite") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Publisher API tests + + let overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos_overwrite/overwritten") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + + let no_overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos_overwrite/no_overwrite") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + + // PublisherBuilder API - overwritten PUT + ztimeout!(overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // PublisherBuilder API - overwritten DELETE + ztimeout!(overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // PublisherBuilder API - non-overwritten PUT + ztimeout!(no_overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // PublisherBuilder API - non-overwritten DELETE + ztimeout!(no_overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); +} diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 3e84249b82..5a18378446 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -430,3 +430,15 @@ async fn zenoh_session_close_in_background() { }; ztimeout!(close_all); } + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_undeclare_subscribers_same_keyexpr() { + let key_expr = "test/undeclare/subscribers"; + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let sub1 = session.declare_subscriber(key_expr).await.unwrap(); + let sub2 = session.declare_subscriber(key_expr).await.unwrap(); + tokio::time::sleep(SLEEP).await; + ztimeout!(sub1.undeclare()).unwrap(); + ztimeout!(sub2.undeclare()).unwrap(); +} diff --git a/zenoh/tests/tcp_buffers.rs b/zenoh/tests/tcp_buffers.rs new file mode 100644 index 0000000000..8d9bb4ac84 --- /dev/null +++ b/zenoh/tests/tcp_buffers.rs @@ -0,0 +1,99 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{Config, Wait}; + +#[test] +fn buffer_size_config() { + let mut config = Config::default(); + config + .insert_json5( + "transport/link/tcp", + r#" + { + so_sndbuf: 65000, + so_rcvbuf: 65000, + } + "#, + ) + .unwrap(); + + config + .insert_json5("listen/endpoints", r#"["tcp/[::]:0"]"#) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[test] +fn buffer_size_endpoint() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#so_sndbuf=65000;so_rcvbuf=65000"]"#, + ) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[test] +fn buffer_size_endpoint_overwrite() { + let mut config = Config::default(); + config + .insert_json5( + "transport/link/tcp", + r#" + { + so_sndbuf: 0, + so_rcvbuf: 0, + } + "#, + ) + .unwrap(); + + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#so_sndbuf=65000;so_rcvbuf=65000"]"#, + ) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[cfg(target_os = "macos")] +#[test] +#[should_panic(expected = "Can not create a new TCP listener")] +fn buffer_size_zero() { + listen_zero_buffers(); +} + +#[cfg(not(target_os = "macos"))] +#[test] +fn buffer_size_zero() { + listen_zero_buffers(); +} + +fn listen_zero_buffers() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#so_sndbuf=0;so_rcvbuf=0"]"#, + ) + .unwrap(); + zenoh::open(config).wait().unwrap(); +} diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 0e5e9737ee..3432335136 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -116,9 +116,8 @@ fn config_from_args(args: &Args) -> Config { if let Some(id) = &args.id { config.set_id(id.parse().unwrap()).unwrap(); } - // apply '--rest-http-port' to config only if explicitly set (overwriting config), - // or if no config file is set (to apply its default value) - if args.rest_http_port.is_some() || args.config.is_none() { + // apply '--rest-http-port' to config only if explicitly set (overwriting config) + if args.rest_http_port.is_some() { let value = args.rest_http_port.as_deref().unwrap_or("8000"); if !value.eq_ignore_ascii_case("none") { config