This repository has been archived by the owner on Dec 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 89
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Dvir/papyrus consensus context (#2111)
* feat(consensus): add papyrus context * CR fix
- Loading branch information
1 parent
25c2370
commit b9c9593
Showing
7 changed files
with
327 additions
and
3 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
199 changes: 199 additions & 0 deletions
199
crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
#[cfg(test)] | ||
#[path = "papyrus_consensus_context_test.rs"] | ||
mod papyrus_consensus_context_test; | ||
|
||
use core::panic; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
use async_trait::async_trait; | ||
use futures::channel::{mpsc, oneshot}; | ||
use futures::StreamExt; | ||
use papyrus_protobuf::consensus::Proposal; | ||
use papyrus_storage::body::BodyStorageReader; | ||
use papyrus_storage::header::HeaderStorageReader; | ||
use papyrus_storage::{StorageError, StorageReader}; | ||
use starknet_api::block::{BlockHash, BlockNumber}; | ||
use starknet_api::block_hash; | ||
use starknet_api::transaction::Transaction; | ||
use tokio::sync::Mutex; | ||
use tracing::debug; | ||
|
||
use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; | ||
|
||
// TODO: add debug messages and span to the tasks. | ||
|
||
#[derive(Debug, PartialEq, Eq, Clone)] | ||
struct PapyrusConsensusBlock { | ||
content: Vec<Transaction>, | ||
id: BlockHash, | ||
} | ||
|
||
impl ConsensusBlock for PapyrusConsensusBlock { | ||
type ProposalChunk = Transaction; | ||
type ProposalIter = std::vec::IntoIter<Transaction>; | ||
|
||
fn id(&self) -> BlockHash { | ||
self.id | ||
} | ||
|
||
fn proposal_iter(&self) -> Self::ProposalIter { | ||
self.content.clone().into_iter() | ||
} | ||
} | ||
|
||
struct PapyrusConsensusContext { | ||
storage_reader: StorageReader, | ||
broadcast_sender: Arc<Mutex<mpsc::Sender<Proposal>>>, | ||
} | ||
|
||
impl PapyrusConsensusContext { | ||
// TODO(dvir): remove the dead code attribute after we will use this function. | ||
#[allow(dead_code)] | ||
pub fn new(storage_reader: StorageReader, broadcast_sender: mpsc::Sender<Proposal>) -> Self { | ||
Self { storage_reader, broadcast_sender: Arc::new(Mutex::new(broadcast_sender)) } | ||
} | ||
} | ||
|
||
const CHANNEL_SIZE: usize = 5000; | ||
|
||
#[async_trait] | ||
impl ConsensusContext for PapyrusConsensusContext { | ||
type Block = PapyrusConsensusBlock; | ||
|
||
async fn build_proposal( | ||
&self, | ||
height: BlockNumber, | ||
) -> (mpsc::Receiver<Transaction>, oneshot::Receiver<PapyrusConsensusBlock>) { | ||
let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE); | ||
let (fin_sender, fin_receiver) = oneshot::channel(); | ||
|
||
let storage_reader = self.storage_reader.clone(); | ||
tokio::spawn(async move { | ||
// TODO(dvir): consider fix this for the case of reverts. If between the check that the | ||
// block in storage and to getting the transaction was a revert this flow will fail. | ||
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); | ||
|
||
let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); | ||
let transactions = txn | ||
.get_block_transactions(height) | ||
.expect("Get transactions from storage failed") | ||
.expect(&format!( | ||
"Block in {height} was not found in storage despite waiting for it" | ||
)); | ||
|
||
for tx in transactions.clone() { | ||
sender.try_send(tx).expect("Send should succeed"); | ||
} | ||
sender.close_channel(); | ||
|
||
let block_hash = txn | ||
.get_block_header(height) | ||
.expect("Get header from storage failed") | ||
.expect(&format!( | ||
"Block in {height} was not found in storage despite waiting for it" | ||
)) | ||
.block_hash; | ||
fin_sender | ||
.send(PapyrusConsensusBlock { content: transactions, id: block_hash }) | ||
.expect("Send should succeed"); | ||
}); | ||
|
||
(receiver, fin_receiver) | ||
} | ||
|
||
async fn validate_proposal( | ||
&self, | ||
height: BlockNumber, | ||
mut content: mpsc::Receiver<Transaction>, | ||
) -> oneshot::Receiver<PapyrusConsensusBlock> { | ||
let (fin_sender, fin_receiver) = oneshot::channel(); | ||
|
||
let storage_reader = self.storage_reader.clone(); | ||
tokio::spawn(async move { | ||
// TODO(dvir): consider fix this for the case of reverts. If between the check that the | ||
// block in storage and to getting the transaction was a revert this flow will fail. | ||
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); | ||
|
||
let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); | ||
let transactions = txn | ||
.get_block_transactions(height) | ||
.expect("Get transactions from storage failed") | ||
.expect(&format!( | ||
"Block in {height} was not found in storage despite waiting for it" | ||
)); | ||
|
||
for tx in transactions.iter() { | ||
let received_tx = content | ||
.next() | ||
.await | ||
.expect(&format!("Not received transaction equals to {tx:?}")); | ||
if tx != &received_tx { | ||
panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); | ||
} | ||
} | ||
|
||
let block_hash = txn | ||
.get_block_header(height) | ||
.expect("Get header from storage failed") | ||
.expect(&format!( | ||
"Block in {height} was not found in storage despite waiting for it" | ||
)) | ||
.block_hash; | ||
fin_sender | ||
.send(PapyrusConsensusBlock { content: transactions, id: block_hash }) | ||
.expect("Send should succeed"); | ||
}); | ||
|
||
fin_receiver | ||
} | ||
|
||
async fn validators(&self, _height: BlockNumber) -> Vec<ValidatorId> { | ||
vec![0u8.into(), 1u8.into(), 2u8.into()] | ||
} | ||
|
||
fn proposer(&self, _validators: &Vec<ValidatorId>, _height: BlockNumber) -> ValidatorId { | ||
0u8.into() | ||
} | ||
|
||
async fn propose( | ||
&self, | ||
init: ProposalInit, | ||
mut content_receiver: mpsc::Receiver<Transaction>, | ||
fin_receiver: oneshot::Receiver<BlockHash>, | ||
) -> Result<(), ConsensusError> { | ||
let broadcast_sender = self.broadcast_sender.clone(); | ||
|
||
tokio::spawn(async move { | ||
let mut transactions = Vec::new(); | ||
while let Some(tx) = content_receiver.next().await { | ||
transactions.push(tx); | ||
} | ||
|
||
let block_hash = | ||
fin_receiver.await.expect("Failed to get block hash from fin receiver"); | ||
let proposal = Proposal { | ||
height: init.height.0, | ||
proposer: init.proposer, | ||
transactions, | ||
block_hash, | ||
}; | ||
|
||
broadcast_sender.lock().await.try_send(proposal).expect("Failed to send proposal"); | ||
}); | ||
Ok(()) | ||
} | ||
} | ||
|
||
const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10); | ||
|
||
async fn wait_for_block( | ||
storage_reader: &StorageReader, | ||
height: BlockNumber, | ||
) -> Result<(), StorageError> { | ||
while storage_reader.begin_ro_txn()?.get_body_marker()? <= height { | ||
debug!("Waiting for block {height:?} to continue consensus"); | ||
tokio::time::sleep(SLEEP_BETWEEN_CHECK_FOR_BLOCK).await; | ||
} | ||
Ok(()) | ||
} |
116 changes: 116 additions & 0 deletions
116
crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
use futures::channel::{mpsc, oneshot}; | ||
use futures::StreamExt; | ||
use papyrus_protobuf::consensus::Proposal; | ||
use papyrus_storage::body::BodyStorageWriter; | ||
use papyrus_storage::header::HeaderStorageWriter; | ||
use papyrus_storage::test_utils::get_test_storage; | ||
use starknet_api::block::Block; | ||
use starknet_api::core::ContractAddress; | ||
use starknet_api::transaction::Transaction; | ||
use test_utils::get_test_block; | ||
|
||
use crate::papyrus_consensus_context::PapyrusConsensusContext; | ||
use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit}; | ||
|
||
// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing | ||
// happen until it should (for example, not creating a block before we have it in storage). | ||
|
||
const TEST_CHANNEL_SIZE: usize = 10; | ||
|
||
#[tokio::test] | ||
async fn build_proposal() { | ||
let (block, papyrus_context, _network_receiver) = test_setup(); | ||
let block_number = block.header.block_number; | ||
|
||
let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await; | ||
|
||
let mut transactions = Vec::new(); | ||
while let Some(tx) = proposal_receiver.next().await { | ||
transactions.push(tx); | ||
} | ||
assert_eq!(transactions, block.body.transactions); | ||
|
||
let fin = fin_receiver.await.unwrap(); | ||
assert_eq!(fin.id(), block.header.block_hash); | ||
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions); | ||
} | ||
|
||
#[tokio::test] | ||
async fn validate_proposal_success() { | ||
let (block, papyrus_context, _network_receiver) = test_setup(); | ||
let block_number = block.header.block_number; | ||
|
||
let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); | ||
for tx in block.body.transactions.clone() { | ||
validate_sender.try_send(tx).unwrap(); | ||
} | ||
validate_sender.close_channel(); | ||
|
||
let fin = | ||
papyrus_context.validate_proposal(block_number, validate_receiver).await.await.unwrap(); | ||
|
||
assert_eq!(fin.id(), block.header.block_hash); | ||
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions); | ||
} | ||
|
||
#[tokio::test] | ||
async fn validate_proposal_fail() { | ||
let (block, papyrus_context, _network_receiver) = test_setup(); | ||
let block_number = block.header.block_number; | ||
|
||
let different_block = get_test_block(4, None, None, None); | ||
let (mut validate_sender, validate_receiver) = mpsc::channel(5000); | ||
for tx in different_block.body.transactions.clone() { | ||
validate_sender.try_send(tx).unwrap(); | ||
} | ||
validate_sender.close_channel(); | ||
|
||
let fin = papyrus_context.validate_proposal(block_number, validate_receiver).await.await; | ||
assert_eq!(fin, Err(oneshot::Canceled)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn propose() { | ||
let (block, papyrus_context, mut network_receiver) = test_setup(); | ||
let block_number = block.header.block_number; | ||
|
||
let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); | ||
for tx in block.body.transactions.clone() { | ||
content_sender.try_send(tx).unwrap(); | ||
} | ||
content_sender.close_channel(); | ||
|
||
let (fin_sender, fin_receiver) = oneshot::channel(); | ||
fin_sender.send(block.header.block_hash).unwrap(); | ||
|
||
let proposal_init = ProposalInit { height: block_number, proposer: ContractAddress::default() }; | ||
papyrus_context.propose(proposal_init.clone(), content_receiver, fin_receiver).await.unwrap(); | ||
|
||
let expected_proposal = Proposal { | ||
height: proposal_init.height.0, | ||
proposer: proposal_init.proposer, | ||
transactions: block.body.transactions, | ||
block_hash: block.header.block_hash, | ||
}; | ||
|
||
assert_eq!(network_receiver.next().await.unwrap(), expected_proposal); | ||
} | ||
|
||
fn test_setup() -> (Block, PapyrusConsensusContext, mpsc::Receiver<Proposal>) { | ||
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); | ||
let block = get_test_block(5, None, None, None); | ||
let block_number = block.header.block_number; | ||
storage_writer | ||
.begin_rw_txn() | ||
.unwrap() | ||
.append_header(block_number, &block.header) | ||
.unwrap() | ||
.append_body(block_number, block.body.clone()) | ||
.unwrap() | ||
.commit() | ||
.unwrap(); | ||
|
||
let (network_sender, network_reciver) = mpsc::channel(TEST_CHANNEL_SIZE); | ||
let papyrus_context = PapyrusConsensusContext::new(storage_reader.clone(), network_sender); | ||
(block, papyrus_context, network_reciver) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters