Skip to content

Commit

Permalink
slot-based-collator: Refactor some internals (#6935)
Browse files Browse the repository at this point in the history
Move `RelayChainDataFetcher` to its own module and rename it to
`RelayChainDataCache`. Also move the `core_selector` function to the
`slot_based` module.

Related issue: #6495
  • Loading branch information
bkchr authored Dec 18, 2024
1 parent c4d66cc commit fd0fb76
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,32 @@ use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
OccupiedCoreAssumption,
};
use polkadot_primitives::Id as ParaId;

use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot};
use sp_core::{crypto::Pair, U256};
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, One};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use sp_timestamp::Timestamp;
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use super::CollatorMessage;
use crate::{
collator::{self as collator_util},
collators::{check_validation_code_or_log, cores_scheduled_for_para},
collators::{
check_validation_code_or_log,
slot_based::{
core_selector,
relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
},
},
LOG_TARGET,
};

Expand Down Expand Up @@ -218,7 +220,7 @@ where
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};

let mut relay_chain_fetcher = RelayChainCachingFetcher::new(relay_client.clone(), para_id);
let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);

loop {
// We wait here until the next slot arrives.
Expand All @@ -242,7 +244,7 @@ where

// Retrieve the core selector.
let (core_selector, claim_queue_offset) =
match core_selector(&*para_client, &parent).await {
match core_selector(&*para_client, parent.hash, *parent.header.number()) {
Ok(core_selector) => core_selector,
Err(err) => {
tracing::trace!(
Expand All @@ -259,7 +261,7 @@ where
max_pov_size,
scheduled_cores,
claimed_cores,
}) = relay_chain_fetcher
}) = relay_chain_data_cache
.get_mut_relay_chain_data(relay_parent, claim_queue_offset)
.await
else {
Expand Down Expand Up @@ -419,119 +421,3 @@ where
}
}
}

/// Contains relay chain data necessary for parachain block building.
#[derive(Clone)]
struct RelayChainData {
/// Current relay chain parent header.
pub relay_parent_header: RelayHeader,
/// The cores on which the para is scheduled at the configured claim queue offset.
pub scheduled_cores: Vec<CoreIndex>,
/// Maximum configured PoV size on the relay chain.
pub max_pov_size: u32,
/// The claimed cores at a relay parent.
pub claimed_cores: BTreeSet<CoreIndex>,
}

/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block
/// hash.
struct RelayChainCachingFetcher<RI> {
relay_client: RI,
para_id: ParaId,
last_data: Option<(RelayHash, RelayChainData)>,
}

impl<RI> RelayChainCachingFetcher<RI>
where
RI: RelayChainInterface + Clone + 'static,
{
pub fn new(relay_client: RI, para_id: ParaId) -> Self {
Self { relay_client, para_id, last_data: None }
}

/// Fetch required [`RelayChainData`] from the relay chain.
/// If this data has been fetched in the past for the incoming hash, it will reuse
/// cached data.
pub async fn get_mut_relay_chain_data(
&mut self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<&mut RelayChainData, ()> {
match &self.last_data {
Some((last_seen_hash, _)) if *last_seen_hash == relay_parent => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent.");
Ok(&mut self.last_data.as_mut().expect("last_data is Some").1)
},
_ => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain.");
let data = self.update_for_relay_parent(relay_parent, claim_queue_offset).await?;
self.last_data = Some((relay_parent, data));
Ok(&mut self.last_data.as_mut().expect("last_data was just set above").1)
},
}
}

/// Fetch fresh data from the relay chain for the given relay parent hash.
async fn update_for_relay_parent(
&self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<RelayChainData, ()> {
let scheduled_cores = cores_scheduled_for_para(
relay_parent,
self.para_id,
&self.relay_client,
claim_queue_offset,
)
.await;

let Ok(Some(relay_parent_header)) =
self.relay_client.header(BlockId::Hash(relay_parent)).await
else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header.");
return Err(())
};

let max_pov_size = match self
.relay_client
.persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included)
.await
{
Ok(None) => return Err(()),
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
return Err(())
},
};

Ok(RelayChainData {
relay_parent_header,
scheduled_cores,
max_pov_size,
claimed_cores: BTreeSet::new(),
})
}
}

