Skip to content

Commit

Permalink
Basic adaptive batching for parallel query iteration (bevyengine#4777)
Browse files Browse the repository at this point in the history
# Objective
Fixes bevyengine#3184. Fixes bevyengine#6640. Fixes bevyengine#4798. Using `Query::par_for_each(_mut)` currently requires a `batch_size` parameter, which affects how it chunks up large archetypes and tables into smaller chunks to run in parallel. Tuning this value is difficult, as the performance characteristics entirely depends on the state of the `World` it's being run on. Typically, users will just use a flat constant and just tune it by hand until it performs well in some benchmarks. However, this is both error prone and risks overfitting the tuning on that benchmark.

This PR proposes a naive automatic batch-size computation based on the current state of the `World`.

## Background
`Query::par_for_each(_mut)` schedules a new Task for every archetype or table that it matches. Archetypes/tables larger than the batch size are chunked into smaller tasks. Assuming every entity matched by the query has an identical workload, this makes the worst case scenario involve using a batch size equal to the size of the largest matched archetype or table. Conversely, a batch size of `max {archetype, table} size / thread count * COUNT_PER_THREAD` is likely the sweetspot where the overhead of scheduling tasks is minimized, at least not without grouping small archetypes/tables together.

There is also likely a strict minimum batch size below which the overhead of scheduling these tasks is heavier than running the entire thing single-threaded.

## Solution

- [x] Remove the `batch_size` from `Query(State)::par_for_each`  and friends.
- [x] Add a check to compute `batch_size = max {archeytpe/table} size / thread count  * COUNT_PER_THREAD`
- [x] ~~Panic if thread count is 0.~~ Defer to `for_each` if the thread count is 1 or less.
- [x] Early return if there is no matched table/archetype. 
- [x] Add override option for users have queries that strongly violate the initial assumption that all iterated entities have an equal workload.

---

## Changelog
Changed: `Query::par_for_each(_mut)` has been changed to `Query::par_iter(_mut)` and will now automatically try to produce a batch size for callers based on the current `World` state.

## Migration Guide
The `batch_size` parameter for `Query(State)::par_for_each(_mut)` has been removed. These calls will automatically compute a batch size for you. Remove these parameters from all calls to these functions.

Before:
```rust
fn parallel_system(query: Query<&MyComponent>) {
   query.par_for_each(32, |comp| {
        ...
   });
}
```

After:

```rust
fn parallel_system(query: Query<&MyComponent>) {
   query.par_iter().for_each(|comp| {
        ...
   });
}
```

Co-authored-by: Arnav Choubey <[email protected]>
Co-authored-by: Robert Swain <[email protected]>
Co-authored-by: François <[email protected]>
Co-authored-by: Corey Farwell <[email protected]>
Co-authored-by: Aevyrie <[email protected]>
  • Loading branch information
6 people authored and ItsDoot committed Feb 1, 2023
1 parent 68e7f13 commit 9fa40d0
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 188 deletions.
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn heavy_compute(c: &mut Criterion) {
}));

fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(128, |(mut pos, mut mat)| {
query.par_iter_mut().for_each_mut(|(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand Down
30 changes: 16 additions & 14 deletions crates/bevy_animation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,22 @@ pub fn animation_player(
parents: Query<(Option<With<AnimationPlayer>>, Option<&Parent>)>,
mut animation_players: Query<(Entity, Option<&Parent>, &mut AnimationPlayer)>,
) {
animation_players.par_for_each_mut(10, |(root, maybe_parent, mut player)| {
update_transitions(&mut player, &time);
run_animation_player(
root,
player,
&time,
&animations,
&names,
&transforms,
maybe_parent,
&parents,
&children,
);
});
animation_players
.par_iter_mut()
.for_each_mut(|(root, maybe_parent, mut player)| {
update_transitions(&mut player, &time);
run_animation_player(
root,
player,
&time,
&animations,
&names,
&transforms,
maybe_parent,
&parents,
&children,
);
});
}

#[allow(clippy::too_many_arguments)]
Expand Down
12 changes: 6 additions & 6 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world
.query::<(Entity, &A)>()
.par_for_each(&world, 2, |(e, &A(i))| {
.par_iter(&world)
.for_each(|(e, &A(i))| {
results.lock().unwrap().push((e, i));
});
results.lock().unwrap().sort();
Expand All @@ -420,11 +421,10 @@ mod tests {
let e4 = world.spawn((SparseStored(4), A(1))).id();
let e5 = world.spawn((SparseStored(5), A(1))).id();
let results = Arc::new(Mutex::new(Vec::new()));
world.query::<(Entity, &SparseStored)>().par_for_each(
&world,
2,
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
);
world
.query::<(Entity, &SparseStored)>()
.par_iter(&world)
.for_each(|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)));
results.lock().unwrap().sort();
assert_eq!(
&*results.lock().unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod access;
mod fetch;
mod filter;
mod iter;
mod par_iter;
mod state;

pub use access::*;
pub use fetch::*;
pub use filter::*;
pub use iter::*;
pub use par_iter::*;
pub use state::*;

/// A debug checked version of [`Option::unwrap_unchecked`]. Will panic in
Expand Down
202 changes: 202 additions & 0 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::world::World;
use bevy_tasks::ComputeTaskPool;
use std::ops::Range;

use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};

/// Dictates how a parallel query chunks up large tables/archetypes
/// during iteration.
///
/// A parallel query will chunk up large tables and archetypes into
/// chunks of at most a certain batch size.
///
/// By default, this batch size is automatically determined by dividing
/// the size of the largest matched archetype by the number
/// of threads. This attempts to minimize the overhead of scheduling
/// tasks onto multiple threads, but assumes each entity has roughly the
/// same amount of work to be done, which may not hold true in every
/// workload.
///
/// See [`Query::par_iter`] for more information.
///
/// [`Query::par_iter`]: crate::system::Query::par_iter
#[derive(Clone)]
pub struct BatchingStrategy {
/// The upper and lower limits for how large a batch of entities.
///
/// Setting the bounds to the same value will result in a fixed
/// batch size.
///
/// Defaults to `[1, usize::MAX]`.
pub batch_size_limits: Range<usize>,
/// The number of batches per thread in the [`ComputeTaskPool`].
/// Increasing this value will decrease the batch size, which may
/// increase the scheduling overhead for the iteration.
///
/// Defaults to 1.
pub batches_per_thread: usize,
}

impl BatchingStrategy {
/// Creates a new unconstrained default batching strategy.
pub const fn new() -> Self {
Self {
batch_size_limits: 1..usize::MAX,
batches_per_thread: 1,
}
}

/// Declares a batching strategy with a fixed batch size.
pub const fn fixed(batch_size: usize) -> Self {
Self {
batch_size_limits: batch_size..batch_size,
batches_per_thread: 1,
}
}

pub const fn min_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.start = batch_size;
self
}

pub const fn max_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.end = batch_size;
self
}

pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self {
assert!(
batches_per_thread > 0,
"The number of batches per thread must be non-zero."
);
self.batches_per_thread = batches_per_thread;
self
}
}

/// A parallel iterator over query results of a [`Query`](crate::system::Query).
///
/// This struct is created by the [`Query::par_iter`](crate::system::Query::iter) and
/// [`Query::par_iter_mut`](crate::system::Query::iter_mut) methods.
pub struct QueryParIter<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> {
pub(crate) world: &'w World,
pub(crate) state: &'s QueryState<Q, F>,
pub(crate) batching_strategy: BatchingStrategy,
}

impl<'w, 's, Q: ReadOnlyWorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for
/// write-queries.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
// SAFETY: query is read only
unsafe {
self.for_each_unchecked(func);
}
}
}

impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Changes the batching strategy used when iterating.
///
/// For more information on how this affects the resultant iteration, see
/// [`BatchingStrategy`].
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
self.batching_strategy = strategy;
self
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&mut self, func: FN) {
// SAFETY: query has unique world access
unsafe {
self.for_each_unchecked(func);
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
/// have unique access to the components they query.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&self,
func: FN,
) {
let thread_count = ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
self.state.for_each_unchecked_manual(
self.world,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
}
}

fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
return self.batching_strategy.batch_size_limits.start;
}

assert!(
thread_count > 0,
"Attempted to run parallel iteration over a query with an empty TaskPool"
);
let max_size = if Q::IS_DENSE && F::IS_DENSE {
let tables = &self.world.storages().tables;
self.state
.matched_table_ids
.iter()
.map(|id| tables[*id].entity_count())
.max()
.unwrap_or(0)
} else {
let archetypes = &self.world.archetypes();
self.state
.matched_archetype_ids
.iter()
.map(|id| archetypes[*id].len())
.max()
.unwrap_or(0)
};
let batch_size = max_size / (thread_count * self.batching_strategy.batches_per_thread);
batch_size.clamp(
self.batching_strategy.batch_size_limits.start,
self.batching_strategy.batch_size_limits.end,
)
}
}
Loading

0 comments on commit 9fa40d0

Please sign in to comment.