diff --git a/Cargo.lock b/Cargo.lock index b529fdfad1..9b96c88ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5945,8 +5945,10 @@ dependencies = [ "mockall", "papyrus_network", "papyrus_protobuf", + "papyrus_storage", "starknet-types-core", "starknet_api", + "test_utils", "thiserror", "tokio", "tracing", diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 6ceac66c6b..a5df223d40 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -2,6 +2,7 @@ use starknet_api::block::BlockHash; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; +#[derive(Debug, Clone, Eq, PartialEq)] pub struct Proposal { pub height: u64, pub proposer: ContractAddress, @@ -9,6 +10,7 @@ pub struct Proposal { pub block_hash: BlockHash, } +#[derive(Debug, Clone, Eq, PartialEq)] pub enum ConsensusMessage { Proposal(Proposal), } diff --git a/crates/sequencing/papyrus_consensus/Cargo.toml b/crates/sequencing/papyrus_consensus/Cargo.toml index 6f3609417f..42b32c54ff 100644 --- a/crates/sequencing/papyrus_consensus/Cargo.toml +++ b/crates/sequencing/papyrus_consensus/Cargo.toml @@ -11,6 +11,7 @@ async-trait.workspace = true futures.workspace = true papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2" } papyrus_protobuf = { path = "../../papyrus_protobuf", version = "0.4.0-dev.2" } +papyrus_storage = { path = "../../papyrus_storage", version = "0.4.0-dev.2" } starknet_api.workspace = true starknet-types-core.workspace = true thiserror.workspace = true @@ -19,3 +20,5 @@ tracing.workspace = true [dev-dependencies] mockall.workspace = true +papyrus_storage = { path = "../../papyrus_storage", features = ["testing"] } +test_utils = { path = "../../test_utils" } diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index 79244f9e71..8cd4995ba6 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -8,6 +8,7 @@ use tracing::info; use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; // TODO(matan): Remove dead code allowance at the end of milestone 1. +pub mod papyrus_consensus_context; #[allow(dead_code)] pub mod single_height_consensus; #[cfg(test)] @@ -28,8 +29,9 @@ where { let mut current_height = start_height; loop { - info!("Starting consensus for height {start_height}"); - let mut shc = SingleHeightConsensus::new(start_height, context.clone(), validator_id).await; + info!("Starting consensus for height {current_height}"); + let mut shc = + SingleHeightConsensus::new(current_height, context.clone(), validator_id).await; let block = if let Some(block) = shc.start().await? { info!("Proposer flow height {current_height}"); diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs new file mode 100644 index 0000000000..c14bd3b05f --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -0,0 +1,199 @@ +#[cfg(test)] +#[path = "papyrus_consensus_context_test.rs"] +mod papyrus_consensus_context_test; + +use core::panic; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::StreamExt; +use papyrus_protobuf::consensus::Proposal; +use papyrus_storage::body::BodyStorageReader; +use papyrus_storage::header::HeaderStorageReader; +use papyrus_storage::{StorageError, StorageReader}; +use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::block_hash; +use starknet_api::transaction::Transaction; +use tokio::sync::Mutex; +use tracing::debug; + +use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; + +// TODO: add debug messages and span to the tasks. + +#[derive(Debug, PartialEq, Eq, Clone)] +struct PapyrusConsensusBlock { + content: Vec, + id: BlockHash, +} + +impl ConsensusBlock for PapyrusConsensusBlock { + type ProposalChunk = Transaction; + type ProposalIter = std::vec::IntoIter; + + fn id(&self) -> BlockHash { + self.id + } + + fn proposal_iter(&self) -> Self::ProposalIter { + self.content.clone().into_iter() + } +} + +struct PapyrusConsensusContext { + storage_reader: StorageReader, + broadcast_sender: Arc>>, +} + +impl PapyrusConsensusContext { + // TODO(dvir): remove the dead code attribute after we will use this function. + #[allow(dead_code)] + pub fn new(storage_reader: StorageReader, broadcast_sender: mpsc::Sender) -> Self { + Self { storage_reader, broadcast_sender: Arc::new(Mutex::new(broadcast_sender)) } + } +} + +const CHANNEL_SIZE: usize = 5000; + +#[async_trait] +impl ConsensusContext for PapyrusConsensusContext { + type Block = PapyrusConsensusBlock; + + async fn build_proposal( + &self, + height: BlockNumber, + ) -> (mpsc::Receiver, oneshot::Receiver) { + let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE); + let (fin_sender, fin_receiver) = oneshot::channel(); + + let storage_reader = self.storage_reader.clone(); + tokio::spawn(async move { + // TODO(dvir): consider fix this for the case of reverts. If between the check that the + // block in storage and to getting the transaction was a revert this flow will fail. + wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); + + let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); + let transactions = txn + .get_block_transactions(height) + .expect("Get transactions from storage failed") + .expect(&format!( + "Block in {height} was not found in storage despite waiting for it" + )); + + for tx in transactions.clone() { + sender.try_send(tx).expect("Send should succeed"); + } + sender.close_channel(); + + let block_hash = txn + .get_block_header(height) + .expect("Get header from storage failed") + .expect(&format!( + "Block in {height} was not found in storage despite waiting for it" + )) + .block_hash; + fin_sender + .send(PapyrusConsensusBlock { content: transactions, id: block_hash }) + .expect("Send should succeed"); + }); + + (receiver, fin_receiver) + } + + async fn validate_proposal( + &self, + height: BlockNumber, + mut content: mpsc::Receiver, + ) -> oneshot::Receiver { + let (fin_sender, fin_receiver) = oneshot::channel(); + + let storage_reader = self.storage_reader.clone(); + tokio::spawn(async move { + // TODO(dvir): consider fix this for the case of reverts. If between the check that the + // block in storage and to getting the transaction was a revert this flow will fail. + wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); + + let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); + let transactions = txn + .get_block_transactions(height) + .expect("Get transactions from storage failed") + .expect(&format!( + "Block in {height} was not found in storage despite waiting for it" + )); + + for tx in transactions.iter() { + let received_tx = content + .next() + .await + .expect(&format!("Not received transaction equals to {tx:?}")); + if tx != &received_tx { + panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); + } + } + + let block_hash = txn + .get_block_header(height) + .expect("Get header from storage failed") + .expect(&format!( + "Block in {height} was not found in storage despite waiting for it" + )) + .block_hash; + fin_sender + .send(PapyrusConsensusBlock { content: transactions, id: block_hash }) + .expect("Send should succeed"); + }); + + fin_receiver + } + + async fn validators(&self, _height: BlockNumber) -> Vec { + vec![0u8.into(), 1u8.into(), 2u8.into()] + } + + fn proposer(&self, _validators: &Vec, _height: BlockNumber) -> ValidatorId { + 0u8.into() + } + + async fn propose( + &self, + init: ProposalInit, + mut content_receiver: mpsc::Receiver, + fin_receiver: oneshot::Receiver, + ) -> Result<(), ConsensusError> { + let broadcast_sender = self.broadcast_sender.clone(); + + tokio::spawn(async move { + let mut transactions = Vec::new(); + while let Some(tx) = content_receiver.next().await { + transactions.push(tx); + } + + let block_hash = + fin_receiver.await.expect("Failed to get block hash from fin receiver"); + let proposal = Proposal { + height: init.height.0, + proposer: init.proposer, + transactions, + block_hash, + }; + + broadcast_sender.lock().await.try_send(proposal).expect("Failed to send proposal"); + }); + Ok(()) + } +} + +const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10); + +async fn wait_for_block( + storage_reader: &StorageReader, + height: BlockNumber, +) -> Result<(), StorageError> { + while storage_reader.begin_ro_txn()?.get_body_marker()? <= height { + debug!("Waiting for block {height:?} to continue consensus"); + tokio::time::sleep(SLEEP_BETWEEN_CHECK_FOR_BLOCK).await; + } + Ok(()) +} diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs new file mode 100644 index 0000000000..4298e12125 --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs @@ -0,0 +1,116 @@ +use futures::channel::{mpsc, oneshot}; +use futures::StreamExt; +use papyrus_protobuf::consensus::Proposal; +use papyrus_storage::body::BodyStorageWriter; +use papyrus_storage::header::HeaderStorageWriter; +use papyrus_storage::test_utils::get_test_storage; +use starknet_api::block::Block; +use starknet_api::core::ContractAddress; +use starknet_api::transaction::Transaction; +use test_utils::get_test_block; + +use crate::papyrus_consensus_context::PapyrusConsensusContext; +use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit}; + +// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing +// happen until it should (for example, not creating a block before we have it in storage). + +const TEST_CHANNEL_SIZE: usize = 10; + +#[tokio::test] +async fn build_proposal() { + let (block, papyrus_context, _network_receiver) = test_setup(); + let block_number = block.header.block_number; + + let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await; + + let mut transactions = Vec::new(); + while let Some(tx) = proposal_receiver.next().await { + transactions.push(tx); + } + assert_eq!(transactions, block.body.transactions); + + let fin = fin_receiver.await.unwrap(); + assert_eq!(fin.id(), block.header.block_hash); + assert_eq!(fin.proposal_iter().collect::>(), block.body.transactions); +} + +#[tokio::test] +async fn validate_proposal_success() { + let (block, papyrus_context, _network_receiver) = test_setup(); + let block_number = block.header.block_number; + + let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); + for tx in block.body.transactions.clone() { + validate_sender.try_send(tx).unwrap(); + } + validate_sender.close_channel(); + + let fin = + papyrus_context.validate_proposal(block_number, validate_receiver).await.await.unwrap(); + + assert_eq!(fin.id(), block.header.block_hash); + assert_eq!(fin.proposal_iter().collect::>(), block.body.transactions); +} + +#[tokio::test] +async fn validate_proposal_fail() { + let (block, papyrus_context, _network_receiver) = test_setup(); + let block_number = block.header.block_number; + + let different_block = get_test_block(4, None, None, None); + let (mut validate_sender, validate_receiver) = mpsc::channel(5000); + for tx in different_block.body.transactions.clone() { + validate_sender.try_send(tx).unwrap(); + } + validate_sender.close_channel(); + + let fin = papyrus_context.validate_proposal(block_number, validate_receiver).await.await; + assert_eq!(fin, Err(oneshot::Canceled)); +} + +#[tokio::test] +async fn propose() { + let (block, papyrus_context, mut network_receiver) = test_setup(); + let block_number = block.header.block_number; + + let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); + for tx in block.body.transactions.clone() { + content_sender.try_send(tx).unwrap(); + } + content_sender.close_channel(); + + let (fin_sender, fin_receiver) = oneshot::channel(); + fin_sender.send(block.header.block_hash).unwrap(); + + let proposal_init = ProposalInit { height: block_number, proposer: ContractAddress::default() }; + papyrus_context.propose(proposal_init.clone(), content_receiver, fin_receiver).await.unwrap(); + + let expected_proposal = Proposal { + height: proposal_init.height.0, + proposer: proposal_init.proposer, + transactions: block.body.transactions, + block_hash: block.header.block_hash, + }; + + assert_eq!(network_receiver.next().await.unwrap(), expected_proposal); +} + +fn test_setup() -> (Block, PapyrusConsensusContext, mpsc::Receiver) { + let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); + let block = get_test_block(5, None, None, None); + let block_number = block.header.block_number; + storage_writer + .begin_rw_txn() + .unwrap() + .append_header(block_number, &block.header) + .unwrap() + .append_body(block_number, block.body.clone()) + .unwrap() + .commit() + .unwrap(); + + let (network_sender, network_reciver) = mpsc::channel(TEST_CHANNEL_SIZE); + let papyrus_context = PapyrusConsensusContext::new(storage_reader.clone(), network_sender); + (block, papyrus_context, network_reciver) +} diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index e4fad52d84..0570e28c5c 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -140,7 +140,7 @@ pub trait ConsensusContext: Send + Sync { ) -> Result<(), ConsensusError>; } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub struct ProposalInit { pub height: BlockNumber, pub proposer: ValidatorId,