diff --git a/polkadot/primitives/src/v8/mod.rs b/polkadot/primitives/src/v8/mod.rs index 7fc4c5b5c3f1..f952e93d1839 100644 --- a/polkadot/primitives/src/v8/mod.rs +++ b/polkadot/primitives/src/v8/mod.rs @@ -435,10 +435,8 @@ pub const ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE: u32 = 10_000; /// Maximum for maximum queue size. /// -/// Setting `on_demand_queue_max_size` to a value higher than this is unsound. This is more a -/// theoretical limit, just below enough what the target type supports, so comparisons are possible -/// even with indices that are overflowing the underyling type. -pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000; +/// We use this value for benchmarking. +pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 10_000; /// Backing votes threshold used from the host prior to runtime API version 6 and from the runtime /// prior to v9 configuration migration. diff --git a/polkadot/runtime/parachains/src/assigner_coretime/mod.rs b/polkadot/runtime/parachains/src/assigner_coretime/mod.rs index 866d52dc9848..976bdfa2ba42 100644 --- a/polkadot/runtime/parachains/src/assigner_coretime/mod.rs +++ b/polkadot/runtime/parachains/src/assigner_coretime/mod.rs @@ -27,19 +27,24 @@ mod mock_helpers; #[cfg(test)] mod tests; -use crate::{ - configuration, on_demand, - paras::AssignCoretime, - scheduler::common::{Assignment, AssignmentProvider}, - ParaId, -}; +use crate::{configuration, on_demand, paras::AssignCoretime, ParaId}; -use alloc::{vec, vec::Vec}; +use alloc::{ + collections::{BTreeMap, VecDeque}, + vec, + vec::Vec, +}; use frame_support::{defensive, pallet_prelude::*}; use frame_system::pallet_prelude::*; -use pallet_broker::CoreAssignment; use polkadot_primitives::CoreIndex; -use sp_runtime::traits::{One, Saturating}; +use scale_info::TypeInfo; +use sp_runtime::{ + codec::{Decode, Encode}, + traits::{One, Saturating}, + RuntimeDebug, +}; + +pub use pallet_broker::CoreAssignment; pub use pallet::*; @@ -146,7 +151,7 @@ struct WorkState { /// Position in the assignments we are currently in. /// /// Aka which core assignment will be popped next on - /// `AssignmentProvider::pop_assignment_for_core`. + /// `AssignmentProvider::advance_assignments`. pos: u16, /// Step width /// @@ -172,6 +177,67 @@ struct AssignmentState { remaining: PartsOf57600, } +/// How storage is accessed. +enum AccessMode<'a, T: Config> { + /// We only want to peek (no side effects). + Peek { on_demand_orders: &'a mut on_demand::OrderQueue> }, + /// We need to update state. + Pop, +} + +impl<'a, T: Config> AccessMode<'a, T> { + /// Construct a peeking access mode. + fn peek(on_demand_orders: &'a mut on_demand::OrderQueue>) -> Self { + Self::Peek { on_demand_orders } + } + + /// Construct popping/modifying access mode. + fn pop() -> Self { + Self::Pop + } + + /// Pop pool assignments according to access mode. + fn pop_assignment_for_ondemand_cores( + &mut self, + now: BlockNumberFor, + num_cores: u32, + ) -> impl Iterator { + match self { + Self::Peek { on_demand_orders } => on_demand_orders + .pop_assignment_for_cores::(now, num_cores) + .collect::>(), + Self::Pop => + on_demand::Pallet::::pop_assignment_for_cores(now, num_cores).collect::>(), + } + .into_iter() + } + + /// Get core schedule according to access mode (either take or get). + fn get_core_schedule( + &self, + next_scheduled: BlockNumberFor, + core_idx: CoreIndex, + ) -> Option>> { + match self { + Self::Peek { .. } => CoreSchedules::::get((next_scheduled, core_idx)), + Self::Pop => CoreSchedules::::take((next_scheduled, core_idx)), + } + } +} + +/// Assignments that got advanced. +struct AdvancedAssignments { + bulk_assignments: Vec<(CoreIndex, ParaId)>, + pool_assignments: Vec<(CoreIndex, ParaId)>, +} + +impl AdvancedAssignments { + fn into_iter(self) -> impl Iterator { + let Self { bulk_assignments, pool_assignments } = self; + bulk_assignments.into_iter().chain(pool_assignments.into_iter()) + } +} + impl From> for WorkState { fn from(schedule: Schedule) -> Self { let Schedule { assignments, end_hint, next_schedule: _ } = schedule; @@ -207,10 +273,11 @@ pub mod pallet { /// /// Assignments as of the given block number. They will go into state once the block number is /// reached (and replace whatever was in there before). + /// TODO: Write migration (Twox256 -> Twox256Concat) #[pallet::storage] pub(super) type CoreSchedules = StorageMap< _, - Twox256, + Twox64Concat, (BlockNumberFor, CoreIndex), Schedule>, OptionQuery, @@ -220,12 +287,11 @@ pub mod pallet { /// /// They will be picked from `PendingAssignments` once we reach the scheduled block number in /// `PendingAssignments`. + /// TODO: Migration #[pallet::storage] - pub(super) type CoreDescriptors = StorageMap< + pub(super) type CoreDescriptors = StorageValue< _, - Twox256, - CoreIndex, - CoreDescriptor>, + BTreeMap>>, ValueQuery, GetDefault, >; @@ -242,139 +308,81 @@ pub mod pallet { } } -impl AssignmentProvider> for Pallet { - fn pop_assignment_for_core(core_idx: CoreIndex) -> Option { - let now = frame_system::Pallet::::block_number(); - - CoreDescriptors::::mutate(core_idx, |core_state| { - Self::ensure_workload(now, core_idx, core_state); - - let work_state = core_state.current_work.as_mut()?; - - // Wrap around: - work_state.pos = work_state.pos % work_state.assignments.len() as u16; - let (a_type, a_state) = &mut work_state - .assignments - .get_mut(work_state.pos as usize) - .expect("We limited pos to the size of the vec one line above. qed"); - - // advance for next pop: - a_state.remaining = a_state.remaining.saturating_sub(work_state.step); - if a_state.remaining < work_state.step { - // Assignment exhausted, need to move to the next and credit remaining for - // next round. - work_state.pos += 1; - // Reset to ratio + still remaining "credits": - a_state.remaining = a_state.remaining.saturating_add(a_state.ratio); - } - - match a_type { - CoreAssignment::Idle => None, - CoreAssignment::Pool => on_demand::Pallet::::pop_assignment_for_core(core_idx), - CoreAssignment::Task(para_id) => Some(Assignment::Bulk((*para_id).into())), - } - }) - } - - fn report_processed(assignment: Assignment) { - match assignment { - Assignment::Pool { para_id, core_index } => - on_demand::Pallet::::report_processed(para_id, core_index), - Assignment::Bulk(_) => {}, - } +impl Pallet { + /// Peek `num_entries` into the future. + /// + /// First element for each `CoreIndex` will tell what would be retrieved when + /// `advance_assignments` is called at the next block. The second what one would get in the + /// block after the next block and so forth. + /// + /// The predictions are accurate in the sense that if an assignment `B` was predicted, it will + /// never happen that `advance_assignments` at that block will retrieve an assignment `A`. + /// What can happen though is that the prediction is empty (returned vec does not contain that + /// element), but `advance_assignments` at that block will then return something regardless. + /// + /// Invariants: + /// + /// - `advance_assignments` must be called for each core each block + /// exactly once for the prediction offered by `peek_next_block` to stay accurate. + /// - This function is meant to be called from a runtime API and thus uses the state of the + /// block after the current one to show an accurate prediction of upcoming schedules. + pub fn peek_next_block(num_entries: u32) -> BTreeMap> { + let now = frame_system::Pallet::::block_number().saturating_plus_one(); + Self::peek_impl(now, num_entries) } - /// Push an assignment back to the front of the queue. + /// Advance assignments. + /// + /// We move forward one step with the assignments on each core. /// - /// The assignment has not been processed yet. Typically used on session boundaries. /// Parameters: - /// - `assignment`: The on demand assignment. - fn push_back_assignment(assignment: Assignment) { - match assignment { - Assignment::Pool { para_id, core_index } => - on_demand::Pallet::::push_back_assignment(para_id, core_index), - Assignment::Bulk(_) => { - // Session changes are rough. We just drop assignments that did not make it on a - // session boundary. This seems sensible as bulk is region based. Meaning, even if - // we made the effort catching up on those dropped assignments, this would very - // likely lead to other assignments not getting served at the "end" (when our - // assignment set gets replaced). - }, - } - } + /// + /// - blocked: Lambda, for each core it returns true, the assignment could not actually be + /// served. + /// + /// Returns: Advanced assignments. Blocked cores will still be advanced, but will not be + /// contained in the output. + pub fn advance_assignments bool>( + is_blocked: F, + ) -> BTreeMap { + let now = frame_system::Pallet::::block_number(); - #[cfg(any(feature = "runtime-benchmarks", test))] - fn get_mock_assignment(_: CoreIndex, para_id: polkadot_primitives::Id) -> Assignment { - // Given that we are not tracking anything in `Bulk` assignments, it is safe to always - // return a bulk assignment. - Assignment::Bulk(para_id) - } + let assignments = CoreDescriptors::::mutate(|core_states| { + Self::advance_assignments_single_impl(now, core_states, AccessMode::pop()) + }); - fn assignment_duplicated(assignment: &Assignment) { - match assignment { - Assignment::Pool { para_id, core_index } => - on_demand::Pallet::::assignment_duplicated(*para_id, *core_index), - Assignment::Bulk(_) => {}, + // Give blocked on-demand orders another chance: + for blocked in assignments.pool_assignments.iter().filter_map(|(core_idx, para_id)| { + if is_blocked(*core_idx) { + Some(*para_id) + } else { + None + } + }) { + on_demand::Pallet::::push_back_order(blocked); } - } -} -impl Pallet { - /// Ensure given workload for core is up to date. - fn ensure_workload( - now: BlockNumberFor, - core_idx: CoreIndex, - descriptor: &mut CoreDescriptor>, - ) { - // Workload expired? - if descriptor - .current_work - .as_ref() - .and_then(|w| w.end_hint) - .map_or(false, |e| e <= now) + let mut assignments: BTreeMap = + assignments.into_iter().filter(|(core_idx, _)| !is_blocked(*core_idx)).collect(); + + // Try to fill missing assignments from the next position (duplication to allow asynchronous + // backing even for first assignment coming in on a previously empty core): + let next = now.saturating_plus_one(); + let mut core_states = CoreDescriptors::::get(); + let mut on_demand_orders = on_demand::Pallet::::peek_order_queue(); + let next_assignments = Self::advance_assignments_single_impl( + next, + &mut core_states, + AccessMode::peek(&mut on_demand_orders), + ) + .into_iter(); + + for (core_idx, next_assignment) in + next_assignments.filter(|(core_idx, _)| !is_blocked(*core_idx)) { - descriptor.current_work = None; + assignments.entry(core_idx).or_insert_with(|| next_assignment); } - - let Some(queue) = descriptor.queue else { - // No queue. - return - }; - - let mut next_scheduled = queue.first; - - if next_scheduled > now { - // Not yet ready. - return - } - - // Update is needed: - let update = loop { - let Some(update) = CoreSchedules::::take((next_scheduled, core_idx)) else { - break None - }; - // Still good? - if update.end_hint.map_or(true, |e| e > now) { - break Some(update) - } - // Move on if possible: - if let Some(n) = update.next_schedule { - next_scheduled = n; - } else { - break None - } - }; - - let new_first = update.as_ref().and_then(|u| u.next_schedule); - descriptor.current_work = update.map(Into::into); - - descriptor.queue = new_first.map(|new_first| { - QueueDescriptor { - first: new_first, - // `last` stays unaffected, if not empty: - last: queue.last, - } - }); + assignments } /// Append another assignment for a core. @@ -403,7 +411,8 @@ impl Pallet { // There should be at least one assignment. ensure!(!assignments.is_empty(), Error::::AssignmentsEmpty); - CoreDescriptors::::mutate(core_idx, |core_descriptor| { + CoreDescriptors::::mutate(|core_descriptors| { + let core_descriptor = core_descriptors.entry(core_idx).or_default(); let new_queue = match core_descriptor.queue { Some(queue) => { ensure!(begin >= queue.last, Error::::DisallowedInsert); @@ -445,6 +454,143 @@ impl Pallet { Ok(()) }) } + + fn num_coretime_cores() -> u32 { + configuration::ActiveConfig::::get().scheduler_params.num_cores + } + + fn peek_impl( + mut now: BlockNumberFor, + num_entries: u32, + ) -> BTreeMap> { + let mut core_states = CoreDescriptors::::get(); + let mut result = BTreeMap::new(); + let mut on_demand_orders = on_demand::Pallet::::peek_order_queue(); + for i in 0..num_entries { + let assignments = Self::advance_assignments_single_impl( + now, + &mut core_states, + AccessMode::peek(&mut on_demand_orders), + ) + .into_iter(); + for (core_idx, para_id) in assignments { + let claim_queue: &mut VecDeque = result.entry(core_idx).or_default(); + // Stop filling on holes, otherwise we get claims at the wrong positions. + if claim_queue.len() == i as usize { + claim_queue.push_back(para_id) + } else if claim_queue.len() == 0 && i == 1 { + // Except for position 1: Claim queue was empty before. We now have an incoming + // assignment on position 1: Duplicate it to position 0 so the chain will + // get a full asynchronous backing opportunity (and a bonus synchronous + // backing opportunity). + claim_queue.push_back(para_id); + // And fill position 1: + claim_queue.push_back(para_id); + } + } + now.saturating_inc(); + } + result + } + + /// Pop assignments for `now`. + fn advance_assignments_single_impl( + now: BlockNumberFor, + core_states: &mut BTreeMap>>, + mut mode: AccessMode, + ) -> AdvancedAssignments { + let mut bulk_assignments = Vec::with_capacity(Self::num_coretime_cores() as _); + let mut pool_cores = Vec::with_capacity(Self::num_coretime_cores() as _); + for (core_idx, core_state) in core_states.iter_mut() { + Self::ensure_workload(now, *core_idx, core_state, &mode); + + let Some(work_state) = core_state.current_work.as_mut() else { continue }; + + // Wrap around: + work_state.pos = work_state.pos % work_state.assignments.len() as u16; + let (a_type, a_state) = &mut work_state + .assignments + .get_mut(work_state.pos as usize) + .expect("We limited pos to the size of the vec one line above. qed"); + + // advance for next pop: + a_state.remaining = a_state.remaining.saturating_sub(work_state.step); + if a_state.remaining < work_state.step { + // Assignment exhausted, need to move to the next and credit remaining for + // next round. + work_state.pos += 1; + // Reset to ratio + still remaining "credits": + a_state.remaining = a_state.remaining.saturating_add(a_state.ratio); + } + match *a_type { + CoreAssignment::Pool => pool_cores.push(*core_idx), + CoreAssignment::Task(para_id) => bulk_assignments.push((*core_idx, para_id.into())), + CoreAssignment::Idle => {}, + } + } + + let pool_assignments = mode.pop_assignment_for_ondemand_cores(now, pool_cores.len() as _); + let pool_assignments = pool_cores.into_iter().zip(pool_assignments).collect(); + + AdvancedAssignments { bulk_assignments, pool_assignments } + } + + /// Ensure given workload for core is up to date. + fn ensure_workload( + now: BlockNumberFor, + core_idx: CoreIndex, + descriptor: &mut CoreDescriptor>, + mode: &AccessMode, + ) { + // Workload expired? + if descriptor + .current_work + .as_ref() + .and_then(|w| w.end_hint) + .map_or(false, |e| e <= now) + { + descriptor.current_work = None; + } + + let Some(queue) = descriptor.queue else { + // No queue. + return + }; + + let mut next_scheduled = queue.first; + + if next_scheduled > now { + // Not yet ready. + return + } + + // Update is needed: + let update = loop { + let Some(update) = mode.get_core_schedule(next_scheduled, core_idx) else { break None }; + + // Still good? + if update.end_hint.map_or(true, |e| e > now) { + break Some(update) + } + // Move on if possible: + if let Some(n) = update.next_schedule { + next_scheduled = n; + } else { + break None + } + }; + + let new_first = update.as_ref().and_then(|u| u.next_schedule); + descriptor.current_work = update.map(Into::into); + + descriptor.queue = new_first.map(|new_first| { + QueueDescriptor { + first: new_first, + // `last` stays unaffected, if not empty: + last: queue.last, + } + }); + } } impl AssignCoretime for Pallet { diff --git a/polkadot/runtime/parachains/src/assigner_coretime/tests.rs b/polkadot/runtime/parachains/src/assigner_coretime/tests.rs index ab011bfc4ae1..153fd7fea221 100644 --- a/polkadot/runtime/parachains/src/assigner_coretime/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_coretime/tests.rs @@ -24,7 +24,6 @@ use crate::{ Scheduler, System, Test, }, paras::{ParaGenesisArgs, ParaKind}, - scheduler::common::Assignment, }; use frame_support::{assert_noop, assert_ok, pallet_prelude::*, traits::Currency}; use pallet_broker::TaskId; @@ -77,7 +76,7 @@ fn run_to_block( OnDemand::on_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::advance_claim_queue(&Default::default()); + Scheduler::advance_claim_queue(|_| false); } } @@ -113,14 +112,14 @@ fn assign_core_works_with_no_prior_schedule() { // Check QueueDescriptor assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .queue .as_ref() .and_then(|q| Some(q.first)), Some(BlockNumberFor::::from(11u32)) ); assert_eq!( - CoreDescriptors::::get(core_idx).queue.as_ref().and_then(|q| Some(q.last)), + CoreDescriptors::::get().entry(core_idx).or_default().queue.as_ref().and_then(|q| Some(q.last)), Some(BlockNumberFor::::from(11u32)) ); }); @@ -141,28 +140,28 @@ fn end_hint_is_properly_honored() { )); assert!( - CoretimeAssigner::pop_assignment_for_core(core_idx).is_none(), + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx).is_none(), "No assignment yet in effect" ); run_to_block(11, |_| None); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(1.into())), + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(1.into()), "Assignment should now be present" ); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(1.into())), + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(1.into()), "Nothing changed, assignment should still be present" ); run_to_block(15, |_| None); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), None, "Assignment should now be gone" ); @@ -177,7 +176,7 @@ fn end_hint_is_properly_honored() { // Core should still be empty: assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), None, "Assignment should now be gone" ); @@ -221,14 +220,14 @@ fn assign_core_works_with_prior_schedule() { // Check QueueDescriptor assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .queue .as_ref() .and_then(|q| Some(q.first)), Some(BlockNumberFor::::from(11u32)) ); assert_eq!( - CoreDescriptors::::get(core_idx).queue.as_ref().and_then(|q| Some(q.last)), + CoreDescriptors::::get().entry(core_idx).or_default().queue.as_ref().and_then(|q| Some(q.last)), Some(BlockNumberFor::::from(15u32)) ); }); @@ -405,14 +404,14 @@ fn next_schedule_always_points_to_next_work_plan_item() { // Check QueueDescriptor assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .queue .as_ref() .and_then(|q| Some(q.first)), Some(start_3) ); assert_eq!( - CoreDescriptors::::get(core_idx).queue.as_ref().and_then(|q| Some(q.last)), + CoreDescriptors::::get().entry(core_idx).or_default().queue.as_ref().and_then(|q| Some(q.last)), Some(start_5) ); }); @@ -448,7 +447,7 @@ fn ensure_workload_works() { run_to_block(1, |n| if n == 1 { Some(Default::default()) } else { None }); // Case 1: No new schedule in CoreSchedules for core - CoretimeAssigner::ensure_workload(10u32, core_idx, &mut core_descriptor); + CoretimeAssigner::ensure_workload(10u32, core_idx, &mut core_descriptor, &AccessMode::Pop); assert_eq!(core_descriptor, empty_descriptor); // Case 2: New schedule exists in CoreSchedules for core, but new @@ -462,20 +461,20 @@ fn ensure_workload_works() { // Propagate changes from storage to Core_Descriptor handle. Normally // pop_assignment_for_core would handle this. - core_descriptor = CoreDescriptors::::get(core_idx); + let mut core_descriptor = CoreDescriptors::::get().entry(core_idx).or_default(); - CoretimeAssigner::ensure_workload(10u32, core_idx, &mut core_descriptor); + CoretimeAssigner::ensure_workload(10u32, core_idx, &mut core_descriptor, &AccessMode::Pop); assert_eq!(core_descriptor, assignments_queued_descriptor); // Case 3: Next schedule exists in CoreSchedules for core. Next starting // block has been reached. Swaps new WorkState into CoreDescriptors from // CoreSchedules. - CoretimeAssigner::ensure_workload(11u32, core_idx, &mut core_descriptor); + CoretimeAssigner::ensure_workload(11u32, core_idx, &mut core_descriptor, &AccessMode::Pop); assert_eq!(core_descriptor, assignments_active_descriptor); // Case 4: end_hint reached but new schedule start not yet reached. WorkState in // CoreDescriptor is cleared - CoretimeAssigner::ensure_workload(15u32, core_idx, &mut core_descriptor); + CoretimeAssigner::ensure_workload(15u32, core_idx, &mut core_descriptor, &AccessMode::Pop); assert_eq!(core_descriptor, empty_descriptor); }); } @@ -569,12 +568,12 @@ fn assignment_proportions_in_core_state_work() { // Case 1: Current assignment remaining >= step after pop { assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_1.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&CoreIndex(0)), + Some(task_1.into()) ); assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.pos)), @@ -582,7 +581,7 @@ fn assignment_proportions_in_core_state_work() { ); // Consumed step should be 1/3 of core parts, leaving 1/3 remaining assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.assignments[0].1.remaining)), @@ -593,12 +592,12 @@ fn assignment_proportions_in_core_state_work() { // Case 2: Current assignment remaining < step after pop { assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_1.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&CoreIndex(0)), + Some(task_1.into()) ); // Pos should have incremented, as assignment had remaining < step assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.pos)), @@ -607,7 +606,7 @@ fn assignment_proportions_in_core_state_work() { // Remaining should have started at 1/3 of core work parts. We then subtract // step (1/3) and add back ratio (2/3), leaving us with 2/3 of core work parts. assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.assignments[0].1.remaining)), @@ -618,7 +617,7 @@ fn assignment_proportions_in_core_state_work() { // Final check, task 2's turn to be served assert_eq!( CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_2.into())) + Some(task_2.into()) ); }); } @@ -711,40 +710,40 @@ fn assignment_proportions_indivisible_by_step_work() { // 1, 2, 1, 1, 2. The remaining parts for each assignment should be same // at the end as in the beginning. assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_1.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(&task_1.into()) ); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_2.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(&task_2.into()) ); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_1.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(&task_1.into()) ); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_1.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(&task_1.into()) ); assert_eq!( - CoretimeAssigner::pop_assignment_for_core(core_idx), - Some(Assignment::Bulk(task_2.into())) + CoretimeAssigner::advance_assignments(|_| false).get(&core_idx), + Some(&task_2.into()) ); // Remaining should equal ratio for both assignments. assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.assignments[0].1.remaining)), Some(ratio_1) ); assert_eq!( - CoreDescriptors::::get(core_idx) + CoreDescriptors::::get().entry(core_idx).or_default() .current_work .as_ref() .and_then(|w| Some(w.assignments[1].1.remaining)), diff --git a/polkadot/runtime/parachains/src/builder.rs b/polkadot/runtime/parachains/src/builder.rs index fa9497f8ccd5..3f6e46672dca 100644 --- a/polkadot/runtime/parachains/src/builder.rs +++ b/polkadot/runtime/parachains/src/builder.rs @@ -14,15 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use core::iter::repeat; + use crate::{ configuration, inclusion, initializer, paras, paras::ParaKind, paras_inherent, - scheduler::{ - self, - common::{Assignment, AssignmentProvider}, - }, + scheduler, session_info, shared, + assigner_coretime::{PartsOf57600, CoreAssignment, self}, + on_demand, }; use alloc::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque}, @@ -921,14 +922,15 @@ impl BenchBuilder { .map(|idx| (idx, 0)) .collect::>(); - let mut all_cores = builder.backed_and_concluding_paras.clone(); - all_cores.append(&mut disputed_cores); + let mut all_paras = builder.backed_and_concluding_paras.clone(); + all_paras.append(&mut disputed_cores); assert_eq!(inclusion::PendingAvailability::::iter().count(), used_cores - extra_cores); // Sanity check that the occupied cores reported by the inclusion module are what we expect // to be. let mut core_idx = 0u32; + let now = frame_system::Pallet::::block_number(); let elastic_paras = &builder.elastic_paras; let mut occupied_cores = inclusion::Pallet::::get_occupied_cores() @@ -936,7 +938,7 @@ impl BenchBuilder { .collect::>(); occupied_cores.sort_by(|(core_a, _), (core_b, _)| core_a.0.cmp(&core_b.0)); - let mut expected_cores = all_cores + let mut expected_cores = all_paras .iter() .flat_map(|(para_id, _)| { (0..elastic_paras.get(¶_id).cloned().unwrap_or(1)) @@ -954,29 +956,16 @@ impl BenchBuilder { assert_eq!(expected_cores, occupied_cores); // We need entries in the claim queue for those: - all_cores.append(&mut builder.backed_in_inherent_paras.clone()); + all_paras.append(&mut builder.backed_in_inherent_paras.clone()); - let mut core_idx = 0u32; - let cores = all_cores + let core_paras = all_paras .keys() - .flat_map(|para_id| { - (0..elastic_paras.get(¶_id).cloned().unwrap_or(1)) - .map(|_para_local_core_idx| { - // Load an assignment into provider so that one is present to pop - let assignment = - ::AssignmentProvider::get_mock_assignment( - CoreIndex(core_idx), - ParaId::from(*para_id), - ); + .flat_map(|para_id| repeat(ParaId::from(*para_id)).take(elastic_paras.get(¶_id).cloned().unwrap_or(1).into())); + let cores = (0..).map(CoreIndex).zip(core_paras).collect::>(); - core_idx += 1; - (CoreIndex(core_idx - 1), [assignment].into()) - }) - .collect::)>>() - }) - .collect::>>(); - - scheduler::ClaimQueue::::set(cores); + for (core_idx, para) in cores { + assigner_coretime::Pallet::::assign_core(core_idx, now, vec![(CoreAssignment::Task(para.into()), PartsOf57600::FULL)], None).unwrap(); + } Bench:: { data: ParachainsInherentData { diff --git a/polkadot/runtime/parachains/src/coretime/benchmarking.rs b/polkadot/runtime/parachains/src/coretime/benchmarking.rs index 49e3d8a88c01..9be3783299d0 100644 --- a/polkadot/runtime/parachains/src/coretime/benchmarking.rs +++ b/polkadot/runtime/parachains/src/coretime/benchmarking.rs @@ -41,7 +41,7 @@ mod benchmarks { T::MaxHistoricalRevenue, > = BoundedVec::try_from((1..=mhr).map(|v| minimum_balance * v.into()).collect::>()) .unwrap(); - on_demand::Revenue::::put(rev); + on_demand::Pallet::::set_revenue(rev); crate::paras::Heads::::insert(ParaId::from(T::BrokerId::get()), vec![1, 2, 3]); diff --git a/polkadot/runtime/parachains/src/mock.rs b/polkadot/runtime/parachains/src/mock.rs index ee1990a7b618..49de5a1670dd 100644 --- a/polkadot/runtime/parachains/src/mock.rs +++ b/polkadot/runtime/parachains/src/mock.rs @@ -22,11 +22,9 @@ use crate::{ initializer, on_demand, origin, paras, paras::ParaKind, paras_inherent, scheduler, - scheduler::common::AssignmentProvider, session_info, shared, ParaId, }; use frame_support::pallet_prelude::*; -use polkadot_primitives::CoreIndex; use codec::Decode; use frame_support::{ @@ -54,7 +52,7 @@ use sp_runtime::{ }; use std::{ cell::RefCell, - collections::{btree_map::BTreeMap, vec_deque::VecDeque, HashMap}, + collections::{btree_map::BTreeMap, HashMap}, }; use xcm::{ prelude::XcmVersion, @@ -77,7 +75,6 @@ frame_support::construct_runtime!( ParaInclusion: inclusion, ParaInherent: paras_inherent, Scheduler: scheduler, - MockAssigner: mock_assigner, OnDemand: on_demand, CoretimeAssigner: assigner_coretime, Coretime: coretime, @@ -344,9 +341,7 @@ impl crate::disputes::SlashingHandler for Test { fn initializer_on_new_session(_: SessionIndex) {} } -impl crate::scheduler::Config for Test { - type AssignmentProvider = MockAssigner; -} +impl crate::scheduler::Config for Test { } pub struct TestMessageQueueWeight; impl pallet_message_queue::WeightInfo for TestMessageQueueWeight { @@ -495,72 +490,6 @@ impl ValidatorSetWithIdentification for MockValidatorSet { type IdentificationOf = FoolIdentificationOf; } -/// A mock assigner which acts as the scheduler's `AssignmentProvider` for tests. The mock -/// assigner provides bare minimum functionality to test scheduler internals. Since they -/// have no direct effect on scheduler state, AssignmentProvider functions such as -/// `push_back_assignment` can be left empty. -pub mod mock_assigner { - use crate::scheduler::common::Assignment; - - use super::*; - pub use pallet::*; - - #[frame_support::pallet] - pub mod pallet { - use super::*; - - #[pallet::pallet] - #[pallet::without_storage_info] - pub struct Pallet(_); - - #[pallet::config] - pub trait Config: frame_system::Config + configuration::Config + paras::Config {} - - #[pallet::storage] - pub(super) type MockAssignmentQueue = - StorageValue<_, VecDeque, ValueQuery>; - } - - impl Pallet { - /// Adds a claim to the `MockAssignmentQueue` this claim can later be popped by the - /// scheduler when filling the claim queue for tests. - pub fn add_test_assignment(assignment: Assignment) { - MockAssignmentQueue::::mutate(|queue| queue.push_back(assignment)); - } - } - - impl AssignmentProvider for Pallet { - // With regards to popping_assignments, the scheduler just needs to be tested under - // the following two conditions: - // 1. An assignment is provided - // 2. No assignment is provided - // A simple assignment queue populated to fit each test fulfills these needs. - fn pop_assignment_for_core(_core_idx: CoreIndex) -> Option { - let mut queue: VecDeque = MockAssignmentQueue::::get(); - let front = queue.pop_front(); - // Write changes to storage. - MockAssignmentQueue::::set(queue); - front - } - - // We don't care about core affinity in the test assigner - fn report_processed(_: Assignment) {} - - fn push_back_assignment(assignment: Assignment) { - Self::add_test_assignment(assignment); - } - - #[cfg(any(feature = "runtime-benchmarks", test))] - fn get_mock_assignment(_: CoreIndex, para_id: ParaId) -> Assignment { - Assignment::Bulk(para_id) - } - - fn assignment_duplicated(_: &Assignment) {} - } -} - -impl mock_assigner::pallet::Config for Test {} - pub struct FoolIdentificationOf; impl sp_runtime::traits::Convert> for FoolIdentificationOf { fn convert(_: AccountId) -> Option<()> { diff --git a/polkadot/runtime/parachains/src/on_demand/mod.rs b/polkadot/runtime/parachains/src/on_demand/mod.rs index 66400eb00fd9..4d411aec70df 100644 --- a/polkadot/runtime/parachains/src/on_demand/mod.rs +++ b/polkadot/runtime/parachains/src/on_demand/mod.rs @@ -31,20 +31,21 @@ //! occupying multiple cores in on-demand, we will likely add a separate order type, where the //! intent can be made explicit. +use core::mem; + use sp_runtime::traits::Zero; mod benchmarking; -pub mod migration; +// TODO: Add back & fix. +// pub mod migration; mod mock_helpers; -mod types; extern crate alloc; #[cfg(test)] mod tests; -use crate::{configuration, paras, scheduler::common::Assignment}; -use alloc::collections::BinaryHeap; -use core::mem::take; +use crate::{configuration, paras}; +use alloc::collections::BTreeSet; use frame_support::{ pallet_prelude::*, traits::{ @@ -56,42 +57,130 @@ use frame_support::{ PalletId, }; use frame_system::{pallet_prelude::*, Pallet as System}; -use polkadot_primitives::{CoreIndex, Id as ParaId}; +use polkadot_primitives::{Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE}; use sp_runtime::{ traits::{AccountIdConversion, One, SaturatedConversion}, FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating, }; -use types::{ - BalanceOf, CoreAffinityCount, EnqueuedOrder, QueuePushDirection, QueueStatusType, - SpotTrafficCalculationErr, -}; const LOG_TARGET: &str = "runtime::parachains::on-demand"; pub use pallet::*; pub trait WeightInfo { - fn place_order_allow_death(s: u32) -> Weight; - fn place_order_keep_alive(s: u32) -> Weight; + fn place_order_allow_death() -> Weight; + fn place_order_keep_alive() -> Weight; } /// A weight info that is only suitable for testing. pub struct TestWeightInfo; impl WeightInfo for TestWeightInfo { - fn place_order_allow_death(_: u32) -> Weight { + fn place_order_allow_death() -> Weight { Weight::MAX } - fn place_order_keep_alive(_: u32) -> Weight { + fn place_order_keep_alive() -> Weight { Weight::MAX } } +/// Shorthand for the Balance type the runtime is using. +pub type BalanceOf = + <::Currency as Currency<::AccountId>>::Balance; + +/// All queued on-demand orders. +#[derive(Encode, Decode, TypeInfo)] +pub struct OrderQueue { + queue: BoundedVec, ConstU32>, +} + +impl OrderQueue { + /// Pop `num_cores` from the queue, assuming `now` as the current block number. + pub fn pop_assignment_for_cores( + &mut self, + now: N, + mut num_cores: u32, + ) -> impl Iterator + where + N: Saturating + Ord + One + Copy, + { + let mut popped = BTreeSet::new(); + let mut remaining_orders = Vec::with_capacity(self.queue.len()); + for order in mem::take(&mut self.queue).into_iter() { + // Order is ready 2 blocks later (asynchronous backing): + let ready_at = order.ordered_at.saturating_plus_one().saturating_plus_one(); + let is_ready = ready_at <= now; + + if num_cores > 0 && is_ready && popped.insert(order.para_id) { + num_cores -= 1; + } else { + remaining_orders.push(order); + } + } + self.queue = BoundedVec::truncate_from(remaining_orders); + popped.into_iter() + } + + fn new() -> Self { + OrderQueue { queue: BoundedVec::new() } + } + + /// Try to push an additional order. + /// + /// Fails if queue is already at capacity. + fn try_push(&mut self, now: N, para_id: ParaId) -> Result<(), ParaId> { + self.queue + .try_push(EnqueuedOrder { para_id, ordered_at: now }) + .map_err(|o| o.para_id) + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +/// Data about a placed on-demand order. +#[derive(Encode, Decode, TypeInfo)] +struct EnqueuedOrder { + /// The parachain the order was placed for. + para_id: ParaId, + /// The block number the order came in. + ordered_at: N, +} + +/// Queue data for on-demand. +#[derive(Encode, Decode, TypeInfo)] +struct OrderStatus { + /// Last calculated traffic value. + traffic: FixedU128, + + /// Enqueued orders. + queue: OrderQueue, +} + +impl Default for OrderStatus { + fn default() -> OrderStatus { + OrderStatus { traffic: FixedU128::default(), queue: OrderQueue::new() } + } +} + +/// Errors that can happen during spot traffic calculation. +#[derive(PartialEq, RuntimeDebug)] +pub enum SpotTrafficCalculationErr { + /// The order queue capacity is at 0. + QueueCapacityIsZero, + /// The queue size is larger than the queue capacity. + QueueSizeLargerThanCapacity, + /// Arithmetic error during division, either division by 0 or over/underflow. + Division, +} + #[frame_support::pallet] pub mod pallet { use super::*; + use polkadot_primitives::Id as ParaId; const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); @@ -125,48 +214,14 @@ pub mod pallet { type PalletId: Get; } - /// Creates an empty queue status for an empty queue with initial traffic value. - #[pallet::type_value] - pub(super) fn QueueStatusOnEmpty() -> QueueStatusType { - QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() } - } - - #[pallet::type_value] - pub(super) fn EntriesOnEmpty() -> BinaryHeap { - BinaryHeap::new() - } - - /// Maps a `ParaId` to `CoreIndex` and keeps track of how many assignments the scheduler has in - /// it's lookahead. Keeping track of this affinity prevents parallel execution of the same - /// `ParaId` on two or more `CoreIndex`es. - #[pallet::storage] - pub(super) type ParaIdAffinity = - StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>; - - /// Overall status of queue (both free + affinity entries) - #[pallet::storage] - pub(super) type QueueStatus = - StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty>; - /// Priority queue for all orders which don't yet (or not any more) have any core affinity. #[pallet::storage] - pub(super) type FreeEntries = - StorageValue<_, BinaryHeap, ValueQuery, EntriesOnEmpty>; - - /// Queue entries that are currently bound to a particular core due to core affinity. - #[pallet::storage] - pub(super) type AffinityEntries = StorageMap< - _, - Twox64Concat, - CoreIndex, - BinaryHeap, - ValueQuery, - EntriesOnEmpty, - >; + pub(super) type OrderStatus = + StorageValue<_, super::OrderStatus>, ValueQuery>; /// Keeps track of accumulated revenue from on demand order sales. #[pallet::storage] - pub type Revenue = + pub(super) type Revenue = StorageValue<_, BoundedVec, T::MaxHistoricalRevenue>, ValueQuery>; #[pallet::event] @@ -206,8 +261,8 @@ pub mod pallet { let config = configuration::ActiveConfig::::get(); // We need to update the spot traffic on block initialize in order to account for idle // blocks. - QueueStatus::::mutate(|queue_status| { - Self::update_spot_traffic(&config, queue_status); + OrderStatus::::mutate(|order_status| { + Self::update_spot_traffic(&config, order_status); }); // Reads: `Revenue`, `ActiveConfig`, `QueueStatus` @@ -234,7 +289,7 @@ pub mod pallet { /// Events: /// - `OnDemandOrderPlaced` #[pallet::call_index(0)] - #[pallet::weight(::WeightInfo::place_order_allow_death(QueueStatus::::get().size()))] + #[pallet::weight(::WeightInfo::place_order_allow_death())] pub fn place_order_allow_death( origin: OriginFor, max_amount: BalanceOf, @@ -260,7 +315,7 @@ pub mod pallet { /// Events: /// - `OnDemandOrderPlaced` #[pallet::call_index(1)] - #[pallet::weight(::WeightInfo::place_order_keep_alive(QueueStatus::::get().size()))] + #[pallet::weight(::WeightInfo::place_order_keep_alive())] pub fn place_order_keep_alive( origin: OriginFor, max_amount: BalanceOf, @@ -277,75 +332,36 @@ impl Pallet where BalanceOf: FixedPointOperand, { - /// Take the next queued entry that is available for a given core index. - /// - /// Parameters: - /// - `core_index`: The core index - pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option { - let entry: Result = QueueStatus::::try_mutate(|queue_status| { - AffinityEntries::::try_mutate(core_index, |affinity_entries| { - let free_entry = FreeEntries::::try_mutate(|free_entries| { - let affinity_next = affinity_entries.peek(); - let free_next = free_entries.peek(); - let pick_free = match (affinity_next, free_next) { - (None, _) => true, - (Some(_), None) => false, - (Some(a), Some(f)) => f < a, - }; - if pick_free { - let entry = free_entries.pop().ok_or(())?; - let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) = - take(free_entries) - .into_iter() - .partition(|e| e.para_id == entry.para_id); - affinity_entries.append(&mut affinities); - *free_entries = free; - Ok(entry) - } else { - Err(()) - } - }); - let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?; - queue_status.consume_index(entry.idx); - Ok(entry) - }) - }); - - let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?; - - Pallet::::increase_affinity(assignment.para_id(), core_index); - Some(assignment) - } - - /// Report that an assignment was duplicated by the scheduler. - pub fn assignment_duplicated(para_id: ParaId, core_index: CoreIndex) { - Pallet::::increase_affinity(para_id, core_index); + /// Pop assignments for the given number of on-demand cores in a block. + pub fn pop_assignment_for_cores( + now: BlockNumberFor, + num_cores: u32, + ) -> impl Iterator { + pallet::OrderStatus::::mutate(|order_status| { + order_status.queue.pop_assignment_for_cores::(now, num_cores) + }) } - /// Report that the `para_id` & `core_index` combination was processed. - /// - /// This should be called once it is clear that the assignment won't get pushed back anymore. + /// Look into upcoming orders. /// - /// In other words for each `pop_assignment_for_core` a call to this function or - /// `push_back_assignment` must follow, but only one. - pub fn report_processed(para_id: ParaId, core_index: CoreIndex) { - Pallet::::decrease_affinity_update_queue(para_id, core_index); + /// The returned `OrderQueue` allows for simulating upcoming + /// `pop_assignment_for_cores` calls. + pub fn peek_order_queue() -> OrderQueue> { + pallet::OrderStatus::::get().queue } - /// Push an assignment back to the front of the queue. - /// - /// The assignment has not been processed yet. Typically used on session boundaries. + /// Push an order back to the back of the queue. /// - /// NOTE: We are not checking queue size here. So due to push backs it is possible that we - /// exceed the maximum queue size slightly. + /// The order could not be served for some reason, give it another chance. /// /// Parameters: /// - `para_id`: The para that did not make it. - /// - `core_index`: The core the para was scheduled on. - pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) { - Pallet::::decrease_affinity_update_queue(para_id, core_index); - QueueStatus::::mutate(|queue_status| { - Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front); + pub fn push_back_order(para_id: ParaId) { + pallet::OrderStatus::::mutate(|order_status| { + let now = >::block_number(); + if let Err(e) = order_status.queue.try_push(now, para_id) { + log::debug!(target: LOG_TARGET, "Pushing back order failed (queue too long): {:?}", e); + }; }); } @@ -374,9 +390,9 @@ where ) -> DispatchResult { let config = configuration::ActiveConfig::::get(); - QueueStatus::::mutate(|queue_status| { - Self::update_spot_traffic(&config, queue_status); - let traffic = queue_status.traffic; + pallet::OrderStatus::::mutate(|order_status| { + Self::update_spot_traffic(&config, order_status); + let traffic = order_status.traffic; // Calculate spot price let spot_price: BalanceOf = traffic.saturating_mul_int( @@ -387,7 +403,8 @@ where ensure!(spot_price.le(&max_amount), Error::::SpotPriceHigherThanMaxAmount); ensure!( - queue_status.size() < config.scheduler_params.on_demand_queue_max_size, + order_status.queue.len() < + config.scheduler_params.on_demand_queue_max_size as usize, Error::::QueueFull ); @@ -420,7 +437,11 @@ where } }); - Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + let now = >::block_number(); + if let Err(p) = order_status.queue.try_push(now, para_id) { + log::error!(target: LOG_TARGET, "Placing order failed (queue too long): {:?}, but size has been checked above!", p); + }; + Pallet::::deposit_event(Event::::OnDemandOrderPlaced { para_id, spot_price, @@ -434,20 +455,20 @@ where /// Calculate and update spot traffic. fn update_spot_traffic( config: &configuration::HostConfiguration>, - queue_status: &mut QueueStatusType, + order_status: &mut OrderStatus>, ) { - let old_traffic = queue_status.traffic; + let old_traffic = order_status.traffic; match Self::calculate_spot_traffic( old_traffic, config.scheduler_params.on_demand_queue_max_size, - queue_status.size(), + order_status.queue.len() as u32, config.scheduler_params.on_demand_target_queue_utilization, config.scheduler_params.on_demand_fee_variability, ) { Ok(new_traffic) => { // Only update storage on change if new_traffic != old_traffic { - queue_status.traffic = new_traffic; + order_status.traffic = new_traffic; // calculate the new spot price let spot_price: BalanceOf = new_traffic.saturating_mul_int( @@ -538,104 +559,6 @@ where } } - /// Adds an order to the on demand queue. - /// - /// Parameters: - /// - `location`: Whether to push this entry to the back or the front of the queue. Pushing an - /// entry to the front of the queue is only used when the scheduler wants to push back an - /// entry it has already popped. - fn add_on_demand_order( - queue_status: &mut QueueStatusType, - para_id: ParaId, - location: QueuePushDirection, - ) { - let idx = match location { - QueuePushDirection::Back => queue_status.push_back(), - QueuePushDirection::Front => queue_status.push_front(), - }; - - let affinity = ParaIdAffinity::::get(para_id); - let order = EnqueuedOrder::new(idx, para_id); - #[cfg(test)] - log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location); - - match affinity { - None => FreeEntries::::mutate(|entries| entries.push(order)), - Some(affinity) => - AffinityEntries::::mutate(affinity.core_index, |entries| entries.push(order)), - } - } - - /// Decrease core affinity for para and update queue - /// - /// if affinity dropped to 0, moving entries back to `FreeEntries`. - fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) { - let affinity = Pallet::::decrease_affinity(para_id, core_index); - #[cfg(not(test))] - debug_assert_ne!( - affinity, None, - "Decreased affinity for a para that has not been served on a core?" - ); - if affinity != Some(0) { - return; - } - // No affinity more for entries on this core, free any entries: - // - // This is necessary to ensure them being served as the core might no longer exist at all. - AffinityEntries::::mutate(core_index, |affinity_entries| { - FreeEntries::::mutate(|free_entries| { - let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) = - take(affinity_entries).into_iter().partition(|e| e.para_id == para_id); - free_entries.append(&mut freed); - *affinity_entries = affinities; - }) - }); - } - - /// Decreases the affinity of a `ParaId` to a specified `CoreIndex`. - /// - /// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_index - /// matches. When the count reaches 0, the entry is removed. - /// A non-existent entry is a no-op. - /// - /// Returns: The new affinity of the para on that core. `None` if there is no affinity on this - /// core. - fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option { - ParaIdAffinity::::mutate(para_id, |maybe_affinity| { - let affinity = maybe_affinity.as_mut()?; - if affinity.core_index == core_index { - let new_count = affinity.count.saturating_sub(1); - if new_count > 0 { - *maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count }); - } else { - *maybe_affinity = None; - } - return Some(new_count); - } else { - None - } - }) - } - - /// Increases the affinity of a `ParaId` to a specified `CoreIndex`. - /// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_index - /// matches. A non-existent entry will be initialized with a count of 1 and uses the supplied - /// `CoreIndex`. - fn increase_affinity(para_id: ParaId, core_index: CoreIndex) { - ParaIdAffinity::::mutate(para_id, |maybe_affinity| match maybe_affinity { - Some(affinity) => - if affinity.core_index == core_index { - *maybe_affinity = Some(CoreAffinityCount { - core_index, - count: affinity.count.saturating_add(1), - }); - }, - None => { - *maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 }); - }, - }) - } - /// Collect the revenue from the `when` blockheight pub fn claim_revenue_until(when: BlockNumberFor) -> BalanceOf { let now = >::block_number(); @@ -659,41 +582,29 @@ where T::PalletId::get().into_account_truncating() } - /// Getter for the affinity tracker. - #[cfg(test)] - fn get_affinity_map(para_id: ParaId) -> Option { - ParaIdAffinity::::get(para_id) - } - - /// Getter for the affinity entries. - #[cfg(test)] - fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap { - AffinityEntries::::get(core_index) - } - - /// Getter for the free entries. - #[cfg(test)] - fn get_free_entries() -> BinaryHeap { - FreeEntries::::get() - } - #[cfg(feature = "runtime-benchmarks")] pub fn populate_queue(para_id: ParaId, num: u32) { - QueueStatus::::mutate(|queue_status| { + let now = >::block_number(); + pallet::OrderStatus::::mutate(|order_status| { for _ in 0..num { - Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + order_status.queue.try_push(now, para_id).unwrap(); } }); } + #[cfg(feature = "runtime-benchmarks")] + pub(crate) fn set_revenue(rev: BoundedVec, T::MaxHistoricalRevenue>) { + Revenue::::put(rev); + } + #[cfg(test)] - fn set_queue_status(new_status: QueueStatusType) { - QueueStatus::::set(new_status); + fn set_order_status(new_status: OrderStatus>) { + pallet::OrderStatus::::set(new_status); } #[cfg(test)] - fn get_queue_status() -> QueueStatusType { - QueueStatus::::get() + fn get_queue_status() -> OrderStatus> { + pallet::OrderStatus::::get() } #[cfg(test)] diff --git a/polkadot/runtime/parachains/src/on_demand/tests.rs b/polkadot/runtime/parachains/src/on_demand/tests.rs index 7da16942c7ad..51a0dcaa6983 100644 --- a/polkadot/runtime/parachains/src/on_demand/tests.rs +++ b/polkadot/runtime/parachains/src/on_demand/tests.rs @@ -25,16 +25,14 @@ use crate::{ on_demand::{ self, mock_helpers::GenesisConfigBuilder, - types::{QueueIndex, ReverseQueueIndex}, Error, }, paras::{ParaGenesisArgs, ParaKind}, }; -use core::cmp::{Ord, Ordering}; use frame_support::{assert_noop, assert_ok}; use pallet_balances::Error as BalancesError; use polkadot_primitives::{ - BlockNumber, SessionIndex, ValidationCode, ON_DEMAND_MAX_QUEUE_MAX_SIZE, + BlockNumber, SessionIndex, ValidationCode, }; use sp_runtime::traits::BadOrigin; @@ -85,7 +83,7 @@ fn run_to_block( OnDemand::on_initialize(b + 1); // In the real runtime this is expected to be called by the `InclusionInherent` pallet. - Scheduler::advance_claim_queue(&Default::default()); + Scheduler::advance_claim_queue(|_| false); } } @@ -248,7 +246,7 @@ fn spot_traffic_decreases_between_idle_blocks() { assert!(Paras::is_parathread(para_id)); // Set the spot traffic to a large number - OnDemand::set_queue_status(QueueStatusType { + OnDemand::set_order_status(OrderStatus { traffic: FixedU128::from_u32(10), ..Default::default() }); @@ -331,17 +329,18 @@ fn place_order_keep_alive_keeps_alive() { } #[test] -fn pop_assignment_for_core_works() { +fn pop_assignment_for_cores_works() { new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { let para_a = ParaId::from(111); let para_b = ParaId::from(110); schedule_blank_para(para_a, ParaKind::Parathread); schedule_blank_para(para_b, ParaKind::Parathread); - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); + let block_num = 11; + run_to_block(block_num, |n| if n == 11 { Some(Default::default()) } else { None }); // Pop should return none with empty queue - assert_eq!(OnDemand::pop_assignment_for_core(CoreIndex(0)), None); + assert_eq!(OnDemand::pop_assignment_for_cores(block_num, 1).next(), None); // Add enough assignments to the order queue. for _ in 0..2 { @@ -350,55 +349,17 @@ fn pop_assignment_for_core_works() { } // Popped assignments should be for the correct paras and cores - assert_eq!( - OnDemand::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), - Some(para_a) - ); - assert_eq!( - OnDemand::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), - Some(para_b) - ); - assert_eq!( - OnDemand::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), - Some(para_a) - ); - assert_eq!( - OnDemand::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), - Some(para_b) - ); - }); -} - -#[test] -fn push_back_assignment_works() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let para_b = ParaId::from(110); - schedule_blank_para(para_a, ParaKind::Parathread); - schedule_blank_para(para_b, ParaKind::Parathread); + let mut assignments = OnDemand::pop_assignment_for_cores(block_num, 2); + assert_eq!(assignments.next(), Some(para_a)); + assert_eq!(assignments.next(), Some(para_b)); - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); + let mut assignments = OnDemand::pop_assignment_for_cores(block_num, 2); + // Should be empty for same block again: + assert_eq!(assignments.next(), None); - // Add enough assignments to the order queue. - place_order_run_to_101(para_a); - place_order_run_to_101(para_b); - - // Pop order a - assert_eq!(OnDemand::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), para_a); - - // Para a should have affinity for core 0 - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 1); - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); - - // Push back order a - OnDemand::push_back_assignment(para_a, CoreIndex(0)); - - // Para a should have no affinity - assert_eq!(OnDemand::get_affinity_map(para_a).is_none(), true); - - // Queue should contain orders a, b. A in front of b. - assert_eq!(OnDemand::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), para_a); - assert_eq!(OnDemand::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), para_b); + let mut assignments = OnDemand::pop_assignment_for_cores(block_num + 1, 2); + assert_eq!(assignments.next(), Some(para_a)); + assert_eq!(assignments.next(), Some(para_b)); }); } @@ -411,35 +372,18 @@ fn affinity_prohibits_parallel_scheduling() { schedule_blank_para(para_a, ParaKind::Parathread); schedule_blank_para(para_b, ParaKind::Parathread); - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - // There should be no affinity before starting. - assert!(OnDemand::get_affinity_map(para_a).is_none()); - assert!(OnDemand::get_affinity_map(para_b).is_none()); + let block_num = 11; + run_to_block(block_num, |n| if n == 11 { Some(Default::default()) } else { None }); // Add 2 assignments for para_a for every para_b. place_order_run_to_101(para_a); place_order_run_to_101(para_a); place_order_run_to_101(para_b); - // Approximate having 1 core. - for _ in 0..3 { - assert!(OnDemand::pop_assignment_for_core(CoreIndex(0)).is_some()); + // Behaviour with just one core: + for (assignment, expected) in (0..4).map(|_| OnDemand::pop_assignment_for_cores(block_num, 1).next()).zip([Some(para_a), Some(para_a), Some(para_b), None].into_iter()) { + assert_eq!(assignment, expected); } - assert!(OnDemand::pop_assignment_for_core(CoreIndex(0)).is_none()); - - // Affinity on one core is meaningless. - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemand::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!( - OnDemand::get_affinity_map(para_a).unwrap().core_index, - OnDemand::get_affinity_map(para_b).unwrap().core_index, - ); - - // Clear affinity - OnDemand::report_processed(para_a, 0.into()); - OnDemand::report_processed(para_a, 0.into()); - OnDemand::report_processed(para_b, 0.into()); // Add 2 assignments for para_a for every para_b. place_order_run_to_101(para_a); @@ -447,255 +391,16 @@ fn affinity_prohibits_parallel_scheduling() { place_order_run_to_101(para_b); // Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment - for _ in 0..3 { - OnDemand::pop_assignment_for_core(CoreIndex(0)); - OnDemand::pop_assignment_for_core(CoreIndex(1)); - assert!(OnDemand::pop_assignment_for_core(CoreIndex(2)).is_none()); - } - - // Affinity should be the same as before, but on different cores. - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemand::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); - assert_eq!(OnDemand::get_affinity_map(para_b).unwrap().core_index, CoreIndex(1)); - - // Clear affinity - OnDemand::report_processed(para_a, CoreIndex(0)); - OnDemand::report_processed(para_a, CoreIndex(0)); - OnDemand::report_processed(para_b, CoreIndex(1)); - - // There should be no affinity after clearing. - assert!(OnDemand::get_affinity_map(para_a).is_none()); - assert!(OnDemand::get_affinity_map(para_b).is_none()); - }); -} - -#[test] -fn affinity_changes_work() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let core_index = CoreIndex(0); - schedule_blank_para(para_a, ParaKind::Parathread); - - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - // There should be no affinity before starting. - assert!(OnDemand::get_affinity_map(para_a).is_none()); - - // Add enough assignments to the order queue. - for _ in 0..10 { - place_order_run_to_101(para_a); - } - - // There should be no affinity before the scheduler pops. - assert!(OnDemand::get_affinity_map(para_a).is_none()); - - OnDemand::pop_assignment_for_core(core_index); - - // Affinity count is 1 after popping. - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 1); - - OnDemand::report_processed(para_a, 0.into()); - OnDemand::pop_assignment_for_core(core_index); - - // Affinity count is 1 after popping with a previous para. - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 1); - - for _ in 0..3 { - OnDemand::pop_assignment_for_core(core_index); - } - - // Affinity count is 4 after popping 3 times without a previous para. - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 4); - - for _ in 0..5 { - OnDemand::report_processed(para_a, 0.into()); - assert!(OnDemand::pop_assignment_for_core(core_index).is_some()); - } - - // Affinity count should still be 4 but queue should be empty. - assert!(OnDemand::pop_assignment_for_core(core_index).is_none()); - assert_eq!(OnDemand::get_affinity_map(para_a).unwrap().count, 4); - - // Pop 4 times and get to exactly 0 (None) affinity. - for _ in 0..4 { - OnDemand::report_processed(para_a, 0.into()); - assert!(OnDemand::pop_assignment_for_core(core_index).is_none()); - } - assert!(OnDemand::get_affinity_map(para_a).is_none()); - - // Decreasing affinity beyond 0 should still be None. - OnDemand::report_processed(para_a, 0.into()); - assert!(OnDemand::pop_assignment_for_core(core_index).is_none()); - assert!(OnDemand::get_affinity_map(para_a).is_none()); - }); -} - -#[test] -fn new_affinity_for_a_core_must_come_from_free_entries() { - // If affinity count for a core was zero before, and is 1 now, then the entry - // must have come from free_entries. - let parachains = - vec![ParaId::from(111), ParaId::from(222), ParaId::from(333), ParaId::from(444)]; - let core_indices = vec![CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]; - - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - parachains.iter().for_each(|chain| { - schedule_blank_para(*chain, ParaKind::Parathread); - }); - - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - // Place orders for all chains. - parachains.iter().for_each(|chain| { - place_order_run_to_101(*chain); - }); - - // There are 4 entries in free_entries. - let start_free_entries = OnDemand::get_free_entries().len(); - assert_eq!(start_free_entries, 4); - - // Pop assignments on all cores. - core_indices.iter().enumerate().for_each(|(n, core_index)| { - // There is no affinity on the core prior to popping. - assert!(OnDemand::get_affinity_entries(*core_index).is_empty()); - - // There's always an order to be popped for each core. - let free_entries = OnDemand::get_free_entries(); - let next_order = free_entries.peek(); - - // There is no affinity on the paraid prior to popping. - assert!(OnDemand::get_affinity_map(next_order.unwrap().para_id).is_none()); - - match OnDemand::pop_assignment_for_core(*core_index) { - Some(assignment) => { - // The popped assignment came from free entries. - assert_eq!(start_free_entries - 1 - n, OnDemand::get_free_entries().len()); - // The popped assignment has the same para id as the next order. - assert_eq!(assignment.para_id(), next_order.unwrap().para_id); - }, - None => panic!("Should not happen"), - } - }); - - // All entries have been removed from free_entries. - assert!(OnDemand::get_free_entries().is_empty()); - - // All chains have an affinity count of 1. - parachains.iter().for_each(|chain| { - assert_eq!(OnDemand::get_affinity_map(*chain).unwrap().count, 1); - }); - }); -} - -#[test] -#[should_panic] -fn queue_index_ordering_is_unsound_over_max_size() { - // NOTE: Unsoundness proof. If the number goes sufficiently over the max_queue_max_size - // the overflow will cause an opposite comparison to what would be expected. - let max_num = u32::MAX - ON_DEMAND_MAX_QUEUE_MAX_SIZE; - // 0 < some large number. - assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); -} - -#[test] -fn queue_index_ordering_works() { - // The largest accepted queue size. - let max_num = ON_DEMAND_MAX_QUEUE_MAX_SIZE; - - // 0 == 0 - assert_eq!(QueueIndex(0).cmp(&QueueIndex(0)), Ordering::Equal); - // 0 < 1 - assert_eq!(QueueIndex(0).cmp(&QueueIndex(1)), Ordering::Less); - // 1 > 0 - assert_eq!(QueueIndex(1).cmp(&QueueIndex(0)), Ordering::Greater); - // 0 < max_num - assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num)), Ordering::Less); - // 0 > max_num + 1 - assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); - - // Ordering within the bounds of ON_DEMAND_MAX_QUEUE_MAX_SIZE works. - let mut v = vec![3, 6, 2, 1, 5, 4]; - v.sort_by_key(|&num| QueueIndex(num)); - assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); - - v = vec![max_num, 4, 5, 1, 6]; - v.sort_by_key(|&num| QueueIndex(num)); - assert_eq!(v, vec![1, 4, 5, 6, max_num]); - - // Ordering with an element outside of the bounds of the max size also works. - v = vec![max_num + 2, 0, 6, 2, 1, 5, 4]; - v.sort_by_key(|&num| QueueIndex(num)); - assert_eq!(v, vec![0, 1, 2, 4, 5, 6, max_num + 2]); - - // Numbers way above the max size will overflow - v = vec![u32::MAX - 1, u32::MAX, 6, 2, 1, 5, 4]; - v.sort_by_key(|&num| QueueIndex(num)); - assert_eq!(v, vec![u32::MAX - 1, u32::MAX, 1, 2, 4, 5, 6]); -} - -#[test] -fn reverse_queue_index_does_reverse() { - let mut v = vec![1, 2, 3, 4, 5, 6]; - - // Basic reversal of a vector. - v.sort_by_key(|&num| ReverseQueueIndex(num)); - assert_eq!(v, vec![6, 5, 4, 3, 2, 1]); - - // Example from rust docs on `Reverse`. Should work identically. - v.sort_by_key(|&num| (num > 3, ReverseQueueIndex(num))); - assert_eq!(v, vec![3, 2, 1, 6, 5, 4]); - - let mut v2 = vec![1, 2, u32::MAX]; - v2.sort_by_key(|&num| ReverseQueueIndex(num)); - assert_eq!(v2, vec![2, 1, u32::MAX]); -} - -#[test] -fn queue_status_size_fn_works() { - // Add orders to the on demand queue, and make sure that they are properly represented - // by the QueueStatusType::size fn. - let parachains = vec![ParaId::from(111), ParaId::from(222), ParaId::from(333)]; - let core_indices = vec![CoreIndex(0), CoreIndex(1)]; - - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - parachains.iter().for_each(|chain| { - schedule_blank_para(*chain, ParaKind::Parathread); - }); - - assert_eq!(OnDemand::get_queue_status().size(), 0); - - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - // Place orders for all chains. - parachains.iter().for_each(|chain| { - // 2 per chain for a total of 6 - place_order_run_to_101(*chain); - place_order_run_to_101(*chain); - }); - - // 6 orders in free entries - assert_eq!(OnDemand::get_free_entries().len(), 6); - // 6 orders via queue status size - assert_eq!( - OnDemand::get_free_entries().len(), - OnDemand::get_queue_status().size() as usize - ); - - core_indices.iter().for_each(|core_index| { - OnDemand::pop_assignment_for_core(*core_index); - }); - - // There should be 2 orders in the scheduler's claimqueue, - // 2 in assorted AffinityMaps and 2 in free. - // ParaId 111 - assert_eq!(OnDemand::get_affinity_entries(core_indices[0]).len(), 1); - // ParaId 222 - assert_eq!(OnDemand::get_affinity_entries(core_indices[1]).len(), 1); - // Free entries are from ParaId 333 - assert_eq!(OnDemand::get_free_entries().len(), 2); - // For a total size of 4. - assert_eq!(OnDemand::get_queue_status().size(), 4) + let mut assignments = OnDemand::pop_assignment_for_cores(block_num, 3); + assert_eq!(assignments.next(), Some(para_a)); + // No duplicates: + assert_eq!(assignments.next(), Some(para_b)); + assert_eq!(assignments.next(), None); + + // Should come with next block: + let mut assignments = OnDemand::pop_assignment_for_cores(block_num + 1, 3); + assert_eq!(assignments.next(), Some(para_a)); + assert_eq!(assignments.next(), None); }); } diff --git a/polkadot/runtime/parachains/src/on_demand/types.rs b/polkadot/runtime/parachains/src/on_demand/types.rs deleted file mode 100644 index c87e7abaf860..000000000000 --- a/polkadot/runtime/parachains/src/on_demand/types.rs +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! On demand module types. - -use super::{alloc, pallet::Config}; -use alloc::collections::BinaryHeap; -use core::cmp::{Ord, Ordering, PartialOrd}; -use frame_support::{ - pallet_prelude::{Decode, Encode, RuntimeDebug, TypeInfo}, - traits::Currency, -}; -use polkadot_primitives::{CoreIndex, Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE}; -use sp_runtime::FixedU128; - -/// Shorthand for the Balance type the runtime is using. -pub type BalanceOf = - <::Currency as Currency<::AccountId>>::Balance; - -/// Meta data for full queue. -/// -/// This includes elements with affinity and free entries. -/// -/// The actual queue is implemented via multiple priority queues. One for each core, for entries -/// which currently have a core affinity and one free queue, with entries without any affinity yet. -/// -/// The design aims to have most queue accessess be O(1) or O(log(N)). Absolute worst case is O(N). -/// Importantly this includes all accessess that happen in a single block. Even with 50 cores, the -/// total complexity of all operations in the block should maintain above complexities. In -/// particular O(N) stays O(N), it should never be O(N*cores). -/// -/// More concrete rundown on complexity: -/// -/// - insert: O(1) for placing an order, O(log(N)) for push backs. -/// - pop_assignment_for_core: O(log(N)), O(N) worst case: Can only happen for one core, next core -/// is already less work. -/// - report_processed & push back: If affinity dropped to 0, then O(N) in the worst case. Again -/// this divides per core. -/// -/// Reads still exist, also improved slightly, but worst case we fetch all entries. -#[derive(Encode, Decode, TypeInfo)] -pub struct QueueStatusType { - /// Last calculated traffic value. - pub traffic: FixedU128, - /// The next index to use. - pub next_index: QueueIndex, - /// Smallest index still in use. - /// - /// In case of a completely empty queue (free + affinity queues), `next_index - smallest_index - /// == 0`. - pub smallest_index: QueueIndex, - /// Indices that have been freed already. - /// - /// But have a hole to `smallest_index`, so we can not yet bump `smallest_index`. This binary - /// heap is roughly bounded in the number of on demand cores: - /// - /// For a single core, elements will always be processed in order. With each core added, a - /// level of out of order execution is added. - pub freed_indices: BinaryHeap, -} - -impl Default for QueueStatusType { - fn default() -> QueueStatusType { - QueueStatusType { - traffic: FixedU128::default(), - next_index: QueueIndex(0), - smallest_index: QueueIndex(0), - freed_indices: BinaryHeap::new(), - } - } -} - -impl QueueStatusType { - /// How many orders are queued in total? - /// - /// This includes entries which have core affinity. - pub fn size(&self) -> u32 { - self.next_index - .0 - .overflowing_sub(self.smallest_index.0) - .0 - .saturating_sub(self.freed_indices.len() as u32) - } - - /// Get current next index - /// - /// to use for an element newly pushed to the back of the queue. - pub fn push_back(&mut self) -> QueueIndex { - let QueueIndex(next_index) = self.next_index; - self.next_index = QueueIndex(next_index.overflowing_add(1).0); - QueueIndex(next_index) - } - - /// Push something to the front of the queue - pub fn push_front(&mut self) -> QueueIndex { - self.smallest_index = QueueIndex(self.smallest_index.0.overflowing_sub(1).0); - self.smallest_index - } - - /// The given index is no longer part of the queue. - /// - /// This updates `smallest_index` if need be. - pub fn consume_index(&mut self, removed_index: QueueIndex) { - if removed_index != self.smallest_index { - self.freed_indices.push(removed_index.reverse()); - return - } - let mut index = self.smallest_index.0.overflowing_add(1).0; - // Even more to advance? - while self.freed_indices.peek() == Some(&ReverseQueueIndex(index)) { - index = index.overflowing_add(1).0; - self.freed_indices.pop(); - } - self.smallest_index = QueueIndex(index); - } -} - -/// Type used for priority indices. -// NOTE: The `Ord` implementation for this type is unsound in the general case. -// Do not use it for anything but it's intended purpose. -#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] -pub struct QueueIndex(pub u32); - -/// QueueIndex with reverse ordering. -/// -/// Same as `Reverse(QueueIndex)`, but with all the needed traits implemented. -#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] -pub struct ReverseQueueIndex(pub u32); - -impl QueueIndex { - fn reverse(self) -> ReverseQueueIndex { - ReverseQueueIndex(self.0) - } -} - -impl Ord for QueueIndex { - fn cmp(&self, other: &Self) -> Ordering { - let diff = self.0.overflowing_sub(other.0).0; - if diff == 0 { - Ordering::Equal - } else if diff <= ON_DEMAND_MAX_QUEUE_MAX_SIZE { - Ordering::Greater - } else { - Ordering::Less - } - } -} - -impl PartialOrd for QueueIndex { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ReverseQueueIndex { - fn cmp(&self, other: &Self) -> Ordering { - QueueIndex(other.0).cmp(&QueueIndex(self.0)) - } -} -impl PartialOrd for ReverseQueueIndex { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(&other)) - } -} - -/// Internal representation of an order after it has been enqueued already. -/// -/// This data structure is provided for a min BinaryHeap (Ord compares in reverse order with regards -/// to its elements) -#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)] -pub struct EnqueuedOrder { - pub para_id: ParaId, - pub idx: QueueIndex, -} - -impl EnqueuedOrder { - pub fn new(idx: QueueIndex, para_id: ParaId) -> Self { - Self { idx, para_id } - } -} - -impl PartialOrd for EnqueuedOrder { - fn partial_cmp(&self, other: &Self) -> Option { - match other.idx.partial_cmp(&self.idx) { - Some(Ordering::Equal) => other.para_id.partial_cmp(&self.para_id), - o => o, - } - } -} - -impl Ord for EnqueuedOrder { - fn cmp(&self, other: &Self) -> Ordering { - match other.idx.cmp(&self.idx) { - Ordering::Equal => other.para_id.cmp(&self.para_id), - o => o, - } - } -} - -/// Keeps track of how many assignments a scheduler currently has at a specific `CoreIndex` for a -/// specific `ParaId`. -#[derive(Encode, Decode, Default, Clone, Copy, TypeInfo)] -#[cfg_attr(test, derive(PartialEq, RuntimeDebug))] -pub struct CoreAffinityCount { - pub core_index: CoreIndex, - pub count: u32, -} - -/// An indicator as to which end of the `OnDemandQueue` an assignment will be placed. -#[cfg_attr(test, derive(RuntimeDebug))] -pub enum QueuePushDirection { - Back, - Front, -} - -/// Errors that can happen during spot traffic calculation. -#[derive(PartialEq, RuntimeDebug)] -pub enum SpotTrafficCalculationErr { - /// The order queue capacity is at 0. - QueueCapacityIsZero, - /// The queue size is larger than the queue capacity. - QueueSizeLargerThanCapacity, - /// Arithmetic error during division, either division by 0 or over/underflow. - Division, -} diff --git a/polkadot/runtime/parachains/src/paras_inherent/mod.rs b/polkadot/runtime/parachains/src/paras_inherent/mod.rs index 4c1394fd1347..5a0d21841f2a 100644 --- a/polkadot/runtime/parachains/src/paras_inherent/mod.rs +++ b/polkadot/runtime/parachains/src/paras_inherent/mod.rs @@ -63,7 +63,7 @@ use polkadot_primitives::{ }; use rand::{seq::SliceRandom, SeedableRng}; use scale_info::TypeInfo; -use sp_runtime::traits::{Header as HeaderT, One}; +use sp_runtime::traits::{Header as HeaderT, One, Saturating}; mod misc; mod weights; @@ -315,19 +315,14 @@ impl Pallet { // Before anything else, update the allowed relay-parents. { - let parent_number = now - One::one(); + let parent_number = now.saturating_sub(One::one()); let parent_storage_root = *parent_header.state_root(); shared::AllowedRelayParents::::mutate(|tracker| { tracker.update( parent_hash, parent_storage_root, - scheduler::ClaimQueue::::get() - .into_iter() - .map(|(core_index, paras)| { - (core_index, paras.into_iter().map(|e| e.para_id()).collect()) - }) - .collect(), + scheduler::Pallet::::claim_queue(), parent_number, config.async_backing_params.allowed_ancestry_len, ); @@ -598,87 +593,54 @@ impl Pallet { METRICS.on_candidates_processed_total(backed_candidates.len() as u64); - if !upcoming_new_session { - let occupied_cores = - inclusion::Pallet::::get_occupied_cores().map(|(core, _)| core).collect(); + let occupied_cores: BTreeSet<_> = + inclusion::Pallet::::get_occupied_cores().map(|(core, _)| core).collect(); - let mut eligible: BTreeMap> = BTreeMap::new(); - let mut total_eligible_cores = 0; + let mut eligible: BTreeMap> = BTreeMap::new(); - for (core_idx, para_id) in Self::eligible_paras(&occupied_cores) { - total_eligible_cores += 1; - log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx); - eligible.entry(para_id).or_default().insert(core_idx); - } - - let node_features = configuration::ActiveConfig::::get().node_features; - let core_index_enabled = node_features - .get(FeatureIndex::ElasticScalingMVP as usize) - .map(|b| *b) - .unwrap_or(false); - - let allow_v2_receipts = node_features - .get(FeatureIndex::CandidateReceiptV2 as usize) - .map(|b| *b) - .unwrap_or(false); + let is_blocked = |core_idx| occupied_cores.contains(&core_idx) || upcoming_new_session; + let scheduled = scheduler::Pallet::::advance_claim_queue(is_blocked); + let total_eligible_cores = scheduled.len(); - let backed_candidates_with_core = sanitize_backed_candidates::( - backed_candidates, - &allowed_relay_parents, - concluded_invalid_hashes, - eligible, - core_index_enabled, - allow_v2_receipts, - ); - let count = count_backed_candidates(&backed_candidates_with_core); + for (core_idx, para_id) in scheduled { + eligible.entry(para_id).or_default().insert(core_idx); + } - ensure!(count <= total_eligible_cores, Error::::UnscheduledCandidate); + let node_features = configuration::ActiveConfig::::get().node_features; + let core_index_enabled = node_features + .get(FeatureIndex::ElasticScalingMVP as usize) + .map(|b| *b) + .unwrap_or(false); - METRICS.on_candidates_sanitized(count as u64); + let allow_v2_receipts = node_features + .get(FeatureIndex::CandidateReceiptV2 as usize) + .map(|b| *b) + .unwrap_or(false); - // Process backed candidates according to scheduled cores. - let candidate_receipt_with_backing_validator_indices = - inclusion::Pallet::::process_candidates( - &allowed_relay_parents, - &backed_candidates_with_core, - scheduler::Pallet::::group_validators, - core_index_enabled, - )?; + let backed_candidates_with_core = sanitize_backed_candidates::( + backed_candidates, + &allowed_relay_parents, + concluded_invalid_hashes, + eligible, + core_index_enabled, + allow_v2_receipts, + ); + let count = count_backed_candidates(&backed_candidates_with_core); - // We need to advance the claim queue on all cores, except for the ones that did not - // get freed in this block. The ones that did not get freed also cannot be newly - // occupied. - scheduler::Pallet::::advance_claim_queue(&occupied_cores); + ensure!(count <= total_eligible_cores, Error::::UnscheduledCandidate); - Ok((candidate_receipt_with_backing_validator_indices, backed_candidates_with_core)) - } else { - log::debug!( - target: LOG_TARGET, - "Upcoming session change, not backing any new candidates." - ); - // If we'll initialize a new session at the end of the block, we don't want to - // advance the claim queue. + METRICS.on_candidates_sanitized(count as u64); - Ok((vec![], BTreeMap::new())) - } - } + // Process backed candidates according to scheduled cores. + let candidate_receipt_with_backing_validator_indices = + inclusion::Pallet::::process_candidates( + &allowed_relay_parents, + &backed_candidates_with_core, + scheduler::Pallet::::group_validators, + core_index_enabled, + )?; - /// Paras that may get backed on cores. - /// - /// 1. The para must be scheduled on core. - /// 2. Core needs to be free, otherwise backing is not possible. - /// - /// We get a set of the occupied cores as input. - pub(crate) fn eligible_paras<'a>( - occupied_cores: &'a BTreeSet, - ) -> impl Iterator + 'a { - scheduler::ClaimQueue::::get().into_iter().filter_map(|(core_idx, queue)| { - if occupied_cores.contains(&core_idx) { - return None - } - let next_scheduled = queue.front()?; - Some((core_idx, next_scheduled.para_id())) - }) + Ok((candidate_receipt_with_backing_validator_indices, backed_candidates_with_core)) } } diff --git a/polkadot/runtime/parachains/src/paras_inherent/tests.rs b/polkadot/runtime/parachains/src/paras_inherent/tests.rs index 146be0ee0aad..3fd526864bd6 100644 --- a/polkadot/runtime/parachains/src/paras_inherent/tests.rs +++ b/polkadot/runtime/parachains/src/paras_inherent/tests.rs @@ -2454,12 +2454,11 @@ mod sanitizers { mod candidates { use crate::{ - mock::{set_disabled_validators, RuntimeOrigin}, - scheduler::common::Assignment, - util::{make_persisted_validation_data, make_persisted_validation_data_with_parent}, + assigner_coretime::{self, PartsOf57600}, mock::{set_disabled_validators, RuntimeOrigin}, on_demand, util::{make_persisted_validation_data, make_persisted_validation_data_with_parent} }; use alloc::collections::vec_deque::VecDeque; - use polkadot_primitives::ValidationCode; + use pallet_broker::CoreAssignment; +use polkadot_primitives::ValidationCode; use super::*; @@ -2533,22 +2532,10 @@ mod sanitizers { ]); // Update scheduler's claimqueue with the parachains - scheduler::Pallet::::set_claim_queue(BTreeMap::from([ - ( - CoreIndex::from(0), - VecDeque::from([Assignment::Pool { - para_id: 1.into(), - core_index: CoreIndex(0), - }]), - ), - ( - CoreIndex::from(1), - VecDeque::from([Assignment::Pool { - para_id: 2.into(), - core_index: CoreIndex(1), - }]), - ), - ])); + assigner_coretime::Pallet::::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + on_demand::Pallet::::push_back_order(1.into()); + assigner_coretime::Pallet::::assign_core(CoreIndex(1), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + on_demand::Pallet::::push_back_order(2.into()); // Set the on-chain included head data for paras. paras::Pallet::::set_current_head(ParaId::from(1), HeadData(vec![1])); @@ -2627,10 +2614,6 @@ mod sanitizers { .collect::>(); // State sanity checks - assert_eq!( - Pallet::::eligible_paras(&Default::default()).collect::>(), - vec![(CoreIndex(0), ParaId::from(1)), (CoreIndex(1), ParaId::from(2))] - ); assert_eq!( shared::ActiveValidatorIndices::::get(), vec![ @@ -2799,7 +2782,7 @@ mod sanitizers { shared::Pallet::::add_allowed_relay_parent( relay_parent, Default::default(), - scheduler::ClaimQueue::::get() + scheduler::Pallet::::claim_queue() .into_iter() .map(|(core_index, paras)| { (core_index, paras.into_iter().map(|e| e.para_id()).collect()) @@ -3335,7 +3318,7 @@ mod sanitizers { shared::Pallet::::add_allowed_relay_parent( relay_parent, Default::default(), - scheduler::ClaimQueue::::get() + scheduler::Pallet::::claim_queue() .into_iter() .map(|(core_index, paras)| { (core_index, paras.into_iter().map(|e| e.para_id()).collect()) diff --git a/polkadot/runtime/parachains/src/runtime_api_impl/v11.rs b/polkadot/runtime/parachains/src/runtime_api_impl/v11.rs index e9327bc7641a..7e66d0282b07 100644 --- a/polkadot/runtime/parachains/src/runtime_api_impl/v11.rs +++ b/polkadot/runtime/parachains/src/runtime_api_impl/v11.rs @@ -41,8 +41,8 @@ use polkadot_primitives::{ ApprovalVotingParams, AuthorityDiscoveryId, CandidateHash, CoreIndex, DisputeState, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, - PvfCheckStatement, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, - ValidatorIndex, ValidatorSignature, + PvfCheckStatement, ScheduledCore, SessionIndex, SessionInfo, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }; use sp_runtime::traits::One; @@ -85,7 +85,7 @@ pub fn availability_cores() -> Vec::get_claim_queue(); + let claim_queue = scheduler::Pallet::::claim_queue(); let occupied_cores: BTreeMap> = inclusion::Pallet::::get_occupied_cores().collect(); let n_cores = scheduler::Pallet::::num_availability_cores(); @@ -99,10 +99,16 @@ pub fn availability_cores() -> Vec::next_up_on_available(core_idx), + next_up_on_available: claim_queue + .get(&core_idx) + .and_then(|q| q.front()) + .map(|¶_id| ScheduledCore { para_id, collator: None }), occupied_since: pending_availability.backed_in_number(), time_out_at: time_out_for(pending_availability.backed_in_number()).live_until, - next_up_on_time_out: scheduler::Pallet::::next_up_on_available(core_idx), + next_up_on_time_out: claim_queue + .get(&core_idx) + .and_then(|q| q.front()) + .map(|¶_id| ScheduledCore { para_id, collator: None }), availability: pending_availability.availability_votes().clone(), group_responsible: group_responsible_for( backing_group_allocation_time, @@ -112,11 +118,8 @@ pub fn availability_cores() -> Vec() -> ApprovalVotingParams /// Returns the claimqueue from the scheduler pub fn claim_queue() -> BTreeMap> { - let config = configuration::ActiveConfig::::get(); - // Extra sanity, config should already never be smaller than 1: - let n_lookahead = config.scheduler_params.lookahead.max(1); - scheduler::Pallet::::get_claim_queue() - .into_iter() - .map(|(core_index, entries)| { - ( - core_index, - entries.into_iter().map(|e| e.para_id()).take(n_lookahead as usize).collect(), - ) - }) - .collect() + scheduler::Pallet::::claim_queue() } /// Returns all the candidates that are pending availability for a given `ParaId`. diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index 9c111c2d28e7..30ee6450ca40 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -36,32 +36,24 @@ //! number of groups as availability cores. Validator groups will be assigned to different //! availability cores over time. -use crate::{configuration, initializer::SessionChangeNotification, paras}; +use crate::{assigner_coretime, configuration, initializer::SessionChangeNotification}; use alloc::{ - collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque}, + collections::{btree_map::BTreeMap, vec_deque::VecDeque}, vec::Vec, }; use frame_support::{pallet_prelude::*, traits::Defensive}; use frame_system::pallet_prelude::BlockNumberFor; pub use polkadot_core_primitives::v2::BlockNumber; -use polkadot_primitives::{ - CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, SchedulerParams, - ValidatorIndex, -}; +use polkadot_primitives::{CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ValidatorIndex}; use sp_runtime::traits::One; -pub mod common; - -use common::{Assignment, AssignmentProvider}; - pub use pallet::*; #[cfg(test)] mod tests; -const LOG_TARGET: &str = "runtime::parachains::scheduler"; - -pub mod migration; +// TODO: Add back + fix. +// pub mod migration; #[frame_support::pallet] pub mod pallet { @@ -75,8 +67,9 @@ pub mod pallet { pub struct Pallet(_); #[pallet::config] - pub trait Config: frame_system::Config + configuration::Config + paras::Config { - type AssignmentProvider: AssignmentProvider>; + pub trait Config: + frame_system::Config + configuration::Config + assigner_coretime::Config + { } /// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the @@ -99,11 +92,6 @@ pub mod pallet { #[pallet::storage] pub type SessionStartBlock = StorageValue<_, BlockNumberFor, ValueQuery>; - /// One entry for each availability core. The `VecDeque` represents the assignments to be - /// scheduled on that core. - #[pallet::storage] - pub type ClaimQueue = StorageValue<_, BTreeMap>, ValueQuery>; - /// Availability timeout status of a core. pub(crate) struct AvailabilityTimeoutStatus { /// Is the core already timed out? @@ -120,6 +108,32 @@ pub mod pallet { } impl Pallet { + /// Advance claim queue. + /// + /// Parameters: + /// - is_blocked: Inform whether a given core is currently blocked (schedules can not be + /// served). + /// + /// Returns: The `ParaId`s that had been scheduled next, blocked ones are filtered out. + pub(crate) fn advance_claim_queue bool>( + is_blocked: F, + ) -> BTreeMap { + let mut assignments = assigner_coretime::Pallet::::advance_assignments(is_blocked); + assignments.split_off(&CoreIndex(Self::num_availability_cores() as _)); + assignments + } + + /// Retrieve upcoming claims for each core. + /// + /// To be called from runtime APIs. + pub(crate) fn claim_queue() -> BTreeMap> { + let config = configuration::ActiveConfig::::get(); + let lookahead = config.scheduler_params.lookahead; + let mut queue = assigner_coretime::Pallet::::peek_next_block(lookahead); + queue.split_off(&CoreIndex(Self::num_availability_cores() as _)); + queue + } + /// Called by the initializer to initialize the scheduler pallet. pub(crate) fn initializer_initialize(_now: BlockNumberFor) -> Weight { Weight::zero() @@ -185,11 +199,6 @@ impl Pallet { ValidatorGroups::::set(groups); } - - // Resize and populate claim queue. - Self::maybe_resize_claim_queue(); - Self::populate_claim_queue_after_session_change(); - let now = frame_system::Pallet::::block_number() + One::one(); SessionStartBlock::::set(now); } @@ -204,12 +213,6 @@ impl Pallet { ValidatorGroups::::decode_len().unwrap_or(0) } - /// Expected claim queue len. Can be different than the real length if for example we don't have - /// assignments for a core. - fn expected_claim_queue_len(config: &SchedulerParams>) -> u32 { - core::cmp::min(config.num_cores, Self::num_availability_cores() as u32) - } - /// Get the group assigned to a specific core by index at the current block number. Result /// undefined if the core index is unknown or the block number is less than the session start /// index. @@ -298,148 +301,9 @@ impl Pallet { GroupRotationInfo { session_start_block, now, group_rotation_frequency } } - /// Return the next thing that will be scheduled on this core assuming it is currently - /// occupied and the candidate occupying it became available. - pub(crate) fn next_up_on_available(core: CoreIndex) -> Option { - // Since this is being called from a runtime API, we need to workaround for #64. - if Self::on_chain_storage_version() == StorageVersion::new(2) { - migration::v2::ClaimQueue::::get() - .get(&core) - .and_then(|a| a.front().map(|entry| entry.assignment.para_id())) - } else { - ClaimQueue::::get() - .get(&core) - .and_then(|a| a.front().map(|assignment| assignment.para_id())) - } - .map(|para_id| ScheduledCore { para_id, collator: None }) - } - - // Since this is being called from a runtime API, we need to workaround for #64. - pub(crate) fn get_claim_queue() -> BTreeMap> { - if Self::on_chain_storage_version() == StorageVersion::new(2) { - migration::v2::ClaimQueue::::get() - .into_iter() - .map(|(core_index, entries)| { - (core_index, entries.into_iter().map(|e| e.assignment).collect()) - }) - .collect() - } else { - ClaimQueue::::get() - } - } - - /// For each core that isn't part of the `except_for` set, pop the first item of the claim queue - /// and fill the queue from the assignment provider. - pub(crate) fn advance_claim_queue(except_for: &BTreeSet) { - let config = configuration::ActiveConfig::::get(); - let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params); - // Extra sanity, config should already never be smaller than 1: - let n_lookahead = config.scheduler_params.lookahead.max(1); - - for core_idx in 0..expected_claim_queue_len { - let core_idx = CoreIndex::from(core_idx); - - if !except_for.contains(&core_idx) { - let core_idx = CoreIndex::from(core_idx); - - if let Some(dropped_para) = Self::pop_front_of_claim_queue(&core_idx) { - T::AssignmentProvider::report_processed(dropped_para); - } - - Self::fill_claim_queue(core_idx, n_lookahead); - } - } - } - - // on new session - fn maybe_resize_claim_queue() { - let cq = ClaimQueue::::get(); - let Some((old_max_core, _)) = cq.last_key_value() else { return }; - let config = configuration::ActiveConfig::::get(); - let new_core_count = Self::expected_claim_queue_len(&config.scheduler_params); - - if new_core_count < (old_max_core.0 + 1) { - ClaimQueue::::mutate(|cq| { - let to_remove: Vec<_> = - cq.range(CoreIndex(new_core_count)..=*old_max_core).map(|(k, _)| *k).collect(); - for key in to_remove { - if let Some(dropped_assignments) = cq.remove(&key) { - Self::push_back_to_assignment_provider(dropped_assignments.into_iter()); - } - } - }); - } - } - - // Populate the claim queue. To be called on new session, after all the other modules were - // initialized. - fn populate_claim_queue_after_session_change() { - let config = configuration::ActiveConfig::::get(); - // Extra sanity, config should already never be smaller than 1: - let n_lookahead = config.scheduler_params.lookahead.max(1); - let expected_claim_queue_len = Self::expected_claim_queue_len(&config.scheduler_params); - - for core_idx in 0..expected_claim_queue_len { - let core_idx = CoreIndex::from(core_idx); - Self::fill_claim_queue(core_idx, n_lookahead); - } - } - - /// Push some assignments back to the provider. - fn push_back_to_assignment_provider( - assignments: impl core::iter::DoubleEndedIterator, - ) { - // Push back in reverse order so that when we pop from the provider again, - // the entries in the claim queue are in the same order as they are right - // now. - for assignment in assignments.rev() { - T::AssignmentProvider::push_back_assignment(assignment); - } - } - - fn fill_claim_queue(core_idx: CoreIndex, n_lookahead: u32) { - ClaimQueue::::mutate(|la| { - let cq = la.entry(core_idx).or_default(); - - let mut n_lookahead_used = cq.len() as u32; - - // If the claim queue used to be empty, we need to double the first assignment. - // Otherwise, the para will only be able to get the collation in right at the next block - // (synchronous backing). - // Only do this if the configured lookahead is greater than 1. Otherwise, it doesn't - // make sense. - if n_lookahead_used == 0 && n_lookahead > 1 { - if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) { - T::AssignmentProvider::assignment_duplicated(&assignment); - cq.push_back(assignment.clone()); - cq.push_back(assignment); - n_lookahead_used += 2; - } - } - - for _ in n_lookahead_used..n_lookahead { - if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) { - cq.push_back(assignment); - } else { - break - } - } - - // If we didn't end up pushing anything, remove the entry. We don't want to waste the - // space if we've no assignments. - if cq.is_empty() { - la.remove(&core_idx); - } - }); - } - - fn pop_front_of_claim_queue(core_idx: &CoreIndex) -> Option { - ClaimQueue::::mutate(|cq| cq.get_mut(core_idx)?.pop_front()) - } - #[cfg(any(feature = "try-runtime", test))] fn claim_queue_len() -> usize { - ClaimQueue::::get().iter().map(|la_vec| la_vec.1.len()).sum() + Self::claim_queue().iter().map(|la_vec| la_vec.1.len()).sum() } #[cfg(all(not(feature = "runtime-benchmarks"), test))] @@ -451,9 +315,4 @@ impl Pallet { pub(crate) fn set_validator_groups(validator_groups: Vec>) { ValidatorGroups::::set(validator_groups); } - - #[cfg(test)] - pub(crate) fn set_claim_queue(claim_queue: BTreeMap>) { - ClaimQueue::::set(claim_queue); - } } diff --git a/polkadot/runtime/parachains/src/scheduler/common.rs b/polkadot/runtime/parachains/src/scheduler/common.rs deleted file mode 100644 index bf8a2bee74e3..000000000000 --- a/polkadot/runtime/parachains/src/scheduler/common.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Common traits and types used by the scheduler and assignment providers. - -use scale_info::TypeInfo; -use sp_runtime::{ - codec::{Decode, Encode}, - RuntimeDebug, -}; - -use polkadot_primitives::{CoreIndex, Id as ParaId}; - -/// Assignment (ParaId -> CoreIndex). -#[derive(Encode, Decode, TypeInfo, RuntimeDebug, Clone, PartialEq)] -pub enum Assignment { - /// A pool assignment. - Pool { - /// The assigned para id. - para_id: ParaId, - /// The core index the para got assigned to. - core_index: CoreIndex, - }, - /// A bulk assignment. - Bulk(ParaId), -} - -impl Assignment { - /// Returns the [`ParaId`] this assignment is associated to. - pub fn para_id(&self) -> ParaId { - match self { - Self::Pool { para_id, .. } => *para_id, - Self::Bulk(para_id) => *para_id, - } - } -} - -pub trait AssignmentProvider { - /// Pops an [`Assignment`] from the provider for a specified [`CoreIndex`]. - /// - /// This is where assignments come into existence. - fn pop_assignment_for_core(core_idx: CoreIndex) -> Option; - - /// A previously popped `Assignment` has been fully processed. - /// - /// Report back to the assignment provider that an assignment is done and no longer present in - /// the scheduler. - /// - /// This is one way of the life of an assignment coming to an end. - fn report_processed(assignment: Assignment); - - /// Push back a previously popped assignment. - /// - /// If the assignment could not be processed within the current session, it can be pushed back - /// to the assignment provider in order to be popped again later. - /// - /// This is the second way the life of an assignment can come to an end. - fn push_back_assignment(assignment: Assignment); - - /// Push some assignment for mocking/benchmarks purposes. - /// - /// Useful for benchmarks and testing. The returned assignment is "valid" and can if need be - /// passed into `report_processed` for example. - #[cfg(any(feature = "runtime-benchmarks", test))] - fn get_mock_assignment(core_idx: CoreIndex, para_id: ParaId) -> Assignment; - - /// Report that an assignment was duplicated by the scheduler. - fn assignment_duplicated(assignment: &Assignment); -} diff --git a/polkadot/runtime/parachains/src/scheduler/tests.rs b/polkadot/runtime/parachains/src/scheduler/tests.rs index 431562c6e6fb..7c85fff201eb 100644 --- a/polkadot/runtime/parachains/src/scheduler/tests.rs +++ b/polkadot/runtime/parachains/src/scheduler/tests.rs @@ -17,21 +17,18 @@ use super::*; use alloc::collections::btree_map::BTreeMap; +use assigner_coretime::PartsOf57600; use frame_support::assert_ok; +use pallet_broker::CoreAssignment; use polkadot_primitives::{ BlockNumber, SchedulerParams, SessionIndex, ValidationCode, ValidatorId, }; use sp_keyring::Sr25519Keyring; use crate::{ - configuration::HostConfiguration, - initializer::SessionChangeNotification, - mock::{ - new_test_ext, Configuration, MockAssigner, MockGenesisConfig, Paras, ParasShared, - RuntimeOrigin, Scheduler, System, Test, - }, - paras::{ParaGenesisArgs, ParaKind}, - scheduler::{self, common::Assignment, ClaimQueue}, + configuration::HostConfiguration, initializer::SessionChangeNotification, mock::{ + new_test_ext, Configuration, CoretimeAssigner, MockGenesisConfig, Paras, ParasShared, RuntimeOrigin, Scheduler, System, Test + }, on_demand, paras::{ParaGenesisArgs, ParaKind}, scheduler::{self} }; fn register_para(id: ParaId) { @@ -80,7 +77,7 @@ fn run_to_block( Paras::initializer_initialize(b + 1); Scheduler::initializer_initialize(b + 1); - Scheduler::advance_claim_queue(&Default::default()); + Scheduler::advance_claim_queue(|_| false); } } @@ -112,8 +109,8 @@ fn genesis_config(config: &HostConfiguration) -> MockGenesisConfig } /// Internal access to assignments at the top of the claim queue. -fn next_assignments() -> impl Iterator { - let claim_queue = ClaimQueue::::get(); +fn next_assignments() -> impl Iterator { + let claim_queue = Scheduler::claim_queue(); claim_queue .into_iter() .filter_map(|(core_idx, v)| v.front().map(|a| (core_idx, a.clone()))) @@ -215,10 +212,6 @@ fn advance_claim_queue_doubles_assignment_only_if_empty() { let para_b = ParaId::from(4_u32); let para_c = ParaId::from(5_u32); - let assignment_a = Assignment::Bulk(para_a); - let assignment_b = Assignment::Bulk(para_b); - let assignment_c = Assignment::Bulk(para_c); - new_test_ext(genesis_config).execute_with(|| { // Add 3 paras register_para(para_a); @@ -239,28 +232,30 @@ fn advance_claim_queue_doubles_assignment_only_if_empty() { }); // add some para assignments. - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); - MockAssigner::add_test_assignment(assignment_c.clone()); + CoretimeAssigner::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + CoretimeAssigner::assign_core(CoreIndex(1), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); // This will call advance_claim_queue run_to_block(2, |_| None); { - assert_eq!(Scheduler::claim_queue_len(), 5); - let mut claim_queue = scheduler::ClaimQueue::::get(); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_c); + on_demand::Pallet::::push_back_order(para_b); + + let mut claim_queue = scheduler::Pallet::::claim_queue(); // Because the claim queue used to be empty, the first assignment is doubled for every // core so that the first para gets a fair shot at backing something. assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a.clone(), assignment_a, assignment_b] + [para_a, para_a, para_b] .into_iter() .collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(1)).unwrap(), - [assignment_c.clone(), assignment_c].into_iter().collect::>() + [para_c, para_c].into_iter().collect::>() ); } }); @@ -275,7 +270,6 @@ fn advance_claim_queue_no_entry_if_empty() { let genesis_config = genesis_config(&config); let para_a = ParaId::from(3_u32); - let assignment_a = Assignment::Bulk(para_a); new_test_ext(genesis_config).execute_with(|| { // Add 1 para @@ -294,17 +288,19 @@ fn advance_claim_queue_no_entry_if_empty() { _ => None, }); - MockAssigner::add_test_assignment(assignment_a.clone()); + CoretimeAssigner::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + CoretimeAssigner::assign_core(CoreIndex(1), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + on_demand::Pallet::::push_back_order(para_a); // This will call advance_claim_queue run_to_block(3, |_| None); { - let mut claim_queue = scheduler::ClaimQueue::::get(); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a].into_iter().collect::>() + [para_a].into_iter().collect::>() ); // Even though core 1 exists, there's no assignment for it so it's not present in the @@ -332,11 +328,7 @@ fn advance_claim_queue_except_for() { let para_d = ParaId::from(4_u32); let para_e = ParaId::from(5_u32); - let assignment_a = Assignment::Bulk(para_a); - let assignment_b = Assignment::Bulk(para_b); - let assignment_c = Assignment::Bulk(para_c); - let assignment_d = Assignment::Bulk(para_d); - let assignment_e = Assignment::Bulk(para_e); + CoretimeAssigner::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); new_test_ext(genesis_config).execute_with(|| { // add 5 paras @@ -361,21 +353,21 @@ fn advance_claim_queue_except_for() { }); // add a couple of para claims now that paras are live - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_c.clone()); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_c); run_to_block(2, |_| None); - Scheduler::advance_claim_queue(&Default::default()); + Scheduler::advance_claim_queue(|_| false); // Queues of all cores should be empty assert_eq!(Scheduler::claim_queue_len(), 0); - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_c.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); - MockAssigner::add_test_assignment(assignment_d.clone()); - MockAssigner::add_test_assignment(assignment_e.clone()); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_c); + on_demand::Pallet::::push_back_order(para_b); + on_demand::Pallet::::push_back_order(para_d); + on_demand::Pallet::::push_back_order(para_e); run_to_block(3, |_| None); @@ -383,22 +375,22 @@ fn advance_claim_queue_except_for() { let scheduled: BTreeMap<_, _> = next_assignments().collect(); assert_eq!(scheduled.len(), 3); - assert_eq!(scheduled.get(&CoreIndex(0)).unwrap(), &Assignment::Bulk(para_a)); - assert_eq!(scheduled.get(&CoreIndex(1)).unwrap(), &Assignment::Bulk(para_c)); - assert_eq!(scheduled.get(&CoreIndex(2)).unwrap(), &Assignment::Bulk(para_b)); + assert_eq!(scheduled.get(&CoreIndex(0)).unwrap(), para_a); + assert_eq!(scheduled.get(&CoreIndex(1)).unwrap(), para_c); + assert_eq!(scheduled.get(&CoreIndex(2)).unwrap(), para_b); } // now note that cores 0 and 1 were freed. - Scheduler::advance_claim_queue(&std::iter::once(CoreIndex(2)).collect()); + Scheduler::advance_claim_queue(|CoreIndex(ix)| ix == 2); { let scheduled: BTreeMap<_, _> = next_assignments().collect(); // 1 thing scheduled before, + 2 cores freed. assert_eq!(scheduled.len(), 3); - assert_eq!(scheduled.get(&CoreIndex(0)).unwrap(), &Assignment::Bulk(para_d)); - assert_eq!(scheduled.get(&CoreIndex(1)).unwrap(), &Assignment::Bulk(para_e)); - assert_eq!(scheduled.get(&CoreIndex(2)).unwrap(), &Assignment::Bulk(para_b)); + assert_eq!(scheduled.get(&CoreIndex(0)).unwrap(), para_d); + assert_eq!(scheduled.get(&CoreIndex(1)).unwrap(), para_e); + assert_eq!(scheduled.get(&CoreIndex(2)).unwrap(), para_b); } }); } @@ -512,58 +504,6 @@ fn availability_predicate_works() { }); } -#[test] -fn next_up_on_available_uses_next_scheduled_or_none() { - let mut config = default_config(); - config.scheduler_params.num_cores = 1; - let genesis_config = genesis_config(&config); - - let para_a = ParaId::from(1_u32); - let para_b = ParaId::from(2_u32); - - new_test_ext(genesis_config).execute_with(|| { - register_para(para_a); - register_para(para_b); - - // start a new session to activate, 2 validators for 2 cores. - run_to_block(1, |number| match number { - 1 => Some(SessionChangeNotification { - new_config: config.clone(), - validators: vec![ - ValidatorId::from(Sr25519Keyring::Alice.public()), - ValidatorId::from(Sr25519Keyring::Eve.public()), - ], - ..Default::default() - }), - _ => None, - }); - - MockAssigner::add_test_assignment(Assignment::Bulk(para_a)); - - run_to_block(2, |_| None); - - { - // Two assignments for A on core 0, because the claim queue used to be empty. - assert_eq!(Scheduler::claim_queue_len(), 2); - - assert!(Scheduler::next_up_on_available(CoreIndex(1)).is_none()); - - assert_eq!( - Scheduler::next_up_on_available(CoreIndex(0)).unwrap(), - ScheduledCore { para_id: para_a, collator: None } - ); - - Scheduler::advance_claim_queue(&Default::default()); - assert_eq!( - Scheduler::next_up_on_available(CoreIndex(0)).unwrap(), - ScheduledCore { para_id: para_a, collator: None } - ); - - Scheduler::advance_claim_queue(&Default::default()); - assert!(Scheduler::next_up_on_available(CoreIndex(0)).is_none()); - } - }); -} #[test] fn session_change_increasing_number_of_cores() { @@ -574,9 +514,6 @@ fn session_change_increasing_number_of_cores() { let para_a = ParaId::from(3_u32); let para_b = ParaId::from(4_u32); - let assignment_a = Assignment::Bulk(para_a); - let assignment_b = Assignment::Bulk(para_b); - new_test_ext(genesis_config).execute_with(|| { // Add 2 paras register_para(para_a); @@ -595,25 +532,27 @@ fn session_change_increasing_number_of_cores() { _ => None, }); - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); + CoretimeAssigner::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + CoretimeAssigner::assign_core(CoreIndex(1), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); // This will call advance_claim_queue run_to_block(2, |_| None); { - let mut claim_queue = scheduler::ClaimQueue::::get(); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_b); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!(Scheduler::claim_queue_len(), 4); assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a.clone(), assignment_a.clone()] + [para_a.clone(), para_a.clone()] .into_iter() .collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(1)).unwrap(), - [assignment_b.clone(), assignment_b.clone()] + [para_b.clone(), para_b.clone()] .into_iter() .collect::>() ); @@ -625,7 +564,10 @@ fn session_change_increasing_number_of_cores() { new_config.scheduler_params.num_cores = 4; // add another assignment for para b. - MockAssigner::add_test_assignment(assignment_b.clone()); + on_demand::Pallet::::push_back_order(para_b); + + CoretimeAssigner::assign_core(CoreIndex(2), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + CoretimeAssigner::assign_core(CoreIndex(3), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); run_to_block(3, |number| match number { 3 => Some(SessionChangeNotification { @@ -643,20 +585,20 @@ fn session_change_increasing_number_of_cores() { }); { - let mut claim_queue = scheduler::ClaimQueue::::get(); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!(Scheduler::claim_queue_len(), 3); assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a].into_iter().collect::>() + [para_a].into_iter().collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(1)).unwrap(), - [assignment_b.clone()].into_iter().collect::>() + [para_b].into_iter().collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(2)).unwrap(), - [assignment_b.clone()].into_iter().collect::>() + [para_b].into_iter().collect::>() ); } }); @@ -716,17 +658,17 @@ fn session_change_decreasing_number_of_cores() { _ => None, }); - let mut claim_queue = scheduler::ClaimQueue::::get(); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!(Scheduler::claim_queue_len(), 1); // There's only one assignment for B because run_to_block also calls advance_claim_queue at // the end. assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_b.clone()].into_iter().collect::>() + [para_b.clone()].into_iter().collect::>() ); - Scheduler::advance_claim_queue(&Default::default()); + Scheduler::advance_claim_queue(|_| false); // No more assignments now. assert_eq!(Scheduler::claim_queue_len(), 0); @@ -734,7 +676,7 @@ fn session_change_decreasing_number_of_cores() { // should be the minimum of these two. // Add an assignment. - MockAssigner::add_test_assignment(assignment_b.clone()); + on_demand::Pallet::::push_back_order(para_b); run_to_block(4, |number| match number { 4 => Some(SessionChangeNotification { @@ -781,30 +723,31 @@ fn session_change_increasing_lookahead() { _ => None, }); - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_a.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); - MockAssigner::add_test_assignment(assignment_b.clone()); + CoretimeAssigner::assign_core(CoreIndex(0), 0, vec![(CoreAssignment::Pool, PartsOf57600::FULL)], None).unwrap(); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_a); + on_demand::Pallet::::push_back_order(para_b); + on_demand::Pallet::::push_back_order(para_b); + on_demand::Pallet::::push_back_order(para_b); // Lookahead is currently 2. run_to_block(2, |_| None); { - let mut claim_queue = scheduler::ClaimQueue::::get(); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!(Scheduler::claim_queue_len(), 4); assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a.clone(), assignment_a.clone()] + [para_a, para_a] .into_iter() .collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(1)).unwrap(), - [assignment_a.clone(), assignment_a.clone()] + [para_a, para_a] .into_iter() .collect::>() ); @@ -829,18 +772,18 @@ fn session_change_increasing_lookahead() { }); { - let mut claim_queue = scheduler::ClaimQueue::::get(); + let mut claim_queue = scheduler::Pallet::::claim_queue(); assert_eq!(Scheduler::claim_queue_len(), 6); assert_eq!( claim_queue.remove(&CoreIndex(0)).unwrap(), - [assignment_a.clone(), assignment_a.clone(), assignment_b.clone()] + [para_a, para_a, para_b] .into_iter() .collect::>() ); assert_eq!( claim_queue.remove(&CoreIndex(1)).unwrap(), - [assignment_a.clone(), assignment_b.clone(), assignment_b.clone()] + [para_a, para_b, para_b] .into_iter() .collect::>() );