Skip to content

Commit

Permalink
chore(sync): receive statediffchunk from network
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored and eitanm-starkware committed Jun 19, 2024
1 parent e7ae1b3 commit 560f761
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 191 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ use papyrus_network::{network_manager, NetworkConfig, Protocol};
use papyrus_node::config::NodeConfig;
use papyrus_node::version::VERSION_FULL;
use papyrus_p2p_sync::{P2PSync, P2PSyncConfig, P2PSyncError};
use papyrus_protobuf::sync::{DataOrFin, HeaderQuery, SignedBlockHeader, StateDiffQuery};
use papyrus_protobuf::sync::{
DataOrFin,
HeaderQuery,
SignedBlockHeader,
StateDiffChunk,
StateDiffQuery,
};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand All @@ -32,7 +38,6 @@ use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError, SyncConfig};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_api::state::ThinStateDiff;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -229,7 +234,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
storage_reader: StorageReader,
storage_writer: StorageWriter,
header_channels: SqmrSubscriberChannels<HeaderQuery, DataOrFin<SignedBlockHeader>>,
state_diff_channels: SqmrSubscriberChannels<StateDiffQuery, DataOrFin<ThinStateDiff>>,
state_diff_channels: SqmrSubscriberChannels<StateDiffQuery, DataOrFin<StateDiffChunk>>,
) -> Result<(), P2PSyncError> {
let sync = P2PSync::new(
p2p_sync_config,
Expand All @@ -248,7 +253,7 @@ type NetworkRunReturn = (
BoxFuture<'static, Result<(), NetworkError>>,
Option<(
SqmrSubscriberChannels<HeaderQuery, DataOrFin<SignedBlockHeader>>,
SqmrSubscriberChannels<StateDiffQuery, DataOrFin<ThinStateDiff>>,
SqmrSubscriberChannels<StateDiffQuery, DataOrFin<StateDiffChunk>>,
)>,
String,
);
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ lazy_static.workspace = true
papyrus_storage = { path = "../papyrus_storage", features = ["testing"] }
static_assertions.workspace = true
rand.workspace = true
rand_chacha.workspace = true
test_utils = { path = "../test_utils" }
papyrus_protobuf = { path = "../papyrus_protobuf", features = ["testing"]}
11 changes: 8 additions & 3 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::network_manager::ReportCallback;
use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{DataOrFin, HeaderQuery, SignedBlockHeader, StateDiffQuery};
use papyrus_protobuf::sync::{
DataOrFin,
HeaderQuery,
SignedBlockHeader,
StateDiffChunk,
StateDiffQuery,
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::state::ThinStateDiff;
use tokio_stream::StreamExt;
use tracing::instrument;

Expand Down Expand Up @@ -171,7 +176,7 @@ where
HeaderResponseReceiver: Stream<Item = Response<SignedBlockHeader>> + Unpin + Send + 'static,
StateDiffQuerySender: Sink<StateDiffQuery, Error = SendError> + Unpin + Send + 'static,
// TODO(shahak): Change to StateDiffChunk.
StateDiffResponseReceiver: Stream<Item = Response<ThinStateDiff>> + Unpin + Send + 'static,
StateDiffResponseReceiver: Stream<Item = Response<StateDiffChunk>> + Unpin + Send + 'static,
{
pub fn new(
config: P2PSyncConfig,
Expand Down
69 changes: 42 additions & 27 deletions crates/papyrus_p2p_sync/src/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::future::BoxFuture;
use futures::{FutureExt, Sink, Stream, StreamExt};
use indexmap::IndexMap;
use papyrus_proc_macros::latency_histogram;
use papyrus_protobuf::sync::Query;
use papyrus_protobuf::sync::{Query, StateDiffChunk};
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::state::{StateStorageReader, StateStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
Expand All @@ -32,11 +32,11 @@ pub(crate) struct StateDiffStreamFactory<QuerySender, DataReceiver>(
);

// TODO(shahak): Change to StateDiffChunk.
impl<QuerySender, DataReceiver> DataStreamFactory<QuerySender, DataReceiver, ThinStateDiff>
impl<QuerySender, DataReceiver> DataStreamFactory<QuerySender, DataReceiver, StateDiffChunk>
for StateDiffStreamFactory<QuerySender, DataReceiver>
where
QuerySender: Sink<Query, Error = SendError> + Unpin + Send + 'static,
DataReceiver: Stream<Item = Response<ThinStateDiff>> + Unpin + Send + 'static,
DataReceiver: Stream<Item = Response<StateDiffChunk>> + Unpin + Send + 'static,
{
type Output = (ThinStateDiff, BlockNumber);

Expand All @@ -45,7 +45,7 @@ where

#[latency_histogram("p2p_sync_state_diff_parse_data_for_block_latency_seconds", true)]
fn parse_data_for_block<'a>(
state_diffs_receiver: &'a mut DataReceiver,
state_diff_chunks_receiver: &'a mut DataReceiver,
block_number: BlockNumber,
storage_reader: &'a StorageReader,
) -> BoxFuture<'a, Result<Option<Self::Output>, P2PSyncError>> {
Expand All @@ -64,13 +64,13 @@ where
})?;

while current_state_diff_len < target_state_diff_len {
let (maybe_state_diff_part, _report_callback) =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diffs_receiver.next())
let (maybe_state_diff_chunk, _report_callback) =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diff_chunks_receiver.next())
.await?
.ok_or(P2PSyncError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some(state_diff_part) = maybe_state_diff_part?.0 else {
let Some(state_diff_chunk) = maybe_state_diff_chunk?.0 else {
if current_state_diff_len == 0 {
return Ok(None);
} else {
Expand All @@ -81,13 +81,13 @@ where
}
};
prev_result_len = current_state_diff_len;
if state_diff_part.is_empty() {
if state_diff_chunk.is_empty() {
return Err(P2PSyncError::EmptyStateDiffPart);
}
// It's cheaper to calculate the length of `state_diff_part` than the length of
// It's cheaper to calculate the length of `state_diff_chunk` than the length of
// `result`.
current_state_diff_len += state_diff_part.len();
unite_state_diffs(&mut result, state_diff_part)?;
current_state_diff_len += state_diff_chunk.len();
unite_state_diffs(&mut result, state_diff_chunk)?;
}

if current_state_diff_len != target_state_diff_len {
Expand All @@ -113,26 +113,41 @@ where
#[latency_histogram("p2p_sync_state_diff_unite_state_diffs_latency_seconds", true)]
fn unite_state_diffs(
state_diff: &mut ThinStateDiff,
other_state_diff: ThinStateDiff,
state_diff_chunk: StateDiffChunk,
) -> Result<(), P2PSyncError> {
unite_state_diffs_field(
&mut state_diff.deployed_contracts,
other_state_diff.deployed_contracts,
)?;
unite_state_diffs_field(&mut state_diff.declared_classes, other_state_diff.declared_classes)?;
unite_state_diffs_field(&mut state_diff.nonces, other_state_diff.nonces)?;
unite_state_diffs_field(&mut state_diff.replaced_classes, other_state_diff.replaced_classes)?;

for (other_contract_address, other_storage_diffs) in other_state_diff.storage_diffs {
match state_diff.storage_diffs.get_mut(&other_contract_address) {
Some(storage_diffs) => unite_state_diffs_field(storage_diffs, other_storage_diffs)?,
None => {
state_diff.storage_diffs.insert(other_contract_address, other_storage_diffs);
match state_diff_chunk {
StateDiffChunk::ContractDiff(contract_diff) => {
let mut chunk_deployed_contract = IndexMap::new();
if let Some(class_hash) = contract_diff.class_hash {
chunk_deployed_contract.insert(contract_diff.contract_address, class_hash);
}
unite_state_diffs_field(&mut state_diff.deployed_contracts, chunk_deployed_contract)?;
let mut chunk_nonce = IndexMap::new();
if let Some(nonce) = contract_diff.nonce {
chunk_nonce.insert(contract_diff.contract_address, nonce);
}
unite_state_diffs_field(&mut state_diff.nonces, chunk_nonce)?;
match state_diff.storage_diffs.get_mut(&contract_diff.contract_address) {
Some(storage_diffs) => {
unite_state_diffs_field(storage_diffs, contract_diff.storage_diffs)?
}
None => {
state_diff
.storage_diffs
.insert(contract_diff.contract_address, contract_diff.storage_diffs);
}
}
}
StateDiffChunk::DeclaredClass(declared_class) => {
let mut chunk_declared_class = IndexMap::new();
chunk_declared_class
.insert(declared_class.class_hash, declared_class.compiled_class_hash);
unite_state_diffs_field(&mut state_diff.declared_classes, chunk_declared_class)?;
}
StateDiffChunk::DeprecatedDeclaredClass(deprecated_declared_class) => {
state_diff.deprecated_declared_classes.push(deprecated_declared_class.class_hash);
}
}

state_diff.deprecated_declared_classes.extend(other_state_diff.deprecated_declared_classes);
Ok(())
}

Expand Down
Loading

0 comments on commit 560f761

Please sign in to comment.