Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elastic scaling: add core selector to cumulus #5372

Merged
merged 33 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c6c166d
expose claim queue on cumulus
alindima Aug 14, 2024
d7c595e
add hardcoded claim queue offset and switch to using the claim queue …
alindima Aug 14, 2024
bcae16d
continue
alindima Aug 15, 2024
7b3594a
logs
alindima Aug 15, 2024
94bf1df
add runtime API to westend/rococo system parachains
alindima Aug 16, 2024
12aea2a
use ClaimQueueSnapshot
alindima Aug 16, 2024
f1e2721
rollback changes to rococo-parachain
alindima Aug 16, 2024
f9268dd
add prdoc
alindima Aug 16, 2024
b825c22
fix prdoc
alindima Aug 16, 2024
7e2dc9a
fix compilation
alindima Aug 16, 2024
48d19b9
fix prdoc again
alindima Aug 16, 2024
6d0fbc1
Merge remote-tracking branch 'origin/master' into alindima/cumulus-op…
alindima Aug 16, 2024
e829934
prdoc
alindima Aug 19, 2024
e4478d1
bootstrap weights
alindima Aug 19, 2024
920982f
Merge branch 'master' of https://github.com/paritytech/polkadot-sdk i…
Aug 20, 2024
ebf5e93
".git/.scripts/commands/bench/bench.sh" --subcommand=pallet --runtime…
Aug 20, 2024
e5c366c
Merge remote-tracking branch 'origin/master' into alindima/cumulus-op…
alindima Sep 13, 2024
8305048
fix compilation
alindima Sep 13, 2024
982ce7f
address some comments
alindima Sep 13, 2024
dec01b7
unused imports
alindima Sep 13, 2024
4c6fb64
make the core selection logic generic
alindima Sep 16, 2024
6ab29d9
fix compilation
alindima Sep 16, 2024
2c0c003
Merge remote-tracking branch 'origin/master' into alindima/cumulus-op…
alindima Sep 16, 2024
e8c7e96
fix compilation again
alindima Sep 16, 2024
a2c4d9b
fix compilation take 3
alindima Sep 17, 2024
67b17fa
rollback weight generation
alindima Sep 17, 2024
b568cf4
fix compilation take 4
alindima Sep 17, 2024
010f7b9
remove unused
alindima Sep 17, 2024
08cd77d
Merge remote-tracking branch 'origin/master' into alindima/cumulus-op…
alindima Sep 19, 2024
b6d1fb1
feedback and prdoc
alindima Sep 20, 2024
99ff6dd
Merge remote-tracking branch 'origin/master' into alindima/cumulus-op…
alindima Sep 20, 2024
46fcf45
update prdoc
alindima Sep 20, 2024
c268696
Merge branch 'master' into alindima/cumulus-open-collator-set
alindima Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ where
while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();

