Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Indexer caching for exchange_rates and validators_apys to use instance-level caches, fixing cargo test failures #3774

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iota-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The crate provides following tests currently:
# run tests requiring only postgres integration
cargo test --features pg_integration -- --test-threads 1
# run rpc tests with shared runtime
cargo test --features shared_test_runtime -- --test-threads 1
cargo test --features shared_test_runtime
```

For a better testing experience is possible to use [nextest](https://nexte.st/)
Expand Down
77 changes: 57 additions & 20 deletions crates/iota-indexer/src/apis/governance_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use cached::{SizedCache, proc_macro::cached};
use cached::{Cached, SizedCache};
use diesel::r2d2::R2D2Connection;
use iota_json_rpc::{IotaRpcModule, governance_api::ValidatorExchangeRates};
use iota_json_rpc_api::GovernanceReadApiServer;
Expand All @@ -23,6 +23,7 @@ use iota_types::{
timelock::timelocked_staked_iota::TimelockedStakedIota,
};
use jsonrpsee::{RpcModule, core::RpcResult};
use tokio::sync::Mutex;

use crate::{errors::IndexerError, indexer_reader::IndexerReader};

Expand All @@ -32,19 +33,27 @@ const MAX_QUERY_STAKED_OBJECTS: usize = 1000;
#[derive(Clone)]
pub struct GovernanceReadApi<T: R2D2Connection + 'static> {
inner: IndexerReader<T>,
exchange_rates_cache: Arc<Mutex<SizedCache<EpochId, Vec<ValidatorExchangeRates>>>>,
validators_apys_cache: Arc<Mutex<SizedCache<EpochId, BTreeMap<IotaAddress, f64>>>>,
}

impl<T: R2D2Connection + 'static> GovernanceReadApi<T> {
pub fn new(inner: IndexerReader<T>) -> Self {
Self { inner }
Self {
inner,
exchange_rates_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
validators_apys_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
}
}

/// Get a validator's APY by its address
pub async fn get_validator_apy(
&self,
address: &IotaAddress,
) -> Result<Option<f64>, IndexerError> {
let apys = validators_apys_map(self.get_validators_apy().await?);
let apys = self
.validators_apys_map(self.get_validators_apy().await?)
.await;
Ok(apys.get(address).copied())
}

Expand Down Expand Up @@ -261,6 +270,28 @@ impl<T: R2D2Connection + 'static> GovernanceReadApi<T> {
}
Ok(delegated_stakes)
}

/// Cache a map representing the validators' APYs for this epoch
async fn validators_apys_map(&self, apys: ValidatorApys) -> BTreeMap<IotaAddress, f64> {
// check if the apys are already in the cache
if let Some(cached_apys) = self
.validators_apys_cache
.lock()
.await
.cache_get(&apys.epoch)
{
return cached_apys.clone();
}

let ret = BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy)));
// insert the apys into the cache
self.validators_apys_cache
.lock()
.await
.cache_set(apys.epoch, ret.clone());

ret
}
}

fn stake_status(
Expand Down Expand Up @@ -292,15 +323,31 @@ fn stake_status(
/// Cached exchange rates for validators for the given epoch, the cache size is
/// 1, it will be cleared when the epoch changes. rates are in descending order
tomxey marked this conversation as resolved.
Show resolved Hide resolved
/// by epoch.
#[cached(
type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
create = "{ SizedCache::with_size(1) }",
convert = "{ system_state_summary.epoch }",
result = true
)]
pub async fn exchange_rates(
state: &GovernanceReadApi<impl R2D2Connection>,
system_state_summary: &IotaSystemStateSummary,
) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
let epoch = system_state_summary.epoch;

let mut cache = state.exchange_rates_cache.lock().await;

// Check if the exchange rates for the current epoch are cached
if let Some(cached_rates) = cache.cache_get(&epoch) {
return Ok(cached_rates.clone());
}

// Cache miss: compute exchange rates
let exchange_rates = compute_exchange_rates(state, system_state_summary).await?;

// Store in cache
cache.cache_set(epoch, exchange_rates.clone());

Ok(exchange_rates)
}

pub async fn compute_exchange_rates(
state: &GovernanceReadApi<impl R2D2Connection>,
system_state_summary: &IotaSystemStateSummary,
) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
// Get validator rate tables
let mut tables = vec![];
Expand Down Expand Up @@ -384,16 +431,6 @@ pub async fn exchange_rates(
Ok(exchange_rates)
}

/// Cache a map representing the validators' APYs for this epoch
#[cached(
type = "SizedCache<EpochId, BTreeMap<IotaAddress, f64>>",
create = "{ SizedCache::with_size(1) }",
convert = " {apys.epoch} "
)]
fn validators_apys_map(apys: ValidatorApys) -> BTreeMap<IotaAddress, f64> {
BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy)))
}

#[async_trait]
impl<T: R2D2Connection + 'static> GovernanceReadApiServer for GovernanceReadApi<T> {
async fn get_stakes_by_ids(
Expand Down
15 changes: 12 additions & 3 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use iota_metrics::init_metrics;
use iota_types::{
base_types::{ObjectID, SequenceNumber},
digests::TransactionDigest,
object::Object,
};
use jsonrpsee::{
http_client::{HttpClient, HttpClientBuilder},
Expand Down Expand Up @@ -57,9 +58,12 @@ impl ApiTestSetup {
GLOBAL_API_TEST_SETUP.get_or_init(|| {
let runtime = tokio::runtime::Runtime::new().unwrap();

let (cluster, store, client) = runtime.block_on(
start_test_cluster_with_read_write_indexer(None, Some("shared_test_indexer_db")),
);
let (cluster, store, client) =
runtime.block_on(start_test_cluster_with_read_write_indexer(
None,
Some("shared_test_indexer_db"),
None,
));

Self {
runtime,
Expand Down Expand Up @@ -115,6 +119,7 @@ impl SimulacrumTestSetup {
pub async fn start_test_cluster_with_read_write_indexer(
stop_cluster_after_checkpoint_seq: Option<u64>,
database_name: Option<&str>,
objects: Option<Vec<Object>>,
tomxey marked this conversation as resolved.
Show resolved Hide resolved
) -> (TestCluster, PgIndexerStore<PgConnection>, HttpClient) {
let temp = tempdir().unwrap().into_path();
let mut builder = TestClusterBuilder::new().with_data_ingestion_dir(temp.clone());
Expand All @@ -126,6 +131,10 @@ pub async fn start_test_cluster_with_read_write_indexer(
)));
};

if let Some(objects) = objects {
builder = builder.with_objects(objects);
}

let cluster = builder.build().await;

// start indexer in write mode
Expand Down
1 change: 0 additions & 1 deletion crates/iota-indexer/tests/rpc-tests/extended_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use iota_types::{
storage::ReadStore,
};
use simulacrum::Simulacrum;
use tempfile::tempdir;
use test_cluster::TestCluster;

use crate::common::{ApiTestSetup, SimulacrumTestSetup, indexer_wait_for_checkpoint};
Expand Down
Loading
Loading