Skip to content

Commit

Permalink
Refactored transport
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Dec 8, 2023
1 parent 3e881d6 commit 1ca94b0
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 514 deletions.
8 changes: 4 additions & 4 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ impl PartialEq<LinkUnicast> for Link {
fn eq(&self, other: &LinkUnicast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
}
}
impl PartialEq<LinkMulticast> for Link {
}

impl PartialEq<LinkMulticast> for Link {
fn eq(&self, other: &LinkMulticast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
}
}
}
41 changes: 14 additions & 27 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
unicast::{
establishment::{
compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie,
establishment::{compute_sn, ext, AcceptFsm, Cookie, Zenoh080Cookie},
link::{
EstablishedTransportLinkUnicast, TransportLinkUnicast, TransportLinkUnicastConfig,
TransportLinkUnicastDirection,
},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
TransportConfigUnicast,
},
TransportManager,
Expand Down Expand Up @@ -710,31 +711,17 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let a_link = TransportLinkUnicast::new(link.link, a_config);
let a_link = link.reconfigure(a_config);
let s_link = format!("{:?}", a_link);
let transport = step!(manager.init_transport_unicast(config, a_link).await);

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
.map_err(|e| (e, Some(close::reason::INVALID))))
.sync(osyn_out.other_initial_sn)
.await;

// Finalize the transport
let input = InputFinalize {
transport: transport.clone(),
other_lease: osyn_out.other_lease,
};
step!(finalize_transport(&link, manager, input)
.await
.map_err(|e| (e, Some(close::reason::INVALID))));

// Send the open_ack on the link
step!(link
.send(&oack_out.open_ack.into())
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));
let a_link = EstablishedTransportLinkUnicast::new(a_link, Some(oack_out.open_ack.into()));
let _transport = manager
.init_transport_unicast(
config,
a_link,
osyn_out.other_initial_sn,
osyn_out.other_lease,
)
.await?;

log::debug!(
"New transport link accepted from {} to {}: {}.",
Expand Down
58 changes: 1 addition & 57 deletions io/zenoh-transport/src/unicast/establishment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ pub(super) mod cookie;
pub mod ext;
pub(crate) mod open;

use super::{TransportPeer, TransportUnicast};
use crate::{common::seq_num, unicast::link::TransportLinkUnicast, TransportManager};
use crate::common::seq_num;
use async_trait::async_trait;
use cookie::*;
use sha3::{
digest::{ExtendableOutput, Update, XofReader},
Shake128,
};
use std::time::Duration;
use zenoh_link::Link;
use zenoh_protocol::{
core::{Field, Resolution, ZenohId},
transport::TransportSn,
};
use zenoh_result::ZResult;

/*************************************/
/* TRAITS */
Expand Down Expand Up @@ -115,55 +111,3 @@ pub(super) fn compute_sn(zid1: ZenohId, zid2: ZenohId, resolution: Resolution) -
hasher.finalize_xof().read(&mut array);
TransportSn::from_le_bytes(array) & seq_num::get_mask(resolution.get(Field::FrameSN))
}

pub(super) struct InputFinalize {
pub(super) transport: TransportUnicast,
pub(super) other_lease: Duration,
}
// Finalize the transport, notify the callback and start the link tasks
pub(super) async fn finalize_transport(
link: &TransportLinkUnicast,
manager: &TransportManager,
input: self::InputFinalize,
) -> ZResult<()> {
// Retrive the transport's transport
let transport = input.transport.get_inner()?;

// Start the TX loop
let keep_alive = manager.config.unicast.lease / manager.config.unicast.keep_alive as u32;
transport.start_tx(link, &manager.tx_executor, keep_alive)?;

// Assign a callback if the transport is new
// Keep the lock to avoid concurrent new_transport and closing/closed notifications
let a_guard = transport.get_alive().await;
if transport.get_callback().is_none() {
let peer = TransportPeer {
zid: transport.get_zid(),
whatami: transport.get_whatami(),
links: vec![Link::from(link)],
is_qos: transport.is_qos(),
#[cfg(feature = "shared-memory")]
is_shm: transport.is_shm(),
};
// Notify the transport handler that there is a new transport and get back a callback
// NOTE: the read loop of the link the open message was sent on remains blocked
// until new_unicast() returns. The read_loop in the various links
// waits for any eventual transport to associate to.
let callback = manager
.config
.handler
.new_unicast(peer, input.transport.clone())?;
// Set the callback on the transport
transport.set_callback(callback);
}
if let Some(callback) = transport.get_callback() {
// Notify the transport handler there is a new link on this transport
callback.new_link(Link::from(link));
}
drop(a_guard);

// Start the RX loop
transport.start_rx(link, input.other_lease)?;

Ok(())
}
39 changes: 15 additions & 24 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
unicast::{
establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
establishment::{compute_sn, ext, OpenFsm},
link::{
EstablishedTransportLinkUnicast, TransportLinkUnicast, TransportLinkUnicastConfig,
TransportLinkUnicastDirection,
},
TransportConfigUnicast, TransportUnicast,
},
TransportManager,
Expand Down Expand Up @@ -621,29 +624,17 @@ pub(crate) async fn open_link(
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
};
let o_link = TransportLinkUnicast::new(link.link, o_config);
let o_link = link.reconfigure(o_config);
let s_link = format!("{:?}", o_link);
let transport = step!(
manager
.init_transport_unicast(
config,
o_link,
oack_out.other_initial_sn,
oack_out.other_lease
)
.await
);

