Skip to content

Commit

Permalink
Unwind panics occurred in spawned tasks (#950)
Browse files Browse the repository at this point in the history
* Unwind panics occurred in spawned tasks

We panic in these cases anyway, but it would be better to see the real message

* Shim for Shuttle's `JoinError`
  • Loading branch information
akoshelev authored Feb 21, 2024
1 parent 860e046 commit fbff7f1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
19 changes: 19 additions & 0 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ pub(crate) mod task {
pub use shuttle::future::{JoinError, JoinHandle};
}

#[cfg(feature = "shuttle")]
pub(crate) mod shim {
use std::any::Any;

use shuttle_crate::future::JoinError;

/// There is currently an API mismatch between Tokio and Shuttle `JoinError` implementations.
/// This trait brings them closer together, until it is addressed
pub trait Tokio: Sized {
fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self>;
}

impl Tokio for JoinError {
fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
Err(self) // Shuttle `JoinError` does not wrap panics
}
}
}

#[cfg(not(all(feature = "shuttle", test)))]
pub(crate) mod task {
pub use tokio::task::{JoinError, JoinHandle};
Expand Down
34 changes: 31 additions & 3 deletions ipa-core/src/seq_join/multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ where
type Item = F::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[cfg(feature = "shuttle")]
use crate::shim::Tokio;

let mut this = self.project();

// Draw more values from the input, up to the capacity.
Expand All @@ -114,7 +117,13 @@ where
if this.spawner.remaining() > 0 {
this.spawner.as_mut().poll_next(cx).map(|v| match v {
Some(Ok(v)) => Some(v),
Some(Err(_)) => panic!("SequentialFutures: spawned task aborted"),
Some(Err(e)) => {
if let Ok(reason) = e.try_into_panic() {
std::panic::resume_unwind(reason);
} else {
panic!("SequentialFutures: spawned task is cancelled")
}
}
None => None,
})
} else if this.source.is_done() {
Expand Down Expand Up @@ -168,9 +177,11 @@ where

#[cfg(all(test, unit_test))]
mod tests {
use std::{future::Future, pin::Pin};
use std::{future::Future, num::NonZeroUsize, pin::Pin};

use futures_util::future::lazy;

use crate::test_executor::run;
use crate::{seq_join::seq_try_join_all, test_executor::run};

/// This test demonstrates that forgetting the future returned by `parallel_join` is not safe and will cause
/// use-after-free safety error. It spawns a few tasks that constantly try to access the `borrow_from_me` weak
Expand Down Expand Up @@ -248,4 +259,21 @@ mod tests {
drop(f);
});
}

#[test]
#[should_panic(expected = "panic in task 1")]
fn panic_from_task_unwinds_to_main() {
fn f(i: u32) -> impl Future<Output = Result<u32, &'static str>> {
lazy(move |_| match i {
1 => panic!("panic in task 1"),
i => Ok(i),
})
}

run(|| async {
let active = NonZeroUsize::new(10).unwrap();
let _ = seq_try_join_all(active, (1..=3).map(f)).await;
assert!(false, "Should have aborted earlier");
});
}
}

0 comments on commit fbff7f1

Please sign in to comment.