From a332bfd24d65862cb794d0fea0c683a51d5db6ea Mon Sep 17 00:00:00 2001 From: Matilde Morrone Date: Fri, 28 Feb 2025 19:38:11 +0100 Subject: [PATCH] Spawn a single task with branch selection --- src/helper.rs | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/helper.rs b/src/helper.rs index 3d9e82a..6011a4e 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -33,33 +33,32 @@ impl Handle { C: Fn(RawFd) -> CFut + Send + 'static, CFut: Future + Send + 'static, { - let (ask_sx, respond_rx) = { + let (ask_sx, respond_rx, close_sx) = { let (ask_sx, ask_rx) = tokio_mpsc::unbounded_channel::(); - let (respond_sx, respond_rx) = mpsc::channel::(); - - let mut ask_rx = UnboundedReceiverStream::new(ask_rx); - - tokio::spawn(async move { - while let Some(path) = ask_rx.next().await { - let _ = respond_sx.send(open(path).await); - } - }); - - (ask_sx, respond_rx) - }; - - let close_sx = { let (close_sx, close_rx) = tokio_mpsc::unbounded_channel::(); + let (respond_sx, respond_rx) = mpsc::channel::(); + let mut close_rx = UnboundedReceiverStream::new(close_rx); + let mut ask_rx = UnboundedReceiverStream::new(ask_rx); tokio::spawn(async move { - while let Some(fd) = close_rx.next().await { - close(fd).await; + loop { + tokio::select! { + Some(path) = ask_rx.next() => { + if respond_sx.send(open(path).await).is_err() { + break; + } + } + Some(fd) = close_rx.next() => { + close(fd).await; + } + else => break + } } }); - close_sx + (ask_sx, respond_rx, close_sx) }; let (rx, signal_sender) = spawn_libinput_task(seat_name, ask_sx, close_sx, respond_rx)?;