Skip to content

Commit

Permalink
Merge pull request #1578 from ZettaScaleLabs/fix/drop_fragments
Browse files Browse the repository at this point in the history
fix: break fragment chain when dropping fragmented message
  • Loading branch information
Mallets authored Nov 4, 2024
2 parents bb24e95 + 1d496fb commit e73a89d
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/api/provider/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl MemoryLayout {
return Err(ZLayoutError::IncorrectLayoutArgs);
};

// size of an allocation must be a miltiple of it's alignment!
// size of an allocation must be a multiple of its alignment!
match size.get() % alignment.get_alignment_value() {
0 => Ok(Self { size, alignment }),
_ => Err(ZLayoutError::IncorrectLayoutArgs),
Expand Down
6 changes: 2 additions & 4 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,14 @@ impl ShmBufInner {

impl Drop for ShmBufInner {
fn drop(&mut self) {
// # Safety
// obviouly, we need to decrement refcount when dropping ShmBufInner instance
// SAFETY: obviously, we need to decrement refcount when dropping ShmBufInner instance
unsafe { self.dec_ref_count() };
}
}

impl Clone for ShmBufInner {
fn clone(&self) -> Self {
// # Safety
// obviouly, we need to increment refcount when cloning ShmBufInner instance
// SAFETY: obviously, we need to increment refcount when cloning ShmBufInner instance
unsafe { self.inc_ref_count() };
let bp = self.buf.load(Ordering::SeqCst);
ShmBufInner {
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
### z_liveliness

Declares a liveliness token on a given key expression (`group1/zenoh-rs` by default).
This token will be seen alive byt the `z_get_liveliness` and `z_sub_liveliness` until
This token will be seen alive by the `z_get_liveliness` and `z_sub_liveliness` until
user explicitly drops the token by pressing `'d'` or implicitly dropped by terminating
or killing the `z_liveliness` example.

Expand Down
10 changes: 6 additions & 4 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl StageIn {
let mut c_guard = self.mutex.current();

macro_rules! zgetbatch_rets {
($restore_sn:expr) => {
($($restore_sn:stmt)?) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -234,7 +234,7 @@ impl StageIn {
if !deadline.wait(&self.s_ref) {
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
$($restore_sn)?
return false;
}
c_guard = self.mutex.current();
Expand Down Expand Up @@ -262,7 +262,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!({});
let mut batch = zgetbatch_rets!();
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch, msg),
Expand Down Expand Up @@ -322,7 +322,9 @@ impl StageIn {
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());
// If deadline is reached, sequence number is incremented with `SeqNumGenerator::get`
// in order to break the fragment chain already sent.
batch = zgetbatch_rets!(let _ = tch.sn.get());

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-trait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//!
//! Static plugin is just a type which implements [`Plugin`] trait. It can be added to [`PluginsManager`] by [`PluginsManager::declare_static_plugin`](crate::manager::PluginsManager::declare_static_plugin) method.
//!
//! Dynamic plugin is a shared library which exports set of C-repr (unmangled) functions which allows to check plugin compatibility and create plugin instance. These functiuons are defined automatically by [`declare_plugin`] macro.
//! Dynamic plugin is a shared library which exports set of C-repr (unmangled) functions which allows to check plugin compatibility and create plugin instance. These functions are defined automatically by [`declare_plugin`] macro.
//!
mod compatibility;
mod manager;
Expand Down

0 comments on commit e73a89d

Please sign in to comment.