let core_index = if let Some(core_index) = super::cores_scheduled_for_para(
let core_index = if let Some(core_index) = super::cores_scheduled_for_para_legacy(
alindima marked this conversation as resolved.
Show resolved Hide resolved
relay_parent,
params.para_id,
&mut params.relay_client,
Expand Down
43 changes: 41 additions & 2 deletions cumulus/client/consensus/aura/src/collators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ async fn async_backing_params(
}
}

// Return all the cores assigned to the para at the provided relay parent.
async fn cores_scheduled_for_para(
// Return all the cores assigned to the para at the provided relay parent, using the legacy method,
// availability-cores. Using the claim queue is the preferred alternative.
alindima marked this conversation as resolved.
Show resolved Hide resolved
async fn cores_scheduled_for_para_legacy(
relay_parent: RelayHash,
para_id: ParaId,
relay_client: &impl RelayChainInterface,
Expand Down Expand Up @@ -173,6 +174,44 @@ async fn cores_scheduled_for_para(
.collect()
}

// Return all the cores assigned to the para at the provided relay parent, using the claim queue
// offset. This assumes the relay chain runtime supports the claimqueue runtime API.
alindima marked this conversation as resolved.
Show resolved Hide resolved
// Will return an empty vec if the provided offset is higher than the claim queue length (which
// corresponds to the scheduling_lookahead on the relay chain).
async fn cores_scheduled_for_para(
sandreim marked this conversation as resolved.
Show resolved Hide resolved
relay_parent: RelayHash,
para_id: ParaId,
relay_client: &impl RelayChainInterface,
claim_queue_offset: u8,
) -> Vec<CoreIndex> {
// Get `ClaimQueue` from runtime
let claim_queue = match relay_client.claim_queue(relay_parent).await {
Ok(claim_queue) => claim_queue,
Err(error) => {
tracing::error!(
target: crate::LOG_TARGET,
?error,
?relay_parent,
"Failed to query claim queue runtime API",
);
return Vec::new()
},
};

claim_queue
alindima marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.filter_map(|(core_index, queue)| {
let core_para_id = queue.get(claim_queue_offset as usize);

if core_para_id == Some(&para_id) {
Some(core_index)
} else {
None
}
})
.collect()
}

// Checks if we own the slot at the given block and whether there
// is space in the unincluded segment.
async fn can_build_upon<Block: BlockT, Client, P>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
use cumulus_primitives_core::{
FetchClaimQueueOffset, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
Expand All @@ -31,7 +33,7 @@ use polkadot_primitives::{
use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sp_api::ProvideRuntimeApi;
use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo};
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
Expand All @@ -45,7 +47,9 @@ use std::{sync::Arc, time::Duration};
use super::CollatorMessage;
use crate::{
collator::{self as collator_util},
collators::{check_validation_code_or_log, cores_scheduled_for_para},
collators::{
check_validation_code_or_log, cores_scheduled_for_para, cores_scheduled_for_para_legacy,
},
LOG_TARGET,
};

Expand Down Expand Up @@ -177,7 +181,7 @@ where
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
AuraApi<Block, P::Public> + FetchClaimQueueOffset<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RelayClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
Expand Down Expand Up @@ -239,37 +243,56 @@ where
return
};

let Ok(RelayChainData {
relay_parent_header,
max_pov_size,
relay_parent_hash: relay_parent,
scheduled_cores,
}) = relay_chain_fetcher.get_relay_chain_data().await
let Ok(relay_parent) = relay_client.best_block_hash().await else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
continue
};

let Some((included_block, parent)) =
crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
.await
else {
continue
};

let parent_header = parent.header;
let parent_hash = parent.hash;

// Retrieve claim queue offset, if present.
let Ok(maybe_cq_offset) = fetch_claim_queue_offset(&*para_client, parent_hash).await
else {
continue
alindima marked this conversation as resolved.
Show resolved Hide resolved
};

let Ok(RelayChainData { relay_parent_header, max_pov_size, scheduled_cores }) =
relay_chain_fetcher.get_relay_chain_data(relay_parent, maybe_cq_offset).await
else {
continue;
};

if scheduled_cores.is_empty() {
tracing::debug!(target: LOG_TARGET, "Parachain not scheduled, skipping slot.");
continue;
} else {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
"Parachain is scheduled on cores: {:?}",
scheduled_cores
);
}

let core_index_in_scheduled: u64 = *para_slot.slot % expected_cores;
let Some(core_index) = scheduled_cores.get(core_index_in_scheduled as usize) else {
tracing::debug!(target: LOG_TARGET, core_index_in_scheduled, core_len = scheduled_cores.len(), "Para is scheduled, but not enough cores available.");
tracing::debug!(
target: LOG_TARGET,
core_index_in_scheduled,
core_len = scheduled_cores.len(),
"Para is scheduled, but not enough cores available."
alindima marked this conversation as resolved.
Show resolved Hide resolved
);
continue;
};

let Some((included_block, parent)) =
crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
.await
else {
continue
};

let parent_header = parent.header;
let parent_hash = parent.hash;

// We mainly call this to inform users at genesis if there is a mismatch with the
// on-chain data.
collator.collator_service().check_block_status(parent_hash, &parent_header);
Expand Down Expand Up @@ -414,8 +437,6 @@ struct RelayChainData {
pub scheduled_cores: Vec<CoreIndex>,
/// Maximum configured PoV size on the relay chain.
pub max_pov_size: u32,
/// Current relay chain parent header.
pub relay_parent_hash: RelayHash,
}

/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block
Expand All @@ -437,30 +458,65 @@ where
/// Fetch required [`RelayChainData`] from the relay chain.
/// If this data has been fetched in the past for the incoming hash, it will reuse
/// cached data.
pub async fn get_relay_chain_data(&mut self) -> Result<RelayChainData, ()> {
let Ok(relay_parent) = self.relay_client.best_block_hash().await else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
return Err(())
};

pub async fn get_relay_chain_data(
&mut self,
relay_parent: RelayHash,
maybe_claim_queue_offset: Option<u8>,
) -> Result<RelayChainData, ()> {
match &self.last_data {
Some((last_seen_hash, data)) if *last_seen_hash == relay_parent => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent.");
Ok(data.clone())
},
_ => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain.");
let data = self.update_for_relay_parent(relay_parent).await?;
let data =
self.update_for_relay_parent(relay_parent, maybe_claim_queue_offset).await?;
self.last_data = Some((relay_parent, data.clone()));
Ok(data)
},
}
}

/// Fetch fresh data from the relay chain for the given relay parent hash.
async fn update_for_relay_parent(&self, relay_parent: RelayHash) -> Result<RelayChainData, ()> {
let scheduled_cores =
cores_scheduled_for_para(relay_parent, self.para_id, &self.relay_client).await;
async fn update_for_relay_parent(
&self,
relay_parent: RelayHash,
maybe_claim_queue_offset: Option<u8>,
) -> Result<RelayChainData, ()> {
let runtime_api_version = self.relay_client.version(relay_parent).await.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch relay chain runtime version.",
)
})?;
let parachain_host_runtime_api_version =
runtime_api_version
.api_version(
&<dyn polkadot_primitives::runtime_api::ParachainHost<
polkadot_primitives::Block,
>>::ID,
)
.unwrap_or_default();

