Skip to content

Commit

Permalink
Merge branch 'main' into chore/disable-rest-plugin-if-no-cli-argument
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Dec 5, 2024
2 parents 116028b + 64e8caa commit 8a2a534
Show file tree
Hide file tree
Showing 124 changed files with 6,658 additions and 2,434 deletions.
411 changes: 10 additions & 401 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ crossbeam-utils = "0.8.20"
derive_more = { version = "1.0.0", features = ["as_ref"] }
derive-new = "0.7.0"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
tracing-loki = "0.2"
event-listener = "5.3.1"
flume = "0.11"
form_urlencoded = "1.2.1"
Expand Down Expand Up @@ -167,6 +166,7 @@ socket2 = { version = "0.5.7", features = ["all"] }
stop-token = "0.7.0"
syn = "2.0"
tide = "0.16.0"
time = "0.3.36"
token-cell = { version = "1.5.0", default-features = false }
tokio = { version = "1.40.0", default-features = false } # Default features are disabled due to some crates' requirements
tokio-util = "0.7.12"
Expand Down
28 changes: 28 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
// publication: [
// {
// /// PUT and DELETE messages on key expressions that are included by these key expressions
// /// will have their QoS options overwritten by the given config.
// key_exprs: ["demo/**", "example/key"],
// /// Configurations that will be applied on the publisher.
// /// Options that are supplied here will overwrite the configuration given in Zenoh API
// config: {
// congestion_control: "block",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down Expand Up @@ -225,6 +246,7 @@
// "messages": [
// "put", "delete", "declare_subscriber",
// "query", "reply", "declare_queryable",
// "liveliness_token", "liveliness_query", "declare_liveliness_subscriber",
// ],
// "flows":["egress","ingress"],
// "permission": "allow",
Expand Down Expand Up @@ -411,6 +433,8 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 50000,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
Expand Down Expand Up @@ -466,6 +490,10 @@
// This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your
// ca to verify that the server at baz.com is actually baz.com, let this be true (default).
verify_name_on_connect: true,
// Whether or not to close links when remote certificates expires.
// If set to true, links that require certificates (tls/quic) will automatically disconnect when the time of expiration of the remote certificate chain is reached
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
},
},
/// Shared memory configuration.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<img src="https://raw.githubusercontent.com/eclipse-zenoh/zenoh/master/zenoh-dragon.png" height="150">

