Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make session an arc-like object #1347

Merged
merged 38 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5519276
Add reliability to NetworkMessage
OlivierHecart Aug 9, 2024
ef76d11
Updates
OlivierHecart Aug 9, 2024
ff8699b
Change primitives
OlivierHecart Aug 9, 2024
ce6cdec
Api publisher reliability
OlivierHecart Aug 9, 2024
3ac6c3f
Fix serialization_batch test
OlivierHecart Aug 9, 2024
168a804
Change defrag errors to log instead of closing transport
oteffahi Aug 29, 2024
5114c33
Fix messages with old SN not being dropped
oteffahi Aug 30, 2024
7206fdc
Remove failure check in defrag tests
oteffahi Aug 30, 2024
c434f69
Wait for message to be sent in defrag test
oteffahi Aug 30, 2024
bafbe61
Remove if-statement nesting
oteffahi Sep 2, 2024
96cefa5
Remove deprecated defragmentation test
oteffahi Sep 2, 2024
e2f366c
Merge pull request #1344 from oteffahi/transport-sn-changes
Mallets Sep 3, 2024
a7d03b7
Parse `backend_search_dirs` as a list of paths
fuzzypixelz Sep 3, 2024
a8de5bc
Merge pull request #1349 from eclipse-zenoh/fix-backend-search-dirs
fuzzypixelz Sep 3, 2024
b21ce07
Change Reliability default to Reliable
OlivierHecart Sep 3, 2024
5f43fee
Merge branch 'main' into dev/publisher_reliability
OlivierHecart Sep 3, 2024
e1cd9cb
Fix merge
OlivierHecart Sep 3, 2024
0a911f3
Fix storage error when keyexpr equals the configured `strip_prefix` (…
oteffahi Sep 3, 2024
627a71d
chore(deps): bump quinn-proto from 0.11.3 to 0.11.8
dependabot[bot] Sep 3, 2024
118c441
Improve batch tests
OlivierHecart Sep 4, 2024
b6bf334
Improve batch tests
OlivierHecart Sep 4, 2024
38ef610
Improve publisher reliability doc.
OlivierHecart Sep 4, 2024
2128f14
Improve publisher reliability doc.
OlivierHecart Sep 4, 2024
43ec70e
Code format
OlivierHecart Sep 4, 2024
eb67b82
Merge pull request #1305 from eclipse-zenoh/dev/publisher_reliability
Mallets Sep 4, 2024
4bcf093
Merge pull request #1352 from eclipse-zenoh/dependabot/cargo/quinn-pr…
Mallets Sep 4, 2024
147ebc3
feat!: bind callback subscriber/queryable to session lifetime
wyfo Aug 28, 2024
82e7b16
fix: fix example
wyfo Aug 28, 2024
15fff73
fix: fix example
wyfo Aug 28, 2024
a1ff4a6
fix: add missing comment about ZST trick
wyfo Sep 2, 2024
0d4423b
Update zenoh/src/api/key_expr.rs
wyfo Sep 2, 2024
150c497
fix: formatting
wyfo Sep 2, 2024
baa2426
fix: don't use `Weak` when undeclared on drop
wyfo Sep 3, 2024
2c52a3c
feat!: make session an arc-like object
wyfo Sep 3, 2024
155c923
fix: use weak everywhere!
wyfo Sep 3, 2024
5dfd6f3
fix: fix doc
wyfo Sep 4, 2024
a938c78
feat: use pseudo-weak session with the same perf than arc
wyfo Sep 4, 2024
2ef2446
fix: fix resource cleanup
wyfo Sep 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;

use zenoh::{config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{config::Config, key_expr::KeyExpr};

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration};
use zenoh::{
config::Config,
key_expr::KeyExpr,
prelude::*,
query::{QueryTarget, Selector},
};

Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ where
let header: u8 = self.codec.read(&mut *reader)?;

let codec = Zenoh080Header::new(header);
codec.read(&mut *reader)
let mut msg: NetworkMessage = codec.read(&mut *reader)?;
msg.reliability = self.reliability;
Ok(msg)
}
}