// If the relay chain runtime does not support the new claim queue runtime API, fallback to
// the using availability cores. In this case the claim queue offset will not be used.
let supports_claim_queue = parachain_host_runtime_api_version >=
alindima marked this conversation as resolved.
Show resolved Hide resolved
polkadot_node_subsystem::messages::RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;

let scheduled_cores = if supports_claim_queue {
cores_scheduled_for_para(
relay_parent,
self.para_id,
&self.relay_client,
maybe_claim_queue_offset.unwrap_or(DEFAULT_CLAIM_QUEUE_OFFSET),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that we already should be building on cores in the future if the claim queue is supported, even when not using the new candidate receipts.

I would keep the old behaviour and call cores_scheduled_for_para_legacy if v2 receipts are not enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass an offset of 0 here there should be no difference to the legacy version, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we shouldn't be using the legacy version anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why I was proposing to use an offset of 0 if the parachain runtime doesn't have it configured

)
.await
} else {
cores_scheduled_for_para_legacy(relay_parent, self.para_id, &self.relay_client).await
};

let Ok(Some(relay_parent_header)) =
self.relay_client.header(BlockId::Hash(relay_parent)).await
else {
Expand All @@ -481,11 +537,23 @@ where
},
};

Ok(RelayChainData {
relay_parent_hash: relay_parent,
relay_parent_header,
scheduled_cores,
max_pov_size,
})
Ok(RelayChainData { relay_parent_header, scheduled_cores, max_pov_size })
}
}

async fn fetch_claim_queue_offset<Block: BlockT, Client>(
para_client: &Client,
block_hash: Block::Hash,
) -> Result<Option<u8>, sp_api::ApiError>
where
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client::Api: FetchClaimQueueOffset<Block>,
{
let runtime_api = para_client.runtime_api();

if runtime_api.has_api::<dyn FetchClaimQueueOffset<Block>>(block_hash)? {
Ok(Some(runtime_api.fetch_claim_queue_offset(block_hash)?))
} else {
Ok(None)
}
}
4 changes: 2 additions & 2 deletions cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::CollectCollationInfo;
use cumulus_primitives_core::FetchClaimQueueOffset;
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::{
CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash,
Expand Down Expand Up @@ -113,7 +113,7 @@ where
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
AuraApi<Block, P::Public> + FetchClaimQueueOffset<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
Expand Down
18 changes: 15 additions & 3 deletions cumulus/client/relay-chain-inprocess-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::btree_map::BTreeMap, pin::Pin, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
sync::Arc,
time::Duration,
};

use async_trait::async_trait;
use cumulus_primitives_core::{
relay_chain::{
runtime_api::ParachainHost, Block as PBlock, BlockId, BlockNumber,
CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage,
OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId,
CommittedCandidateReceipt, CoreIndex, CoreState, Hash as PHash, Header as PHeader,
InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
Expand Down Expand Up @@ -270,6 +275,13 @@ impl RelayChainInterface for RelayChainInProcessInterface {
) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
Ok(self.full_client.runtime_api().candidates_pending_availability(hash, para_id)?)
}

async fn claim_queue(
&self,
hash: PHash,
) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
Ok(self.full_client.runtime_api().claim_queue(hash)?)
}
}

pub enum BlockCheckStatus {
Expand Down
24 changes: 21 additions & 3 deletions cumulus/client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
sync::Arc,
};

use futures::Stream;
use polkadot_overseer::prometheus::PrometheusError;
Expand All @@ -29,8 +33,9 @@ use sp_api::ApiError;
use cumulus_primitives_core::relay_chain::BlockId;
pub use cumulus_primitives_core::{
relay_chain::{
BlockNumber, CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader,
InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId,
BlockNumber, CommittedCandidateReceipt, CoreIndex, CoreState, Hash as PHash,
Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex,
ValidationCodeHash, ValidatorId,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
Expand Down Expand Up @@ -225,6 +230,12 @@ pub trait RelayChainInterface: Send + Sync {
&self,
relay_parent: PHash,
) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>>;

/// Fetch the claim queue.
async fn claim_queue(
&self,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>>;
}

#[async_trait]
Expand Down Expand Up @@ -363,4 +374,11 @@ where
async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
(**self).version(relay_parent).await
}

async fn claim_queue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the introduction of this can we remove availability_cores from the interface? Did not check, but should not be used anymore now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we could. But is there a good reason for that? There's still information that the availability-cores API contains that you cannot get elsewhere: like when a core is occupied or not. Maybe this will come in handy at some point in the future.

&self,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
(**self).claim_queue(relay_parent).await
}
}
Loading
Loading