Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shell hang on progress bar #2929

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/pixi_progress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ version = "0.1.0"

[dependencies]
indicatif = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt"] }
parking_lot = { workspace = true }
144 changes: 67 additions & 77 deletions crates/pixi_progress/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState};
use parking_lot::Mutex;
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Write;
use std::future::Future;
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::sync::mpsc::{channel, Sender};

/// Returns a global instance of [`indicatif::MultiProgress`].
///
Expand Down Expand Up @@ -86,116 +85,107 @@ pub async fn await_in_progress<T, F: FnOnce(ProgressBar) -> Fut, Fut: Future<Out
/// It's primary usecase is when you have a single progress bar but multiple tasks that are running
/// and which you want to communicate to the user. This struct will set the message part of the
/// passed progress bar to the oldest unfinished task and include a the number of pending tasks.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct ProgressBarMessageFormatter {
sender: Sender<Operation>,
state: Arc<Mutex<State>>,
}

/// Internal state kept by the [`ProgressBarMessageFormatter`] and derived state.
///
/// This contains the state of the formatter and allows updating the progress bar.
#[derive(Debug)]
struct State {
pb: ProgressBar,
pending: Vec<String>,
}

impl State {
/// Notify the state that a certain operation happened.
fn notify(&mut self, msg: Operation) {
match msg {
Operation::Started(op) => self.pending.push(op),
Operation::Finished(op) => {
let Some(idx) = self.pending.iter().position(|p| p == &op) else {
panic!("operation {op} was never started");
};
self.pending.remove(idx);
}
}

if self.pending.is_empty() {
self.pb.set_message("");
} else if self.pending.len() == 1 {
self.pb.set_message(self.pending[0].clone());
} else {
self.pb.set_message(format!(
"{} (+{})",
self.pending.last().unwrap(),
self.pending.len() - 1
));
}
}
}

#[derive(Debug)]
enum Operation {
Started(String),
Finished(String),
}

pub struct ScopedTask {
state: Option<Arc<Mutex<State>>>,
name: String,
sender: Option<Sender<Operation>>,
pb: ProgressBar,
}

