Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select links based on message Priority & Reliability #1398

Merged
merged 45 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
cbe6d2a
Add wip `QoS`-based priority-to-link dispatch impl
fuzzypixelz Aug 26, 2024
5cfcf55
Improve `QoS` state machine
fuzzypixelz Sep 4, 2024
1c9ee29
Add `PriorityRange` negotiation tests
fuzzypixelz Sep 4, 2024
162e356
Refactor link selection function
fuzzypixelz Sep 4, 2024
2f3b821
Minor edits
fuzzypixelz Sep 4, 2024
8b3eff1
Add Link selectioh tests
fuzzypixelz Sep 4, 2024
22e7807
Minor edits
fuzzypixelz Sep 4, 2024
3c9af4c
More minor edits
fuzzypixelz Sep 4, 2024
20a6169
Never disobey Clippy
fuzzypixelz Sep 4, 2024
8ac3ea8
Implement Reliability negotiation
fuzzypixelz Sep 4, 2024
af43d88
Apply negotiated Reliability to Link config
fuzzypixelz Sep 5, 2024
7e26110
Document Endpoint `reliability` metadata
fuzzypixelz Sep 5, 2024
b559fa7
I'm sorry Clippy
fuzzypixelz Sep 5, 2024
e98fb4f
Make `PriorityRange` inclusive
fuzzypixelz Sep 5, 2024
02a6cba
Clippy lints are inevitable
fuzzypixelz Sep 5, 2024
1758b20
Make Reliability negotiation stricter
fuzzypixelz Sep 5, 2024
c5078bd
Refactor negotiation tests
fuzzypixelz Sep 5, 2024
d374992
We are still not `core::error::Error`
fuzzypixelz Sep 5, 2024
ab64d0f
Use `RangeInclusive`
fuzzypixelz Sep 5, 2024
2f9edc1
Clippy at it again
fuzzypixelz Sep 5, 2024
a941021
Split `State` into `StateOpen` and `StateAccept`
fuzzypixelz Sep 6, 2024
fd7b050
Remove `NewLinkUnicast`
fuzzypixelz Sep 6, 2024
2e3285d
Fix test typos
fuzzypixelz Sep 6, 2024
3c2d578
Patch `Link::src` and `Link::dst` with negotiated metadata
fuzzypixelz Sep 6, 2024
877f61d
Optimize `QoS` extension overhead
fuzzypixelz Sep 6, 2024
50a4333
Implement `Display` instead of `ToString` for `PriorityRange`
fuzzypixelz Sep 6, 2024
d07275a
Fix typo (metdata -> metadata)
fuzzypixelz Sep 6, 2024
5083b94
Fix `n_exts` in `INIT` codec
fuzzypixelz Sep 6, 2024
ed4b555
Add missing `'static` lifetime in const
fuzzypixelz Sep 6, 2024
4ec1396
Don't compare `Link` to `TransportLinkUnicast`
fuzzypixelz Sep 9, 2024
3e32185
Don't set Link Reliability if not configured
fuzzypixelz Sep 9, 2024
8889d1a
Update DEFAULT_CONFIG
fuzzypixelz Sep 16, 2024
c91a052
Move metadata docs to `Endpoint`
fuzzypixelz Sep 17, 2024
00ae24b
Add Endpoint examples
fuzzypixelz Sep 17, 2024
e14c023
Fix doc list items without indentation
fuzzypixelz Sep 17, 2024
a80e035
Update Endpoint links in DEFAULT_CONFIG
fuzzypixelz Sep 20, 2024
b18e8d5
Change `x..=y` syntax to `x-y`
fuzzypixelz Sep 23, 2024
fad25ab
Connect to multiple links with distinct priorities when scouting (wip)
fuzzypixelz Sep 11, 2024
4812b11
Add `LocatorInspector::is_reliable`
fuzzypixelz Sep 13, 2024
5955a06
Compare locator Reliability even if not set
fuzzypixelz Sep 13, 2024
28a9bcc
Apply `cargo +stable clippy --fix`
fuzzypixelz Sep 13, 2024
c053fe8
Get transport at each locator iteration
OlivierHecart Sep 23, 2024
d774479
Don't try to connect to peer whaile already connecting to it
OlivierHecart Sep 23, 2024
cd5c665
Fix return value of `Runtime::connect`
fuzzypixelz Sep 23, 2024
d196de9
Fix `PriorityRange` formatting
fuzzypixelz Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
/// The list of endpoints to connect to.
/// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }).
///
/// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html
endpoints: [
// "<proto>/<address>"
],
Expand Down Expand Up @@ -67,6 +69,8 @@
/// The list of endpoints to listen on.
/// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }).
///
/// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html
endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] },

