From 10b73613b2e47da86573939f2d5ba27109b9fc30 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 17 Dec 2024 15:26:08 +0100 Subject: [PATCH] Drop sooner mutex guards during open/accept --- .../src/unicast/establishment/accept.rs | 80 ++++++++++--------- .../src/unicast/establishment/open.rs | 61 +++++++------- 2 files changed, 74 insertions(+), 67 deletions(-) diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 386b9f832..bc29a884f 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -356,40 +356,44 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Create the cookie - let cookie_nonce: u64 = zasynclock!(self.prng).gen(); - let cookie = Cookie { - zid: input.other_zid, - whatami: input.other_whatami, - resolution: state.transport.resolution, - batch_size: state.transport.batch_size, - nonce: cookie_nonce, - ext_qos: state.transport.ext_qos, - #[cfg(feature = "transport_multilink")] - ext_mlink: state.transport.ext_mlink, - #[cfg(feature = "shared-memory")] - ext_shm: state.transport.ext_shm, - #[cfg(feature = "transport_auth")] - ext_auth: state.link.ext_auth, - ext_lowlatency: state.transport.ext_lowlatency, - #[cfg(feature = "transport_compression")] - ext_compression: state.link.ext_compression, - ext_patch: state.transport.ext_patch, - }; + let (cookie, cookie_nonce): (ZSlice, u64) = { + let mut prng = zasynclock!(self.prng); + + let nonce: u64 = prng.gen(); + let cookie = Cookie { + zid: input.other_zid, + whatami: input.other_whatami, + resolution: state.transport.resolution, + batch_size: state.transport.batch_size, + nonce, + ext_qos: state.transport.ext_qos, + #[cfg(feature = "transport_multilink")] + ext_mlink: state.transport.ext_mlink, + #[cfg(feature = "shared-memory")] + ext_shm: state.transport.ext_shm, + #[cfg(feature = "transport_auth")] + ext_auth: state.link.ext_auth, + ext_lowlatency: state.transport.ext_lowlatency, + #[cfg(feature = "transport_compression")] + ext_compression: state.link.ext_compression, + ext_patch: state.transport.ext_patch, + }; - let mut encrypted = vec![]; - let mut writer = encrypted.writer(); - let mut codec = Zenoh080Cookie { - prng: &mut *zasynclock!(self.prng), - cipher: self.cipher, - codec: Zenoh080::new(), + let mut encrypted = vec![]; + let mut writer = encrypted.writer(); + let mut codec = Zenoh080Cookie { + prng: &mut prng, + cipher: self.cipher, + codec: Zenoh080::new(), + }; + codec.write(&mut writer, &cookie).map_err(|_| { + ( + zerror!("Encoding cookie failed").into(), + Some(close::reason::INVALID), + ) + })?; + (encrypted.into(), nonce) }; - codec.write(&mut writer, &cookie).map_err(|_| { - ( - zerror!("Encoding cookie failed").into(), - Some(close::reason::INVALID), - ) - })?; - let cookie: ZSlice = encrypted.into(); // Send the message on the link let msg: TransportMessage = InitAck { @@ -477,8 +481,9 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { // Decrypt the cookie with the cipher let cookie: Cookie = { + let mut prng = zasynclock!(self.prng); let mut codec = Zenoh080Cookie { - prng: &mut *zasynclock!(self.prng), + prng: &mut prng, cipher: self.cipher, codec: Zenoh080::new(), }; @@ -722,6 +727,9 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - let batch_size = manager.config.batch_size.min(batch_size::UNICAST).min(mtu); let iack_out = { + #[cfg(feature = "transport_auth")] + let mut prng = zasynclock!(manager.prng); + let mut state = State { transport: StateTransport { batch_size, @@ -743,11 +751,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { #[cfg(feature = "transport_auth")] - ext_auth: manager - .state - .unicast - .authenticator - .accept(&mut *zasynclock!(manager.prng)), + ext_auth: manager.state.unicast.authenticator.accept(&mut *prng), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::StateAccept::new( manager.config.unicast.is_compression, diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index a4283e364..69d71ccfd 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -603,35 +603,38 @@ pub(crate) async fn open_link( .min(link.config.batch.mtu) .min(batch_size::UNICAST); - let mut state = State { - transport: StateTransport { - batch_size, - resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos, &endpoint)?, - #[cfg(feature = "transport_multilink")] - ext_mlink: manager - .state - .unicast - .multilink - .open(manager.config.unicast.max_links > 1), - #[cfg(feature = "shared-memory")] - ext_shm: ext::shm::StateOpen::new(), - ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), - ext_patch: ext::patch::StateOpen::new(), - }, - #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] - link: StateLink { - #[cfg(feature = "transport_auth")] - ext_auth: manager - .state - .unicast - .authenticator - .open(&mut *zasynclock!(manager.prng)), - #[cfg(feature = "transport_compression")] - ext_compression: ext::compression::StateOpen::new( - manager.config.unicast.is_compression, - ), - }, + let mut state = { + #[cfg(feature = "transport_auth")] + let mut prng = zasynclock!(manager.prng); + + State { + transport: StateTransport { + batch_size, + resolution: manager.config.resolution, + ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos, &endpoint)?, + #[cfg(feature = "transport_multilink")] + ext_mlink: manager + .state + .unicast + .multilink + .open(manager.config.unicast.max_links > 1), + #[cfg(feature = "shared-memory")] + ext_shm: ext::shm::StateOpen::new(), + ext_lowlatency: ext::lowlatency::StateOpen::new( + manager.config.unicast.is_lowlatency, + ), + ext_patch: ext::patch::StateOpen::new(), + }, + #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] + link: StateLink { + #[cfg(feature = "transport_auth")] + ext_auth: manager.state.unicast.authenticator.open(&mut *prng), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateOpen::new( + manager.config.unicast.is_compression, + ), + }, + } }; // Init handshake