Skip to content

Commit

Permalink
add Trie support to data_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
EdHastingsCasperAssociation committed Feb 11, 2024
1 parent ef2ca4a commit b9ad6d8
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 184 deletions.
8 changes: 3 additions & 5 deletions execution_engine/src/engine_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use casper_storage::{
balance::BalanceResult,
get_bids::{BidsRequest, BidsResult},
query::{QueryRequest, QueryResult},
DataAccessLayer, EraValidatorsRequest, EraValidatorsResult,
DataAccessLayer, EraValidatorsRequest, EraValidatorsResult, TrieRequest,
},
global_state::{
self,
Expand Down Expand Up @@ -2217,10 +2217,8 @@ where

/// Gets a trie object for given state root hash.
pub fn get_trie_full(&self, trie_key: Digest) -> Result<Option<TrieRaw>, Error> {
match self.state.get_trie_full(&trie_key) {
Ok(ret) => Ok(ret),
Err(err) => Err(err.into()),
}
let req = TrieRequest::new(trie_key, None);
self.state.trie(req).into_legacy().map_err(Error::Storage)
}

/// Puts a trie if no children are missing from the global state; otherwise reports the missing
Expand Down
102 changes: 39 additions & 63 deletions node/src/components/contract_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ use casper_storage::data_access_layer::{BidsRequest, BidsResult};
use casper_storage::{
data_access_layer::{
AddressableEntityRequest, BlockStore, DataAccessLayer, ExecutionResultsChecksumRequest,
TrieRequest,
},
global_state::{
error::Error as GlobalStateError,
state::{lmdb::LmdbGlobalState, StateProvider},
transaction_source::lmdb::LmdbEnvironment,
trie_store::lmdb::LmdbTrieStore,
},
};
use casper_types::{
bytesrepr::Bytes, BlockHash, BlockHeaderV2, Chainspec, ChainspecRawBytes, ChainspecRegistry,
Digest, EraId, ProtocolVersion, Timestamp, Transaction, UpgradeConfig,
BlockHash, BlockHeaderV2, Chainspec, ChainspecRawBytes, ChainspecRegistry, Digest, EraId,
ProtocolVersion, Timestamp, Transaction, UpgradeConfig,
};

use crate::{
Expand All @@ -58,7 +60,7 @@ use crate::{
ContractRuntimeAnnouncement, FatalAnnouncement, MetaBlockAnnouncement,
UnexecutedBlockAnnouncement,
},
incoming::{TrieDemand, TrieRequest, TrieRequestIncoming},
incoming::{TrieDemand, TrieRequest as TrieRequestMessage, TrieRequestIncoming},
requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
Expand Down Expand Up @@ -90,7 +92,7 @@ pub(crate) enum ContractRuntimeError {
InvalidSerializedId(#[source] bincode::Error),
// It was not possible to get trie with the specified id
#[error("error retrieving trie by id: {0}")]
FailedToRetrieveTrieById(#[source] engine_state::Error),
FailedToRetrieveTrieById(#[source] GlobalStateError),
/// Chunking error.
#[error("failed to chunk the data {0}")]
ChunkingError(#[source] ChunkingError),
Expand Down Expand Up @@ -530,7 +532,7 @@ impl ContractRuntime {
}
};

tracing::info!("rewards successfully computed");
info!("rewards successfully computed");

rewards
} else {
Expand Down Expand Up @@ -685,43 +687,29 @@ impl ContractRuntime {
}

/// Reads the trie (or chunk of a trie) under the given key and index.
pub(crate) fn get_trie(
pub(crate) fn fetch_trie_local(
&self,
serialized_id: &[u8],
) -> Result<FetchResponse<TrieOrChunk, TrieOrChunkId>, ContractRuntimeError> {
trace!(?serialized_id, "get_trie");

let id: TrieOrChunkId = bincode::deserialize(serialized_id)?;
let maybe_trie = Self::do_get_trie(&self.engine_state, &self.metrics, id)?;
Ok(FetchResponse::from_opt(id, maybe_trie))
}

fn do_get_trie(
engine_state: &EngineState<DataAccessLayer<LmdbGlobalState>>,
metrics: &Metrics,
trie_or_chunk_id: TrieOrChunkId,
) -> Result<Option<TrieOrChunk>, ContractRuntimeError> {
let start = Instant::now();
let TrieOrChunkId(chunk_index, trie_key) = trie_or_chunk_id;
let ret = match engine_state.get_trie_full(trie_key)? {
None => Ok(None),
Some(trie_raw) => Ok(Some(TrieOrChunk::new(trie_raw.into(), chunk_index)?)),
let trie_or_chunk_id: TrieOrChunkId = bincode::deserialize(serialized_id)?;
let data_access_layer = Arc::clone(&self.data_access_layer);
let maybe_trie = {
let start = Instant::now();
let TrieOrChunkId(chunk_index, trie_key) = trie_or_chunk_id;
let req = TrieRequest::new(trie_key, Some(chunk_index));
let maybe_raw = data_access_layer
.trie(req)
.into_legacy()
.map_err(ContractRuntimeError::FailedToRetrieveTrieById)?;
let ret = match maybe_raw {
Some(raw) => Some(TrieOrChunk::new(raw.into(), chunk_index)?),
None => None,
};
self.metrics.get_trie.observe(start.elapsed().as_secs_f64());
ret
};
metrics.get_trie.observe(start.elapsed().as_secs_f64());
ret
}

fn get_trie_full(
engine_state: &EngineState<DataAccessLayer<LmdbGlobalState>>,
metrics: &Metrics,
trie_key: Digest,
) -> Result<Option<Bytes>, engine_state::Error> {
let start = Instant::now();
let result = engine_state.get_trie_full(trie_key);
metrics.get_trie.observe(start.elapsed().as_secs_f64());
// Extract the inner Bytes, we don't want this change to ripple through the system right
// now.
result.map(|option| option.map(|trie_raw| trie_raw.into_inner()))
Ok(FetchResponse::from_opt(trie_or_chunk_id, maybe_trie))
}

/// Returns the engine state, for testing only.
Expand All @@ -747,8 +735,8 @@ impl ContractRuntime {
where
REv: From<NetworkRequest<Message>> + Send,
{
let TrieRequest(ref serialized_id) = *message;
let fetch_response = match self.get_trie(serialized_id) {
let TrieRequestMessage(ref serialized_id) = *message;
let fetch_response = match self.fetch_trie_local(serialized_id) {
Ok(fetch_response) => fetch_response,
Err(error) => {
debug!("failed to get trie: {}", error);
Expand All @@ -774,8 +762,8 @@ impl ContractRuntime {
..
}: TrieDemand,
) -> Effects<Event> {
let TrieRequest(ref serialized_id) = *request_msg;
let fetch_response = match self.get_trie(serialized_id) {
let TrieRequestMessage(ref serialized_id) = *request_msg;
let fetch_response = match self.fetch_trie_local(serialized_id) {
Ok(fetch_response) => fetch_response,
Err(error) => {
// Something is wrong in our trie store, but be courteous and still send a reply.
Expand Down Expand Up @@ -881,7 +869,7 @@ impl ContractRuntime {
state_root_hash,
responder,
} => {
trace!(?state_root_hash, "get exection results checksum request");
trace!(?state_root_hash, "get execution results checksum request");
let metrics = Arc::clone(&self.metrics);
let data_access_layer = Arc::clone(&self.data_access_layer);
async move {
Expand Down Expand Up @@ -958,29 +946,17 @@ impl ContractRuntime {
}
// trie related events
ContractRuntimeRequest::GetTrie {
trie_or_chunk_id,
responder,
} => {
trace!(?trie_or_chunk_id, "get_trie request");
let engine_state = Arc::clone(&self.engine_state);
let metrics = Arc::clone(&self.metrics);
async move {
let result = Self::do_get_trie(&engine_state, &metrics, trie_or_chunk_id);
trace!(?result, "get_trie response");
responder.respond(result).await
}
.ignore()
}
ContractRuntimeRequest::GetTrieFull {
trie_key,
request: trie_request,
responder,
} => {
trace!(?trie_key, "get_trie_full request");
let engine_state = Arc::clone(&self.engine_state);
trace!(?trie_request, "trie request");
let metrics = Arc::clone(&self.metrics);
let data_access_layer = Arc::clone(&self.data_access_layer);
async move {
let result = Self::get_trie_full(&engine_state, &metrics, trie_key);
trace!(?result, "get_trie_full response");
let start = Instant::now();
let result = data_access_layer.trie(trie_request);
metrics.get_trie.observe(start.elapsed().as_secs_f64());
trace!(?result, "trie response");
responder.respond(result).await
}
.ignore()
Expand Down Expand Up @@ -1237,7 +1213,7 @@ mod trie_chunking_tests {
fn read_trie(contract_runtime: &ContractRuntime, id: TrieOrChunkId) -> TrieOrChunk {
let serialized_id = bincode::serialize(&id).unwrap();
match contract_runtime
.get_trie(&serialized_id)
.fetch_trie_local(&serialized_id)
.expect("expected a successful read")
{
FetchResponse::Fetched(found) => found,
Expand Down Expand Up @@ -1288,7 +1264,7 @@ mod trie_chunking_tests {
// there should be no chunk with index `count`
let serialized_id = bincode::serialize(&TrieOrChunkId(count, hash)).unwrap();
assert!(matches!(
contract_runtime.get_trie(&serialized_id),
contract_runtime.fetch_trie_local(&serialized_id),
Err(ContractRuntimeError::ChunkingError(
ChunkingError::MerkleConstruction(_)
))
Expand Down
4 changes: 2 additions & 2 deletions node/src/components/contract_runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::Serialize;
use tempfile::TempDir;

use casper_types::{
runtime_args, Chainspec, ChainspecRawBytes, Deploy, EraId, ExecutableDeployItem, PublicKey,
SecretKey, TimeDiff, U512,
bytesrepr::Bytes, runtime_args, Chainspec, ChainspecRawBytes, Deploy, EraId,
ExecutableDeployItem, PublicKey, SecretKey, TimeDiff, U512,
};

use super::*;
Expand Down
33 changes: 29 additions & 4 deletions node/src/components/fetcher/fetcher_impls/trie_or_chunk_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{collections::HashMap, time::Duration};
use async_trait::async_trait;
use tracing::error;

use casper_storage::data_access_layer::{TrieElement, TrieRequest, TrieResult};

use crate::{
components::fetcher::{metrics::Metrics, Fetcher, ItemFetcher, ItemHandle, StoringState},
effect::{requests::ContractRuntimeRequest, EffectBuilder},
Expand Down Expand Up @@ -31,10 +33,33 @@ impl ItemFetcher<TrieOrChunk> for Fetcher<TrieOrChunk> {
effect_builder: EffectBuilder<REv>,
id: TrieOrChunkId,
) -> Option<TrieOrChunk> {
effect_builder.get_trie(id).await.unwrap_or_else(|error| {
error!(?error, "get_trie_request");
None
})
let TrieOrChunkId(chunk_index, trie_key) = id;
let request = TrieRequest::new(trie_key, Some(chunk_index));
let result = effect_builder.get_trie(request).await;
match result {
TrieResult::ValueNotFound(_) => None,
TrieResult::Failure(err) => {
error!(%err, "failed to get trie element locally");
None
}
TrieResult::Success { element } => match element {
TrieElement::Raw(raw) => match TrieOrChunk::new(raw.into(), 0) {
Ok(voc) => Some(voc),
Err(err) => {
error!(%err, "raw chunking error");
None
}
},
TrieElement::Chunked(raw, chunk_id) => match TrieOrChunk::new(raw.into(), chunk_id)
{
Ok(voc) => Some(voc),
Err(err) => {
error!(%err, "chunking error");
None
}
},
},
}
}

fn put_to_storage<'a, REv>(
Expand Down
30 changes: 20 additions & 10 deletions node/src/components/rpc_server/rpcs/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use tracing::{debug, error, info, warn};

use casper_json_rpc::ReservedErrorCode;
use casper_storage::{
data_access_layer::{get_bids::BidsResult, BalanceResult, EraValidatorsResult, QueryResult},
data_access_layer::{
get_bids::BidsResult, BalanceResult, EraValidatorsResult, QueryResult, TrieElement,
TrieRequest, TrieResult,
},
global_state::trie::merkle_proof::TrieMerkleProof,
};
use casper_types::{
Expand Down Expand Up @@ -1034,22 +1037,29 @@ impl RpcWithParams for GetTrie {
params: Self::RequestParams,
) -> Result<Self::ResponseResult, Error> {
let trie_key = params.trie_key;

match effect_builder.get_trie_full(trie_key).await {
Ok(maybe_trie_bytes) => {
let result = Self::ResponseResult {
api_version,
maybe_trie_bytes,
};
Ok(result)
let request = TrieRequest::new(trie_key, None);
let response = effect_builder.get_trie(request).await;
match response {
TrieResult::ValueNotFound(msg) => {
warn!(?msg, "failed to get trie");
Err(Error::new(ErrorCode::FailedToGetTrie, msg))
}
Err(error) => {
TrieResult::Failure(error) => {
warn!(?error, "failed to get trie");
Err(Error::new(
ErrorCode::FailedToGetTrie,
format!("{:?}", error),
))
}
TrieResult::Success { element } => match element {
TrieElement::Raw(raw) | TrieElement::Chunked(raw, _) => {
let result = Self::ResponseResult {
api_version,
maybe_trie_bytes: Some(raw.into_inner()),
};
Ok(result)
}
},
}
}
}
Expand Down
Loading

0 comments on commit b9ad6d8

Please sign in to comment.