From 95b263757325a9fe80d90c526950e3e38d117949 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 23 Jan 2024 14:24:19 -0800 Subject: [PATCH] Split the `seq_join` file into separate modules One for local spawn and one for multi-threading --- .pre-commit.stashsIsbN1 | 0 ipa-core/src/seq_join.rs | 785 -------------------------- ipa-core/src/seq_join/local.rs | 268 +++++++++ ipa-core/src/seq_join/mod.rs | 274 +++++++++ ipa-core/src/seq_join/multi_thread.rs | 252 +++++++++ 5 files changed, 794 insertions(+), 785 deletions(-) create mode 100644 .pre-commit.stashsIsbN1 delete mode 100644 ipa-core/src/seq_join.rs create mode 100644 ipa-core/src/seq_join/local.rs create mode 100644 ipa-core/src/seq_join/mod.rs create mode 100644 ipa-core/src/seq_join/multi_thread.rs diff --git a/.pre-commit.stashsIsbN1 b/.pre-commit.stashsIsbN1 new file mode 100644 index 000000000..e69de29bb diff --git a/ipa-core/src/seq_join.rs b/ipa-core/src/seq_join.rs deleted file mode 100644 index f1987eb8b..000000000 --- a/ipa-core/src/seq_join.rs +++ /dev/null @@ -1,785 +0,0 @@ -use std::{ - future::IntoFuture, - num::NonZeroUsize, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{ - stream::{iter, Iter as StreamIter, TryCollect}, - Future, Stream, StreamExt, TryStreamExt, -}; -use pin_project::pin_project; - -use crate::exact::ExactSizeStream; - -/// This helper function might be necessary to convince the compiler that -/// the return value from [`seq_try_join_all`] implements `Send`. -/// Use this if you get higher-ranked lifetime errors that mention `std::marker::Send`. -/// -/// -pub fn assert_send<'a, O>( - fut: impl Future + Send + 'a, -) -> impl Future + Send + 'a { - fut -} - -/// Sequentially join futures from a stream. -/// -/// This function polls futures in strict sequence. -/// If any future blocks, up to `active - 1` futures after it will be polled so -/// that they make progress. -/// -/// # Deadlocks -/// -/// This will fail to resolve if the progress of any future depends on a future more -/// than `active` items behind it in the input sequence. -/// -/// # Safety -/// If multi-threading is enabled, forgetting the resulting future will cause use-after-free error. Do not leak it or -/// prevent the future destructor from running. -/// -/// [`try_join_all`]: futures::future::try_join_all -/// [`Stream`]: futures::stream::Stream -/// [`StreamExt::buffered`]: futures::stream::StreamExt::buffered -pub fn seq_join<'st, S, F, O>(active: NonZeroUsize, source: S) -> SequentialFutures<'st, S, F> -where - S: Stream + Send + 'st, - F: Future + Send, - O: Send + 'static, -{ - #[cfg(feature = "multi-threading")] - unsafe { - SequentialFutures::new(active, source) - } - #[cfg(not(feature = "multi-threading"))] - SequentialFutures::new(active, source) -} - -/// The `SeqJoin` trait wraps `seq_try_join_all`, providing the `active` parameter -/// from the provided context so that the value can be made consistent. -pub trait SeqJoin { - /// Perform a sequential join of the futures from the provided iterable. - /// This uses [`seq_join`], with the current state of the associated object - /// being used to determine the number of active items to track (see [`active_work`]). - /// - /// A rough rule of thumb for how to decide between this and [`parallel_join`] is - /// that this should be used whenever you are iterating over different records. - /// [`parallel_join`] is better suited to smaller batches, such as iterating over - /// the bits of a value for a single record. - /// - /// Note that the join functions from the [`futures`] crate, such as [`join3`], - /// are also parallel and can be used where you have a small, fixed number of tasks. - /// - /// Be especially careful if you use the random bits generator with this. - /// The random bits generator can produce values out of sequence. - /// You might need to use [`parallel_join`] for that. - /// - /// [`active_work`]: Self::active_work - /// [`parallel_join`]: Self::parallel_join - /// [`join3`]: futures::future::join3 - fn try_join<'fut, I, F, O, E>( - &self, - iterable: I, - ) -> TryCollect, Vec> - where - I: IntoIterator + Send, - I::IntoIter: Send + 'fut, - F: Future> + Send + 'fut, - O: Send + 'static, - E: Send + 'static, - { - seq_try_join_all(self.active_work(), iterable) - } - - /// Join multiple tasks in parallel. Only do this if you can't use a sequential join. - /// - /// # Safety - /// Forgetting the future returned from this function will cause use-after-free. This is a tradeoff between - /// performance and safety that allows us to use regular references instead of Arc pointers. - /// - /// Dropping the future is always safe. - #[cfg(feature = "multi-threading")] - fn parallel_join<'a, I, F, O, E>( - &self, - iterable: I, - ) -> Pin, E>> + Send + 'a>> - where - I: IntoIterator + Send, - F: Future> + Send + 'a, - O: Send + 'static, - E: Send + 'static, - { - unsafe { Box::pin(multi_thread::parallel_join(iterable)) } - } - - /// Join multiple tasks in parallel. Only do this if you can't use a sequential join. - #[cfg(not(feature = "multi-threading"))] - fn parallel_join(&self, iterable: I) -> futures::future::TryJoinAll - where - I: IntoIterator, - I::Item: futures::future::TryFuture, - { - #[allow(clippy::disallowed_methods)] // Just in this one place. - futures::future::try_join_all(iterable) - } - - /// The amount of active work that is concurrently permitted. - fn active_work(&self) -> NonZeroUsize; -} - -type SeqTryJoinAll<'st, I, F> = - SequentialFutures<'st, StreamIter<::IntoIter>, F>; - -/// A substitute for [`futures::future::try_join_all`] that uses [`seq_join`]. -/// This awaits all the provided futures in order, -/// aborting early if any future returns `Result::Err`. -pub fn seq_try_join_all<'iter, I, F, O, E>( - active: NonZeroUsize, - source: I, -) -> TryCollect, Vec> -where - I: IntoIterator + Send, - I::IntoIter: Send + 'iter, - F: Future> + Send + 'iter, - O: Send + 'static, - E: Send + 'static, -{ - seq_join(active, iter(source)).try_collect() -} - -impl<'fut, S, F> ExactSizeStream for SequentialFutures<'fut, S, F> -where - S: Stream + Send + ExactSizeStream, - F: IntoFuture, - ::IntoFuture: Send + 'fut, - <::IntoFuture as Future>::Output: Send + 'static, -{ -} - -#[cfg(not(feature = "multi-threading"))] -pub use local::SequentialFutures; -#[cfg(feature = "multi-threading")] -pub use multi_thread::SequentialFutures; - -/// Parallel and sequential join that use at most one thread. Good for unit testing and debugging, -/// to get results in predictable order with fewer things happening at the same time. -#[cfg(not(feature = "multi-threading"))] -mod local { - use std::{collections::VecDeque, marker::PhantomData}; - use futures::stream::Fuse; - - use super::*; - - enum ActiveItem { - Pending(Pin>), - Resolved(F::Output), - } - - impl ActiveItem { - /// Drives this item to resolved state when value is ready to be taken out. Has no effect - /// if the value is ready. - /// - /// ## Panics - /// Panics if this item is completed - fn check_ready(&mut self, cx: &mut Context<'_>) -> bool { - let ActiveItem::Pending(f) = self else { - return true; - }; - if let Poll::Ready(v) = Future::poll(Pin::as_mut(f), cx) { - *self = ActiveItem::Resolved(v); - true - } else { - false - } - } - - /// Takes the resolved value out - /// - /// ## Panics - /// If the value is not ready yet. - #[must_use] - fn take(self) -> F::Output { - let ActiveItem::Resolved(v) = self else { - unreachable!("take should be only called once."); - }; - - v - } - } - - #[pin_project] - pub struct SequentialFutures<'unused, S, F> - where - S: Stream + Send, - F: IntoFuture, - { - #[pin] - source: Fuse, - active: VecDeque>, - _marker: PhantomData &'unused ()>, - } - - impl SequentialFutures<'_, S, F> - where - S: Stream + Send, - F: IntoFuture, - { - pub fn new(active: NonZeroUsize, source: S) -> Self { - Self { - source: source.fuse(), - active: VecDeque::with_capacity(active.get()), - _marker: PhantomData, - } - } - } - - impl Stream for SequentialFutures<'_, S, F> - where - S: Stream + Send, - F: IntoFuture, - { - type Item = F::Output; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - // Draw more values from the input, up to the capacity. - while this.active.len() < this.active.capacity() { - if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { - this.active - .push_back(ActiveItem::Pending(Box::pin(f.into_future()))); - } else { - break; - } - } - - if let Some(item) = this.active.front_mut() { - if item.check_ready(cx) { - let v = this.active.pop_front().map(ActiveItem::take); - Poll::Ready(v) - } else { - for f in this.active.iter_mut().skip(1) { - f.check_ready(cx); - } - Poll::Pending - } - } else if this.source.is_done() { - Poll::Ready(None) - } else { - Poll::Pending - } - } - - fn size_hint(&self) -> (usize, Option) { - let in_progress = self.active.len(); - let (lower, upper) = self.source.size_hint(); - ( - lower.saturating_add(in_progress), - upper.and_then(|u| u.checked_add(in_progress)), - ) - } - } -} - -/// Both joins use executor tasks to drive futures to completion. Much faster than single-threaded -/// version, so this is what we want to use in release/prod mode. -#[cfg(feature = "multi-threading")] -mod multi_thread { - use futures::stream::Fuse; - use tracing::{Instrument, Span}; - - use super::*; - - #[cfg(feature = "shuttle")] - mod shuttle_spawner { - use shuttle_crate::future::{self, JoinError, JoinHandle}; - - use super::*; - - /// Spawner implementation for Shuttle framework to run tests in parallel - pub(super) struct ShuttleSpawner; - - unsafe impl async_scoped::spawner::Spawner for ShuttleSpawner - where - T: Send + 'static, - { - type FutureOutput = Result; - type SpawnHandle = JoinHandle; - - fn spawn + Send + 'static>(&self, f: F) -> Self::SpawnHandle { - future::spawn(f) - } - } - - unsafe impl async_scoped::spawner::Blocker for ShuttleSpawner { - fn block_on>(&self, f: F) -> T { - future::block_on(f) - } - } - } - - #[cfg(feature = "shuttle")] - type Spawner<'fut, T> = async_scoped::Scope<'fut, T, shuttle_spawner::ShuttleSpawner>; - #[cfg(not(feature = "shuttle"))] - type Spawner<'fut, T> = async_scoped::TokioScope<'fut, T>; - - unsafe fn create_spawner<'fut, T: Send + 'static>() -> Spawner<'fut, T> { - #[cfg(feature = "shuttle")] - return async_scoped::Scope::create(shuttle_spawner::ShuttleSpawner); - #[cfg(not(feature = "shuttle"))] - return async_scoped::TokioScope::create(async_scoped::spawner::use_tokio::Tokio); - } - - #[pin_project] - #[must_use = "Futures do nothing unless polled"] - pub struct SequentialFutures<'fut, S, F> - where - S: Stream + Send + 'fut, - F: IntoFuture, - <::IntoFuture as Future>::Output: Send + 'static, - { - #[pin] - spawner: Spawner<'fut, F::Output>, - #[pin] - source: Fuse, - capacity: usize, - } - - impl SequentialFutures<'_, S, F> - where - S: Stream + Send, - F: IntoFuture, - <::IntoFuture as Future>::Output: Send + 'static, - { - pub unsafe fn new(active: NonZeroUsize, source: S) -> Self { - SequentialFutures { - spawner: unsafe { create_spawner() }, - source: source.fuse(), - capacity: active.get(), - } - } - } - - impl<'fut, S, F> Stream for SequentialFutures<'fut, S, F> - where - S: Stream + Send, - F: IntoFuture, - ::IntoFuture: Send + 'fut, - <::IntoFuture as Future>::Output: Send + 'static, - { - type Item = F::Output; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - // Draw more values from the input, up to the capacity. - while this.spawner.remaining() < *this.capacity { - if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { - // Making futures cancellable is critical to avoid hangs. - // if one of them panics, unwinding causes spawner to drop and, in turn, - // it blocks the thread to await all pending futures completion. If there is - // a dependency between futures, pending one will never complete. - // Cancellable futures will be cancelled when spawner is dropped which is - // the behavior we want. - this.spawner - .spawn_cancellable(f.into_future().instrument(Span::current()), || { - panic!("SequentialFutures: spawned task cancelled") - }); - } else { - break; - } - } - - // Poll spawner if it has work to do. If both source and spawner are empty, we're done. - if this.spawner.remaining() > 0 { - this.spawner.as_mut().poll_next(cx).map(|v| match v { - Some(Ok(v)) => Some(v), - Some(Err(_)) => panic!("SequentialFutures: spawned task aborted"), - None => None, - }) - } else if this.source.is_done() { - Poll::Ready(None) - } else { - Poll::Pending - } - } - - fn size_hint(&self) -> (usize, Option) { - let in_progress = self.spawner.remaining(); - let (lower, upper) = self.source.size_hint(); - ( - lower.saturating_add(in_progress), - upper.and_then(|u| u.checked_add(in_progress)), - ) - } - } - - pub(super) unsafe fn parallel_join<'fut, I, F, O, E>( - iterable: I, - ) -> impl Future, E>> + Send + 'fut - where - I: IntoIterator + Send, - F: Future> + Send + 'fut, - O: Send + 'static, - E: Send + 'static, - { - let mut scope = { - let mut scope = unsafe { create_spawner() }; - for element in iterable { - // it is important to make those cancellable to avoid deadlocks if one of the spawned future panics. - // If there is a dependency between futures, pending one will never complete. - // Cancellable futures will be cancelled when spawner is dropped which is the behavior we want. - scope.spawn_cancellable(element.instrument(Span::current()), || { - panic!("parallel_join: task cancelled") - }); - } - scope - }; - - async move { - let mut result = Vec::with_capacity(scope.len()); - while let Some(item) = scope.next().await { - // join error is nothing we can do about - result.push(item.expect("parallel_join: received JoinError")?) - } - Ok(result) - } - } -} - -#[cfg(all(test, unit_test, not(feature = "multi-threading")))] -mod local_test { - use std::{ - num::NonZeroUsize, - ptr::null, - sync::{Arc, Mutex}, - task::{Context, Poll, Waker}, - }; - - use futures::{ - future::lazy, - stream::{poll_fn, repeat_with}, - StreamExt, - }; - - use super::*; - - fn fake_waker() -> Waker { - use std::task::{RawWaker, RawWakerVTable}; - const fn fake_raw_waker() -> RawWaker { - const TABLE: RawWakerVTable = - RawWakerVTable::new(|_| fake_raw_waker(), |_| {}, |_| {}, |_| {}); - RawWaker::new(null(), &TABLE) - } - unsafe { Waker::from_raw(fake_raw_waker()) } - } - - /// Check the value of a counter, then reset it. - fn assert_count(counter_r: &Arc>, expected: usize) { - let mut counter = counter_r.lock().unwrap(); - assert_eq!(*counter, expected); - *counter = 0; - } - - /// A fully synchronous test. - #[test] - fn synchronous() { - let capacity = NonZeroUsize::new(3).unwrap(); - let v_r: Arc>> = Arc::new(Mutex::new(None)); - let v_w = Arc::clone(&v_r); - // Track when the stream was polled, - let polled_w: Arc> = Arc::new(Mutex::new(0)); - let polled_r = Arc::clone(&polled_w); - // when the stream produced something, and - let produced_w: Arc> = Arc::new(Mutex::new(0)); - let produced_r = Arc::clone(&produced_w); - // when the future was read. - let read_w: Arc> = Arc::new(Mutex::new(0)); - let read_r = Arc::clone(&read_w); - - let stream = poll_fn(|_cx| { - *polled_w.lock().unwrap() += 1; - if let Some(v) = v_r.lock().unwrap().take() { - *produced_w.lock().unwrap() += 1; - let read_w = Arc::clone(&read_w); - Poll::Ready(Some(lazy(move |_| { - *read_w.lock().unwrap() += 1; - v - }))) - } else { - // Note: we can ignore `cx` because we are driving this directly. - Poll::Pending - } - }); - let mut joined = seq_join(capacity, stream); - let waker = fake_waker(); - let mut cx = Context::from_waker(&waker); - - let res = joined.poll_next_unpin(&mut cx); - assert_count(&polled_r, 1); - assert_count(&produced_r, 0); - assert_count(&read_r, 0); - assert!(res.is_pending()); - - *v_w.lock().unwrap() = Some(7); - let res = joined.poll_next_unpin(&mut cx); - assert_count(&polled_r, 2); - assert_count(&produced_r, 1); - assert_count(&read_r, 1); - assert!(matches!(res, Poll::Ready(Some(7)))); - } - - /// A fully synchronous test with a synthetic stream, all the way to the end. - #[test] - fn complete_stream() { - const VALUE: u32 = 20; - const COUNT: usize = 7; - let capacity = NonZeroUsize::new(3).unwrap(); - // Track the number of values produced. - let produced_w: Arc> = Arc::new(Mutex::new(0)); - let produced_r = Arc::clone(&produced_w); - - let stream = repeat_with(|| { - *produced_w.lock().unwrap() += 1; - lazy(|_| VALUE) - }) - .take(COUNT); - let mut joined = seq_join(capacity, stream); - let waker = fake_waker(); - let mut cx = Context::from_waker(&waker); - - // The first poll causes the active buffer to be filled if that is possible. - let res = joined.poll_next_unpin(&mut cx); - assert_count(&produced_r, capacity.get()); - assert!(matches!(res, Poll::Ready(Some(VALUE)))); - - // A few more iterations, where each top up the buffer. - for _ in 0..(COUNT - capacity.get()) { - let res = joined.poll_next_unpin(&mut cx); - assert_count(&produced_r, 1); - assert!(matches!(res, Poll::Ready(Some(VALUE)))); - } - - // Then we drain the buffer. - for _ in 0..(capacity.get() - 1) { - let res = joined.poll_next_unpin(&mut cx); - assert_count(&produced_r, 0); - assert!(matches!(res, Poll::Ready(Some(VALUE)))); - } - - // Then the stream ends. - let res = joined.poll_next_unpin(&mut cx); - assert_count(&produced_r, 0); - assert!(matches!(res, Poll::Ready(None))); - } -} - -#[cfg(all(test, unit_test))] -mod test { - use std::{convert::Infallible, iter::once}; - - use futures::{ - future::{lazy, BoxFuture}, - stream::{iter, poll_immediate}, - Future, StreamExt, - }; - - use super::*; - use crate::test_executor::run; - - async fn immediate(count: u32) { - let capacity = NonZeroUsize::new(3).unwrap(); - let values = seq_join(capacity, iter((0..count).map(|i| async move { i }))) - .collect::>() - .await; - assert_eq!((0..count).collect::>(), values); - } - - #[test] - fn within_capacity() { - run(|| async { - immediate(2).await; - immediate(1).await; - }); - } - - #[test] - fn over_capacity() { - run(|| async { - immediate(10).await; - }); - } - - #[test] - fn size() { - run(|| async { - let mut count = 10_usize; - let capacity = NonZeroUsize::new(3).unwrap(); - let mut values = seq_join(capacity, iter((0..count).map(|i| async move { i }))); - assert_eq!((count, Some(count)), values.size_hint()); - - while values.next().await.is_some() { - count -= 1; - assert_eq!((count, Some(count)), values.size_hint()); - } - }); - } - - #[test] - fn out_of_order() { - run(|| async { - let capacity = NonZeroUsize::new(3).unwrap(); - let barrier = tokio::sync::Barrier::new(2); - let unresolved: BoxFuture<'_, u32> = Box::pin(async { - barrier.wait().await; - 0 - }); - let it = once(unresolved) - .chain((1..4_u32).map(|i| -> BoxFuture<'_, u32> { Box::pin(async move { i }) })); - let mut seq_futures = seq_join(capacity, iter(it)); - - assert_eq!( - Some(Poll::Pending), - poll_immediate(&mut seq_futures).next().await - ); - barrier.wait().await; - assert_eq!(vec![0, 1, 2, 3], seq_futures.collect::>().await); - }); - } - - #[test] - fn join_success() { - fn f(v: T) -> impl Future> { - lazy(move |_| Ok(v)) - } - - run(|| async { - let active = NonZeroUsize::new(10).unwrap(); - let res = seq_try_join_all(active, (1..5).map(f)).await.unwrap(); - assert_eq!((1..5).collect::>(), res); - }); - } - - /// This test has to use multi-threaded runtime because early return causes `TryCollect` to be - /// dropped and the remaining futures to be cancelled which can only happen if there is more - /// than one thread available. - /// - /// This behavior is only applicable when `seq_try_join_all` uses more than one thread, for - /// maintenance reasons, we use it even when parallelism is turned off. - #[test] - fn try_join_early_abort() { - const ERROR: &str = "error message"; - fn f(i: u32) -> impl Future> { - lazy(move |_| match i { - 1 => Ok(1), - 2 => Err(ERROR), - _ => panic!("should have aborted earlier"), - }) - } - - run(|| async { - let active = NonZeroUsize::new(10).unwrap(); - let err = seq_try_join_all(active, (1..=3).map(f)).await.unwrap_err(); - assert_eq!(err, ERROR); - }); - } - - #[test] - fn does_not_block_on_error() { - const ERROR: &str = "returning early is safe"; - use std::pin::Pin; - - fn f(i: u32) -> Pin> + Send>> { - match i { - 1 => Box::pin(lazy(move |_| Ok(1))), - 2 => Box::pin(lazy(move |_| Err(ERROR))), - _ => Box::pin(futures::future::pending()), - } - } - - run(|| async { - let active = NonZeroUsize::new(10).unwrap(); - let err = seq_try_join_all(active, (1..=3).map(f)).await.unwrap_err(); - assert_eq!(err, ERROR); - }); - } - - /// This test demonstrates that forgetting the future returned by `parallel_join` is not safe and will cause - /// use-after-free safety error. It spawns a few tasks that constantly try to access the `borrow_from_me` weak - /// reference while the main thread drops the owning reference. By proving that futures are able to see the weak - /// pointer unset, this test shows that same can happen for regular references and cause use-after-free. - #[test] - #[cfg(feature = "multi-threading")] - fn parallel_join_forget_is_not_safe() { - use futures::future::poll_immediate; - - use crate::{seq_join::multi_thread::parallel_join, sync::Arc}; - - run(|| async { - const N: usize = 24; - let borrowed_vec = Box::new([1, 2, 3]); - let borrow_from_me = Arc::new(vec![1, 2, 3]); - let start = Arc::new(tokio::sync::Barrier::new(N + 1)); - // counts how many tasks have accessed `borrow_from_me` after it was destroyed. - // this test expects all tasks to access `borrow_from_me` at least once. - let bad_accesses = Arc::new(tokio::sync::Barrier::new(N + 1)); - - let futures = (0..N) - .map(|_| { - let borrowed = Arc::downgrade(&borrow_from_me); - let regular_ref = &borrowed_vec; - let start = start.clone(); - let bad_access = bad_accesses.clone(); - async move { - start.wait().await; - for _ in 0..100 { - if borrowed.upgrade().is_none() { - bad_access.wait().await; - // switch to `true` if you want to see the real corruption. - #[allow(unreachable_code)] - if false { - // this is a place where we can see the use-after-free. - // we avoid executing this block to appease sanitizers, but compiler happily - // allows us to follow this reference. - println!("{:?}", regular_ref); - } - break; - } - tokio::task::yield_now().await; - } - Ok::<_, ()>(()) - } - }) - .collect::>(); - - let mut f = Box::pin(unsafe { parallel_join(futures) }); - poll_immediate(&mut f).await; - start.wait().await; - - // the type of `f` above captures the lifetime for borrowed_vec. Leaking `f` allows `borrowed_vec` to be - // dropped, but that drop prohibits any subsequent manipulations with `f` pointer, irrespective of whether - // `f` is `&mut _` or `*mut _` (value already borrowed error). - // I am not sure I fully understand what is going on here (why borrowck allows me to leak the value, but - // then I can't drop it even if it is a raw pointer), but removing the lifetime from `f` type allows - // the test to pass. - // - // This is only required to do the proper cleanup and avoid memory leaks. Replacing this line with - // `mem::forget(f)` will lead to the same test outcome, but Miri will complain about memory leaks. - let f: _ = unsafe { - std::mem::transmute::<_, Pin, ()>> + Send>>>( - Box::pin(f) as Pin, ()>>>>, - ) - }; - - // Async executor will still be polling futures and they will try to follow this pointer. - drop(borrow_from_me); - drop(borrowed_vec); - - // this test should terminate because all tasks should access `borrow_from_me` at least once. - bad_accesses.wait().await; - - drop(f); - }); - } -} diff --git a/ipa-core/src/seq_join/local.rs b/ipa-core/src/seq_join/local.rs new file mode 100644 index 000000000..33fe3d757 --- /dev/null +++ b/ipa-core/src/seq_join/local.rs @@ -0,0 +1,268 @@ +use std::{ + collections::VecDeque, + future::IntoFuture, + marker::PhantomData, + num::NonZeroUsize, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::stream::Fuse; + +use super::*; + +enum ActiveItem { + Pending(Pin>), + Resolved(F::Output), +} + +impl ActiveItem { + /// Drives this item to resolved state when value is ready to be taken out. Has no effect + /// if the value is ready. + /// + /// ## Panics + /// Panics if this item is completed + fn check_ready(&mut self, cx: &mut Context<'_>) -> bool { + let ActiveItem::Pending(f) = self else { + return true; + }; + if let Poll::Ready(v) = Future::poll(Pin::as_mut(f), cx) { + *self = ActiveItem::Resolved(v); + true + } else { + false + } + } + + /// Takes the resolved value out + /// + /// ## Panics + /// If the value is not ready yet. + #[must_use] + fn take(self) -> F::Output { + let ActiveItem::Resolved(v) = self else { + unreachable!("take should be only called once."); + }; + + v + } +} + +#[pin_project] +pub struct SequentialFutures<'unused, S, F> +where + S: Stream + Send, + F: IntoFuture, +{ + #[pin] + source: Fuse, + active: VecDeque>, + _marker: PhantomData &'unused ()>, +} + +impl SequentialFutures<'_, S, F> +where + S: Stream + Send, + F: IntoFuture, +{ + pub fn new(active: NonZeroUsize, source: S) -> Self { + Self { + source: source.fuse(), + active: VecDeque::with_capacity(active.get()), + _marker: PhantomData, + } + } +} + +impl Stream for SequentialFutures<'_, S, F> +where + S: Stream + Send, + F: IntoFuture, +{ + type Item = F::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // Draw more values from the input, up to the capacity. + while this.active.len() < this.active.capacity() { + if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { + this.active + .push_back(ActiveItem::Pending(Box::pin(f.into_future()))); + } else { + break; + } + } + + if let Some(item) = this.active.front_mut() { + if item.check_ready(cx) { + let v = this.active.pop_front().map(ActiveItem::take); + Poll::Ready(v) + } else { + for f in this.active.iter_mut().skip(1) { + f.check_ready(cx); + } + Poll::Pending + } + } else if this.source.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option) { + let in_progress = self.active.len(); + let (lower, upper) = self.source.size_hint(); + ( + lower.saturating_add(in_progress), + upper.and_then(|u| u.checked_add(in_progress)), + ) + } +} + +#[cfg(all(test, unit_test))] +mod local_test { + use std::{ + num::NonZeroUsize, + ptr::null, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + }; + + use futures::{ + future::lazy, + stream::{poll_fn, repeat_with}, + StreamExt, + }; + + use super::*; + use crate::test_executor::run; + + fn fake_waker() -> Waker { + use std::task::{RawWaker, RawWakerVTable}; + const fn fake_raw_waker() -> RawWaker { + const TABLE: RawWakerVTable = + RawWakerVTable::new(|_| fake_raw_waker(), |_| {}, |_| {}, |_| {}); + RawWaker::new(null(), &TABLE) + } + unsafe { Waker::from_raw(fake_raw_waker()) } + } + + /// Check the value of a counter, then reset it. + fn assert_count(counter_r: &Arc>, expected: usize) { + let mut counter = counter_r.lock().unwrap(); + assert_eq!(*counter, expected); + *counter = 0; + } + + /// A fully synchronous test. + #[test] + fn synchronous() { + let capacity = NonZeroUsize::new(3).unwrap(); + let v_r: Arc>> = Arc::new(Mutex::new(None)); + let v_w = Arc::clone(&v_r); + // Track when the stream was polled, + let polled_w: Arc> = Arc::new(Mutex::new(0)); + let polled_r = Arc::clone(&polled_w); + // when the stream produced something, and + let produced_w: Arc> = Arc::new(Mutex::new(0)); + let produced_r = Arc::clone(&produced_w); + // when the future was read. + let read_w: Arc> = Arc::new(Mutex::new(0)); + let read_r = Arc::clone(&read_w); + + let stream = poll_fn(|_cx| { + *polled_w.lock().unwrap() += 1; + if let Some(v) = v_r.lock().unwrap().take() { + *produced_w.lock().unwrap() += 1; + let read_w = Arc::clone(&read_w); + Poll::Ready(Some(lazy(move |_| { + *read_w.lock().unwrap() += 1; + v + }))) + } else { + // Note: we can ignore `cx` because we are driving this directly. + Poll::Pending + } + }); + let mut joined = seq_join(capacity, stream); + let waker = fake_waker(); + let mut cx = Context::from_waker(&waker); + + let res = joined.poll_next_unpin(&mut cx); + assert_count(&polled_r, 1); + assert_count(&produced_r, 0); + assert_count(&read_r, 0); + assert!(res.is_pending()); + + *v_w.lock().unwrap() = Some(7); + let res = joined.poll_next_unpin(&mut cx); + assert_count(&polled_r, 2); + assert_count(&produced_r, 1); + assert_count(&read_r, 1); + assert!(matches!(res, Poll::Ready(Some(7)))); + } + + /// A fully synchronous test with a synthetic stream, all the way to the end. + #[test] + fn complete_stream() { + const VALUE: u32 = 20; + const COUNT: usize = 7; + let capacity = NonZeroUsize::new(3).unwrap(); + // Track the number of values produced. + let produced_w: Arc> = Arc::new(Mutex::new(0)); + let produced_r = Arc::clone(&produced_w); + + let stream = repeat_with(|| { + *produced_w.lock().unwrap() += 1; + lazy(|_| VALUE) + }) + .take(COUNT); + let mut joined = seq_join(capacity, stream); + let waker = fake_waker(); + let mut cx = Context::from_waker(&waker); + + // The first poll causes the active buffer to be filled if that is possible. + let res = joined.poll_next_unpin(&mut cx); + assert_count(&produced_r, capacity.get()); + assert!(matches!(res, Poll::Ready(Some(VALUE)))); + + // A few more iterations, where each top up the buffer. + for _ in 0..(COUNT - capacity.get()) { + let res = joined.poll_next_unpin(&mut cx); + assert_count(&produced_r, 1); + assert!(matches!(res, Poll::Ready(Some(VALUE)))); + } + + // Then we drain the buffer. + for _ in 0..(capacity.get() - 1) { + let res = joined.poll_next_unpin(&mut cx); + assert_count(&produced_r, 0); + assert!(matches!(res, Poll::Ready(Some(VALUE)))); + } + + // Then the stream ends. + let res = joined.poll_next_unpin(&mut cx); + assert_count(&produced_r, 0); + assert!(matches!(res, Poll::Ready(None))); + } + + #[test] + fn try_join_early_abort() { + const ERROR: &str = "error message"; + fn f(i: u32) -> impl Future> { + lazy(move |_| match i { + 1 => Ok(1), + 2 => Err(ERROR), + _ => panic!("should have aborted earlier"), + }) + } + + run(|| async { + let active = NonZeroUsize::new(10).unwrap(); + let err = seq_try_join_all(active, (1..=3).map(f)).await.unwrap_err(); + assert_eq!(err, ERROR); + }); + } +} diff --git a/ipa-core/src/seq_join/mod.rs b/ipa-core/src/seq_join/mod.rs new file mode 100644 index 000000000..dfe6b1073 --- /dev/null +++ b/ipa-core/src/seq_join/mod.rs @@ -0,0 +1,274 @@ +use std::{future::IntoFuture, num::NonZeroUsize}; + +use futures::{ + stream::{iter, Iter as StreamIter, TryCollect}, + Future, Stream, StreamExt, TryStreamExt, +}; +use pin_project::pin_project; + +use crate::exact::ExactSizeStream; + +#[cfg(not(feature = "multi-threading"))] +mod local; +#[cfg(feature = "multi-threading")] +mod multi_thread; + +/// This helper function might be necessary to convince the compiler that +/// the return value from [`seq_try_join_all`] implements `Send`. +/// Use this if you get higher-ranked lifetime errors that mention `std::marker::Send`. +/// +/// +pub fn assert_send<'a, O>( + fut: impl Future + Send + 'a, +) -> impl Future + Send + 'a { + fut +} + +/// Sequentially join futures from a stream. +/// +/// This function polls futures in strict sequence. +/// If any future blocks, up to `active - 1` futures after it will be polled so +/// that they make progress. +/// +/// # Deadlocks +/// +/// This will fail to resolve if the progress of any future depends on a future more +/// than `active` items behind it in the input sequence. +/// +/// # Safety +/// If multi-threading is enabled, forgetting the resulting future will cause use-after-free error. Do not leak it or +/// prevent the future destructor from running. +/// +/// [`try_join_all`]: futures::future::try_join_all +/// [`Stream`]: futures::stream::Stream +/// [`StreamExt::buffered`]: futures::stream::StreamExt::buffered +pub fn seq_join<'st, S, F, O>(active: NonZeroUsize, source: S) -> SequentialFutures<'st, S, F> +where + S: Stream + Send + 'st, + F: Future + Send, + O: Send + 'static, +{ + #[cfg(feature = "multi-threading")] + unsafe { + SequentialFutures::new(active, source) + } + #[cfg(not(feature = "multi-threading"))] + SequentialFutures::new(active, source) +} + +/// The `SeqJoin` trait wraps `seq_try_join_all`, providing the `active` parameter +/// from the provided context so that the value can be made consistent. +pub trait SeqJoin { + /// Perform a sequential join of the futures from the provided iterable. + /// This uses [`seq_join`], with the current state of the associated object + /// being used to determine the number of active items to track (see [`active_work`]). + /// + /// A rough rule of thumb for how to decide between this and [`parallel_join`] is + /// that this should be used whenever you are iterating over different records. + /// [`parallel_join`] is better suited to smaller batches, such as iterating over + /// the bits of a value for a single record. + /// + /// Note that the join functions from the [`futures`] crate, such as [`join3`], + /// are also parallel and can be used where you have a small, fixed number of tasks. + /// + /// Be especially careful if you use the random bits generator with this. + /// The random bits generator can produce values out of sequence. + /// You might need to use [`parallel_join`] for that. + /// + /// [`active_work`]: Self::active_work + /// [`parallel_join`]: Self::parallel_join + /// [`join3`]: futures::future::join3 + fn try_join<'fut, I, F, O, E>( + &self, + iterable: I, + ) -> TryCollect, Vec> + where + I: IntoIterator + Send, + I::IntoIter: Send + 'fut, + F: Future> + Send + 'fut, + O: Send + 'static, + E: Send + 'static, + { + seq_try_join_all(self.active_work(), iterable) + } + + /// Join multiple tasks in parallel. Only do this if you can't use a sequential join. + /// + /// # Safety + /// Forgetting the future returned from this function will cause use-after-free. This is a tradeoff between + /// performance and safety that allows us to use regular references instead of Arc pointers. + /// + /// Dropping the future is always safe. + #[cfg(feature = "multi-threading")] + fn parallel_join<'a, I, F, O, E>( + &self, + iterable: I, + ) -> std::pin::Pin, E>> + Send + 'a>> + where + I: IntoIterator + Send, + F: Future> + Send + 'a, + O: Send + 'static, + E: Send + 'static, + { + unsafe { Box::pin(multi_thread::parallel_join(iterable)) } + } + + /// Join multiple tasks in parallel. Only do this if you can't use a sequential join. + #[cfg(not(feature = "multi-threading"))] + fn parallel_join(&self, iterable: I) -> futures::future::TryJoinAll + where + I: IntoIterator, + I::Item: futures::future::TryFuture, + { + #[allow(clippy::disallowed_methods)] // Just in this one place. + futures::future::try_join_all(iterable) + } + + /// The amount of active work that is concurrently permitted. + fn active_work(&self) -> NonZeroUsize; +} + +type SeqTryJoinAll<'st, I, F> = + SequentialFutures<'st, StreamIter<::IntoIter>, F>; + +/// A substitute for [`futures::future::try_join_all`] that uses [`seq_join`]. +/// This awaits all the provided futures in order, +/// aborting early if any future returns `Result::Err`. +pub fn seq_try_join_all<'iter, I, F, O, E>( + active: NonZeroUsize, + source: I, +) -> TryCollect, Vec> +where + I: IntoIterator + Send, + I::IntoIter: Send + 'iter, + F: Future> + Send + 'iter, + O: Send + 'static, + E: Send + 'static, +{ + seq_join(active, iter(source)).try_collect() +} + +impl<'fut, S, F> ExactSizeStream for SequentialFutures<'fut, S, F> +where + S: Stream + Send + ExactSizeStream, + F: IntoFuture, + ::IntoFuture: Send + 'fut, + <::IntoFuture as Future>::Output: Send + 'static, +{ +} + +#[cfg(not(feature = "multi-threading"))] +pub use local::SequentialFutures; +#[cfg(feature = "multi-threading")] +pub use multi_thread::SequentialFutures; + +#[cfg(all(test, any(unit_test, feature = "shuttle")))] +mod test { + use std::{convert::Infallible, iter::once, task::Poll}; + + use futures::{ + future::{lazy, BoxFuture}, + stream::{iter, poll_immediate}, + Future, StreamExt, + }; + + use super::*; + use crate::test_executor::run; + + async fn immediate(count: u32) { + let capacity = NonZeroUsize::new(3).unwrap(); + let values = seq_join(capacity, iter((0..count).map(|i| async move { i }))) + .collect::>() + .await; + assert_eq!((0..count).collect::>(), values); + } + + #[test] + fn within_capacity() { + run(|| async { + immediate(2).await; + immediate(1).await; + }); + } + + #[test] + fn over_capacity() { + run(|| async { + immediate(10).await; + }); + } + + #[test] + fn size() { + run(|| async { + let mut count = 10_usize; + let capacity = NonZeroUsize::new(3).unwrap(); + let mut values = seq_join(capacity, iter((0..count).map(|i| async move { i }))); + assert_eq!((count, Some(count)), values.size_hint()); + + while values.next().await.is_some() { + count -= 1; + assert_eq!((count, Some(count)), values.size_hint()); + } + }); + } + + #[test] + fn out_of_order() { + run(|| async { + let capacity = NonZeroUsize::new(3).unwrap(); + let barrier = tokio::sync::Barrier::new(2); + let unresolved: BoxFuture<'_, u32> = Box::pin(async { + barrier.wait().await; + 0 + }); + let it = once(unresolved) + .chain((1..4_u32).map(|i| -> BoxFuture<'_, u32> { Box::pin(async move { i }) })); + let mut seq_futures = seq_join(capacity, iter(it)); + + assert_eq!( + Some(Poll::Pending), + poll_immediate(&mut seq_futures).next().await + ); + barrier.wait().await; + assert_eq!(vec![0, 1, 2, 3], seq_futures.collect::>().await); + }); + } + + #[test] + fn join_success() { + fn f(v: T) -> impl Future> { + lazy(move |_| Ok(v)) + } + + run(|| async { + let active = NonZeroUsize::new(10).unwrap(); + let res = seq_try_join_all(active, (1..5).map(f)).await.unwrap(); + assert_eq!((1..5).collect::>(), res); + }); + } + + #[test] + #[cfg_attr( + all(feature = "shuttle", feature = "multi-threading"), + should_panic(expected = "cancelled") + )] + fn does_not_block_on_error() { + const ERROR: &str = "returning early is safe"; + use std::pin::Pin; + + fn f(i: u32) -> Pin> + Send>> { + match i { + 1 => Box::pin(lazy(move |_| Ok(1))), + 2 => Box::pin(lazy(move |_| Err(ERROR))), + _ => Box::pin(futures::future::pending()), + } + } + + run(|| async { + let active = NonZeroUsize::new(10).unwrap(); + let err = seq_try_join_all(active, (1..=3).map(f)).await.unwrap_err(); + assert_eq!(err, ERROR); + }); + } +} diff --git a/ipa-core/src/seq_join/multi_thread.rs b/ipa-core/src/seq_join/multi_thread.rs new file mode 100644 index 000000000..79ee89d6e --- /dev/null +++ b/ipa-core/src/seq_join/multi_thread.rs @@ -0,0 +1,252 @@ +use std::{ + future::IntoFuture, + num::NonZeroUsize, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::stream::Fuse; +use tracing::{Instrument, Span}; + +use super::*; + +#[cfg(feature = "shuttle")] +mod shuttle_spawner { + use shuttle_crate::future::{self, JoinError, JoinHandle}; + + use super::*; + + /// Spawner implementation for Shuttle framework to run tests in parallel + pub(super) struct ShuttleSpawner; + + unsafe impl async_scoped::spawner::Spawner for ShuttleSpawner + where + T: Send + 'static, + { + type FutureOutput = Result; + type SpawnHandle = JoinHandle; + + fn spawn + Send + 'static>(&self, f: F) -> Self::SpawnHandle { + future::spawn(f) + } + } + + unsafe impl async_scoped::spawner::Blocker for ShuttleSpawner { + fn block_on>(&self, f: F) -> T { + future::block_on(f) + } + } +} + +#[cfg(feature = "shuttle")] +type Spawner<'fut, T> = async_scoped::Scope<'fut, T, shuttle_spawner::ShuttleSpawner>; +#[cfg(not(feature = "shuttle"))] +type Spawner<'fut, T> = async_scoped::TokioScope<'fut, T>; + +unsafe fn create_spawner<'fut, T: Send + 'static>() -> Spawner<'fut, T> { + #[cfg(feature = "shuttle")] + return async_scoped::Scope::create(shuttle_spawner::ShuttleSpawner); + #[cfg(not(feature = "shuttle"))] + return async_scoped::TokioScope::create(async_scoped::spawner::use_tokio::Tokio); +} + +#[pin_project] +#[must_use = "Futures do nothing unless polled"] +pub struct SequentialFutures<'fut, S, F> +where + S: Stream + Send + 'fut, + F: IntoFuture, + <::IntoFuture as Future>::Output: Send + 'static, +{ + #[pin] + spawner: Spawner<'fut, F::Output>, + #[pin] + source: Fuse, + capacity: usize, +} + +impl SequentialFutures<'_, S, F> +where + S: Stream + Send, + F: IntoFuture, + <::IntoFuture as Future>::Output: Send + 'static, +{ + pub unsafe fn new(active: NonZeroUsize, source: S) -> Self { + SequentialFutures { + spawner: unsafe { create_spawner() }, + source: source.fuse(), + capacity: active.get(), + } + } +} + +impl<'fut, S, F> Stream for SequentialFutures<'fut, S, F> +where + S: Stream + Send, + F: IntoFuture, + ::IntoFuture: Send + 'fut, + <::IntoFuture as Future>::Output: Send + 'static, +{ + type Item = F::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // Draw more values from the input, up to the capacity. + while this.spawner.remaining() < *this.capacity { + if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { + // Making futures cancellable is critical to avoid hangs. + // if one of them panics, unwinding causes spawner to drop and, in turn, + // it blocks the thread to await all pending futures completion. If there is + // a dependency between futures, pending one will never complete. + // Cancellable futures will be cancelled when spawner is dropped which is + // the behavior we want. + let task_index = this.spawner.len(); + this.spawner + .spawn_cancellable(f.into_future().instrument(Span::current()), move || { + panic!("SequentialFutures: spawned task {task_index} cancelled") + }); + } else { + break; + } + } + + // Poll spawner if it has work to do. If both source and spawner are empty, we're done. + if this.spawner.remaining() > 0 { + this.spawner.as_mut().poll_next(cx).map(|v| match v { + Some(Ok(v)) => Some(v), + Some(Err(_)) => panic!("SequentialFutures: spawned task aborted"), + None => None, + }) + } else if this.source.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option) { + let in_progress = self.spawner.remaining(); + let (lower, upper) = self.source.size_hint(); + ( + lower.saturating_add(in_progress), + upper.and_then(|u| u.checked_add(in_progress)), + ) + } +} + +pub(super) unsafe fn parallel_join<'fut, I, F, O, E>( + iterable: I, +) -> impl Future, E>> + Send + 'fut +where + I: IntoIterator + Send, + F: Future> + Send + 'fut, + O: Send + 'static, + E: Send + 'static, +{ + let mut scope = { + let mut scope = unsafe { create_spawner() }; + for element in iterable { + // it is important to make those cancellable to avoid deadlocks if one of the spawned future panics. + // If there is a dependency between futures, pending one will never complete. + // Cancellable futures will be cancelled when spawner is dropped which is the behavior we want. + scope.spawn_cancellable(element.instrument(Span::current()), || { + panic!("parallel_join: task cancelled") + }); + } + scope + }; + + async move { + let mut result = Vec::with_capacity(scope.len()); + while let Some(item) = scope.next().await { + // join error is nothing we can do about + result.push(item.expect("parallel_join: received JoinError")?) + } + Ok(result) + } +} + +#[cfg(all(test, unit_test))] +mod tests { + use std::{future::Future, pin::Pin}; + + use crate::test_executor::run; + + /// This test demonstrates that forgetting the future returned by `parallel_join` is not safe and will cause + /// use-after-free safety error. It spawns a few tasks that constantly try to access the `borrow_from_me` weak + /// reference while the main thread drops the owning reference. By proving that futures are able to see the weak + /// pointer unset, this test shows that same can happen for regular references and cause use-after-free. + #[test] + fn parallel_join_forget_is_not_safe() { + use futures::future::poll_immediate; + + use crate::{seq_join::multi_thread::parallel_join, sync::Arc}; + + run(|| async { + const N: usize = 24; + let borrowed_vec = Box::new([1, 2, 3]); + let borrow_from_me = Arc::new(vec![1, 2, 3]); + let start = Arc::new(tokio::sync::Barrier::new(N + 1)); + // counts how many tasks have accessed `borrow_from_me` after it was destroyed. + // this test expects all tasks to access `borrow_from_me` at least once. + let bad_accesses = Arc::new(tokio::sync::Barrier::new(N + 1)); + + let futures = (0..N) + .map(|_| { + let borrowed = Arc::downgrade(&borrow_from_me); + let regular_ref = &borrowed_vec; + let start = start.clone(); + let bad_access = bad_accesses.clone(); + async move { + start.wait().await; + for _ in 0..100 { + if borrowed.upgrade().is_none() { + bad_access.wait().await; + // switch to `true` if you want to see the real corruption. + #[allow(unreachable_code)] + if false { + // this is a place where we can see the use-after-free. + // we avoid executing this block to appease sanitizers, but compiler happily + // allows us to follow this reference. + println!("{:?}", regular_ref); + } + break; + } + tokio::task::yield_now().await; + } + Ok::<_, ()>(()) + } + }) + .collect::>(); + + let mut f = Box::pin(unsafe { parallel_join(futures) }); + poll_immediate(&mut f).await; + start.wait().await; + + // the type of `f` above captures the lifetime for borrowed_vec. Leaking `f` allows `borrowed_vec` to be + // dropped, but that drop prohibits any subsequent manipulations with `f` pointer, irrespective of whether + // `f` is `&mut _` or `*mut _` (value already borrowed error). + // I am not sure I fully understand what is going on here (why borrowck allows me to leak the value, but + // then I can't drop it even if it is a raw pointer), but removing the lifetime from `f` type allows + // the test to pass. + // + // This is only required to do the proper cleanup and avoid memory leaks. Replacing this line with + // `mem::forget(f)` will lead to the same test outcome, but Miri will complain about memory leaks. + let f: _ = unsafe { + std::mem::transmute::<_, Pin, ()>> + Send>>>( + Box::pin(f) as Pin, ()>>>>, + ) + }; + + // Async executor will still be polling futures and they will try to follow this pointer. + drop(borrow_from_me); + drop(borrowed_vec); + + // this test should terminate because all tasks should access `borrow_from_me` at least once. + bad_accesses.wait().await; + + drop(f); + }); + } +}