let output = InputFinalize {
transport,
other_lease: oack_out.other_lease,
};
let transport = output.transport.clone();
let res = finalize_transport(&link, manager, output).await;
if let Err(e) = res {
let _ = transport.close().await;
return Err(e);
}
let o_link = EstablishedTransportLinkUnicast::new(o_link, None);
let transport = manager
.init_transport_unicast(
config,
o_link,
oack_out.other_initial_sn,
oack_out.other_lease,
)
.await?;

log::debug!(
"New transport link opened from {} to {}: {}.",
Expand Down
84 changes: 78 additions & 6 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,20 @@ pub(crate) struct TransportLinkUnicastConfig {

#[derive(PartialEq, Eq)]
pub(crate) struct TransportLinkUnicast {
pub(crate) link: LinkUnicast,
pub(crate) link: Arc<LinkUnicast>,
pub(crate) config: TransportLinkUnicastConfig,
}

impl TransportLinkUnicast {
pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self {
pub(crate) fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self {
Self::init(Arc::new(link), config)
}

pub(crate) fn reconfigure(self, new_config: TransportLinkUnicastConfig) -> Self {
Self::init(self.link, new_config)
}

fn init(link: Arc<LinkUnicast>, mut config: TransportLinkUnicastConfig) -> Self {
config.mtu = link.get_mtu().min(config.mtu);
Self { link, config }
}
Expand All @@ -60,7 +68,10 @@ impl TransportLinkUnicast {

pub(crate) fn tx(&self) -> TransportLinkUnicastTx {
TransportLinkUnicastTx {
inner: self.clone(),
inner: Self {
link: self.link.clone(),
config: self.config,
},
#[cfg(feature = "transport_compression")]
buffer: self.config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(self.config.mtu as usize),
Expand All @@ -70,7 +81,10 @@ impl TransportLinkUnicast {

pub(crate) fn rx(&self) -> TransportLinkUnicastRx {
TransportLinkUnicastRx {
inner: self.clone(),
inner: Self {
link: self.link.clone(),
config: self.config,
},
}
}

Expand Down Expand Up @@ -116,13 +130,13 @@ impl fmt::Debug for TransportLinkUnicast {

impl From<&TransportLinkUnicast> for Link {
fn from(link: &TransportLinkUnicast) -> Self {
Link::from(&link.link)
Link::from(link.link.as_ref())
}
}

impl From<TransportLinkUnicast> for Link {
fn from(link: TransportLinkUnicast) -> Self {
Link::from(link.link)
Link::from(link.link.as_ref())
}
}

Expand Down Expand Up @@ -273,3 +287,61 @@ impl fmt::Debug for TransportLinkUnicastRx {
.finish()
}
}

pub(crate) struct EstablishAck {
link: TransportLinkUnicastTx,
ack: Option<TransportMessage>,
}

impl EstablishAck {
pub(crate) fn new(link: &TransportLinkUnicast, ack: Option<TransportMessage>) -> Self {
Self {
link: link.tx(),
ack,
}
}

pub(crate) async fn ack(mut self) -> ZResult<()> {
if let Some(msg) = self.ack {
return self.link.send(&msg).await.map(|_| {});
}
Ok(())
}

pub(crate) fn link(&self) -> Link {
self.link.inner.link.as_ref().into()
}
}

#[derive(PartialEq, Eq)]
pub(crate) struct EstablishedTransportLinkUnicast {
link: TransportLinkUnicast,
ack: Option<TransportMessage>,
}

impl EstablishedTransportLinkUnicast {
pub(crate) fn new(link: TransportLinkUnicast, ack: Option<TransportMessage>) -> Self {
Self { link, ack }
}

pub(crate) fn inner_config(&self) -> &TransportLinkUnicastConfig {
&self.link.config
}

pub(crate) fn ack(self) -> (TransportLinkUnicast, EstablishAck) {
let ack = EstablishAck::new(&self.link, self.ack);
(self.link, ack)
}
pub(crate) fn fail(self) -> TransportLinkUnicast {
self.link
}
}

impl fmt::Display for EstablishedTransportLinkUnicast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.ack.as_ref() {
Some(ack) => write!(f, "{}({})", self.link, ack),
None => write!(f, "{}", self.link),
}
}
}
13 changes: 8 additions & 5 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ impl TransportUnicastLowlatency {

pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> {
let guard = zasyncwrite!(self.link);
let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?;
send_with_link(
&guard,
link,
msg,
#[cfg(feature = "stats")]
&self.stats,
Expand Down Expand Up @@ -133,7 +134,7 @@ impl TransportUnicastLowlatency {
let c_transport = self.clone();
let handle = task::spawn(async move {
let guard = zasyncread!(c_transport.link);
let link = guard.rx();
let link = guard.as_ref().unwrap().rx();
drop(guard);
let rx_buffer_size = c_transport.manager.config.link_rx_buffer_size;

Expand Down Expand Up @@ -174,7 +175,7 @@ impl TransportUnicastLowlatency {
/* TASKS */
/*************************************/
async fn keepalive_task(
link: Arc<RwLock<TransportLinkUnicast>>,
link: Arc<RwLock<Option<TransportLinkUnicast>>>,
keep_alive: Duration,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
Expand All @@ -186,8 +187,9 @@ async fn keepalive_task(
};

let guard = zasyncwrite!(link);
let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?;
let _ = send_with_link(
&guard,
link,
keepailve,
#[cfg(feature = "stats")]
&stats,
Expand Down Expand Up @@ -261,7 +263,8 @@ async fn rx_task_dgram(
let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc());

// Async read from the underlying link
let bytes = link.inner
let bytes = link
.inner
.link
.read(&mut buffer)
.timeout(lease)
Expand Down
Loading

0 comments on commit 1ca94b0

Please sign in to comment.