From b9eba4ac631c0046e8035613a164168e5f178ca8 Mon Sep 17 00:00:00 2001 From: Bas Zalmstra Date: Thu, 16 Jan 2025 16:20:04 +0100 Subject: [PATCH] fix: shell hang on progress bar --- Cargo.lock | 2 +- crates/pixi_progress/Cargo.toml | 2 +- crates/pixi_progress/src/lib.rs | 144 +++++++++++++++----------------- src/install_pypi/mod.rs | 2 - src/uv_reporter.rs | 32 +++---- 5 files changed, 80 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ccfe29bd..59586e6db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3949,7 +3949,7 @@ name = "pixi_progress" version = "0.1.0" dependencies = [ "indicatif", - "tokio", + "parking_lot 0.12.3", ] [[package]] diff --git a/crates/pixi_progress/Cargo.toml b/crates/pixi_progress/Cargo.toml index 71a62a8d2..3a9a54501 100644 --- a/crates/pixi_progress/Cargo.toml +++ b/crates/pixi_progress/Cargo.toml @@ -11,4 +11,4 @@ version = "0.1.0" [dependencies] indicatif = { workspace = true } -tokio = { workspace = true, features = ["sync", "rt"] } +parking_lot = { workspace = true } diff --git a/crates/pixi_progress/src/lib.rs b/crates/pixi_progress/src/lib.rs index 98e2e1148..b6ee34b1d 100644 --- a/crates/pixi_progress/src/lib.rs +++ b/crates/pixi_progress/src/lib.rs @@ -1,11 +1,10 @@ use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState}; +use parking_lot::Mutex; use std::borrow::Cow; -use std::collections::VecDeque; use std::fmt::Write; use std::future::Future; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; use std::time::Duration; -use tokio::sync::mpsc::{channel, Sender}; /// Returns a global instance of [`indicatif::MultiProgress`]. /// @@ -86,116 +85,107 @@ pub async fn await_in_progress Fut, Fut: Future, + state: Arc>, +} + +/// Internal state kept by the [`ProgressBarMessageFormatter`] and derived state. +/// +/// This contains the state of the formatter and allows updating the progress bar. +#[derive(Debug)] +struct State { pb: ProgressBar, + pending: Vec, } +impl State { + /// Notify the state that a certain operation happened. + fn notify(&mut self, msg: Operation) { + match msg { + Operation::Started(op) => self.pending.push(op), + Operation::Finished(op) => { + let Some(idx) = self.pending.iter().position(|p| p == &op) else { + panic!("operation {op} was never started"); + }; + self.pending.remove(idx); + } + } + + if self.pending.is_empty() { + self.pb.set_message(""); + } else if self.pending.len() == 1 { + self.pb.set_message(self.pending[0].clone()); + } else { + self.pb.set_message(format!( + "{} (+{})", + self.pending.last().unwrap(), + self.pending.len() - 1 + )); + } + } +} + +#[derive(Debug)] enum Operation { Started(String), Finished(String), } pub struct ScopedTask { + state: Option>>, name: String, - sender: Option>, - pb: ProgressBar, } impl ScopedTask { - /// Finishes the execution of the task. - pub async fn finish(mut self) -> ProgressBar { - // Send the finished operation. If this fails the receiving end was most likely already - // closed and we can just ignore the error. - if let Some(sender) = self.sender.take() { - let _ = sender - .send(Operation::Finished(std::mem::take(&mut self.name))) - .await; + fn start(name: String, state: Arc>) -> Self { + state.lock().notify(Operation::Started(name.clone())); + Self { + state: Some(state), + name, } - self.pb.clone() } /// Finishes the execution of the task. - pub fn finish_sync(mut self) -> ProgressBar { - // Send the finished operation. If this fails the receiving end was most likely already - // closed and we can just ignore the error. - if let Some(sender) = self.sender.take() { - let _ = sender.try_send(Operation::Finished(std::mem::take(&mut self.name))); + pub fn finish(self) { + drop(self) + } +} + +impl Drop for ScopedTask { + fn drop(&mut self) { + if let Some(state) = self.state.take() { + state + .lock() + .notify(Operation::Finished(std::mem::take(&mut self.name))); } - self.pb.clone() } } impl ProgressBarMessageFormatter { /// Allows the user to specify a custom capacity for the internal channel. - pub fn new_with_capacity(progress_bar: ProgressBar, capacity: usize) -> Self { - let pb = progress_bar.clone(); - let (tx, mut rx) = channel::(capacity); - tokio::spawn(async move { - let mut pending = VecDeque::with_capacity(capacity); - while let Some(msg) = rx.recv().await { - match msg { - Operation::Started(op) => pending.push_back(op), - Operation::Finished(op) => { - let Some(idx) = pending.iter().position(|p| p == &op) else { - panic!("operation {op} was never started"); - }; - pending.remove(idx); - } - } - - if pending.is_empty() { - progress_bar.set_message(""); - } else if pending.len() == 1 { - progress_bar.set_message(pending[0].clone()); - } else { - progress_bar.set_message(format!( - "{} (+{})", - pending.back().unwrap(), - pending.len() - 1 - )); - } - } - }); - Self { sender: tx, pb } - } - - /// Adds the start of another task to the progress bar and returns an object that is used to - /// mark the lifetime of the task. If the object is dropped the task is considered finished. - #[must_use] - pub async fn start(&self, op: String) -> ScopedTask { - self.sender - .send(Operation::Started(op.clone())) - .await - .unwrap(); - ScopedTask { - name: op, - sender: Some(self.sender.clone()), - pb: self.pb.clone(), + pub fn new(pb: ProgressBar) -> Self { + Self { + state: Arc::new(Mutex::new(State { + pb, + pending: Vec::new(), + })), } } /// Adds the start of another task to the progress bar and returns an object that is used to /// mark the lifetime of the task. If the object is dropped the task is considered finished. #[must_use] - pub fn start_sync(&self, op: String) -> ScopedTask { - self.sender - .try_send(Operation::Started(op.clone())) - .expect("could not send operation, channel full or closed"); - ScopedTask { - name: op, - sender: Some(self.sender.clone()), - pb: self.pb.clone(), - } + pub fn start(&self, op: String) -> ScopedTask { + ScopedTask::start(op, self.state.clone()) } /// Wraps an future into a task which starts when the task starts and ends when the future /// returns. pub async fn wrap>(&self, name: impl Into, fut: F) -> T { - let task = self.start(name.into()).await; + let task = self.start(name.into()); let result = fut.await; - task.finish().await; + task.finish(); result } } diff --git a/src/install_pypi/mod.rs b/src/install_pypi/mod.rs index a556e6280..d17d773d7 100644 --- a/src/install_pypi/mod.rs +++ b/src/install_pypi/mod.rs @@ -304,7 +304,6 @@ pub async fn update_python_distributions( let options = UvReporterOptions::new() .with_length(remote.len() as u64) - .with_capacity(remote.len() + 30) .with_starting_tasks(remote.iter().map(|(d, _)| format!("{}", d.name()))) .with_top_level_message("Preparing distributions"); @@ -459,7 +458,6 @@ pub async fn update_python_distributions( let options = UvReporterOptions::new() .with_length(all_dists.len() as u64) - .with_capacity(all_dists.len() + 30) .with_starting_tasks(all_dists.iter().map(|d| format!("{}", d.name()))) .with_top_level_message("Installing distributions"); diff --git a/src/uv_reporter.rs b/src/uv_reporter.rs index 06d03d5a6..ef955f107 100644 --- a/src/uv_reporter.rs +++ b/src/uv_reporter.rs @@ -21,7 +21,6 @@ pub struct UvReporterOptions { top_level_message: &'static str, progress_bar: Option, starting_tasks: Vec, - capacity: Option, } impl UvReporterOptions { @@ -31,7 +30,6 @@ impl UvReporterOptions { top_level_message: "", progress_bar: None, starting_tasks: Vec::new(), - capacity: None, } } @@ -50,11 +48,6 @@ impl UvReporterOptions { self } - pub(crate) fn with_capacity(mut self, capacity: usize) -> Self { - self.capacity = Some(capacity); - self - } - pub(crate) fn with_starting_tasks(mut self, tasks: impl Iterator) -> Self { self.starting_tasks = tasks.collect_vec(); self @@ -84,16 +77,13 @@ impl UvReporter { }; // Create the formatter - let fmt = ProgressBarMessageFormatter::new_with_capacity( - pb.clone(), - options.capacity.unwrap_or(20), - ); + let fmt = ProgressBarMessageFormatter::new(pb.clone()); let mut name_to_id = std::collections::HashMap::new(); let mut starting_tasks = vec![]; // Add the starting tasks for task in options.starting_tasks { - let scoped_task = fmt.start_sync(task.clone()); + let scoped_task = fmt.start(task.clone()); starting_tasks.push(Some(scoped_task)); name_to_id.insert(task, starting_tasks.len() - 1); } @@ -110,8 +100,8 @@ impl UvReporter { self.scoped_tasks.lock().expect("progress lock poison") } - pub(crate) fn start_sync(&self, message: String) -> usize { - let task = self.fmt.start_sync(message); + pub(crate) fn start(&self, message: String) -> usize { + let task = self.fmt.start(message); let mut lock = self.lock(); lock.push(Some(task)); lock.len() - 1 @@ -125,7 +115,7 @@ impl UvReporter { .unwrap_or_else(|| panic!("progress bar error idx ({id}) > {len}")) .take(); if let Some(task) = task { - task.finish_sync(); + task.finish(); } } @@ -151,7 +141,7 @@ impl uv_installer::PrepareReporter for UvReporter { } fn on_build_start(&self, dist: &BuildableSource) -> usize { - self.start_sync(format!("building {}", dist)) + self.start(format!("building {}", dist)) } fn on_build_complete(&self, _dist: &BuildableSource, id: usize) { @@ -159,7 +149,7 @@ impl uv_installer::PrepareReporter for UvReporter { } fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize { - self.start_sync(format!("cloning {}", url)) + self.start(format!("cloning {}", url)) } fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) { @@ -196,7 +186,7 @@ impl uv_resolver::ResolverReporter for UvReporter { } fn on_build_start(&self, dist: &BuildableSource) -> usize { - self.start_sync(format!("building {}", dist,)) + self.start(format!("building {}", dist,)) } fn on_build_complete(&self, _dist: &BuildableSource, id: usize) { @@ -204,7 +194,7 @@ impl uv_resolver::ResolverReporter for UvReporter { } fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize { - self.start_sync(format!("cloning {}", url)) + self.start(format!("cloning {}", url)) } fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) { @@ -227,7 +217,7 @@ impl uv_resolver::ResolverReporter for UvReporter { impl uv_distribution::Reporter for UvReporter { fn on_build_start(&self, dist: &BuildableSource) -> usize { - self.start_sync(format!("building {}", dist,)) + self.start(format!("building {}", dist,)) } fn on_build_complete(&self, _dist: &BuildableSource, id: usize) { @@ -235,7 +225,7 @@ impl uv_distribution::Reporter for UvReporter { } fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize { - self.start_sync(format!("cloning {}", url)) + self.start(format!("cloning {}", url)) } fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) {