From 780ec606ba4000a17b2da0e2b0b668e11d37c65e Mon Sep 17 00:00:00 2001 From: Dmitrii Bannov <104833606+yellowhatter@users.noreply.github.com> Date: Fri, 15 Dec 2023 12:01:37 +0300 Subject: [PATCH] Use TX executor for multicast TX (#626) --- io/zenoh-transport/src/multicast/link.rs | 7 +++++-- io/zenoh-transport/src/multicast/transport.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 8e1d17fefe..21ed0b3fdf 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -23,7 +23,9 @@ use crate::{ priority::TransportPriorityTx, }, multicast::transport::TransportMulticastInner, + TransportExecutor, }; +use async_executor::Task; use async_std::{ prelude::FutureExt, task::{self, JoinHandle}, @@ -269,7 +271,7 @@ pub(super) struct TransportLinkMulticastUniversal { // The transport this link is associated to transport: TransportMulticastInner, // The signals to stop TX/RX tasks - handle_tx: Option>>, + handle_tx: Option>>, signal_rx: Signal, handle_rx: Option>>, } @@ -295,6 +297,7 @@ impl TransportLinkMulticastUniversal { &mut self, config: TransportLinkMulticastConfigUniversal, priority_tx: Arc<[TransportPriorityTx]>, + executor: &TransportExecutor, ) { let initial_sns: Vec = priority_tx .iter() @@ -331,7 +334,7 @@ impl TransportLinkMulticastUniversal { // Spawn the TX task let c_link = self.link.clone(); let ctransport = self.transport.clone(); - let handle = task::spawn(async move { + let handle = executor.spawn(async move { let res = tx_task( consumer, c_link.tx(), diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index ca6cddaf2b..d5a1da14d4 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -257,7 +257,7 @@ impl TransportMulticastInner { sn_resolution: self.manager.config.resolution.get(Field::FrameSN), batch_size, }; - l.start_tx(config, self.priority_tx.clone()); + l.start_tx(config, self.priority_tx.clone(), &self.manager.tx_executor); Ok(()) } None => {