Skip to content

Commit

Permalink
Refactor transport trait (#610)
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter authored Dec 14, 2023
1 parent 7340153 commit 68aadaf
Show file tree
Hide file tree
Showing 21 changed files with 800 additions and 673 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,10 @@ impl SharedMemoryManager {
{
Ok(m) => m,
Err(ShmemError::LinkExists) => {
log::trace!("SharedMemory already exists, opening it");
ShmemConf::new()
.flink(path.clone())
.open()
.map_err(|e| ShmError(zerror!("Unable to open SharedMemoryManager: {}", e)))?
return Err(ShmError(zerror!(
"Unable to open SharedMemoryManager: SharedMemory already exists"
))
.into())
}
Err(e) => {
return Err(ShmError(zerror!("Unable to open SharedMemoryManager: {}", e)).into())
Expand Down
12 changes: 12 additions & 0 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,15 @@ impl From<LinkMulticast> for Link {
Link::from(&link)
}
}

impl PartialEq<LinkUnicast> for Link {
fn eq(&self, other: &LinkUnicast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
}
}

impl PartialEq<LinkMulticast> for Link {
fn eq(&self, other: &LinkMulticast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
}
}
13 changes: 10 additions & 3 deletions io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ use async_std::fs::remove_file;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use filepath::FilePath;
use nix::libc;
use nix::unistd::unlink;
use rand::Rng;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::fmt;
use std::fs::File;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::sync::Arc;
use zenoh_core::{zasyncread, zasyncwrite};
use zenoh_protocol::core::{EndPoint, Locator};

use unix_named_pipe::{create, open_read, open_write};
use unix_named_pipe::{create, open_write};

use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
Expand Down Expand Up @@ -161,7 +163,12 @@ impl PipeR {
}

fn open_unique_pipe_for_read(path: &str) -> ZResult<File> {
let read = open_read(path)?;
let read = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)?;

#[cfg(not(target_os = "macos"))]
read.try_lock(FileLockMode::Exclusive)?;
Ok(read)
Expand Down
31 changes: 31 additions & 0 deletions io/zenoh-transport/src/common/seq_num.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,37 @@ impl SeqNum {
Ok((gap != 0) && ((gap & !(self.mask >> 1)) == 0))
}

/// Checks to see if two sequence number are in a precedence relationship,
/// while taking into account roll backs AND do update the sn value if check succeed.
///
/// Two case are considered:
///
/// ## Case 1: sna < snb
///
/// In this case *sna* precedes *snb* iff (snb - sna) <= semi_int where
/// semi_int is defined as half the sequence number resolution.
/// In other terms, sna precedes snb iff there are less than half
/// the length for the interval that separates them.
///
/// ## Case 2: sna > snb
///
/// In this case *sna* precedes *snb* iff (sna - snb) > semi_int.
///
/// # Arguments
///
/// * `value` - The sequence number which should be checked for precedence relation.
pub(crate) fn roll(&mut self, value: TransportSn) -> ZResult<bool> {
if (value & !self.mask) != 0 {
bail!("The sequence number value must be smaller than the resolution");
}
let gap = value.wrapping_sub(self.value) & self.mask;
if (gap != 0) && ((gap & !(self.mask >> 1)) == 0) {
self.value = value;
return Ok(true);
}
Ok(false)
}

/// Computes the modulo gap between two sequence numbers.
#[cfg(test)] // @TODO: remove #[cfg(test)] once reliability is implemented
pub(crate) fn gap(&self, value: TransportSn) -> ZResult<TransportSn> {
Expand Down
45 changes: 16 additions & 29 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
common::batch::BatchConfig,
unicast::{
establishment::{
compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie,
establishment::{compute_sn, ext, AcceptFsm, Cookie, Zenoh080Cookie},
link::{
LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig,
TransportLinkUnicastDirection,
},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
TransportConfigUnicast,
},
TransportManager,
Expand Down Expand Up @@ -585,7 +586,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
}
}

pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> {
pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> {
let mtu = link.get_mtu();
let is_streamed = link.is_streamed();
let config = TransportLinkUnicastConfig {
Expand All @@ -597,7 +598,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
is_compression: false,
},
};
let mut link = TransportLinkUnicast::new(link.clone(), config);
let mut link = TransportLinkUnicast::new(link, config);
let mut fsm = AcceptLink {
link: &mut link,
prng: &manager.prng,
Expand Down Expand Up @@ -718,31 +719,17 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
is_compression: state.link.ext_compression.is_compression(),
},
};
let a_link = TransportLinkUnicast::new(link.link.clone(), a_config);
let a_link = link.reconfigure(a_config);
let s_link = format!("{:?}", a_link);
let transport = step!(manager.init_transport_unicast(config, a_link).await);

// Send the open_ack on the link
step!(link
.send(&oack_out.open_ack.into())
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
.map_err(|e| (e, Some(close::reason::INVALID))))
.sync(osyn_out.other_initial_sn)
.await;

// Finalize the transport
let input = InputFinalize {
transport: transport.clone(),
other_lease: osyn_out.other_lease,
};
step!(finalize_transport(&link, manager, input)
.await
.map_err(|e| (e, Some(close::reason::INVALID))));
let a_link = LinkUnicastWithOpenAck::new(a_link, Some(oack_out.open_ack));
let _transport = manager
.init_transport_unicast(
config,
a_link,
osyn_out.other_initial_sn,
osyn_out.other_lease,
)
.await?;

log::debug!(
"New transport link accepted from {} to {}: {}.",
Expand Down
58 changes: 1 addition & 57 deletions io/zenoh-transport/src/unicast/establishment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ pub(super) mod cookie;
pub mod ext;
pub(crate) mod open;

use super::{TransportPeer, TransportUnicast};
use crate::{common::seq_num, unicast::link::TransportLinkUnicast, TransportManager};
use crate::common::seq_num;
use async_trait::async_trait;
use cookie::*;
use sha3::{
digest::{ExtendableOutput, Update, XofReader},
Shake128,
};
use std::time::Duration;
use zenoh_link::Link;
use zenoh_protocol::{
core::{Field, Resolution, ZenohId},
transport::TransportSn,
};
use zenoh_result::ZResult;

/*************************************/
/* TRAITS */
Expand Down Expand Up @@ -115,55 +111,3 @@ pub(super) fn compute_sn(zid1: ZenohId, zid2: ZenohId, resolution: Resolution) -
hasher.finalize_xof().read(&mut array);
TransportSn::from_le_bytes(array) & seq_num::get_mask(resolution.get(Field::FrameSN))
}

pub(super) struct InputFinalize {
pub(super) transport: TransportUnicast,
pub(super) other_lease: Duration,
}
// Finalize the transport, notify the callback and start the link tasks
pub(super) async fn finalize_transport(
link: &TransportLinkUnicast,
manager: &TransportManager,
input: self::InputFinalize,
) -> ZResult<()> {
// Retrive the transport's transport
let transport = input.transport.get_inner()?;

// Start the TX loop
let keep_alive = manager.config.unicast.lease / manager.config.unicast.keep_alive as u32;
transport.start_tx(link, &manager.tx_executor, keep_alive)?;

// Assign a callback if the transport is new
// Keep the lock to avoid concurrent new_transport and closing/closed notifications
let a_guard = transport.get_alive().await;
if transport.get_callback().is_none() {
let peer = TransportPeer {
zid: transport.get_zid(),
whatami: transport.get_whatami(),
links: vec![Link::from(link)],
is_qos: transport.is_qos(),
#[cfg(feature = "shared-memory")]
is_shm: transport.is_shm(),
};
// Notify the transport handler that there is a new transport and get back a callback
// NOTE: the read loop of the link the open message was sent on remains blocked
// until new_unicast() returns. The read_loop in the various links
// waits for any eventual transport to associate to.
let callback = manager
.config
.handler
.new_unicast(peer, input.transport.clone())?;
// Set the callback on the transport
transport.set_callback(callback);
}
if let Some(callback) = transport.get_callback() {
// Notify the transport handler there is a new link on this transport
callback.new_link(Link::from(link));
}
drop(a_guard);

// Start the RX loop
transport.start_rx(link, input.other_lease)?;

Ok(())
}
37 changes: 15 additions & 22 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
common::batch::BatchConfig,
unicast::{
establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
establishment::{compute_sn, ext, OpenFsm},
link::{
LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig,
TransportLinkUnicastDirection,
},
TransportConfigUnicast, TransportUnicast,
},
TransportManager,
Expand Down Expand Up @@ -629,27 +632,17 @@ pub(crate) async fn open_link(
is_compression: state.link.ext_compression.is_compression(),
},
};
let o_link = TransportLinkUnicast::new(link.link.clone(), o_config);
let o_link = link.reconfigure(o_config);
let s_link = format!("{:?}", o_link);
let transport = step!(manager.init_transport_unicast(config, o_link).await);

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
.map_err(|e| (e, Some(close::reason::INVALID))))
.sync(oack_out.other_initial_sn)
.await;

let output = InputFinalize {
transport,
other_lease: oack_out.other_lease,
};
let transport = output.transport.clone();
let res = finalize_transport(&link, manager, output).await;
if let Err(e) = res {
let _ = transport.close().await;
return Err(e);
}
let o_link = LinkUnicastWithOpenAck::new(o_link, None);
let transport = manager
.init_transport_unicast(
config,
o_link,
oack_out.other_initial_sn,
oack_out.other_lease,
)
.await?;

log::debug!(
"New transport link opened from {} to {}: {}.",
Expand Down
Loading

0 comments on commit 68aadaf

Please sign in to comment.