Skip to content

Commit

Permalink
Mutable recv for unicast link
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 7, 2023
1 parent d0dbd94 commit 3d5ab84
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 53 deletions.
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ macro_rules! ztake {
/* OPEN */
/*************************************/
#[async_trait]
impl<'a> OpenFsm for AuthFsm<'a> {
impl<'a> OpenFsm for &'a AuthFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::Auth>;
async fn send_init_syn(
&self,
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
const S: &str = "Auth extension - Send InitSyn.";
Expand Down Expand Up @@ -341,7 +341,7 @@ impl<'a> OpenFsm for AuthFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::Auth>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
const S: &str = "Auth extension - Recv InitAck.";
Expand Down Expand Up @@ -385,7 +385,7 @@ impl<'a> OpenFsm for AuthFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<open::ext::Auth>;
async fn send_open_syn(
&self,
self,
state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
const S: &str = "Auth extension - Send OpenSyn.";
Expand Down Expand Up @@ -432,7 +432,7 @@ impl<'a> OpenFsm for AuthFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<open::ext::Auth>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
const S: &str = "Auth extension - Recv OpenAck.";
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/auth/pubkey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,13 @@ impl StateOpen {
}

#[async_trait]
impl<'a> OpenFsm for AuthPubKeyFsm<'a> {
impl<'a> OpenFsm for &'a AuthPubKeyFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<ext::InitSyn>;
async fn send_init_syn(
&self,
self,
_input: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
const S: &str = "PubKey extension - Send InitSyn.";
Expand All @@ -392,7 +392,7 @@ impl<'a> OpenFsm for AuthPubKeyFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<ext::InitAck>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
const S: &str = "PubKey extension - Recv InitAck.";
Expand Down Expand Up @@ -438,7 +438,7 @@ impl<'a> OpenFsm for AuthPubKeyFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<ext::OpenSyn>;
async fn send_open_syn(
&self,
self,
state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
const S: &str = "PubKey extension - Send OpenSyn.";
Expand All @@ -461,7 +461,7 @@ impl<'a> OpenFsm for AuthPubKeyFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<ext::OpenAck>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
const S: &str = "PubKey extension - Recv OpenAck.";
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ where
/// ZExtUnit
#[async_trait]
impl<'a> OpenFsm for AuthUsrPwdFsm<'a> {
impl<'a> OpenFsm for &'a AuthUsrPwdFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<ext::InitSyn>;
async fn send_init_syn(
&self,
self,
_input: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let output = zasyncread!(self.inner)
Expand All @@ -295,7 +295,7 @@ impl<'a> OpenFsm for AuthUsrPwdFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<ext::InitAck>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
const S: &str = "UsrPwd extension - Recv InitSyn.";
Expand All @@ -316,7 +316,7 @@ impl<'a> OpenFsm for AuthUsrPwdFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<ext::OpenSyn>;
async fn send_open_syn(
&self,
self,
state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
const S: &str = "UsrPwd extension - Send OpenSyn.";
Expand Down Expand Up @@ -352,7 +352,7 @@ impl<'a> OpenFsm for AuthUsrPwdFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<ext::OpenAck>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
const S: &str = "UsrPwd extension - Recv OpenAck.";
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ impl StateOpen {
}

#[async_trait]
impl<'a> OpenFsm for CompressionFsm<'a> {
impl<'a> OpenFsm for &'a CompressionFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::Compression>;
async fn send_init_syn(
&self,
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let output = state
Expand All @@ -70,7 +70,7 @@ impl<'a> OpenFsm for CompressionFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::Compression>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state, other_ext) = input;
Expand All @@ -81,7 +81,7 @@ impl<'a> OpenFsm for CompressionFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<open::ext::Compression>;
async fn send_open_syn(
&self,
self,
_state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
Ok(None)
Expand All @@ -90,7 +90,7 @@ impl<'a> OpenFsm for CompressionFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<open::ext::Compression>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
_state: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/lowlatency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ impl StateOpen {
}

#[async_trait]
impl<'a> OpenFsm for LowLatencyFsm<'a> {
impl<'a> OpenFsm for &'a LowLatencyFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::LowLatency>;
async fn send_init_syn(
&self,
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let output = state.is_lowlatency.then_some(init::ext::LowLatency::new());
Expand All @@ -68,7 +68,7 @@ impl<'a> OpenFsm for LowLatencyFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::LowLatency>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state, other_ext) = input;
Expand All @@ -79,7 +79,7 @@ impl<'a> OpenFsm for LowLatencyFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<open::ext::LowLatency>;
async fn send_open_syn(
&self,
self,
_state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
Ok(None)
Expand All @@ -88,7 +88,7 @@ impl<'a> OpenFsm for LowLatencyFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<open::ext::LowLatency>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
_state: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/multilink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ impl StateOpen {
}

#[async_trait]
impl<'a> OpenFsm for MultiLinkFsm<'a> {
impl<'a> OpenFsm for &'a MultiLinkFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::MultiLink>;
async fn send_init_syn(
&self,
self,
input: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let pubkey = match input.pubkey.as_ref() {
Expand All @@ -117,7 +117,7 @@ impl<'a> OpenFsm for MultiLinkFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::MultiLink>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
const S: &str = "MultiLink extension - Recv InitAck.";
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<'a> OpenFsm for MultiLinkFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<open::ext::MultiLinkSyn>;
async fn send_open_syn(
&self,
self,
input: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
let pubkey = match input.pubkey.as_ref() {
Expand All @@ -171,7 +171,7 @@ impl<'a> OpenFsm for MultiLinkFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<open::ext::MultiLinkAck>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
let (state, mut ext) = input;
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/unicast/establishment/ext/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ impl StateOpen {
}

#[async_trait]
impl<'a> OpenFsm for QoSFsm<'a> {
impl<'a> OpenFsm for &'a QoSFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::QoS>;
async fn send_init_syn(
&self,
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let output = state.is_qos.then_some(init::ext::QoS::new());
Expand All @@ -68,7 +68,7 @@ impl<'a> OpenFsm for QoSFsm<'a> {
type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::QoS>);
type RecvInitAckOut = ();
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state, other_ext) = input;
Expand All @@ -79,7 +79,7 @@ impl<'a> OpenFsm for QoSFsm<'a> {
type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = Option<open::ext::QoS>;
async fn send_open_syn(
&self,
self,
_state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
Ok(None)
Expand All @@ -88,7 +88,7 @@ impl<'a> OpenFsm for QoSFsm<'a> {
type RecvOpenAckIn = (&'a mut StateOpen, Option<open::ext::QoS>);
type RecvOpenAckOut = ();
async fn recv_open_ack(
&self,
self,
_state: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions io/zenoh-transport/src/unicast/establishment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ pub trait OpenFsm {
type SendInitSynIn;
type SendInitSynOut;
async fn send_init_syn(
&self,
self,
input: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error>;

type RecvInitAckIn;
type RecvInitAckOut;
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error>;

type SendOpenSynIn;
type SendOpenSynOut;
async fn send_open_syn(
&self,
self,
input: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error>;

type RecvOpenAckIn;
type RecvOpenAckOut;
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error>;
}
Expand Down
12 changes: 6 additions & 6 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ struct OpenLink<'a> {
}

#[async_trait]
impl<'a> OpenFsm for OpenLink<'a> {
impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
type Error = OpenError;

type SendInitSynIn = (&'a mut TransportLinkUnicast, &'a mut State, SendInitSynIn);
type SendInitSynOut = ();
async fn send_init_syn(
&self,
self,
input: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let (link, state, input) = input;
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<'a> OpenFsm for OpenLink<'a> {
type RecvInitAckIn = (&'a mut TransportLinkUnicast, &'a mut State);
type RecvInitAckOut = RecvInitAckOut;
async fn recv_init_ack(
&self,
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (link, state) = input;
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<'a> OpenFsm for OpenLink<'a> {
type SendOpenSynIn = (&'a mut TransportLinkUnicast, &'a mut State, SendOpenSynIn);
type SendOpenSynOut = SendOpenSynOut;
async fn send_open_syn(
&self,
self,
input: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
let (link, state, input) = input;
Expand Down Expand Up @@ -422,7 +422,7 @@ impl<'a> OpenFsm for OpenLink<'a> {
type RecvOpenAckIn = (&'a mut TransportLinkUnicast, &'a mut State);
type RecvOpenAckOut = RecvOpenAckOut;
async fn recv_open_ack(
&self,
self,
input: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
let (link, state) = input;
Expand Down Expand Up @@ -516,7 +516,7 @@ pub(crate) async fn open_link(
is_compression: false, // Perform the exchange Init/Open exchange with no compression
};
let mut link = TransportLinkUnicast::new(link, config);
let fsm = OpenLink {
let mut fsm = OpenLink {
ext_qos: ext::qos::QoSFsm::new(),
#[cfg(feature = "transport_multilink")]
ext_mlink: manager.state.unicast.multilink.fsm(&manager.prng),
Expand Down
Loading

0 comments on commit 3d5ab84

Please sign in to comment.