diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 58e7a2856..a93304fd6 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -398,6 +398,13 @@ typedef struct z_timestamp_t { uint64_t time; struct z_id_t id; } z_timestamp_t; +/** + * QoS settings of zenoh message. + * + */ +typedef struct z_qos_t { + uint8_t _0; +} z_qos_t; /** * A data sample. * @@ -418,6 +425,7 @@ typedef struct z_sample_t { const void *_zc_buf; enum z_sample_kind_t kind; struct z_timestamp_t timestamp; + struct z_qos_t qos; struct z_attachment_t attachment; } z_sample_t; /** @@ -1711,6 +1719,22 @@ int8_t z_put(struct z_session_t session, * Constructs the default value for :c:type:`z_put_options_t`. */ ZENOHC_API struct z_put_options_t z_put_options_default(void); +/** + * Returns default qos settings. + */ +ZENOHC_API struct z_qos_t z_qos_default(void); +/** + * Returns message congestion control. + */ +ZENOHC_API enum z_congestion_control_t z_qos_get_congestion_control(struct z_qos_t qos); +/** + * Returns message express flag. If set to true, the message is not batched to reduce the latency. + */ +ZENOHC_API bool z_qos_get_express(struct z_qos_t qos); +/** + * Returns message priority. + */ +ZENOHC_API enum z_priority_t z_qos_get_priority(struct z_qos_t qos); /** * Returns the attachment to the query by aliasing. * diff --git a/src/commons.rs b/src/commons.rs index 5d58663eb..8ac21cb11 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -14,7 +14,10 @@ use crate::collections::*; use crate::keyexpr::*; +use crate::z_congestion_control_t; use crate::z_id_t; +use crate::z_priority_t; +use crate::{impl_guarded_transmute, GuardedTransmute}; use libc::c_void; use libc::{c_char, c_ulong}; use zenoh::buffers::ZBuf; @@ -22,6 +25,7 @@ use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; use zenoh::query::ReplyKeyExpr; use zenoh::sample::Locality; +use zenoh::sample::QoS; use zenoh::sample::Sample; use zenoh_protocol::core::Timestamp; @@ -188,6 +192,47 @@ pub extern "C" fn zc_payload_null() -> zc_owned_payload_t { } } +/// QoS settings of zenoh message. +/// +#[repr(C)] +pub struct z_qos_t(u8); + +impl_guarded_transmute!(QoS, z_qos_t); +impl_guarded_transmute!(z_qos_t, QoS); + +impl From for z_qos_t { + fn from(qos: QoS) -> Self { + qos.transmute() + } +} + +impl From for QoS { + fn from(qos: z_qos_t) -> QoS { + qos.transmute() + } +} + +/// Returns message priority. +#[no_mangle] +pub extern "C" fn z_qos_get_priority(qos: z_qos_t) -> z_priority_t { + qos.transmute().priority().into() +} +/// Returns message congestion control. +#[no_mangle] +pub extern "C" fn z_qos_get_congestion_control(qos: z_qos_t) -> z_congestion_control_t { + qos.transmute().congestion_control().into() +} +/// Returns message express flag. If set to true, the message is not batched to reduce the latency. +#[no_mangle] +pub extern "C" fn z_qos_get_express(qos: z_qos_t) -> bool { + qos.transmute().express() +} +/// Returns default qos settings. +#[no_mangle] +pub extern "C" fn z_qos_default() -> z_qos_t { + QoS::default().transmute() +} + /// A data sample. /// /// A sample is the value associated to a given resource at a given point in time. @@ -207,6 +252,7 @@ pub struct z_sample_t<'a> { pub _zc_buf: &'a c_void, pub kind: z_sample_kind_t, pub timestamp: z_timestamp_t, + pub qos: z_qos_t, pub attachment: z_attachment_t, } @@ -222,6 +268,7 @@ impl<'a> z_sample_t<'a> { _zc_buf: unsafe { std::mem::transmute(owner) }, kind: sample.kind.into(), timestamp: sample.timestamp.as_ref().into(), + qos: sample.qos.into(), attachment: match &sample.attachment { Some(attachment) => z_attachment_t { data: attachment as *const _ as *mut c_void, diff --git a/tests/z_int_pub_sub_test.c b/tests/z_int_pub_sub_test.c index 8a2825618..cf005a80f 100644 --- a/tests/z_int_pub_sub_test.c +++ b/tests/z_int_pub_sub_test.c @@ -37,7 +37,10 @@ int run_publisher() { return -1; } - z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + z_publisher_options_t publisher_options = z_publisher_options_default(); + publisher_options.priority = Z_PRIORITY_DATA; + publisher_options.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), &publisher_options); if (!z_check(pub)) { perror("Unable to declare Publisher for key expression!"); return -1; @@ -68,6 +71,13 @@ void data_handler(const z_sample_t *sample, void *arg) { exit(-1); } + if (z_qos_get_congestion_control(sample->qos) != Z_CONGESTION_CONTROL_BLOCK + || z_qos_get_priority(sample->qos) != Z_PRIORITY_DATA + ) { + perror("Unexpected QoS values"); + exit(-1); + } + if (++val_num == values_count) { exit(0); };