Skip to content

Commit

Permalink
Fix link negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 7, 2023
1 parent 05117fe commit 985f86b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
7 changes: 4 additions & 3 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,17 @@ impl WBatch {
#[cfg(feature = "transport_compression")]
if self.header.is_compression() {
let buffer = buffer.ok_or_else(|| zerror!("Support buffer not provided"))?;
buffer.clear();
return self.compress(buffer);
}

Ok(Finalize::Batch)
}

#[cfg(feature = "transport_compression")]
fn compress(&mut self, buffer: &mut BBuf) -> ZResult<Finalize> {
fn compress(&mut self, support: &mut BBuf) -> ZResult<Finalize> {
// Write the initial bytes for the batch
let mut writer = buffer.writer();
let mut writer = support.writer();
if let Some(h) = self.header.get() {
let _ = writer.write_u8(h.get());
}
Expand All @@ -229,7 +230,7 @@ impl WBatch {
.map_err(|_| zerror!("Compression error"))?;

// Verify wether the resulting compressed data is smaller than the initial input
if buffer.len() < self.buffer.len() {
if support.len() < self.buffer.len() {
Ok(Finalize::Buffer)
} else {
// Keep the original uncompressed buffer and unset the compression flag from the header
Expand Down
19 changes: 8 additions & 11 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,24 +701,21 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
};
link.config.mtu = state.transport.batch_size;

let mut c_link = link.clone();
#[cfg(feature = "transport_compression")]
{
c_link.config.is_compression = state.link.ext_compression.is_compression();
}
let transport = step!(manager.init_transport_unicast(config, c_link).await);
let a_config = TransportLinkUnicastConfig {
mtu: state.transport.batch_size,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let a_link = TransportLinkUnicast::new(link.link.clone(), a_config);
let transport = step!(manager.init_transport_unicast(config, a_link).await);

// Send the open_ack on the link
step!(link
.send(&oack_out.open_ack.into())
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));

#[cfg(feature = "transport_compression")]
{
link.config.is_compression = state.link.ext_compression.is_compression();
}

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
Expand Down
16 changes: 9 additions & 7 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,15 @@ pub(crate) async fn open_link(
is_shm: state.ext_shm.is_shm(),
is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(),
};
link.config.mtu = state.transport.batch_size;
#[cfg(feature = "transport_compression")]
{
link.config.is_compression = state.link.ext_compression.is_compression();
}
let c_link = link.clone();
let transport = step!(manager.init_transport_unicast(config, c_link).await);

let o_config = TransportLinkUnicastConfig {
mtu: state.transport.batch_size,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let o_link = TransportLinkUnicast::new(link.link.clone(), o_config);
let transport = step!(manager.init_transport_unicast(config, o_link).await);

// Sync the RX sequence number
let _ = step!(transport
Expand Down
17 changes: 13 additions & 4 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl TransportLinkUnicast {
pub async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
const ERR: &str = "Write error on link: ";

log::trace!("WBatch: {:?}", batch);

let res = batch
.finalize(
#[cfg(feature = "transport_compression")]
Expand All @@ -84,9 +86,15 @@ impl TransportLinkUnicast {
.as_slice(),
};

log::trace!("WBytes: {:02x?}", bytes);

// Send the message on the link
if self.link.is_streamed() {
let len = bytes.len().to_le_bytes();
let len: BatchSize = bytes
.len()
.try_into()
.map_err(|_| zerror!("Invalid batch length"))?;
let len = len.to_le_bytes();
self.link.write_all(&len).await?;
}
self.link.write_all(bytes).await?;
Expand Down Expand Up @@ -131,16 +139,17 @@ impl TransportLinkUnicast {
self.link.read(into.as_mut_slice()).await?
};

log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]);

let buffer = ZSlice::make(Arc::new(into), 0, end)
.map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?;

log::trace!("RBatch: {:?}", buffer);

let mut batch = RBatch::new(self.batch_config(), buffer);
batch
.initialize(buff)
.map_err(|e| zerror!("{ERR}{self}. {e}."))?;

log::trace!("RBatch: {:?}", batch);

Ok(batch)
}

Expand Down

0 comments on commit 985f86b

Please sign in to comment.