Skip to content

Commit

Permalink
chore(sync): receive statediffchunk from network
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored and eitanm-starkware committed Jul 1, 2024
1 parent 19f9af0 commit f88976e
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 184 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ lazy_static.workspace = true
papyrus_storage = { path = "../papyrus_storage", features = ["testing"] }
static_assertions.workspace = true
rand.workspace = true
rand_chacha.workspace = true
test_utils = { path = "../test_utils" }
papyrus_protobuf = { path = "../papyrus_protobuf", features = ["testing"]}
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use papyrus_protobuf::sync::{
HeaderQuery,
Query,
SignedBlockHeader,
StateDiffChunk,
StateDiffQuery,
TransactionQuery,
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::{Transaction, TransactionOutput};
use state_diff::StateDiffStreamFactory;
use stream_factory::{DataStreamFactory, DataStreamResult};
Expand Down Expand Up @@ -164,7 +164,7 @@ type ResponseReceiver<T> = Box<dyn Stream<Item = Response<T>> + Unpin + Send + '
type HeaderQuerySender = QuerySender<HeaderQuery>;
type HeaderResponseReceiver = ResponseReceiver<SignedBlockHeader>;
type StateDiffQuerySender = QuerySender<StateDiffQuery>;
type StateDiffResponseReceiver = ResponseReceiver<ThinStateDiff>;
type StateDiffResponseReceiver = ResponseReceiver<StateDiffChunk>;
type TransactionQuerySender = QuerySender<TransactionQuery>;
type TransactionResponseReceiver = ResponseReceiver<(Transaction, TransactionOutput)>;

Expand Down
70 changes: 43 additions & 27 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use indexmap::IndexMap;
use papyrus_proc_macros::latency_histogram;
use papyrus_protobuf::sync::StateDiffChunk;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::state::{StateStorageReader, StateStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::state::ThinStateDiff;

use super::stream_factory::{BlockData, BlockNumberLimit, DataStreamFactory};
use super::{P2PSyncError, ResponseReceiver, NETWORK_DATA_TIMEOUT};
use super::ResponseReceiver;
use crate::client::stream_factory::{BlockData, BlockNumberLimit, DataStreamFactory};
use crate::client::{P2PSyncError, NETWORK_DATA_TIMEOUT};

impl BlockData for (ThinStateDiff, BlockNumber) {
#[latency_histogram("p2p_sync_state_diff_write_to_storage_latency_seconds", true)]
Expand All @@ -26,16 +28,15 @@ impl BlockData for (ThinStateDiff, BlockNumber) {

pub(crate) struct StateDiffStreamFactory;

// TODO(shahak): Change to StateDiffChunk.
impl DataStreamFactory<ThinStateDiff> for StateDiffStreamFactory {
impl DataStreamFactory<StateDiffChunk> for StateDiffStreamFactory {
type Output = (ThinStateDiff, BlockNumber);

const TYPE_DESCRIPTION: &'static str = "state diffs";
const BLOCK_NUMBER_LIMIT: BlockNumberLimit = BlockNumberLimit::HeaderMarker;

#[latency_histogram("p2p_sync_state_diff_parse_data_for_block_latency_seconds", true)]
fn parse_data_for_block<'a>(
state_diffs_receiver: &'a mut ResponseReceiver<ThinStateDiff>,
state_diff_chunks_receiver: &'a mut ResponseReceiver<StateDiffChunk>,
block_number: BlockNumber,
storage_reader: &'a StorageReader,
) -> BoxFuture<'a, Result<Option<Self::Output>, P2PSyncError>> {
Expand All @@ -54,13 +55,13 @@ impl DataStreamFactory<ThinStateDiff> for StateDiffStreamFactory {
})?;

while current_state_diff_len < target_state_diff_len {
let (maybe_state_diff_part, _report_callback) =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diffs_receiver.next())
let (maybe_state_diff_chunk, _report_callback) =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diff_chunks_receiver.next())
.await?
.ok_or(P2PSyncError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some(state_diff_part) = maybe_state_diff_part?.0 else {
let Some(state_diff_chunk) = maybe_state_diff_chunk?.0 else {
if current_state_diff_len == 0 {
return Ok(None);
} else {
Expand All @@ -71,13 +72,13 @@ impl DataStreamFactory<ThinStateDiff> for StateDiffStreamFactory {
}
};
prev_result_len = current_state_diff_len;
if state_diff_part.is_empty() {
if state_diff_chunk.is_empty() {
return Err(P2PSyncError::EmptyStateDiffPart);
}
// It's cheaper to calculate the length of `state_diff_part` than the length of
// `result`.
current_state_diff_len += state_diff_part.len();
unite_state_diffs(&mut result, state_diff_part)?;
current_state_diff_len += state_diff_chunk.len();
unite_state_diffs(&mut result, state_diff_chunk)?;
}

if current_state_diff_len != target_state_diff_len {
Expand All @@ -103,26 +104,41 @@ impl DataStreamFactory<ThinStateDiff> for StateDiffStreamFactory {
#[latency_histogram("p2p_sync_state_diff_unite_state_diffs_latency_seconds", true)]
fn unite_state_diffs(
state_diff: &mut ThinStateDiff,
other_state_diff: ThinStateDiff,
state_diff_chunk: StateDiffChunk,
) -> Result<(), P2PSyncError> {
unite_state_diffs_field(
&mut state_diff.deployed_contracts,
other_state_diff.deployed_contracts,
)?;
unite_state_diffs_field(&mut state_diff.declared_classes, other_state_diff.declared_classes)?;
unite_state_diffs_field(&mut state_diff.nonces, other_state_diff.nonces)?;
unite_state_diffs_field(&mut state_diff.replaced_classes, other_state_diff.replaced_classes)?;

for (other_contract_address, other_storage_diffs) in other_state_diff.storage_diffs {
match state_diff.storage_diffs.get_mut(&other_contract_address) {
Some(storage_diffs) => unite_state_diffs_field(storage_diffs, other_storage_diffs)?,
None => {
state_diff.storage_diffs.insert(other_contract_address, other_storage_diffs);
match state_diff_chunk {
StateDiffChunk::ContractDiff(contract_diff) => {
let mut chunk_deployed_contract = IndexMap::new();
if let Some(class_hash) = contract_diff.class_hash {
chunk_deployed_contract.insert(contract_diff.contract_address, class_hash);
}
unite_state_diffs_field(&mut state_diff.deployed_contracts, chunk_deployed_contract)?;
let mut chunk_nonce = IndexMap::new();
if let Some(nonce) = contract_diff.nonce {
chunk_nonce.insert(contract_diff.contract_address, nonce);
}
unite_state_diffs_field(&mut state_diff.nonces, chunk_nonce)?;
match state_diff.storage_diffs.get_mut(&contract_diff.contract_address) {
Some(storage_diffs) => {
unite_state_diffs_field(storage_diffs, contract_diff.storage_diffs)?
}
None => {
state_diff
.storage_diffs
.insert(contract_diff.contract_address, contract_diff.storage_diffs);
}
}
}
StateDiffChunk::DeclaredClass(declared_class) => {
let mut chunk_declared_class = IndexMap::new();
chunk_declared_class
.insert(declared_class.class_hash, declared_class.compiled_class_hash);
unite_state_diffs_field(&mut state_diff.declared_classes, chunk_declared_class)?;
}
StateDiffChunk::DeprecatedDeclaredClass(deprecated_declared_class) => {
state_diff.deprecated_declared_classes.push(deprecated_declared_class.class_hash);
}
}

state_diff.deprecated_declared_classes.extend(other_state_diff.deprecated_declared_classes);
Ok(())
}

Expand Down
Loading

0 comments on commit f88976e

Please sign in to comment.