Skip to content

Commit

Permalink
WIP on transport refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Dec 5, 2023
1 parent 598653b commit 3e881d6
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 150 deletions.
1 change: 0 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}

#[derive(Clone)]
pub struct LinkUnicast(pub Arc<dyn LinkUnicastTrait>);

#[async_trait]
Expand Down
18 changes: 9 additions & 9 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,15 +584,15 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
}
}

pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> {
pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> {
let mtu = link.get_mtu();
let config = TransportLinkUnicastConfig {
mtu,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: false,
};
let mut link = TransportLinkUnicast::new(link.clone(), config);
let mut link = TransportLinkUnicast::new(link, config);
let mut fsm = AcceptLink {
link: &mut link,
prng: &manager.prng,
Expand Down Expand Up @@ -710,16 +710,10 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let a_link = TransportLinkUnicast::new(link.link.clone(), a_config);
let a_link = TransportLinkUnicast::new(link.link, a_config);
let s_link = format!("{:?}", a_link);
let transport = step!(manager.init_transport_unicast(config, a_link).await);

Check failure on line 715 in io/zenoh-transport/src/unicast/establishment/accept.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

this method takes 4 arguments but 2 arguments were supplied

Check failure on line 715 in io/zenoh-transport/src/unicast/establishment/accept.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

this method takes 4 arguments but 2 arguments were supplied

Check failure on line 715 in io/zenoh-transport/src/unicast/establishment/accept.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

this method takes 4 arguments but 2 arguments were supplied

// Send the open_ack on the link
step!(link
.send(&oack_out.open_ack.into())
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
Expand All @@ -736,6 +730,12 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
.await
.map_err(|e| (e, Some(close::reason::INVALID))));

// Send the open_ack on the link
step!(link
.send(&oack_out.open_ack.into())
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));

log::debug!(
"New transport link accepted from {} to {}: {}.",
osyn_out.other_zid,
Expand Down
20 changes: 11 additions & 9 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,16 +621,18 @@ pub(crate) async fn open_link(
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let o_link = TransportLinkUnicast::new(link.link.clone(), o_config);
let o_link = TransportLinkUnicast::new(link.link, o_config);
let s_link = format!("{:?}", o_link);
let transport = step!(manager.init_transport_unicast(config, o_link).await);

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
.map_err(|e| (e, Some(close::reason::INVALID))))
.sync(oack_out.other_initial_sn)
.await;
let transport = step!(
manager
.init_transport_unicast(
config,
o_link,
oack_out.other_initial_sn,
oack_out.other_lease
)
.await
);

let output = InputFinalize {
transport,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) struct TransportLinkUnicastConfig {
pub(crate) is_compression: bool,
}

#[derive(Clone, PartialEq, Eq)]
#[derive(PartialEq, Eq)]
pub(crate) struct TransportLinkUnicast {
pub(crate) link: LinkUnicast,
pub(crate) config: TransportLinkUnicastConfig,
Expand Down
27 changes: 14 additions & 13 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use super::transport::TransportUnicastLowlatency;
#[cfg(feature = "stats")]
use crate::stats::TransportStats;
use crate::unicast::link::TransportLinkUnicastRx;
use crate::{unicast::link::TransportLinkUnicast, TransportExecutor};
use async_std::task;
use async_std::{prelude::FutureExt, sync::RwLock};
Expand Down Expand Up @@ -132,7 +133,7 @@ impl TransportUnicastLowlatency {
let c_transport = self.clone();
let handle = task::spawn(async move {
let guard = zasyncread!(c_transport.link);
let link = guard.clone();
let link = guard.rx();
drop(guard);
let rx_buffer_size = c_transport.manager.config.link_rx_buffer_size;

Expand Down Expand Up @@ -197,26 +198,26 @@ async fn keepalive_task(
}

async fn rx_task_stream(
link: TransportLinkUnicast,
link: TransportLinkUnicastRx,
transport: TransportUnicastLowlatency,
lease: Duration,
rx_buffer_size: usize,
) -> ZResult<()> {
async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult<usize> {
async fn read(link: &TransportLinkUnicastRx, buffer: &mut [u8]) -> ZResult<usize> {
// 16 bits for reading the batch length
let mut length = [0_u8, 0_u8, 0_u8, 0_u8];
link.link.read_exact(&mut length).await?;
link.inner.link.read_exact(&mut length).await?;
let n = u32::from_le_bytes(length) as usize;
let len = buffer.len();
let b = buffer.get_mut(0..n).ok_or_else(|| {
zerror!("Batch len is invalid. Received {n} but negotiated max len is {len}.")
})?;
link.link.read_exact(b).await?;
link.inner.link.read_exact(b).await?;
Ok(n)
}

// The pool of buffers
let mtu = link.config.mtu as usize;
let mtu = link.inner.config.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand All @@ -237,18 +238,18 @@ async fn rx_task_stream(

// Deserialize all the messages from the current ZBuf
let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap();
transport.read_messages(zslice, &link.link).await?;
transport.read_messages(zslice, &link.inner.link).await?;
}
}

async fn rx_task_dgram(
link: TransportLinkUnicast,
link: TransportLinkUnicastRx,
transport: TransportUnicastLowlatency,
lease: Duration,
rx_buffer_size: usize,
) -> ZResult<()> {
// The pool of buffers
let mtu = link.config.mtu as usize;
let mtu = link.inner.config.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand All @@ -260,7 +261,7 @@ async fn rx_task_dgram(
let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc());

// Async read from the underlying link
let bytes = link
let bytes = link.inner
.link
.read(&mut buffer)
.timeout(lease)
Expand All @@ -272,17 +273,17 @@ async fn rx_task_dgram(

// Deserialize all the messages from the current ZBuf
let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap();
transport.read_messages(zslice, &link.link).await?;
transport.read_messages(zslice, &link.inner.link).await?;
}
}

async fn rx_task(
link: TransportLinkUnicast,
link: TransportLinkUnicastRx,
transport: TransportUnicastLowlatency,
lease: Duration,
rx_buffer_size: usize,
) -> ZResult<()> {
if link.link.is_streamed() {
if link.inner.link.is_streamed() {
rx_task_stream(link, transport, lease, rx_buffer_size).await
} else {
rx_task_dgram(link, transport, lease, rx_buffer_size).await
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,15 @@ impl TransportUnicastTrait for TransportUnicastLowlatency {

fn start_tx(
&self,
_link: &TransportLinkUnicast,
_link: &Link,
executor: &TransportExecutor,
keep_alive: Duration,
) -> ZResult<()> {
self.start_keepalive(executor, keep_alive);
Ok(())
}

fn start_rx(&self, _link: &TransportLinkUnicast, lease: Duration) -> ZResult<()> {
fn start_rx(&self, _link: &Link, lease: Duration) -> ZResult<()> {
self.internal_start_rx(lease);
Ok(())
}
Expand Down
Loading

0 comments on commit 3e881d6

Please sign in to comment.