Skip to content

Commit

Permalink
add PutTrie support to data_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
EdHastingsCasperAssociation committed Feb 12, 2024
1 parent b9ad6d8 commit 9c65f79
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 151 deletions.
6 changes: 4 additions & 2 deletions execution_engine/src/engine_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use casper_storage::{
balance::BalanceResult,
get_bids::{BidsRequest, BidsResult},
query::{QueryRequest, QueryResult},
DataAccessLayer, EraValidatorsRequest, EraValidatorsResult, TrieRequest,
DataAccessLayer, EraValidatorsRequest, EraValidatorsResult, PutTrieRequest, TrieRequest,
},
global_state::{
self,
Expand Down Expand Up @@ -2228,8 +2228,10 @@ where
Ok(ret) => ret,
Err(err) => return Err(err.into()),
};
let raw = TrieRaw::new(trie_bytes.into());
let req = PutTrieRequest::new(raw);
if missing_children.is_empty() {
Ok(self.state.put_trie(trie_bytes)?)
Ok(self.state.put_trie(req).as_legacy()?)
} else {
Err(Error::MissingTrieNodeChildren(missing_children))
}
Expand Down
92 changes: 47 additions & 45 deletions node/src/components/block_synchronizer/global_state_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use serde::Serialize;
use thiserror::Error;
use tracing::{debug, error, warn};

use casper_execution_engine::engine_state;
use casper_storage::global_state::trie::TrieRaw;
use casper_storage::{
data_access_layer::{PutTrieRequest, PutTrieResult},
global_state::{error::Error as GlobalStateError, trie::TrieRaw},
};
use casper_types::{BlockHash, Digest, DisplayIter, Timestamp};

