From 7186d735b2bbb042c472ca77447deb2061ce2553 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 7 Nov 2022 17:48:40 -0800 Subject: [PATCH] change scope to take a thread executor --- crates/bevy_app/src/app.rs | 15 +++-- crates/bevy_core/src/lib.rs | 2 + crates/bevy_core/src/task_pool_options.rs | 8 +-- crates/bevy_ecs/src/query/state.rs | 2 +- .../src/schedule/executor_parallel.rs | 32 +++++++-- crates/bevy_gltf/src/loader.rs | 2 +- crates/bevy_tasks/examples/busy_behavior.rs | 2 +- crates/bevy_tasks/examples/idle_behavior.rs | 2 +- crates/bevy_tasks/src/iter/mod.rs | 32 ++++----- crates/bevy_tasks/src/lib.rs | 2 +- crates/bevy_tasks/src/main_thread_executor.rs | 67 ++++++++----------- crates/bevy_tasks/src/slice.rs | 4 +- crates/bevy_tasks/src/task_pool.rs | 58 +++++++--------- 13 files changed, 118 insertions(+), 110 deletions(-) diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index d852a3b063b63c..93245e8ec1eb15 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -4,8 +4,8 @@ use bevy_ecs::{ event::{Event, Events}, prelude::FromWorld, schedule::{ - IntoSystemDescriptor, Schedule, ShouldRun, Stage, StageLabel, State, StateData, SystemSet, - SystemStage, + IntoSystemDescriptor, MainThreadExecutor, Schedule, ShouldRun, Stage, StageLabel, State, + StateData, SystemSet, SystemStage, }, system::Resource, world::World, @@ -153,7 +153,11 @@ impl App { pub fn update(&mut self) { #[cfg(feature = "trace")] let _bevy_frame_update_span = info_span!("frame").entered(); - ComputeTaskPool::init(TaskPool::default).scope(|scope| { + let thread_executor = self + .world + .get_resource::() + .map(|e| e.0.clone()); + ComputeTaskPool::init(TaskPool::default).scope(thread_executor, |scope| { if self.run_once { for sub_app in self.sub_apps.values_mut() { (sub_app.extract)(&mut self.world, &mut sub_app.app); @@ -1001,10 +1005,13 @@ impl App { pub fn add_sub_app( &mut self, label: impl AppLabel, - app: App, + mut app: App, sub_app_extract: impl Fn(&mut World, &mut App) + 'static + Send + Sync, sub_app_runner: impl Fn(&mut App) + 'static + Send + Sync, ) -> &mut Self { + if let Some(executor) = self.world.get_resource::() { + app.world.insert_resource(executor.clone()); + } self.sub_apps.insert( label.as_label(), SubApp { diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index fbc175c1e3f897..f67de169741e1c 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -6,6 +6,7 @@ mod name; mod serde; mod task_pool_options; +use bevy_ecs::schedule::MainThreadExecutor; use bevy_ecs::system::Resource; pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable}; pub use name::*; @@ -40,6 +41,7 @@ impl Plugin for CorePlugin { fn build(&self, app: &mut App) { // Setup the default bevy task pools self.task_pool_options.create_default_pools(); + app.insert_resource(MainThreadExecutor::new()); #[cfg(not(target_arch = "wasm32"))] app.add_system_to_stage( diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 942324e5cc72fb..4537354a69c05f 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,7 +1,5 @@ use bevy_ecs::prelude::Resource; -use bevy_tasks::{ - AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, MainThreadExecutor, TaskPoolBuilder, -}; +use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; use bevy_utils::tracing::trace; /// Defines a simple way to determine how many threads to use given the number of remaining cores @@ -151,9 +149,5 @@ impl TaskPoolOptions { .build() }); } - - { - MainThreadExecutor::init(); - } } } diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 18874fd107fae5..3f1888669678dd 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1007,7 +1007,7 @@ impl QueryState { ) { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual - ComputeTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(None, |scope| { if Q::IS_DENSE && F::IS_DENSE { let tables = &world.storages().tables; for table_id in &self.matched_table_ids { diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 8883b2922a386b..5a80aea31307ab 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,11 +1,15 @@ +use std::sync::Arc; + +use crate as bevy_ecs; use crate::{ archetype::ArchetypeComponentId, query::Access, schedule::{ParallelSystemExecutor, SystemContainer}, + system::Resource, world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -14,6 +18,22 @@ use fixedbitset::FixedBitSet; #[cfg(test)] use scheduling_event::*; +/// +#[derive(Resource, Default)] +pub struct MainThreadExecutor(pub Arc); + +impl MainThreadExecutor { + pub fn new() -> Self { + MainThreadExecutor(Arc::new(ThreadExecutor::new())) + } +} + +impl Clone for MainThreadExecutor { + fn clone(&self) -> Self { + MainThreadExecutor(self.0.clone()) + } +} + struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. start: Event, @@ -124,7 +144,11 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - ComputeTaskPool::init(TaskPool::default).scope(|scope| { + let thread_executor = world + .get_resource::() + .map(|e| e.0.clone()); + + ComputeTaskPool::init(TaskPool::default).scope(thread_executor, |scope| { self.prepare_systems(scope, systems, world); if self.should_run.count_ones(..) == 0 { return; @@ -236,7 +260,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_on_main(task); + scope.spawn_on_scope(task); } #[cfg(test)] @@ -271,7 +295,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_on_main(task); + scope.spawn_on_scope(task); } } } diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index 5459c37d5dd9a0..024b6dc6410ae7 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -409,7 +409,7 @@ async fn load_gltf<'a, 'b>( } else { #[cfg(not(target_arch = "wasm32"))] IoTaskPool::get() - .scope(|scope| { + .scope(None, |scope| { gltf.textures().for_each(|gltf_texture| { let linear_textures = &linear_textures; let load_context: &LoadContext = load_context; diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 8a74034e0ca904..df78b2316251ab 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -11,7 +11,7 @@ fn main() { .build(); let t0 = instant::Instant::now(); - pool.scope(|s| { + pool.scope(None, |s| { for i in 0..40 { s.spawn(async move { let now = instant::Instant::now(); diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs index daa2eaf2e2a894..b1f5f2adb54b62 100644 --- a/crates/bevy_tasks/examples/idle_behavior.rs +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -9,7 +9,7 @@ fn main() { .thread_name("Idle Behavior ThreadPool".to_string()) .build(); - pool.scope(|s| { + pool.scope(None, |s| { for i in 0..1 { s.spawn(async move { println!("Blocking for 10 seconds"); diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 952bc530755511..2c3e2fa6731381 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -34,7 +34,7 @@ where /// /// See [`Iterator::count()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.count) fn count(mut self, pool: &TaskPool) -> usize { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.count() }); } @@ -105,7 +105,7 @@ where where F: FnMut(BatchIter::Item) + Send + Clone + Sync, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { @@ -195,7 +195,7 @@ where C: std::iter::FromIterator, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.collect::>() }); } @@ -216,7 +216,7 @@ where BatchIter::Item: Send + 'static, { let (mut a, mut b) = <(C, C)>::default(); - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.partition::, F>(newf) }); @@ -242,7 +242,7 @@ where F: FnMut(C, BatchIter::Item) -> C + Send + Sync + Clone, C: Clone + Send + Sync + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); let newi = init.clone(); @@ -260,7 +260,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(mut batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.all(newf) }); @@ -279,7 +279,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(mut batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.any(newf) }); @@ -299,7 +299,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - let poses = pool.scope(|s| { + let poses = pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let mut newf = f.clone(); s.spawn(async move { @@ -332,7 +332,7 @@ where where BatchIter::Item: Ord + Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.max() }); } @@ -349,7 +349,7 @@ where where BatchIter::Item: Ord + Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.min() }); } @@ -368,7 +368,7 @@ where F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.max_by_key(newf) }); @@ -388,7 +388,7 @@ where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.max_by(newf) }); @@ -408,7 +408,7 @@ where F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.min_by_key(newf) }); @@ -428,7 +428,7 @@ where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.min_by(newf) }); @@ -482,7 +482,7 @@ where S: std::iter::Sum + Send + 'static, R: std::iter::Sum, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.sum() }); } @@ -499,7 +499,7 @@ where S: std::iter::Product + Send + 'static, R: std::iter::Product, { - pool.scope(|s| { + pool.scope(None, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.product() }); } diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 3be4008fb86e91..ce35e8ee4b54f4 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -23,7 +23,7 @@ pub use usages::tick_global_task_pools_on_main_thread; pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; mod main_thread_executor; -pub use main_thread_executor::MainThreadExecutor; +pub use main_thread_executor::ThreadExecutor; mod iter; pub use iter::ParallelIterator; diff --git a/crates/bevy_tasks/src/main_thread_executor.rs b/crates/bevy_tasks/src/main_thread_executor.rs index 8fc2acc7f9f875..2e39cc230b7783 100644 --- a/crates/bevy_tasks/src/main_thread_executor.rs +++ b/crates/bevy_tasks/src/main_thread_executor.rs @@ -1,66 +1,53 @@ -use std::{marker::PhantomData, sync::Arc}; +use std::{ + marker::PhantomData, + sync::Arc, + thread::{self, ThreadId}, +}; use async_executor::{Executor, Task}; use futures_lite::Future; -use is_main_thread::is_main_thread; -use once_cell::sync::OnceCell; - -static MAIN_THREAD_EXECUTOR: OnceCell = OnceCell::new(); /// Use to access the global main thread executor. Be aware that the main thread executor /// only makes progress when it is ticked. This normally happens in `[TaskPool::scope]`. #[derive(Debug)] -pub struct MainThreadExecutor( +pub struct ThreadExecutor { // this is only pub crate for testing purposes, do not contruct otherwise - pub(crate) Arc>, -); + executor: Arc>, + thread_id: ThreadId, +} -impl MainThreadExecutor { - /// Initializes the global `[MainThreadExecutor]` instance. - pub fn init() -> &'static Self { - MAIN_THREAD_EXECUTOR.get_or_init(|| Self(Arc::new(Executor::new()))) +impl Default for ThreadExecutor { + fn default() -> Self { + Self { + executor: Arc::new(Executor::new()), + thread_id: thread::current().id(), + } } +} - /// Gets the global [`MainThreadExecutor`] instance. - /// - /// # Panics - /// Panics if no executor has been initialized yet. - pub fn get() -> &'static Self { - MAIN_THREAD_EXECUTOR.get().expect( - "A MainThreadExecutor has not been initialize yet. Please call \ - MainThreadExecutor::init beforehand", - ) +impl ThreadExecutor { + /// Initializes the global `[MainThreadExecutor]` instance. + pub fn new() -> Self { + Self::default() } /// Gets the `[MainThreadSpawner]` for the global main thread executor. /// Use this to spawn tasks on the main thread. pub fn spawner(&self) -> MainThreadSpawner<'static> { - MainThreadSpawner(self.0.clone()) + MainThreadSpawner(self.executor.clone()) } - /// Gets the `[MainThreadTicker]` for the global main thread executor. - /// Use this to tick the main thread executor. - /// Returns None if called on not the main thread. + /// Gets the `[MainThreadTicker]` for this executor. + /// Use this to tick the executor. + /// It only returns the ticker if it's on the thread the executor was created on + /// and returns `None` otherwise. pub fn ticker(&self) -> Option { - // always return ticker when testing to allow tests to run off main thread - dbg!("hjj"); - #[cfg(test)] - if true { - dbg!("blah"); + if thread::current().id() == self.thread_id { return Some(MainThreadTicker { - executor: self.0.clone(), + executor: self.executor.clone(), _marker: PhantomData::default(), }); } - - if let Some(is_main) = is_main_thread() { - if is_main { - return Some(MainThreadTicker { - executor: self.0.clone(), - _marker: PhantomData::default(), - }); - } - } None } } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index 4b5d875ea989bd..44372304e71d57 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -37,7 +37,7 @@ pub trait ParallelSlice: AsRef<[T]> { { let slice = self.as_ref(); let f = &f; - task_pool.scope(|scope| { + task_pool.scope(None, |scope| { for chunk in slice.chunks(chunk_size) { scope.spawn(async move { f(chunk) }); } @@ -134,7 +134,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { { let slice = self.as_mut(); let f = &f; - task_pool.scope(|scope| { + task_pool.scope(None, |scope| { for chunk in slice.chunks_mut(chunk_size) { scope.spawn(async move { f(chunk) }); } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9102b34280b421..8bfba228c79ad0 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -10,7 +10,7 @@ use concurrent_queue::ConcurrentQueue; use futures_lite::{future, FutureExt}; use crate::Task; -use crate::{main_thread_executor::MainThreadSpawner, MainThreadExecutor}; +use crate::{main_thread_executor::MainThreadSpawner, ThreadExecutor}; /// Used to create a [`TaskPool`] #[derive(Debug, Default, Clone)] @@ -164,7 +164,7 @@ impl TaskPool { /// /// let pool = TaskPool::new(); /// let mut x = 0; - /// let results = pool.scope(|s| { + /// let results = pool.scope(None, |s| { /// s.spawn(async { /// // you can borrow the spawner inside a task and spawn tasks from within the task /// s.spawn(async { @@ -184,7 +184,7 @@ impl TaskPool { /// assert!(results.contains(&1)); /// /// // The ordering is deterministic if you only spawn directly from the closure function. - /// let results = pool.scope(|s| { + /// let results = pool.scope(None, |s| { /// s.spawn(async { 0 }); /// s.spawn(async { 1 }); /// }); @@ -209,7 +209,7 @@ impl TaskPool { /// fn scope_escapes_closure() { /// let pool = TaskPool::new(); /// let foo = Box::new(42); - /// pool.scope(|scope| { + /// pool.scope(None, |scope| { /// std::thread::spawn(move || { /// // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. /// scope.spawn(async move { @@ -224,7 +224,7 @@ impl TaskPool { /// use bevy_tasks::TaskPool; /// fn cannot_borrow_from_closure() { /// let pool = TaskPool::new(); - /// pool.scope(|scope| { + /// pool.scope(None, |scope| { /// let x = 1; /// let y = &x; /// scope.spawn(async move { @@ -233,7 +233,7 @@ impl TaskPool { /// }); /// } /// - pub fn scope<'env, F, T>(&self, f: F) -> Vec + pub fn scope<'env, F, T>(&self, thread_executor: Option>, f: F) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, @@ -246,23 +246,20 @@ impl TaskPool { let executor: &async_executor::Executor = &self.executor; let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; - #[cfg(not(test))] - let main_thread_executor = MainThreadExecutor::init(); - // for testing configure a new instance of main thread executor for every scope - // this helps us pretend that the thread that an app or stage is constructed on is the main thread - #[cfg(test)] - let main_thread_executor = MainThreadExecutor(Arc::new(async_executor::Executor::new())); - - let main_thread_spawner = main_thread_executor.spawner(); - let main_thread_spawner: MainThreadSpawner<'env> = - unsafe { mem::transmute(main_thread_spawner) }; + let thread_executor = if let Some(thread_executor) = thread_executor { + thread_executor + } else { + Arc::new(ThreadExecutor::new()) + }; + let thread_spawner = thread_executor.spawner(); + let thread_spawner: MainThreadSpawner<'env> = unsafe { mem::transmute(thread_spawner) }; let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); let spawned_ref: &'env ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; let scope = Scope { executor, - main_thread_spawner, + thread_spawner, spawned: spawned_ref, scope: PhantomData, env: PhantomData, @@ -285,10 +282,9 @@ impl TaskPool { results }; - if let Some(main_thread_ticker) = main_thread_executor.ticker() { + if let Some(main_thread_ticker) = thread_executor.ticker() { let tick_forever = async move { loop { - dbg!("tivk"); main_thread_ticker.tick().await; } }; @@ -370,7 +366,7 @@ impl Drop for TaskPool { #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { executor: &'scope async_executor::Executor<'scope>, - main_thread_spawner: MainThreadSpawner<'scope>, + thread_spawner: MainThreadSpawner<'scope>, spawned: &'scope ConcurrentQueue>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -399,10 +395,8 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// [`Scope::spawn`] instead, unless the provided future needs to run on the scope's thread. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_main + 'scope + Send>(&self, f: Fut) { - let main_thread_spawner: &MainThreadSpawner<'scope> = - unsafe { mem::transmute(&self.main_thread_spawner) }; - let task = main_thread_spawner.spawn(f); + pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { + let task = self.thread_spawner.spawn(f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap self.spawned.push(task).unwrap(); @@ -427,7 +421,7 @@ mod tests { let count = Arc::new(AtomicI32::new(0)); - let outputs = pool.scope(|scope| { + let outputs = pool.scope(None, |scope| { for _ in 0..100 { let count_clone = count.clone(); scope.spawn(async move { @@ -459,7 +453,7 @@ mod tests { let local_count = Arc::new(AtomicI32::new(0)); let non_local_count = Arc::new(AtomicI32::new(0)); - let outputs = pool.scope(|scope| { + let outputs = pool.scope(None, |scope| { for i in 0..100 { if i % 2 == 0 { let count_clone = non_local_count.clone(); @@ -473,7 +467,7 @@ mod tests { }); } else { let count_clone = local_count.clone(); - scope.spawn_on_main(async move { + scope.spawn_on_scope(async move { if *foo != 42 { panic!("not 42!?!?") } else { @@ -507,14 +501,14 @@ mod tests { let inner_pool = pool.clone(); let inner_thread_check_failed = thread_check_failed.clone(); std::thread::spawn(move || { - inner_pool.scope(|scope| { + inner_pool.scope(None, |scope| { let inner_count_clone = count_clone.clone(); scope.spawn(async move { inner_count_clone.fetch_add(1, Ordering::Release); }); let spawner = std::thread::current().id(); let inner_count_clone = count_clone.clone(); - scope.spawn_on_main(async move { + scope.spawn_on_scope(async move { inner_count_clone.fetch_add(1, Ordering::Release); if std::thread::current().id() != spawner { // NOTE: This check is using an atomic rather than simply panicing the @@ -540,7 +534,7 @@ mod tests { let count = Arc::new(AtomicI32::new(0)); - let outputs: Vec = pool.scope(|scope| { + let outputs: Vec = pool.scope(None, |scope| { for _ in 0..10 { let count_clone = count.clone(); scope.spawn(async move { @@ -582,14 +576,14 @@ mod tests { let inner_pool = pool.clone(); let inner_thread_check_failed = thread_check_failed.clone(); std::thread::spawn(move || { - inner_pool.scope(|scope| { + inner_pool.scope(None, |scope| { let spawner = std::thread::current().id(); let inner_count_clone = count_clone.clone(); scope.spawn(async move { inner_count_clone.fetch_add(1, Ordering::Release); // spawning on the scope from another thread runs the futures on the scope's thread - scope.spawn_on_main(async move { + scope.spawn_on_scope(async move { inner_count_clone.fetch_add(1, Ordering::Release); if std::thread::current().id() != spawner { // NOTE: This check is using an atomic rather than simply panicing the