/// Global listen configuration,
Expand Down Expand Up @@ -333,11 +337,11 @@
},
},
link: {
/// An optional whitelist of protocols to be used for accepting and opening sessions.
/// If not configured, all the supported protocols are automatically whitelisted.
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
/// An optional whitelist of protocols to be used for accepting and opening sessions. If not
/// configured, all the supported protocols are automatically whitelisted. The supported
/// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For
/// example, to only enable "tls" and "quic": protocols: ["tls", "quic"],
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down Expand Up @@ -394,7 +398,7 @@
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
26 changes: 26 additions & 0 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -59,6 +60,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -98,6 +100,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -173,6 +179,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -190,6 +197,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -229,6 +241,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -255,6 +268,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -269,6 +283,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -311,6 +326,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -389,6 +408,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -406,6 +426,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -446,6 +471,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
26 changes: 24 additions & 2 deletions commons/zenoh-protocol/src/core/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ impl fmt::Debug for AddressMut<'_> {
pub struct Metadata<'a>(pub(super) &'a str);

impl<'a> Metadata<'a> {
pub const RELIABILITY: &'static str = "reliability";
pub const PRIORITIES: &'static str = "priorities";

pub fn as_str(&self) -> &'a str {
self.0
}
Expand Down Expand Up @@ -443,10 +446,29 @@ impl fmt::Debug for ConfigMut<'_> {

/// A string that respects the [`EndPoint`] canon form: `<locator>[#<config>]`.
///
/// `<locator>` is a valid [`Locator`] and `<config>` is of the form `<key1>=<value1>;...;<keyN>=<valueN>` where keys are alphabetically sorted.
/// `<config>` is optional and can be provided to configure some aspectes for an [`EndPoint`], e.g. the interface to listen on or connect to.
/// `<locator>` is a valid [`Locator`] and `<config>` is of the form
/// `<key1>=<value1>;...;<keyN>=<valueN>` where keys are alphabetically sorted. `<config>` is
/// optional and can be provided to configure some aspects for an [`EndPoint`], e.g. the interface
/// to listen on or connect to.
///
/// A full [`EndPoint`] string is hence in the form of `<proto>/<address>[?<metadata>][#config]`.
///
/// ## Metadata
///
/// - **`priorities`**: a range bounded inclusively below and above (e.g. `2-4` signifies
/// priorities 2, 3 and 4). This value is used to select the link used for transmission based on the
/// Priority of the message in question.
///
/// For example, `tcp/localhost:7447?priorities=1-3` assigns priorities
/// [`crate::core::Priority::RealTime`], [`crate::core::Priority::InteractiveHigh`] and
/// [`crate::core::Priority::InteractiveLow`] to the established link.
///
/// - **`reliability`**: either "best_effort" or "reliable". This value is used to select the link
/// used for transmission based on the Reliability of the message in question.
///
/// For example, `tcp/localhost:7447?priorities=6-7;reliability=best_effort` assigns priorities
/// [`crate::core::Priority::DataLow`] and [`crate::core::Priority::Background`], and
/// [`crate::core::Reliability::BestEffort`] to the established link.
#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(into = "String")]
#[serde(try_from = "String")]
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-protocol/src/core/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl Locator {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

pub fn to_endpoint(&self) -> EndPoint {
self.0.clone()
}
}

impl From<EndPoint> for Locator {
Expand Down
Loading