Skip to content

Commit

Permalink
refactor(backing): refactor caching functions and update usages
Browse files Browse the repository at this point in the history
  • Loading branch information
sw10pa committed Nov 6, 2024
1 parent 4d8dd50 commit 22f274f
Showing 1 changed file with 57 additions and 55 deletions.
112 changes: 57 additions & 55 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ use polkadot_node_subsystem::{
RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
StoreAvailableDataError,
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemError,
};
use polkadot_node_subsystem_util::{
self as util,
Expand Down Expand Up @@ -292,30 +293,13 @@ impl From<&ActiveLeafState> for ProspectiveParachainsMode {
}
}

macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;

// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}

/// A cache for storing data per-session to reduce repeated
/// runtime API calls and avoid redundant computations.
struct PerSessionCache {
/// Cache for storing validators list, retrieved from the runtime.
validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
/// Cache for storing node features, retrieved from the runtime.
node_features_cache: LruMap<SessionIndex, Arc<NodeFeatures>>,
node_features_cache: LruMap<SessionIndex, Option<NodeFeatures>>,
/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
/// Cache for storing validator-to-group mappings, computed from validator groups.
Expand Down Expand Up @@ -347,24 +331,25 @@ impl PerSessionCache {
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<Arc<Vec<ValidatorId>>>, Error> {
) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
// Try to get the validators list from the cache.
if let Some(validators) = self.validators_cache.get(&session_index) {
return Ok(Some(Arc::clone(validators)));
return Ok(Arc::clone(validators));
}

// Fetch the validators list from the runtime since it was not in the cache.
let validators = request_validators(parent, sender)
.await
.await
.map_err(Error::RuntimeApiUnavailable)?;
let validators = try_runtime_api!(validators);
let validators_arc = Arc::new(validators);
let validators: Vec<ValidatorId> =
request_validators(parent, sender).await.await.map_err(|err| {
RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
})??;

// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
let validators = Arc::new(validators);

// Cache the fetched validators list for future use.
self.validators_cache.insert(session_index, Arc::clone(&validators_arc));
self.validators_cache.insert(session_index, Arc::clone(&validators));

Ok(Some(validators_arc))
Ok(validators)
}

/// Gets the node features from the cache or fetches it from the runtime if not present.
Expand All @@ -373,22 +358,20 @@ impl PerSessionCache {
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<Arc<NodeFeatures>>, Error> {
) -> Result<Option<NodeFeatures>, Error> {
// Try to get the node features from the cache.
if let Some(node_features) = self.node_features_cache.get(&session_index) {
return Ok(Some(Arc::clone(node_features)));
return Ok(node_features.clone());
}

// Fetch the node features from the runtime since it was not in the cache.
let node_features = request_node_features(parent, session_index, sender)
.await?
.unwrap_or(NodeFeatures::EMPTY);
let node_features_arc = Arc::new(node_features);
let node_features: Option<NodeFeatures> =
request_node_features(parent, session_index, sender).await?;

// Cache the node features for future use.
self.node_features_cache.insert(session_index, Arc::clone(&node_features_arc));
// Cache the fetched node features for future use.
self.node_features_cache.insert(session_index, node_features.clone());

Ok(Some(node_features_arc))
Ok(node_features)
}

/// Gets the minimum backing votes threshold from the
Expand All @@ -398,20 +381,24 @@ impl PerSessionCache {
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<u32>, Error> {
) -> Result<u32, RuntimeApiError> {
// Try to get the value from the cache.
if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
return Ok(Some(*minimum_backing_votes));
return Ok(*minimum_backing_votes);
}

// Fetch the value from the runtime since it was not in the cache.
let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender).await;
let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
.await
.map_err(|err| RuntimeApiError::Execution {
runtime_api_name: "MinimumBackingVotes",
source: Arc::new(err),
})?;

// Cache the fetched value for future use.
self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);

Ok(Some(minimum_backing_votes))
Ok(minimum_backing_votes)
}

/// Gets or computes the validator-to-group mapping for a session.
Expand Down Expand Up @@ -1145,6 +1132,23 @@ async fn handle_active_leaves_update<Context>(
Ok(())
}

macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;

// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}

fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
Expand Down Expand Up @@ -1232,19 +1236,15 @@ async fn construct_per_relay_parent_state<Context>(

let session_index = try_runtime_api!(session_index);

let validators_arc = per_session_cache
let validators = per_session_cache
.get_or_fetch_validators(session_index, parent, ctx.sender())
.await?
.unwrap();
let validators = validators_arc.as_ref().clone();
.await;
let validators = try_runtime_api!(validators);

let node_features_arc = per_session_cache
let inject_core_index = per_session_cache
.get_or_fetch_node_features(session_index, parent, ctx.sender())
.await?
.unwrap();

let inject_core_index = node_features_arc
.as_ref()
.unwrap_or(NodeFeatures::EMPTY)
.get(FeatureIndex::ElasticScalingMVP as usize)
.map(|b| *b)
.unwrap_or(false);
Expand All @@ -1253,10 +1253,11 @@ async fn construct_per_relay_parent_state<Context>(

let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);

let minimum_backing_votes = per_session_cache
.get_or_fetch_minimum_backing_votes(session_index, parent, ctx.sender())
.await?
.unwrap();
.await;
let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);

// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
Expand Down Expand Up @@ -1335,7 +1336,8 @@ async fn construct_per_relay_parent_state<Context>(
&validator_groups,
);

let table_context = TableContext { validator, groups, validators, disabled_validators };
let table_context =
TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
let table_config = TableConfig {
allow_multiple_seconded: match mode {
ProspectiveParachainsMode::Enabled { .. } => true,
Expand Down

0 comments on commit 22f274f

Please sign in to comment.