Skip to content

Commit

Permalink
refactor(network): remove Data from db executor
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Jun 19, 2024
1 parent 0b4ac41 commit ecb746d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 216 deletions.
168 changes: 48 additions & 120 deletions crates/papyrus_network/src/db_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use std::vec;

use async_trait::async_trait;
use bytes::BufMut;
use futures::channel::mpsc::Sender;
use futures::future::{pending, poll_fn};
#[cfg(test)]
use mockall::automock;
use papyrus_protobuf::protobuf;
use papyrus_protobuf::sync::{
BlockHashOrNumber,
ContractDiff,
Expand All @@ -20,56 +16,15 @@ use papyrus_protobuf::sync::{
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{db, StorageReader, StorageTxn};
use prost::Message;
use starknet_api::block::BlockNumber;
use starknet_api::state::ThinStateDiff;
use tracing::error;

use crate::DataType;

#[cfg(test)]
mod test;

mod utils;

#[derive(thiserror::Error, Debug)]
#[error("Failed to encode data")]
pub struct DataEncodingError;

#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
#[derive(Clone)]
pub enum Data {
BlockHeaderAndSignature(DataOrFin<SignedBlockHeader>),
StateDiffChunk(DataOrFin<StateDiffChunk>),
}

impl Default for Data {
fn default() -> Self {
// TODO: consider this default data type.
Self::BlockHeaderAndSignature(DataOrFin(None))
}
}

impl Data {
pub fn encode<B>(self, buf: &mut B) -> Result<(), DataEncodingError>
where
B: BufMut,
{
match self {
Data::BlockHeaderAndSignature(maybe_signed_block_header) => {
let block_headers_response =
protobuf::BlockHeadersResponse::from(maybe_signed_block_header);
block_headers_response.encode(buf).map_err(|_| DataEncodingError)
}
Data::StateDiffChunk(maybe_state_diff_chunk) => {
let state_diffs_response =
protobuf::StateDiffsResponse::from(maybe_state_diff_chunk);
state_diffs_response.encode(buf).map_err(|_| DataEncodingError)
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum DBExecutorError {
#[error(transparent)]
Expand Down Expand Up @@ -106,11 +61,10 @@ impl DBExecutorError {
pub trait DBExecutorTrait {
/// Send a query to be executed in the DBExecutor. The query will be run concurrently with the
/// calling code and the result will be over the given channel.
fn register_query(
&mut self,
fn register_query<Data: FetchBlockDataFromDb + Send + 'static>(
&self,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
sender: Sender<Data>,
sender: Sender<DataOrFin<Data>>,
);

/// Polls incoming queries.
Expand All @@ -130,16 +84,14 @@ impl DBExecutor {

#[async_trait]
impl DBExecutorTrait for DBExecutor {
fn register_query(
&mut self,
fn register_query<Data: FetchBlockDataFromDb + Send + 'static>(
&self,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
sender: Sender<Data>,
sender: Sender<DataOrFin<Data>>,
) {
let storage_reader_clone = self.storage_reader.clone();
tokio::task::spawn(async move {
let result =
send_data_for_query(storage_reader_clone, query.clone(), data_type, sender).await;
let result = send_data_for_query(storage_reader_clone, query.clone(), sender).await;
if let Err(error) = result {
if error.should_log_in_error_level() {
error!("Running inbound query {query:?} failed on {error:?}");
Expand All @@ -158,69 +110,49 @@ impl DBExecutorTrait for DBExecutor {
}
}

#[cfg_attr(test, automock)]
// we need to tell clippy to ignore the "needless" lifetime warning because it's not true.
// we do need the lifetime for the automock, following clippy's suggestion will break the code.
#[allow(clippy::needless_lifetimes)]
pub trait FetchBlockDataFromDb {
fn fetch_block_data_from_db<'a>(
&self,
pub trait FetchBlockDataFromDb: Sized {
fn fetch_block_data_from_db(
block_number: BlockNumber,
txn: &StorageTxn<'a, db::RO>,
) -> Result<Vec<Data>, DBExecutorError>;

fn fin(&self) -> Data;
txn: &StorageTxn<'_, db::RO>,
) -> Result<Vec<Self>, DBExecutorError>;
}

impl FetchBlockDataFromDb for DataType {
impl FetchBlockDataFromDb for SignedBlockHeader {
fn fetch_block_data_from_db(
&self,
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
) -> Result<Vec<Data>, DBExecutorError> {
match self {
DataType::SignedBlockHeader => {
let mut header =
txn.get_block_header(block_number)?.ok_or(DBExecutorError::BlockNotFound {
) -> Result<Vec<Self>, DBExecutorError> {
let mut header =
txn.get_block_header(block_number)?.ok_or(DBExecutorError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?;
// TODO(shahak) Remove this once central sync fills the state_diff_length field.
if header.state_diff_length.is_none() {
header.state_diff_length = Some(
txn.get_state_diff(block_number)?
.ok_or(DBExecutorError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?;
// TODO(shahak) Remove this once central sync fills the state_diff_length field.
if header.state_diff_length.is_none() {
header.state_diff_length = Some(
txn.get_state_diff(block_number)?
.ok_or(DBExecutorError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?
.len(),
);
}
let signature = txn
.get_block_signature(block_number)?
.ok_or(DBExecutorError::SignatureNotFound { block_number })?;
Ok(vec![Data::BlockHeaderAndSignature(DataOrFin(Some(SignedBlockHeader {
block_header: header,
signatures: vec![signature],
})))])
}
DataType::StateDiff => {
let thin_state_diff =
txn.get_state_diff(block_number)?.ok_or(DBExecutorError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?;
let vec_data = split_thin_state_diff(thin_state_diff)
.into_iter()
.map(|state_diff_chunk| Data::StateDiffChunk(DataOrFin(Some(state_diff_chunk))))
.collect();
Ok(vec_data)
}
})?
.len(),
);
}
let signature = txn
.get_block_signature(block_number)?
.ok_or(DBExecutorError::SignatureNotFound { block_number })?;
Ok(vec![SignedBlockHeader { block_header: header, signatures: vec![signature] }])
}
}

fn fin(&self) -> Data {
match self {
DataType::SignedBlockHeader => Data::BlockHeaderAndSignature(DataOrFin(None)),
DataType::StateDiff => Data::StateDiffChunk(DataOrFin(None)),
}
impl FetchBlockDataFromDb for StateDiffChunk {
fn fetch_block_data_from_db(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
) -> Result<Vec<Self>, DBExecutorError> {
let thin_state_diff =
txn.get_state_diff(block_number)?.ok_or(DBExecutorError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?;
Ok(split_thin_state_diff(thin_state_diff))
}
}

Expand Down Expand Up @@ -265,26 +197,22 @@ pub fn split_thin_state_diff(thin_state_diff: ThinStateDiff) -> Vec<StateDiffChu
state_diff_chunks
}

async fn send_data_for_query(
async fn send_data_for_query<Data: FetchBlockDataFromDb + Send + 'static>(
storage_reader: StorageReader,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
mut sender: Sender<Data>,
mut sender: Sender<DataOrFin<Data>>,
) -> Result<(), DBExecutorError> {
let fin = data_type.fin();
// If this function fails, we still want to send fin before failing.
let result =
send_data_without_fin_for_query(&storage_reader, query, data_type, &mut sender).await;
let result = send_data_without_fin_for_query(&storage_reader, query, &mut sender).await;
poll_fn(|cx| sender.poll_ready(cx)).await?;
sender.start_send(fin)?;
sender.start_send(DataOrFin(None))?;
result
}

async fn send_data_without_fin_for_query(
async fn send_data_without_fin_for_query<Data: FetchBlockDataFromDb + Send + 'static>(
storage_reader: &StorageReader,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
sender: &mut Sender<Data>,
sender: &mut Sender<DataOrFin<Data>>,
) -> Result<(), DBExecutorError> {
let txn = storage_reader.begin_ro_txn()?;
let start_block_number = match query.start_block {
Expand All @@ -300,12 +228,12 @@ async fn send_data_without_fin_for_query(
for block_counter in 0..query.limit {
let block_number =
BlockNumber(utils::calculate_block_number(&query, start_block_number, block_counter)?);
let data_vec = data_type.fetch_block_data_from_db(block_number, &txn)?;
let data_vec = Data::fetch_block_data_from_db(block_number, &txn)?;
// Using poll_fn because Sender::poll_ready is not a future
poll_fn(|cx| sender.poll_ready(cx)).await?;
for data in data_vec {
// TODO: consider implement retry mechanism.
sender.start_send(data)?;
sender.start_send(DataOrFin(Some(data)))?;
}
}
Ok(())
Expand Down
Loading

0 comments on commit ecb746d

Please sign in to comment.