use super::{TrieAccumulator, TrieAccumulatorError, TrieAccumulatorEvent, TrieAccumulatorResponse};
Expand Down Expand Up @@ -66,8 +68,8 @@ impl fmt::Display for TrieHash {
pub(crate) enum Error {
#[error("trie accumulator encountered an error while fetching a trie; unreliable peers {}", DisplayIter::new(.0))]
TrieAccumulator(Vec<NodeId>),
#[error("ContractRuntime failed to put a trie into global state: {0}; unreliable peers {}", DisplayIter::new(.1))]
PutTrie(engine_state::Error, Vec<NodeId>),
#[error("Failed to persist trie element in global state: {0}; unreliable peers {}", DisplayIter::new(.1))]
PutTrie(GlobalStateError, Vec<NodeId>),
#[error("no peers available to ask for a trie")]
NoPeersAvailable,
#[error("received request for {hash_requested} while syncing another root hash: {hash_being_synced}")]
Expand Down Expand Up @@ -110,10 +112,10 @@ pub(crate) enum Event {
trie_accumulator_result: Result<TrieAccumulatorResponse, TrieAccumulatorError>,
},
PutTrieResult {
trie_hash: TrieHash,
trie_raw: TrieRaw,
#[serde(skip)]
put_trie_result: Result<TrieHash, engine_state::Error>,
raw: TrieRaw,
#[serde(skip)]
result: PutTrieResult,
},
#[from]
TrieAccumulatorEvent(TrieAccumulatorEvent),
Expand Down Expand Up @@ -420,12 +422,12 @@ impl GlobalStateSynchronizer {

self.touch();

let request = PutTrieRequest::new((*trie_raw).clone());
effect_builder
.put_trie_if_all_children_present((*trie_raw).clone())
.put_trie_if_all_children_present(request)
.event(move |put_trie_result| Event::PutTrieResult {
trie_hash,
trie_raw: *trie_raw,
put_trie_result: put_trie_result.map(TrieHash),
raw: *trie_raw,
result: put_trie_result,
})
}

Expand Down Expand Up @@ -463,9 +465,8 @@ impl GlobalStateSynchronizer {

fn handle_put_trie_result<REv>(
&mut self,
trie_hash: TrieHash,
trie_raw: TrieRaw,
put_trie_result: Result<TrieHash, engine_state::Error>,
requested_hash: Digest,
put_trie_result: PutTrieResult,
effect_builder: EffectBuilder<REv>,
) -> Effects<Event>
where
Expand All @@ -474,30 +475,32 @@ impl GlobalStateSynchronizer {
let mut effects = Effects::new();

match put_trie_result {
Ok(digest) if digest == trie_hash => {
effects.extend(self.handle_trie_written(effect_builder, digest))
PutTrieResult::Success { hash } if hash == requested_hash => {
effects.extend(self.handle_trie_written(effect_builder, TrieHash(hash)))
}
Ok(digest) => {
PutTrieResult::Success { hash } => {
error!(
%digest,
%trie_hash,
%hash,
%requested_hash,
"trie was stored under a different hash than was used to request it - \
it's a bug"
);
}
Err(engine_state::Error::MissingTrieNodeChildren(missing_children)) => {
effects.extend(self.handle_trie_missing_children(
effect_builder,
trie_hash,
trie_raw,
missing_children.into_iter().map(TrieHash).collect(),
))
}
Err(error) => {
warn!(%trie_hash, %error, "couldn't put trie into global state");
PutTrieResult::Failure(GlobalStateError::MissingTrieNodeChildren(
trie_hash,
trie_raw,
missing_children,
)) => effects.extend(self.handle_trie_missing_children(
effect_builder,
TrieHash(trie_hash),
trie_raw,
missing_children.into_iter().map(TrieHash).collect(),
)),
PutTrieResult::Failure(gse) => {
warn!(%requested_hash, %gse, "couldn't put trie into global state");
if let Some(request_state) = &mut self.request_state {
let unreliable_peers = request_state.unreliable_peers.iter().copied().collect();
effects.extend(self.cancel_request(Error::PutTrie(error, unreliable_peers)));
effects.extend(self.cancel_request(Error::PutTrie(gse, unreliable_peers)));
}
}
}
Expand Down Expand Up @@ -539,14 +542,14 @@ impl GlobalStateSynchronizer {

let mut effects: Effects<Event> = ready_tries
.into_iter()
.flat_map(|(trie_hash, trie_awaiting)| {
.flat_map(|(_, trie_awaiting)| {
let trie_raw = trie_awaiting.into_trie_raw();
let request = PutTrieRequest::new(trie_raw.clone());
effect_builder
.put_trie_if_all_children_present(trie_raw.clone())
.event(move |put_trie_result| Event::PutTrieResult {
trie_hash,
trie_raw,
put_trie_result: put_trie_result.map(TrieHash),
.put_trie_if_all_children_present(request)
.event(move |result| Event::PutTrieResult {
raw: trie_raw,
result,
})
})
.collect();
Expand Down Expand Up @@ -575,12 +578,12 @@ impl GlobalStateSynchronizer {
// simulate fetching having been completed in order to start fetching any children that
// might be still missing
let trie_raw = trie_awaiting.trie_raw.clone();
let request = PutTrieRequest::new(trie_raw.clone());
effect_builder
.put_trie_if_all_children_present(trie_raw.clone())
.event(move |put_trie_result| Event::PutTrieResult {
trie_hash,
trie_raw,
put_trie_result: put_trie_result.map(TrieHash),
.put_trie_if_all_children_present(request)
.event(move |result| Event::PutTrieResult {
raw: trie_raw,
result,
})
} else {
// otherwise, add to the queue
Expand Down Expand Up @@ -645,10 +648,9 @@ where
trie_accumulator_result,
} => self.handle_fetched_trie(trie_hash, trie_accumulator_result, effect_builder),
Event::PutTrieResult {
trie_hash,
trie_raw,
put_trie_result,
} => self.handle_put_trie_result(trie_hash, trie_raw, put_trie_result, effect_builder),
raw: trie_raw,
result: put_trie_result,
} => self.handle_put_trie_result(trie_raw.hash(), put_trie_result, effect_builder),
Event::TrieAccumulatorEvent(event) => reactor::wrap_effects(
Event::TrieAccumulatorEvent,
self.trie_accumulator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;
use futures::channel::oneshot;
use rand::Rng;

use casper_storage::global_state::error::Error as GlobalStateError;
use casper_types::{bytesrepr::Bytes, testing::TestRng, TestBlockBuilder};

use super::*;
Expand Down Expand Up @@ -66,10 +67,10 @@ impl MockReactor {
let ((_ancestor, reactor_event), _) = self.scheduler.pop().await;
match reactor_event {
ReactorEvent::ContractRuntimeRequest(ContractRuntimeRequest::PutTrie {
trie_bytes,
request,
responder: _,
}) => {
assert_eq!(trie_bytes, *trie);
assert_eq!(request.raw(), trie);
}
_ => {
unreachable!();
Expand Down Expand Up @@ -189,7 +190,7 @@ async fn sync_global_state_request_starts_maximum_trie_fetches() {
tokio::time::sleep(Duration::from_millis(2)).await;
// simulate the fetch returning a trie
let effects = global_state_synchronizer.handle_fetched_trie(
TrieHash(trie_hash),
trie_hash.into(),
Ok(TrieAccumulatorResponse::new(trie_raw.clone(), vec![])),
reactor.effect_builder(),
);
Expand All @@ -204,17 +205,23 @@ async fn sync_global_state_request_starts_maximum_trie_fetches() {

// sleep a bit so that the next progress timestamp is different
tokio::time::sleep(Duration::from_millis(2)).await;

// root node would have some children that we haven't yet downloaded
let missing_children = (0u8..255)
.into_iter()
// TODO: generate random hashes when `rng.gen` works
.map(|i| Digest::hash([i; 32]))
.collect();

let trie_hash = trie_raw.hash();

// simulate synchronizer processing the fetched trie
let effects = global_state_synchronizer.handle_put_trie_result(
TrieHash(trie_hash),
trie_raw,
// root node would have some children that we haven't yet downloaded
Err(engine_state::Error::MissingTrieNodeChildren(
(0u8..255)
.into_iter()
// TODO: generate random hashes when `rng.gen` works
.map(|i| Digest::hash([i; 32]))
.collect(),
trie_hash,
PutTrieResult::Failure(GlobalStateError::MissingTrieNodeChildren(
trie_hash,
trie_raw,
missing_children,
)),
reactor.effect_builder(),
);
Expand Down Expand Up @@ -424,9 +431,8 @@ async fn trie_store_error_cancels_request() {
// Assuming we received the trie from the accumulator, check the behavior when we an error
// is returned when trying to put the trie to the store.
let mut effects = global_state_synchronizer.handle_put_trie_result(
Digest::hash(trie.inner()).into(),
trie,
Err(engine_state::Error::RootNotFound(state_root_hash)),
trie.hash(),
PutTrieResult::Failure(GlobalStateError::RootNotFound),
reactor.effect_builder(),
);
assert_eq!(effects.len(), 1);
Expand All @@ -449,6 +455,7 @@ async fn missing_trie_node_children_triggers_fetch() {
&mut rng,
Responder::without_shutdown(oneshot::channel().0),
);
let trie_hash = Digest::hash(request_trie.clone().inner());
let state_root_hash = request.state_root_hash;

let mut effects = global_state_synchronizer.handle_request(request, reactor.effect_builder());
Expand Down Expand Up @@ -507,9 +514,10 @@ async fn missing_trie_node_children_triggers_fetch() {
.collect();

let effects = global_state_synchronizer.handle_put_trie_result(
Digest::hash(request_trie.inner()).into(),
request_trie.clone(),
Err(engine_state::Error::MissingTrieNodeChildren(
trie_hash,
PutTrieResult::Failure(GlobalStateError::MissingTrieNodeChildren(
trie_hash,
request_trie.clone(),
missing_trie_nodes_hashes.clone(),
)),
reactor.effect_builder(),
Expand Down Expand Up @@ -579,11 +587,13 @@ async fn missing_trie_node_children_triggers_fetch() {
.expect_put_trie_request(&missing_tries[num_missing_trie_nodes - 1])
.await;

let trie_hash =
Digest::hash_into_chunks_if_necessary(missing_tries[num_missing_trie_nodes - 1].inner());

// Handle put trie to store for the missing child
let mut effects = global_state_synchronizer.handle_put_trie_result(
trie_hash.into(),
missing_tries[num_missing_trie_nodes - 1].clone(),
Ok(trie_hash.into()),
trie_hash,
PutTrieResult::Success { hash: trie_hash },
reactor.effect_builder(),
);

Expand Down Expand Up @@ -689,13 +699,11 @@ async fn stored_trie_finalizes_request() {

// Generate a successful trie store
let mut effects = global_state_synchronizer.handle_put_trie_result(
trie_hash.into(),
trie,
Ok(trie_hash.into()),
trie_hash,
PutTrieResult::Success { hash: trie_hash },
reactor.effect_builder(),
);
// Check that request was successfully serviced and the global synchronizer is finished with
// it.
// Assert request was successful and global synchronizer is finished.
assert_eq!(effects.len(), 1);
assert_eq!(global_state_synchronizer.tries_awaiting_children.len(), 0);
assert!(global_state_synchronizer.request_state.is_none());
Expand Down
29 changes: 12 additions & 17 deletions node/src/components/contract_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use casper_storage::data_access_layer::{BidsRequest, BidsResult};
use casper_storage::{
data_access_layer::{
AddressableEntityRequest, BlockStore, DataAccessLayer, ExecutionResultsChecksumRequest,
TrieRequest,
FlushRequest, FlushResult, TrieRequest,
},
global_state::{
error::Error as GlobalStateError,
Expand Down Expand Up @@ -962,28 +962,23 @@ impl ContractRuntime {
.ignore()
}
ContractRuntimeRequest::PutTrie {
trie_bytes,
request: put_trie_request,
responder,
} => {
trace!(?trie_bytes, "put_trie request");
let engine_state = Arc::clone(&self.engine_state);
trace!(?put_trie_request, "put trie request");
let metrics = Arc::clone(&self.metrics);
let data_access_layer = Arc::clone(&self.data_access_layer);
async move {
let start = Instant::now();
let result = engine_state.put_trie_if_all_children_present(trie_bytes.inner());
// PERF: this *could* be called only periodically.
if let Err(lmdb_error) = engine_state.flush_environment() {
fatal!(
effect_builder,
"error flushing lmdb environment {:?}",
lmdb_error
)
.await;
} else {
metrics.put_trie.observe(start.elapsed().as_secs_f64());
trace!(?result, "put_trie response");
responder.respond(result).await
let result = data_access_layer.put_trie(put_trie_request);
let flush_req = FlushRequest::new();
// PERF: consider flushing periodically.
if let FlushResult::Failure(gse) = data_access_layer.flush(flush_req) {
fatal!(effect_builder, "error flushing data environment {:?}", gse).await;
}
metrics.put_trie.observe(start.elapsed().as_secs_f64());
trace!(?result, "put trie response");
responder.respond(result).await
}
.ignore()
}
Expand Down
Loading

0 comments on commit 9c65f79

Please sign in to comment.