Skip to content

Commit

Permalink
Massive renamig:
Browse files Browse the repository at this point in the history
- transport_shm -> transport_unixpipe
- NET transport -> Universal transport
- SHM transport -> LowLatency transport

Dev:
- the shm functionality and lowlatency transport are switched completely independently
- unicast transport negotiation uses separate Extension instead of switching together with shm
- additional tests for lowlatency transport configuration
  • Loading branch information
yellowhatter committed Sep 20, 2023
1 parent 8959f4d commit d74f14b
Show file tree
Hide file tree
Showing 50 changed files with 872 additions and 494 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets --features shared-memory --features transport_shm -- -D warnings
args: --all-targets --features shared-memory --features transport_unixpipe -- -D warnings

test:
name: Run tests on ${{ matrix.os }}
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: nextest
args: run -F shared-memory -F transport_shm -p zenoh-transport
args: run -F shared-memory -F transport_unixpipe -p zenoh-transport
env:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ members = [
"io/zenoh-links/zenoh-link-udp/",
"io/zenoh-links/zenoh-link-unixsock_stream/",
"io/zenoh-links/zenoh-link-ws/",
"io/zenoh-links/zenoh-link-shm/",
"io/zenoh-links/zenoh-link-unixpipe/",
"io/zenoh-transport",
"plugins/example-plugin",
"plugins/zenoh-backend-traits",
Expand Down Expand Up @@ -180,7 +180,7 @@ zenoh-link-unixsock_stream = { version = "0.10.0-dev", path = "io/zenoh-links/ze
zenoh-link-quic = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-quic" }
zenoh-link-udp = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-udp" }
zenoh-link-ws = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-ws" }
zenoh-link-shm = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-shm" }
zenoh-link-unixpipe = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-unixpipe" }
zenoh-link-serial = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-serial" }
zenoh-link = { version = "0.10.0-dev", path = "io/zenoh-link" }
zenoh-link-commons = { version = "0.10.0-dev", path = "io/zenoh-link-commons" }
Expand Down
28 changes: 26 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ where
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8);
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -87,6 +88,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -144,6 +149,7 @@ where
let mut ext_shm = None;
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -170,6 +176,11 @@ where
ext_mlink = Some(a);
has_ext = ext;
}
ext::LowLatency::ID => {
let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
ext_lowlatency = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -186,6 +197,7 @@ where
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
})
}
}
Expand All @@ -206,7 +218,8 @@ where
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8);
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -251,6 +264,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -311,6 +328,7 @@ where
let mut ext_shm = None;
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -337,6 +355,11 @@ where
ext_mlink = Some(a);
has_ext = ext;
}
ext::LowLatency::ID => {
let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
ext_lowlatency = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -354,6 +377,7 @@ where
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
})
}
}
30 changes: 14 additions & 16 deletions commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,48 @@ use zenoh_buffers::{
reader::{BacktrackableReader, DidntRead, Reader},
writer::{DidntWrite, Writer},
};
#[cfg(feature = "shared-memory")]
use zenoh_protocol::network::NetworkMessage;
use zenoh_protocol::{
common::{imsg, ZExtZ64},
network::NetworkMessage,
transport::*,
};

// TransportMessageShm
#[cfg(feature = "shared-memory")]
impl<W> WCodec<&TransportMessageShm, &mut W> for Zenoh080
// TransportMessageLowLatency
impl<W> WCodec<&TransportMessageLowLatency, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &TransportMessageShm) -> Self::Output {
fn write(self, writer: &mut W, x: &TransportMessageLowLatency) -> Self::Output {
match &x.body {
TransportBodyShm::Network(b) => self.write(&mut *writer, b.as_ref()),
TransportBodyShm::KeepAlive(b) => self.write(&mut *writer, b),
TransportBodyShm::Close(b) => self.write(&mut *writer, b),
TransportBodyLowLatency::Network(b) => self.write(&mut *writer, b.as_ref()),
TransportBodyLowLatency::KeepAlive(b) => self.write(&mut *writer, b),
TransportBodyLowLatency::Close(b) => self.write(&mut *writer, b),
}
}
}
#[cfg(feature = "shared-memory")]
impl<R> RCodec<TransportMessageShm, &mut R> for Zenoh080