impl ScopedTask {
/// Finishes the execution of the task.
pub async fn finish(mut self) -> ProgressBar {
// Send the finished operation. If this fails the receiving end was most likely already
// closed and we can just ignore the error.
if let Some(sender) = self.sender.take() {
let _ = sender
.send(Operation::Finished(std::mem::take(&mut self.name)))
.await;
fn start(name: String, state: Arc<Mutex<State>>) -> Self {
state.lock().notify(Operation::Started(name.clone()));
Self {
state: Some(state),
name,
}
self.pb.clone()
}

/// Finishes the execution of the task.
pub fn finish_sync(mut self) -> ProgressBar {
// Send the finished operation. If this fails the receiving end was most likely already
// closed and we can just ignore the error.
if let Some(sender) = self.sender.take() {
let _ = sender.try_send(Operation::Finished(std::mem::take(&mut self.name)));
pub fn finish(self) {
drop(self)
}
}

impl Drop for ScopedTask {
fn drop(&mut self) {
if let Some(state) = self.state.take() {
state
.lock()
.notify(Operation::Finished(std::mem::take(&mut self.name)));
}
self.pb.clone()
}
}

impl ProgressBarMessageFormatter {
/// Allows the user to specify a custom capacity for the internal channel.
pub fn new_with_capacity(progress_bar: ProgressBar, capacity: usize) -> Self {
let pb = progress_bar.clone();
let (tx, mut rx) = channel::<Operation>(capacity);
tokio::spawn(async move {
let mut pending = VecDeque::with_capacity(capacity);
while let Some(msg) = rx.recv().await {
match msg {
Operation::Started(op) => pending.push_back(op),
Operation::Finished(op) => {
let Some(idx) = pending.iter().position(|p| p == &op) else {
panic!("operation {op} was never started");
};
pending.remove(idx);
}
}

if pending.is_empty() {
progress_bar.set_message("");
} else if pending.len() == 1 {
progress_bar.set_message(pending[0].clone());
} else {
progress_bar.set_message(format!(
"{} (+{})",
pending.back().unwrap(),
pending.len() - 1
));
}
}
});
Self { sender: tx, pb }
}

/// Adds the start of another task to the progress bar and returns an object that is used to
/// mark the lifetime of the task. If the object is dropped the task is considered finished.
#[must_use]
pub async fn start(&self, op: String) -> ScopedTask {
self.sender
.send(Operation::Started(op.clone()))
.await
.unwrap();
ScopedTask {
name: op,
sender: Some(self.sender.clone()),
pb: self.pb.clone(),
pub fn new(pb: ProgressBar) -> Self {
Self {
state: Arc::new(Mutex::new(State {
pb,
pending: Vec::new(),
})),
}
}

/// Adds the start of another task to the progress bar and returns an object that is used to
/// mark the lifetime of the task. If the object is dropped the task is considered finished.
#[must_use]
pub fn start_sync(&self, op: String) -> ScopedTask {
self.sender
.try_send(Operation::Started(op.clone()))
.expect("could not send operation, channel full or closed");
ScopedTask {
name: op,
sender: Some(self.sender.clone()),
pb: self.pb.clone(),
}
pub fn start(&self, op: String) -> ScopedTask {
ScopedTask::start(op, self.state.clone())
}

/// Wraps an future into a task which starts when the task starts and ends when the future
/// returns.
pub async fn wrap<T, F: Future<Output = T>>(&self, name: impl Into<String>, fut: F) -> T {
let task = self.start(name.into()).await;
let task = self.start(name.into());
let result = fut.await;
task.finish().await;
task.finish();
result
}
}
2 changes: 0 additions & 2 deletions src/install_pypi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ pub async fn update_python_distributions(

let options = UvReporterOptions::new()
.with_length(remote.len() as u64)
.with_capacity(remote.len() + 30)
.with_starting_tasks(remote.iter().map(|(d, _)| format!("{}", d.name())))
.with_top_level_message("Preparing distributions");

Expand Down Expand Up @@ -459,7 +458,6 @@ pub async fn update_python_distributions(

let options = UvReporterOptions::new()
.with_length(all_dists.len() as u64)
.with_capacity(all_dists.len() + 30)
.with_starting_tasks(all_dists.iter().map(|d| format!("{}", d.name())))
.with_top_level_message("Installing distributions");

Expand Down
32 changes: 11 additions & 21 deletions src/uv_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub struct UvReporterOptions {
top_level_message: &'static str,
progress_bar: Option<ProgressBar>,
starting_tasks: Vec<String>,
capacity: Option<usize>,
}

impl UvReporterOptions {
Expand All @@ -31,7 +30,6 @@ impl UvReporterOptions {
top_level_message: "",
progress_bar: None,
starting_tasks: Vec::new(),
capacity: None,
}
}

Expand All @@ -50,11 +48,6 @@ impl UvReporterOptions {
self
}

pub(crate) fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}

pub(crate) fn with_starting_tasks(mut self, tasks: impl Iterator<Item = String>) -> Self {
self.starting_tasks = tasks.collect_vec();
self
Expand Down Expand Up @@ -84,16 +77,13 @@ impl UvReporter {
};

// Create the formatter
let fmt = ProgressBarMessageFormatter::new_with_capacity(
pb.clone(),
options.capacity.unwrap_or(20),
);
let fmt = ProgressBarMessageFormatter::new(pb.clone());

let mut name_to_id = std::collections::HashMap::new();
let mut starting_tasks = vec![];
// Add the starting tasks
for task in options.starting_tasks {
let scoped_task = fmt.start_sync(task.clone());
let scoped_task = fmt.start(task.clone());
starting_tasks.push(Some(scoped_task));
name_to_id.insert(task, starting_tasks.len() - 1);
}
Expand All @@ -110,8 +100,8 @@ impl UvReporter {
self.scoped_tasks.lock().expect("progress lock poison")
}

pub(crate) fn start_sync(&self, message: String) -> usize {
let task = self.fmt.start_sync(message);
pub(crate) fn start(&self, message: String) -> usize {
let task = self.fmt.start(message);
let mut lock = self.lock();
lock.push(Some(task));
lock.len() - 1
Expand All @@ -125,7 +115,7 @@ impl UvReporter {
.unwrap_or_else(|| panic!("progress bar error idx ({id}) > {len}"))
.take();
if let Some(task) = task {
task.finish_sync();
task.finish();
}
}

Expand All @@ -151,15 +141,15 @@ impl uv_installer::PrepareReporter for UvReporter {
}

fn on_build_start(&self, dist: &BuildableSource) -> usize {
self.start_sync(format!("building {}", dist))
self.start(format!("building {}", dist))
}

fn on_build_complete(&self, _dist: &BuildableSource, id: usize) {
self.finish(id);
}

fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize {
self.start_sync(format!("cloning {}", url))
self.start(format!("cloning {}", url))
}

fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) {
Expand Down Expand Up @@ -196,15 +186,15 @@ impl uv_resolver::ResolverReporter for UvReporter {
}

fn on_build_start(&self, dist: &BuildableSource) -> usize {
self.start_sync(format!("building {}", dist,))
self.start(format!("building {}", dist,))
}

fn on_build_complete(&self, _dist: &BuildableSource, id: usize) {
self.finish(id);
}

fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize {
self.start_sync(format!("cloning {}", url))
self.start(format!("cloning {}", url))
}

fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) {
Expand All @@ -227,15 +217,15 @@ impl uv_resolver::ResolverReporter for UvReporter {

impl uv_distribution::Reporter for UvReporter {
fn on_build_start(&self, dist: &BuildableSource) -> usize {
self.start_sync(format!("building {}", dist,))
self.start(format!("building {}", dist,))
}

fn on_build_complete(&self, _dist: &BuildableSource, id: usize) {
self.finish(id);
}

fn on_checkout_start(&self, url: &url::Url, _rev: &str) -> usize {
self.start_sync(format!("cloning {}", url))
self.start(format!("cloning {}", url))
}

fn on_checkout_complete(&self, _url: &url::Url, _rev: &str, index: usize) {
Expand Down
Loading