Expand Down
13 changes: 6 additions & 7 deletions commons/zenoh-codec/src/transport/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ where
fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output {
let (m, f) = x;

// @TODO: m.is_reliable() always return true for the time being
// if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
// (f.reliability, m.is_reliable())
// {
// // We are not serializing on the right frame.
// return Err(BatchError::NewFrame);
// }
if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
(f.reliability, m.is_reliable())
{
// We are not serializing on the right frame.
return Err(BatchError::NewFrame);
}

// Mark the write operation
let mark = writer.mark();
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,12 @@ impl TryFrom<u8> for Priority {
#[repr(u8)]
pub enum Reliability {
#[default]
BestEffort,
Reliable,
BestEffort,
}

impl Reliability {
pub const DEFAULT: Self = Self::BestEffort;
pub const DEFAULT: Self = Self::Reliable;

#[cfg(feature = "test")]
pub fn rand() -> Self {
Expand Down
7 changes: 4 additions & 3 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use push::Push;
pub use request::{AtomicRequestId, Request, RequestId};
pub use response::{Response, ResponseFinal};

use crate::core::{CongestionControl, Priority};
use crate::core::{CongestionControl, Priority, Reliability};

pub mod id {
// WARNING: it's crucial that these IDs do NOT collide with the IDs
Expand Down Expand Up @@ -83,6 +83,7 @@ pub enum NetworkBody {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NetworkMessage {
pub body: NetworkBody,
pub reliability: Reliability,
#[cfg(feature = "stats")]
pub size: Option<core::num::NonZeroUsize>,
}
Expand All @@ -109,8 +110,7 @@ impl NetworkMessage {

#[inline]
pub fn is_reliable(&self) -> bool {
// TODO
true
self.reliability == Reliability::Reliable
}

#[inline]
Expand Down Expand Up @@ -179,6 +179,7 @@ impl From<NetworkBody> for NetworkMessage {
fn from(body: NetworkBody) -> Self {
Self {
body,
reliability: Reliability::DEFAULT,
#[cfg(feature = "stats")]
size: None,
}
Expand Down
3 changes: 2 additions & 1 deletion commons/zenoh-protocol/src/transport/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ impl Frame {
let ext_qos = ext::QoSType::rand();
let mut payload = vec![];
for _ in 0..rng.gen_range(1..4) {
let m = NetworkMessage::rand();
let mut m = NetworkMessage::rand();
m.reliability = reliability;
payload.push(m);
}

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;
use zenoh_ext::SubscriberForward;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{prelude::*, session::ZenohId};
use zenoh::session::ZenohId;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).wait().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use futures::select;
use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::*,
sample::{Sample, SampleKind},
Config,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config};
use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
4 changes: 1 addition & 3 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
4 changes: 3 additions & 1 deletion io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ mod tests {
let mut batch = WBatch::new(config);

let tmsg: TransportMessage = KeepAlive.into();
let nmsg: NetworkMessage = Push {
let mut nmsg: NetworkMessage = Push {
wire_expr: WireExpr::empty(),
ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false),
ext_tstamp: None,
Expand Down Expand Up @@ -601,13 +601,15 @@ mod tests {
sn: 0,
ext_qos: frame::ext::QoSType::DEFAULT,
};
nmsg.reliability = frame.reliability;

// Serialize with a frame
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());

frame.reliability = Reliability::BestEffort;
nmsg.reliability = frame.reliability;
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());
Expand Down
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/common/defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl DefragBuffer {
pub(crate) fn push(&mut self, sn: TransportSn, zslice: ZSlice) -> ZResult<()> {
if sn != self.sn.get() {
self.clear();
bail!("Expected SN {}, received {}", self.sn.get(), sn)
bail!(
"Defragmentation SN error: expected SN {}, received {}",
self.sn.get(),
sn
)
}

let new_len = self.len + zslice.len();
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_core::zlock;
use zenoh_protocol::{
core::{Priority, Reliability},
core::Priority,
network::NetworkMessage,
transport::{
fragment::FragmentHeader,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl StageIn {

// The Frame
let frame = FrameHeader {
reliability: Reliability::Reliable, // TODO
reliability: msg.reliability,
sn,
ext_qos: frame::ext::QoSType::new(priority),
};
Expand Down
Loading
Loading