Skip to content

Commit

Permalink
Merge pull request #6 from yellowhatter/duplicate_messages
Browse files Browse the repository at this point in the history
Duplicate messages
  • Loading branch information
yellowhatter authored Mar 12, 2024
2 parents 4537dc2 + 9825e90 commit cc6c39f
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 245 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
/// Maximum number of sessions that can be simultaneously alive
max_sessions: 1000,
/// Maximum number of incoming links that are admitted per session
max_links: 1,
max_links: 100,
/// Enables the LowLatency transport
/// This option does not make LowLatency transport mandatory, the actual implementation of transport
/// used will depend on Establish procedure and other party's settings
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Default for TransportUnicastConf {
accept_timeout: 10_000,
accept_pending: 100,
max_sessions: 1_000,
max_links: 1,
max_links: 100,
lowlatency: false,
qos: QoSUnicastConf::default(),
compression: CompressionUnicastConf::default(),
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async-global-executor = { workspace = true }
async-std = { workspace = true }
async-trait = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
lz4_flex = { workspace = true }
paste = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl TransportLinkUnicastTx {
.as_slice(),
};

// log::trace!("WBytes: {:02x?}", bytes);
log::trace!("Link: {:?} - WBytes: {}", self, bytes.len());

// Send the message on the link
self.inner.link.write_all(bytes).await?;
Expand Down Expand Up @@ -190,7 +190,7 @@ impl fmt::Display for TransportLinkUnicastTx {

impl fmt::Debug for TransportLinkUnicastTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TransportLinkUnicastRx")
f.debug_struct("TransportLinkUnicastTx")
.field("link", &self.inner.link)
.field("config", &self.inner.config)
.field("buffer", &self.buffer.as_ref().map(|b| b.capacity()))
Expand Down Expand Up @@ -230,7 +230,7 @@ impl TransportLinkUnicastRx {
self.link.read(into.as_mut_slice()).await?
};

// log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]);
log::trace!("Link: {:?} - RBytes: {}", self, end);

let buffer = ZSlice::make(Arc::new(into), 0, end)
.map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?;
Expand Down
192 changes: 95 additions & 97 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,23 @@ use super::transport::TransportUnicastUniversal;
use crate::common::stats::TransportStats;
use crate::{
common::{
batch::{BatchConfig, RBatch},
pipeline::{
TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer,
TransmissionPipelineProducer,
},
batch::{Finalize, RBatch},
pipeline::TransmissionPipelineConsumer,
priority::TransportPriorityTx,
},
unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx},
TransportExecutor,
};
use async_std::prelude::FutureExt;
use async_std::task;
use async_std::task::JoinHandle;
use async_std::{
sync::RwLock as AsyncRwLock,
task::{self, JoinHandle},
};
use std::{
sync::{Arc, RwLock},
time::Duration,
};
use zenoh_buffers::ZSliceBuffer;
use zenoh_core::zwrite;
use zenoh_core::{zasyncread, zasyncwrite, zwrite};
use zenoh_protocol::transport::{KeepAlive, TransportMessage};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};
Expand All @@ -50,86 +48,25 @@ pub(super) struct Tasks {
pub(super) struct TransportLinkUnicastUniversal {
// The underlying link
pub(super) link: TransportLinkUnicast,
// The transmission pipeline
pub(super) pipeline: TransmissionPipelineProducer,
// The task handling substruct
tasks: Arc<Tasks>,
}

impl TransportLinkUnicastUniversal {
pub(super) fn new(
transport: &TransportUnicastUniversal,
link: TransportLinkUnicast,
priority_tx: &[TransportPriorityTx],
) -> (Self, TransmissionPipelineConsumer) {
pub(super) fn new(link: TransportLinkUnicast, priority_tx: &[TransportPriorityTx]) -> Self {
assert!(!priority_tx.is_empty());

let config = TransmissionPipelineConf {
batch: BatchConfig {
mtu: link.config.batch.mtu,
is_streamed: link.link.is_streamed(),
#[cfg(feature = "transport_compression")]
is_compression: link.config.batch.is_compression,
},
queue_size: transport.manager.config.queue_size,
backoff: transport.manager.config.queue_backoff,
};

// The pipeline
let (producer, consumer) = TransmissionPipeline::make(config, priority_tx);

let tasks = Arc::new(Tasks {
handle_tx: RwLock::new(None),
signal_rx: Signal::new(),
handle_rx: RwLock::new(None),
});

let result = Self {
link,
pipeline: producer,
tasks,
};

(result, consumer)
Self { link, tasks }
}
}

impl TransportLinkUnicastUniversal {
pub(super) fn start_tx(
&mut self,
transport: TransportUnicastUniversal,
consumer: TransmissionPipelineConsumer,
executor: &TransportExecutor,
keep_alive: Duration,
) {
let mut guard = zwrite!(self.tasks.handle_tx);
if guard.is_none() {
// Spawn the TX task
let mut tx = self.link.tx();
let handle = executor.spawn(async move {
let res = tx_task(
consumer,
&mut tx,
keep_alive,
#[cfg(feature = "stats")]
transport.stats.clone(),
)
.await;
if let Err(e) = res {
log::debug!("{}", e);
// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
task::spawn(async move { transport.del_link(tx.inner.link()).await });
}
});
*guard = Some(handle);
}
}

pub(super) fn stop_tx(&mut self) {
self.pipeline.disable();
}

pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) {
let mut guard = zwrite!(self.tasks.handle_rx);
if guard.is_none() {
Expand Down Expand Up @@ -165,7 +102,6 @@ impl TransportLinkUnicastUniversal {

pub(super) async fn close(mut self) -> ZResult<()> {
log::trace!("{}: closing", self.link);
self.stop_tx();
self.stop_rx();

let handle_tx = zwrite!(self.tasks.handle_tx).take();
Expand All @@ -185,22 +121,88 @@ impl TransportLinkUnicastUniversal {
/*************************************/
/* TASKS */
/*************************************/
async fn tx_task(

pub(super) async fn tx_task(
mut pipeline: TransmissionPipelineConsumer,
link: &mut TransportLinkUnicastTx,
links: Arc<AsyncRwLock<Vec<TransportLinkUnicastTx>>>,
keep_alive: Duration,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
#[derive(Default, Debug)]
enum Disc {
#[default]
Sequential,
Parallel,
Spawn,
}

let env = std::env::var("ZENOH_TX_DISC");
log::debug!("ZENOH_TX_DISC: {env:?}");
let disc = match env {
Ok(d) => match d.as_str() {
"sequential" => Disc::Sequential,
"parallel" => Disc::Parallel,
"spawn" => Disc::Spawn,
_ => Disc::default(),
},
Err(_) => Disc::default(),
};
log::debug!("Tx disc: {disc:?}");

loop {
match pipeline.pull().timeout(keep_alive).await {
let res = pipeline.pull().timeout(keep_alive).await;
let mut ls = zasyncread!(links);
match res {
Ok(res) => match res {
Some((mut batch, priority)) => {
link.send_batch(&mut batch).await?;
while ls.is_empty() {
log::trace!("No links available for TX");
drop(ls);
task::sleep(Duration::from_millis(100)).await;
ls = zasyncread!(links);
}

#[cfg(feature = "stats")]
#[cfg(feature = "transport_compression")]
{
stats.inc_tx_t_msgs(batch.stats.t_msgs);
stats.inc_tx_bytes(batch.len() as usize);
batch.config.is_compression = false;
}

let res = batch
.finalize(None)
.map_err(|_| zerror!("Batch finalization error"))?;

if let Finalize::Buffer = res {
panic!("Compression is not supported");
};

// Send the message on the links
match disc {
Disc::Sequential => {
// Sequential
for l in ls.as_slice() {
let _ = l.inner.link.write_all(batch.as_slice()).await;
}
}
Disc::Parallel => {
// Parallel but blocking
use futures::stream::StreamExt;
let _ = futures::stream::iter(0..ls.len())
.map(|i| ls[i].inner.link.write_all(batch.as_slice()))
.buffer_unordered(ls.len())
.collect::<Vec<_>>()
.await; // Ignore errors
}
Disc::Spawn => {
// Parallel spawn
for l in ls.iter() {
let c_b = batch.clone();
let c_l = l.inner.link.clone();
task::spawn(async move {
if let Err(e) = c_l.write_all(c_b.as_slice()).await {
log::debug!("Write failed on {:?}: {}", c_l, e);
}
});
}
}
}

// Reinsert the batch into the queue
Expand All @@ -210,13 +212,10 @@ async fn tx_task(
},
Err(_) => {
let message: TransportMessage = KeepAlive.into();

#[allow(unused_variables)] // Used when stats feature is enabled
let n = link.send(&message).await?;
#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(1);
stats.inc_tx_bytes(n);
drop(ls);
let mut ls = zasyncwrite!(links);
for l in ls.as_mut_slice() {
let _ = l.send(&message).await?;
}
}
}
Expand All @@ -225,15 +224,14 @@ async fn tx_task(
// Drain the transmission pipeline and write remaining bytes on the wire
let mut batches = pipeline.drain();
for (mut b, _) in batches.drain(..) {
link.send_batch(&mut b)
.timeout(keep_alive)
.await
.map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??;

#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(b.stats.t_msgs);
stats.inc_tx_bytes(b.len() as usize);
let mut ls = zasyncwrite!(links);
for l in ls.as_mut_slice() {
l.send_batch(&mut b)
.timeout(keep_alive)
.await
.map_err(|_| {
zerror!("{}: flush failed after {} ms", l, keep_alive.as_millis())
})??;
}
}

Expand Down
Loading

0 comments on commit cc6c39f

Please sign in to comment.