impl<R> RCodec<TransportMessageLowLatency, &mut R> for Zenoh080
where
R: Reader + BacktrackableReader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<TransportMessageShm, Self::Error> {
fn read(self, reader: &mut R) -> Result<TransportMessageLowLatency, Self::Error> {
let header: u8 = self.read(&mut *reader)?;

let codec = Zenoh080Header::new(header);
let body = match imsg::mid(codec.header) {
id::KEEP_ALIVE => TransportBodyShm::KeepAlive(codec.read(&mut *reader)?),
id::CLOSE => TransportBodyShm::Close(codec.read(&mut *reader)?),
id::KEEP_ALIVE => TransportBodyLowLatency::KeepAlive(codec.read(&mut *reader)?),
id::CLOSE => TransportBodyLowLatency::Close(codec.read(&mut *reader)?),
_ => {
let nw: NetworkMessage = codec.read(&mut *reader)?;
TransportBodyShm::Network(Box::new(nw))
TransportBodyLowLatency::Network(Box::new(nw))
}
};

Ok(TransportMessageShm { body })
Ok(TransportMessageLowLatency { body })
}
}

Expand Down
28 changes: 26 additions & 2 deletions commons/zenoh-codec/src/transport/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ where
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8);
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -75,6 +76,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -119,6 +124,7 @@ where
let mut ext_shm = None;
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -145,6 +151,11 @@ where
ext_mlink = Some(a);
has_ext = ext;
}
ext::LowLatency::ID => {
let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
ext_lowlatency = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "OpenSyn", ext)?;
}
Expand All @@ -159,6 +170,7 @@ where
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
})
}
}
Expand All @@ -181,7 +193,8 @@ where
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8);
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -212,6 +225,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -255,6 +272,7 @@ where
let mut ext_shm = None;
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -281,6 +299,11 @@ where
ext_mlink = Some(a);
has_ext = ext;
}
ext::LowLatency::ID => {
let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
ext_lowlatency = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "OpenAck", ext)?;
}
Expand All @@ -294,6 +317,7 @@ where
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
})
}
}
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Default for TransportUnicastConf {
accept_pending: 100,
max_sessions: 1_000,
max_links: 1,
lowlatency: false,
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ validated_struct::validator! {
max_sessions: usize,
/// Maximum number of unicast incoming links per transport session (default: 1)
max_links: usize,
/// Enables the LowLatency transport (default `false`).
/// This option does not make LowLatency transport mandatory, the actual implementation of transport
/// used will depend on Establish procedure and other party's settings
lowlatency: bool,
},
pub multicast: TransportMulticastConf {
/// Link join interval duration in milliseconds (default: 2500)
Expand Down Expand Up @@ -291,9 +295,9 @@ validated_struct::validator! {
client_certificate: Option<String>,
server_name_verification: Option<bool>
},
pub shared_memory: #[derive(Default)]
SHMConf {
shm_access_mask: Option<u32>
pub unixpipe: #[derive(Default)]
UnixPipeConf {
file_access_mask: Option<u32>
},
pub compression: #[derive(Default)]
/// **Experimental** compression feature.
Expand All @@ -311,7 +315,8 @@ validated_struct::validator! {
pub shared_memory:
SharedMemoryConf {
/// Whether shared memory is enabled or not.
/// If set to `true`, the shared-memory transport will be enabled. (default `false`).
/// If set to `true`, the SHM buffer optimization support will be announced to other parties. (default `false`).
/// This option doesn't make SHM buffer optimization mandatory, the real support depends on other party setting
enabled: bool,
},
pub auth: #[derive(Default)]
Expand Down
10 changes: 10 additions & 0 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct InitSyn {
pub ext_shm: Option<ext::Shm>,
pub ext_auth: Option<ext::Auth>,
pub ext_mlink: Option<ext::MultiLink>,
pub ext_lowlatency: Option<ext::LowLatency>,
}

// Extensions
Expand All @@ -141,6 +142,10 @@ pub mod ext {
/// # Multilink extension
/// Used as challenge for probing multilink capabilities
pub type MultiLink = zextzbuf!(0x4, false);

/// # LowLatency extension
/// Used to negotiate the use of lowlatency transport
pub type LowLatency = zextunit!(0x5, false);
}

impl InitSyn {
Expand All @@ -160,6 +165,7 @@ impl InitSyn {
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());

Self {
version,
Expand All @@ -171,6 +177,7 @@ impl InitSyn {
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
}
}
}
Expand All @@ -187,6 +194,7 @@ pub struct InitAck {
pub ext_shm: Option<ext::Shm>,
pub ext_auth: Option<ext::Auth>,
pub ext_mlink: Option<ext::MultiLink>,
pub ext_lowlatency: Option<ext::LowLatency>,
}

impl InitAck {
Expand All @@ -211,6 +219,7 @@ impl InitAck {
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());

Self {
version,
Expand All @@ -223,6 +232,7 @@ impl InitAck {
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
}
}
}
Loading

0 comments on commit d74f14b

Please sign in to comment.