Skip to content

Commit

Permalink
slot-based-collator: Implement dedicated block import (#6481)
Browse files Browse the repository at this point in the history
The `SlotBasedBlockImport` job is to collect the storage proofs of all
blocks getting imported. These storage proofs alongside the block are
being forwarded to the collation task. Right now they are just being
thrown away. More logic will follow later. Basically this will be
required to include multiple blocks into one `PoV` which will then be
done by the collation task.

---------

Co-authored-by: Michal Kucharczyk <[email protected]>
Co-authored-by: GitHub Action <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 4b054c6 commit b8da8fa
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 60 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ sp-blockchain = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
sp-consensus-aura = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-trie = { workspace = true, default-features = true }
sp-inherents = { workspace = true, default-features = true }
sp-keystore = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
Expand Down
144 changes: 144 additions & 0 deletions cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use futures::{stream::FusedStream, StreamExt};
use sc_consensus::{BlockImport, StateAction};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof};
use sp_runtime::traits::{Block as BlockT, Header as _};
use sp_trie::proof_size_extension::ProofSizeExt;
use std::sync::Arc;

/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`].
///
/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is
/// not running as collator.
pub struct SlotBasedBlockImportHandle<Block> {
receiver: TracingUnboundedReceiver<(Block, StorageProof)>,
}

impl<Block> SlotBasedBlockImportHandle<Block> {
/// Returns the next item.
///
/// The future will never return when the internal channel is closed.
pub async fn next(&mut self) -> (Block, StorageProof) {
loop {
if self.receiver.is_terminated() {
futures::pending!()
} else if let Some(res) = self.receiver.next().await {
return res
}
}
}
}

/// Special block import for the slot based collator.
pub struct SlotBasedBlockImport<Block, BI, Client> {
inner: BI,
client: Arc<Client>,
sender: TracingUnboundedSender<(Block, StorageProof)>,
}

impl<Block, BI, Client> SlotBasedBlockImport<Block, BI, Client> {
/// Create a new instance.
///
/// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the
/// [`Params`](super::Params), so that this block import instance can communicate with the
/// collation task. If the node is not running as a collator, just dropping the handle is fine.
pub fn new(inner: BI, client: Arc<Client>) -> (Self, SlotBasedBlockImportHandle<Block>) {
let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000);

(Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver })
}
}

impl<Block, BI: Clone, Client> Clone for SlotBasedBlockImport<Block, BI, Client> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() }
}
}

#[async_trait::async_trait]
impl<Block, BI, Client> BlockImport<Block> for SlotBasedBlockImport<Block, BI, Client>
where
Block: BlockT,
BI: BlockImport<Block> + Send + Sync,
BI::Error: Into<sp_consensus::Error>,
Client: ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync,
Client::StateBackend: Send,
Client::Api: Core<Block>,
{
type Error = sp_consensus::Error;

async fn check_block(
&self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.inner.check_block(block).await.map_err(Into::into)
}

async fn import_block(
&self,
mut params: sc_consensus::BlockImportParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
// If the channel exists and it is required to execute the block, we will execute the block
// here. This is done to collect the storage proof and to prevent re-execution, we push
// downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either
// means that the node produced the block itself or the block was imported via state sync.
if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_))
{
let mut runtime_api = self.client.runtime_api();

runtime_api.set_call_context(CallContext::Onchain);

runtime_api.record_proof();
let recorder = runtime_api
.proof_recorder()
.expect("Proof recording is enabled in the line above; qed.");
runtime_api.register_extension(ProofSizeExt::new(recorder));

let parent_hash = *params.header.parent_hash();

let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default());

runtime_api
.execute_block(parent_hash, block.clone())
.map_err(|e| Box::new(e) as Box<_>)?;

let storage_proof =
runtime_api.extract_proof().expect("Proof recording was enabled above; qed");

let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?;
let gen_storage_changes = runtime_api
.into_storage_changes(&state, parent_hash)
.map_err(sp_consensus::Error::ChainLookup)?;

if params.header.state_root() != &gen_storage_changes.transaction_storage_root {
return Err(sp_consensus::Error::Other(Box::new(
sp_blockchain::Error::InvalidStateRoot,
)))
}

params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
gen_storage_changes,
));

let _ = self.sender.unbounded_send((block, storage_proof));
}

self.inner.import_block(params).await.map_err(Into::into)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct Params<Block: BlockT, RClient, CS> {
pub collator_service: CS,
/// Receiver channel for communication with the block builder task.
pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
/// The handle from the special slot based block import.
pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
}

/// Asynchronously executes the collation task for a parachain.
Expand All @@ -55,28 +57,49 @@ pub struct Params<Block: BlockT, RClient, CS> {
/// collations to the relay chain. It listens for new best relay chain block notifications and
/// handles collator messages. If our parachain is scheduled on a core and we have a candidate,
/// the task will build a collation and send it to the relay chain.
pub async fn run_collation_task<Block, RClient, CS>(mut params: Params<Block, RClient, CS>)
where
pub async fn run_collation_task<Block, RClient, CS>(
Params {
relay_client,
collator_key,
para_id,
reinitialize,
collator_service,
mut collator_receiver,
mut block_import_handle,
}: Params<Block, RClient, CS>,
) where
Block: BlockT,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
RClient: RelayChainInterface + Clone + 'static,
{
let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else {
let Ok(mut overseer_handle) = relay_client.overseer_handle() else {
tracing::error!(target: LOG_TARGET, "Failed to get overseer handle.");
return
};

cumulus_client_collator::initialize_collator_subsystems(
&mut overseer_handle,
params.collator_key,
params.para_id,
params.reinitialize,
collator_key,
para_id,
reinitialize,
)
.await;

let collator_service = params.collator_service;
while let Some(collator_message) = params.collator_receiver.next().await {
handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await;
loop {
futures::select! {
collator_message = collator_receiver.next() => {
let Some(message) = collator_message else {
return;
};

handle_collation_message(message, &collator_service, &mut overseer_handle).await;
},
block_import_msg = block_import_handle.next().fuse() => {
// TODO: Implement me.
// Issue: https://github.com/paritytech/polkadot-sdk/issues/6495
let _ = block_import_msg;
}
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Member};
use std::{sync::Arc, time::Duration};

pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};

mod block_builder_task;
mod block_import;
mod collation_task;

/// Parameters for [`run`].
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
/// collator.
Expand Down Expand Up @@ -90,6 +93,8 @@ pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner
/// Drift slots by a fixed duration. This can be used to create more preferrable authoring
/// timings.
pub slot_drift: Duration,
/// The handle returned by [`SlotBasedBlockImport`].
pub block_import_handle: SlotBasedBlockImportHandle<Block>,
/// Spawner for spawning futures.
pub spawner: Spawner,
}
Expand All @@ -111,8 +116,9 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
authoring_duration,
reinitialize,
slot_drift,
block_import_handle,
spawner,
}: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
}: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
) where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
Expand Down Expand Up @@ -147,6 +153,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
reinitialize,
collator_service: collator_service.clone(),
collator_receiver: rx,
block_import_handle,
};

let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params);
Expand Down
1 change: 1 addition & 0 deletions cumulus/polkadot-omni-node/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pallet-transaction-payment = { workspace = true, default-features = true }
pallet-transaction-payment-rpc-runtime-api = { workspace = true, default-features = true }
sp-inherents = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
sp-consensus-aura = { workspace = true, default-features = true }
sp-io = { workspace = true, default-features = true }
sp-wasm-interface = { workspace = true, default-features = true }
Expand Down
Loading

0 comments on commit b8da8fa

Please sign in to comment.