-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with #730
Changes from 10 commits
26c55a9
239b4ac
c09a6a4
9c26273
df05068
b90fc73
3028518
03831f4
2f21cfb
e48ea54
85d8769
eb79877
061008f
90d1b76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,12 @@ use crate::prelude::ZenohId; | |
use crate::prelude::{KeyExpr, SampleKind, Value}; | ||
use crate::query::Reply; | ||
use crate::time::{new_reception_timestamp, Timestamp}; | ||
use crate::Priority; | ||
#[zenoh_macros::unstable] | ||
use serde::Serialize; | ||
use std::convert::{TryFrom, TryInto}; | ||
use zenoh_protocol::core::Encoding; | ||
use zenoh_protocol::core::{CongestionControl, Encoding}; | ||
use zenoh_protocol::network::push::ext::QoSType; | ||
|
||
pub type SourceSn = u64; | ||
|
||
|
@@ -50,6 +52,7 @@ pub(crate) struct DataInfo { | |
pub timestamp: Option<Timestamp>, | ||
pub source_id: Option<ZenohId>, | ||
pub source_sn: Option<SourceSn>, | ||
pub qos: QoS, | ||
} | ||
|
||
/// Informations on the source of a zenoh [`Sample`]. | ||
|
@@ -326,6 +329,8 @@ pub struct Sample { | |
pub kind: SampleKind, | ||
/// The [`Timestamp`] of this Sample. | ||
pub timestamp: Option<Timestamp>, | ||
/// Quality of service settings this sample was sent with. | ||
pub qos: QoS, | ||
|
||
#[cfg(feature = "unstable")] | ||
/// <div class="stab unstable"> | ||
|
@@ -361,6 +366,7 @@ impl Sample { | |
value: value.into(), | ||
kind: SampleKind::default(), | ||
timestamp: None, | ||
qos: QoS::default(), | ||
#[cfg(feature = "unstable")] | ||
source_info: SourceInfo::empty(), | ||
#[cfg(feature = "unstable")] | ||
|
@@ -383,6 +389,7 @@ impl Sample { | |
value: value.into(), | ||
kind: SampleKind::default(), | ||
timestamp: None, | ||
qos: QoS::default(), | ||
#[cfg(feature = "unstable")] | ||
source_info: SourceInfo::empty(), | ||
#[cfg(feature = "unstable")] | ||
|
@@ -407,6 +414,7 @@ impl Sample { | |
value, | ||
kind: data_info.kind, | ||
timestamp: data_info.timestamp, | ||
qos: data_info.qos, | ||
#[cfg(feature = "unstable")] | ||
source_info: data_info.into(), | ||
#[cfg(feature = "unstable")] | ||
|
@@ -418,6 +426,7 @@ impl Sample { | |
value, | ||
kind: SampleKind::default(), | ||
timestamp: None, | ||
qos: QoS::default(), | ||
#[cfg(feature = "unstable")] | ||
source_info: SourceInfo::empty(), | ||
#[cfg(feature = "unstable")] | ||
|
@@ -509,3 +518,43 @@ impl TryFrom<Reply> for Sample { | |
value.sample | ||
} | ||
} | ||
|
||
/// Structure containing quality of service data | ||
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] | ||
pub struct QoS { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using accessors, this could be a bitfield and occupy a single byte instead of 3 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the benefit of reducing 3 bytes to one on the system with word size >= 4 bytes ? As a separate struct it will still tend to occupy 4 bytes at least, no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's all about breakpoints and, to be fair, I think this layout just barely fits in API-wise, it's tempting to have only public fields, but that turns any future change to the struct's fields an API break. Builder patterns are just as convenient and give us much more flexibility in the future, both for layout optimization and for field additions. |
||
pub priority: Priority, | ||
pub congestion_control: CongestionControl, | ||
pub express: bool, | ||
} | ||
|
||
impl QoS { | ||
/// Helper function to fallback to QoS with default priority value, in case we fail to extract it | ||
pub(crate) fn from_or_default(ext: QoSType) -> QoS { | ||
Mallets marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let priority = match Priority::try_from(ext.get_priority()) { | ||
Ok(p) => p, | ||
Err(e) => { | ||
log::trace!( | ||
"Failed to convert priority: {}; replacing with default value", | ||
e.to_string() | ||
); | ||
Priority::default() | ||
} | ||
}; | ||
QoS { | ||
priority, | ||
congestion_control: ext.get_congestion_control(), | ||
express: ext.is_express(), | ||
} | ||
} | ||
} | ||
|
||
impl TryFrom<QoSType> for QoS { | ||
type Error = zenoh_result::Error; | ||
fn try_from(ext: QoSType) -> Result<Self, Self::Error> { | ||
Ok(QoS { | ||
priority: ext.get_priority().try_into()?, | ||
congestion_control: ext.get_congestion_control(), | ||
express: ext.is_express(), | ||
}) | ||
} | ||
} |
Mallets marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
// | ||
// Copyright (c) 2024 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
use async_std::prelude::FutureExt; | ||
use async_std::task; | ||
use std::time::Duration; | ||
use zenoh::prelude::r#async::*; | ||
use zenoh::{publication::Priority, SessionDeclarations}; | ||
use zenoh_core::zasync_executor_init; | ||
|
||
const TIMEOUT: Duration = Duration::from_secs(60); | ||
const SLEEP: Duration = Duration::from_secs(1); | ||
|
||
macro_rules! ztimeout { | ||
($f:expr) => { | ||
$f.timeout(TIMEOUT).await.unwrap() | ||
}; | ||
} | ||
|
||
#[test] | ||
fn pubsub() { | ||
task::block_on(async { | ||
zasync_executor_init!(); | ||
let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); | ||
let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); | ||
|
||
let publisher1 = ztimeout!(session1 | ||
.declare_publisher("test/qos") | ||
.priority(Priority::DataHigh) | ||
Mallets marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.congestion_control(CongestionControl::Drop) | ||
.res()) | ||
.unwrap(); | ||
|
||
let publisher2 = ztimeout!(session1 | ||
.declare_publisher("test/qos") | ||
.priority(Priority::DataLow) | ||
.congestion_control(CongestionControl::Block) | ||
.res()) | ||
.unwrap(); | ||
|
||
let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap(); | ||
task::sleep(SLEEP).await; | ||
|
||
ztimeout!(publisher1.put("qos").res_async()).unwrap(); | ||
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; | ||
|
||
assert_eq!(qos.priority, Priority::DataHigh); | ||
assert_eq!(qos.congestion_control, CongestionControl::Drop); | ||
|
||
ztimeout!(publisher2.put("qos").res_async()).unwrap(); | ||
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; | ||
|
||
assert_eq!(qos.priority, Priority::DataLow); | ||
assert_eq!(qos.congestion_control, CongestionControl::Block); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of how
Sample
keeps on getting more public fields. Maybe we can use the API break to make it a bit more private?