async fn core_selector<Block: BlockT, Client>(
para_client: &Client,
parent: &consensus_common::PotentialParent<Block>,
) -> Result<(CoreSelector, ClaimQueueOffset), sp_api::ApiError>
where
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client::Api: GetCoreSelectorApi<Block>,
{
let block_hash = parent.hash;
let runtime_api = para_client.runtime_api();

if runtime_api.has_api::<dyn GetCoreSelectorApi<Block>>(block_hash)? {
Ok(runtime_api.core_selector(block_hash)?)
} else {
let next_block_number: U256 = (*parent.header.number() + One::one()).into();

// If the runtime API does not support the core selector API, fallback to some default
// values.
Ok((CoreSelector(next_block_number.byte(0)), ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
35 changes: 30 additions & 5 deletions cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,32 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::GetCoreSelectorApi;
use cumulus_primitives_core::{ClaimQueueOffset, CoreSelector, GetCoreSelectorApi};
use cumulus_relay_chain_interface::RelayChainInterface;
use futures::FutureExt;
use polkadot_primitives::{
CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash,
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId,
ValidationCodeHash,
};
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sc_utils::mpsc::tracing_unbounded;
use sp_api::ProvideRuntimeApi;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::AuraApi;
use sp_core::{crypto::Pair, traits::SpawnNamed};
use sp_core::{crypto::Pair, traits::SpawnNamed, U256};
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Member};
use sp_runtime::traits::{Block as BlockT, Member, NumberFor, One};
use std::{sync::Arc, time::Duration};

pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};

mod block_builder_task;
mod block_import;
mod collation_task;
mod relay_chain_data_cache;

/// Parameters for [`run`].
pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
Expand Down Expand Up @@ -204,3 +206,26 @@ struct CollatorMessage<Block: BlockT> {
/// Core index that this block should be submitted on
pub core_index: CoreIndex,
}

/// Fetch the `CoreSelector` and `ClaimQueueOffset` for `parent_hash`.
fn core_selector<Block: BlockT, Client>(
para_client: &Client,
parent_hash: Block::Hash,
parent_number: NumberFor<Block>,
) -> Result<(CoreSelector, ClaimQueueOffset), sp_api::ApiError>
where
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client::Api: GetCoreSelectorApi<Block>,
{
let runtime_api = para_client.runtime_api();

if runtime_api.has_api::<dyn GetCoreSelectorApi<Block>>(parent_hash)? {
Ok(runtime_api.core_selector(parent_hash)?)
} else {
let next_block_number: U256 = (parent_number + One::one()).into();

// If the runtime API does not support the core selector API, fallback to some default
// values.
Ok((CoreSelector(next_block_number.byte(0)), ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

//! Utility for caching [`RelayChainData`] for different relay blocks.
use crate::collators::cores_scheduled_for_para;
use cumulus_primitives_core::ClaimQueueOffset;
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::{
CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, OccupiedCoreAssumption,
};
use sp_runtime::generic::BlockId;
use std::collections::BTreeSet;

/// Contains relay chain data necessary for parachain block building.
#[derive(Clone)]
pub struct RelayChainData {
/// Current relay chain parent header.
pub relay_parent_header: RelayHeader,
/// The cores on which the para is scheduled at the configured claim queue offset.
pub scheduled_cores: Vec<CoreIndex>,
/// Maximum configured PoV size on the relay chain.
pub max_pov_size: u32,
/// The claimed cores at a relay parent.
pub claimed_cores: BTreeSet<CoreIndex>,
}

/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block
/// hash.
pub struct RelayChainDataCache<RI> {
relay_client: RI,
para_id: ParaId,
cached_data: schnellru::LruMap<RelayHash, RelayChainData>,
}

impl<RI> RelayChainDataCache<RI>
where
RI: RelayChainInterface + Clone + 'static,
{
pub fn new(relay_client: RI, para_id: ParaId) -> Self {
Self {
relay_client,
para_id,
// 50 cached relay chain blocks should be more than enough.
cached_data: schnellru::LruMap::new(schnellru::ByLength::new(50)),
}
}

/// Fetch required [`RelayChainData`] from the relay chain.
/// If this data has been fetched in the past for the incoming hash, it will reuse
/// cached data.
pub async fn get_mut_relay_chain_data(
&mut self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<&mut RelayChainData, ()> {
let insert_data = if self.cached_data.peek(&relay_parent).is_some() {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent.");
None
} else {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain.");
Some(self.update_for_relay_parent(relay_parent, claim_queue_offset).await?)
};

Ok(self
.cached_data
.get_or_insert(relay_parent, || {
insert_data.expect("`insert_data` exists if not cached yet; qed")
})
.expect("There is space for at least one element; qed"))
}

/// Fetch fresh data from the relay chain for the given relay parent hash.
async fn update_for_relay_parent(
&self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<RelayChainData, ()> {
let scheduled_cores = cores_scheduled_for_para(
relay_parent,
self.para_id,
&self.relay_client,
claim_queue_offset,
)
.await;

let Ok(Some(relay_parent_header)) =
self.relay_client.header(BlockId::Hash(relay_parent)).await
else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header.");
return Err(())
};

let max_pov_size = match self
.relay_client
.persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included)
.await
{
Ok(None) => return Err(()),
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
return Err(())
},
};

Ok(RelayChainData {
relay_parent_header,
scheduled_cores,
max_pov_size,
claimed_cores: BTreeSet::new(),
})
}
}

0 comments on commit fd0fb76

Please sign in to comment.