[![CI](https://github.com/eclipse-zenoh/zenoh/workflows/CI/badge.svg)](https://github.com/eclipse-zenoh/zenoh/actions?query=workflow%3A%22CI%22)
[![CI](https://github.com/eclipse-zenoh/zenoh/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/eclipse-zenoh/zenoh/actions?query=workflow%3ACI+branch%3Amain++)
[![Documentation Status](https://readthedocs.org/projects/zenoh-rust/badge/?version=latest)](https://zenoh-rust.readthedocs.io/en/latest/?badge=latest)
[![Discussion](https://img.shields.io/badge/discussion-on%20github-blue)](https://github.com/eclipse-zenoh/roadmap/discussions)
[![Discord](https://img.shields.io/badge/chat-on%20discord-blue)](https://discord.gg/2GJ958VuHs)
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-buffers/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<'s> BacktrackableWriter for &'s mut [u8] {
}

// Reader
impl<'a> HasReader for &'a [u8] {
impl HasReader for &[u8] {
type Reader = Self;

fn reader(self) -> Self::Reader {
Expand Down Expand Up @@ -196,7 +196,7 @@ impl<'a> BacktrackableReader for &'a [u8] {
}
}

impl<'a> SiphonableReader for &'a [u8] {
impl SiphonableReader for &[u8] {
fn siphon<W>(&mut self, writer: &mut W) -> Result<NonZeroUsize, DidntSiphon>
where
W: Writer,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-buffers/src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SplitBuffer for Vec<u8> {
}

// Writer
impl<'a> HasWriter for &'a mut Vec<u8> {
impl HasWriter for &mut Vec<u8> {
type Writer = Self;

fn writer(self) -> Self::Writer {
Expand Down
14 changes: 7 additions & 7 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<'a> HasReader for &'a ZBuf {
}
}

impl<'a> Reader for ZBufReader<'a> {
impl Reader for ZBufReader<'_> {
fn read(&mut self, mut into: &mut [u8]) -> Result<NonZeroUsize, DidntRead> {
let mut read = 0;
while let Some(slice) = self.inner.slices.get(self.cursor.slice) {
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<'a> Reader for ZBufReader<'a> {
}
}

impl<'a> BacktrackableReader for ZBufReader<'a> {
impl BacktrackableReader for ZBufReader<'_> {
type Mark = ZBufPos;

fn mark(&mut self) -> Self::Mark {
Expand All @@ -312,7 +312,7 @@ impl<'a> BacktrackableReader for ZBufReader<'a> {
}
}

impl<'a> SiphonableReader for ZBufReader<'a> {
impl SiphonableReader for ZBufReader<'_> {
fn siphon<W>(&mut self, writer: &mut W) -> Result<NonZeroUsize, DidntSiphon>
where
W: Writer,
Expand Down Expand Up @@ -345,7 +345,7 @@ impl<'a> SiphonableReader for ZBufReader<'a> {
}

#[cfg(feature = "std")]
impl<'a> io::Read for ZBufReader<'a> {
impl io::Read for ZBufReader<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match <Self as Reader>::read(self, buf) {
Ok(n) => Ok(n.get()),
Expand All @@ -354,7 +354,7 @@ impl<'a> io::Read for ZBufReader<'a> {
}
}

impl<'a> AdvanceableReader for ZBufReader<'a> {
impl AdvanceableReader for ZBufReader<'_> {
fn skip(&mut self, offset: usize) -> Result<(), DidntRead> {
let mut remaining_offset = offset;
while remaining_offset > 0 {
Expand Down Expand Up @@ -399,7 +399,7 @@ impl<'a> AdvanceableReader for ZBufReader<'a> {
}

#[cfg(feature = "std")]
impl<'a> io::Seek for ZBufReader<'a> {
impl io::Seek for ZBufReader<'_> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let current_pos = self
.inner
Expand Down Expand Up @@ -570,7 +570,7 @@ impl BacktrackableWriter for ZBufWriter<'_> {
}

#[cfg(feature = "std")]
impl<'a> io::Write for ZBufWriter<'a> {
impl io::Write for ZBufWriter<'_> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
Expand Down
38 changes: 36 additions & 2 deletions commons/zenoh-codec/src/transport/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
Expand All @@ -49,7 +51,10 @@ where
if *more {
header |= flag::M;
}
if ext_qos != &ext::QoSType::DEFAULT {
let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
+ ext_first.is_some() as u8
+ ext_drop.is_some() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;
Expand All @@ -59,7 +64,16 @@ where

// Extensions
if ext_qos != &ext::QoSType::DEFAULT {
self.write(&mut *writer, (*ext_qos, false))?;
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(first) = ext_first {
n_exts -= 1;
self.write(&mut *writer, (first, n_exts != 0))?
}
if let Some(drop) = ext_drop {
n_exts -= 1;
self.write(&mut *writer, (drop, n_exts != 0))?
}

Ok(())
Expand Down Expand Up @@ -99,6 +113,8 @@ where

// Extensions
let mut ext_qos = ext::QoSType::DEFAULT;
let mut ext_first = None;
let mut ext_drop = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -110,6 +126,16 @@ where
ext_qos = q;
has_ext = ext;
}
ext::First::ID => {
let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?;
ext_first = Some(first);
has_ext = ext;
}
ext::Drop::ID => {
let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?;
ext_drop = Some(drop);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Fragment", ext)?;
}
Expand All @@ -121,6 +147,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
})
}
}
Expand All @@ -139,6 +167,8 @@ where
sn,
payload,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
Expand All @@ -147,6 +177,8 @@ where
more: *more,
sn: *sn,
ext_qos: *ext_qos,
ext_first: *ext_first,
ext_drop: *ext_drop,
};
self.write(&mut *writer, &header)?;

Expand Down Expand Up @@ -185,6 +217,8 @@ where
more: header.more,
sn: header.sn,
ext_qos: header.ext_qos,
ext_first: header.ext_first,
ext_drop: header.ext_drop,
payload,
})
}
Expand Down
30 changes: 28 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -64,7 +65,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -125,6 +127,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -186,6 +192,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -228,6 +235,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -248,6 +260,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Expand Down Expand Up @@ -275,6 +288,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -287,7 +301,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -351,6 +366,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -415,6 +434,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -457,6 +477,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -478,6 +503,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Loading

0 comments on commit 8a2a534

Please sign in to comment.