From 527fab102e46355a1ec2c65c399bc63b75958944 Mon Sep 17 00:00:00 2001 From: Samuel Rufinatscha Date: Thu, 31 Oct 2024 09:15:36 +0100 Subject: [PATCH] refactor: Remove global `exchange_rate` and `validators_apys_` caches in order to work with `cargo test` (#3774) --- crates/iota-indexer/README.md | 2 +- .../iota-indexer/src/apis/governance_api.rs | 77 +++++++++--- crates/iota-indexer/tests/common/mod.rs | 15 ++- .../tests/rpc-tests/governance_api.rs | 115 +++++++++--------- 4 files changed, 125 insertions(+), 84 deletions(-) diff --git a/crates/iota-indexer/README.md b/crates/iota-indexer/README.md index ce19b89f65d..1973532ee65 100644 --- a/crates/iota-indexer/README.md +++ b/crates/iota-indexer/README.md @@ -104,7 +104,7 @@ The crate provides following tests currently: # run tests requiring only postgres integration cargo test --features pg_integration -- --test-threads 1 # run rpc tests with shared runtime -cargo test --features shared_test_runtime -- --test-threads 1 +cargo test --features shared_test_runtime ``` For a better testing experience is possible to use [nextest](https://nexte.st/) diff --git a/crates/iota-indexer/src/apis/governance_api.rs b/crates/iota-indexer/src/apis/governance_api.rs index d697288cfdb..57e42103b26 100644 --- a/crates/iota-indexer/src/apis/governance_api.rs +++ b/crates/iota-indexer/src/apis/governance_api.rs @@ -2,10 +2,10 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; -use cached::{SizedCache, proc_macro::cached}; +use cached::{Cached, SizedCache}; use diesel::r2d2::R2D2Connection; use iota_json_rpc::{IotaRpcModule, governance_api::ValidatorExchangeRates}; use iota_json_rpc_api::GovernanceReadApiServer; @@ -23,6 +23,7 @@ use iota_types::{ timelock::timelocked_staked_iota::TimelockedStakedIota, }; use jsonrpsee::{RpcModule, core::RpcResult}; +use tokio::sync::Mutex; use crate::{errors::IndexerError, indexer_reader::IndexerReader}; @@ -32,11 +33,17 @@ const MAX_QUERY_STAKED_OBJECTS: usize = 1000; #[derive(Clone)] pub struct GovernanceReadApi { inner: IndexerReader, + exchange_rates_cache: Arc>>>, + validators_apys_cache: Arc>>>, } impl GovernanceReadApi { pub fn new(inner: IndexerReader) -> Self { - Self { inner } + Self { + inner, + exchange_rates_cache: Arc::new(Mutex::new(SizedCache::with_size(1))), + validators_apys_cache: Arc::new(Mutex::new(SizedCache::with_size(1))), + } } /// Get a validator's APY by its address @@ -44,7 +51,9 @@ impl GovernanceReadApi { &self, address: &IotaAddress, ) -> Result, IndexerError> { - let apys = validators_apys_map(self.get_validators_apy().await?); + let apys = self + .validators_apys_map(self.get_validators_apy().await?) + .await; Ok(apys.get(address).copied()) } @@ -261,6 +270,28 @@ impl GovernanceReadApi { } Ok(delegated_stakes) } + + /// Cache a map representing the validators' APYs for this epoch + async fn validators_apys_map(&self, apys: ValidatorApys) -> BTreeMap { + // check if the apys are already in the cache + if let Some(cached_apys) = self + .validators_apys_cache + .lock() + .await + .cache_get(&apys.epoch) + { + return cached_apys.clone(); + } + + let ret = BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy))); + // insert the apys into the cache + self.validators_apys_cache + .lock() + .await + .cache_set(apys.epoch, ret.clone()); + + ret + } } fn stake_status( @@ -292,15 +323,31 @@ fn stake_status( /// Cached exchange rates for validators for the given epoch, the cache size is /// 1, it will be cleared when the epoch changes. rates are in descending order /// by epoch. -#[cached( - type = "SizedCache>", - create = "{ SizedCache::with_size(1) }", - convert = "{ system_state_summary.epoch }", - result = true -)] pub async fn exchange_rates( state: &GovernanceReadApi, system_state_summary: &IotaSystemStateSummary, +) -> Result, IndexerError> { + let epoch = system_state_summary.epoch; + + let mut cache = state.exchange_rates_cache.lock().await; + + // Check if the exchange rates for the current epoch are cached + if let Some(cached_rates) = cache.cache_get(&epoch) { + return Ok(cached_rates.clone()); + } + + // Cache miss: compute exchange rates + let exchange_rates = compute_exchange_rates(state, system_state_summary).await?; + + // Store in cache + cache.cache_set(epoch, exchange_rates.clone()); + + Ok(exchange_rates) +} + +pub async fn compute_exchange_rates( + state: &GovernanceReadApi, + system_state_summary: &IotaSystemStateSummary, ) -> Result, IndexerError> { // Get validator rate tables let mut tables = vec![]; @@ -384,16 +431,6 @@ pub async fn exchange_rates( Ok(exchange_rates) } -/// Cache a map representing the validators' APYs for this epoch -#[cached( - type = "SizedCache>", - create = "{ SizedCache::with_size(1) }", - convert = " {apys.epoch} " -)] -fn validators_apys_map(apys: ValidatorApys) -> BTreeMap { - BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy))) -} - #[async_trait] impl GovernanceReadApiServer for GovernanceReadApi { async fn get_stakes_by_ids( diff --git a/crates/iota-indexer/tests/common/mod.rs b/crates/iota-indexer/tests/common/mod.rs index 58c5bb84513..8f97d1654aa 100644 --- a/crates/iota-indexer/tests/common/mod.rs +++ b/crates/iota-indexer/tests/common/mod.rs @@ -26,6 +26,7 @@ use iota_metrics::init_metrics; use iota_types::{ base_types::{ObjectID, SequenceNumber}, digests::TransactionDigest, + object::Object, }; use jsonrpsee::{ http_client::{HttpClient, HttpClientBuilder}, @@ -57,9 +58,12 @@ impl ApiTestSetup { GLOBAL_API_TEST_SETUP.get_or_init(|| { let runtime = tokio::runtime::Runtime::new().unwrap(); - let (cluster, store, client) = runtime.block_on( - start_test_cluster_with_read_write_indexer(None, Some("shared_test_indexer_db")), - ); + let (cluster, store, client) = + runtime.block_on(start_test_cluster_with_read_write_indexer( + None, + Some("shared_test_indexer_db"), + None, + )); Self { runtime, @@ -115,6 +119,7 @@ impl SimulacrumTestSetup { pub async fn start_test_cluster_with_read_write_indexer( stop_cluster_after_checkpoint_seq: Option, database_name: Option<&str>, + objects: Option>, ) -> (TestCluster, PgIndexerStore, HttpClient) { let temp = tempdir().unwrap().into_path(); let mut builder = TestClusterBuilder::new().with_data_ingestion_dir(temp.clone()); @@ -126,6 +131,10 @@ pub async fn start_test_cluster_with_read_write_indexer( ))); }; + if let Some(objects) = objects { + builder = builder.with_objects(objects); + } + let cluster = builder.build().await; // start indexer in write mode diff --git a/crates/iota-indexer/tests/rpc-tests/governance_api.rs b/crates/iota-indexer/tests/rpc-tests/governance_api.rs index c59c6637e49..d2ff07927e3 100644 --- a/crates/iota-indexer/tests/rpc-tests/governance_api.rs +++ b/crates/iota-indexer/tests/rpc-tests/governance_api.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use iota_json_rpc_api::{GovernanceReadApiClient, TransactionBuilderClient}; -use iota_json_rpc_types::{DelegatedStake, StakeStatus, TransactionBlockBytes}; +use iota_json_rpc_types::{ + DelegatedStake, DelegatedTimelockedStake, StakeStatus, TransactionBlockBytes, +}; use iota_test_transaction_builder::TestTransactionBuilder; use iota_types::{ IOTA_FRAMEWORK_ADDRESS, IOTA_SYSTEM_ADDRESS, @@ -19,18 +21,17 @@ use move_core_types::{identifier::Identifier, language_storage::TypeTag}; use crate::common::{ ApiTestSetup, indexer_wait_for_checkpoint, indexer_wait_for_latest_checkpoint, indexer_wait_for_object, indexer_wait_for_transaction, + start_test_cluster_with_read_write_indexer, }; #[test] fn test_staking() { - let ApiTestSetup { - runtime, - store, - client, - cluster, - } = ApiTestSetup::get_or_init(); + let ApiTestSetup { runtime, .. } = ApiTestSetup::get_or_init(); runtime.block_on(async move { + let (cluster, store, client) = + &start_test_cluster_with_read_write_indexer(None, Some("test_staking"), None).await; + indexer_wait_for_checkpoint(store, 1).await; let (sender, keypair): (_, AccountKeyPair) = get_key_pair(); @@ -102,16 +103,12 @@ fn test_staking() { #[test] fn test_unstaking() { - let ApiTestSetup { - runtime, - store, - client, - cluster, - } = ApiTestSetup::get_or_init(); - - let indexer_client = client; + let ApiTestSetup { runtime, .. } = ApiTestSetup::get_or_init(); runtime.block_on(async move { + let (cluster, store, client) = + &start_test_cluster_with_read_write_indexer(None, Some("test_unstaking"), None).await; + indexer_wait_for_checkpoint(store, 1).await; let (sender, keypair): (_, AccountKeyPair) = get_key_pair(); @@ -137,10 +134,10 @@ fn test_unstaking() { indexer_wait_for_object(client, iota_coin_ref.0, iota_coin_ref.1).await; // Check StakedIota object before test - let staked_iota: Vec = indexer_client.get_stakes(sender).await.unwrap(); + let staked_iota: Vec = client.get_stakes(sender).await.unwrap(); assert!(staked_iota.is_empty()); - let validator = indexer_client + let validator = client .get_latest_iota_system_state() .await .unwrap() @@ -148,7 +145,7 @@ fn test_unstaking() { .iota_address; // Delegate some IOTA - let transaction_bytes: TransactionBlockBytes = indexer_client + let transaction_bytes: TransactionBlockBytes = client .request_add_stake( sender, vec![iota_coin_ref.0], @@ -169,7 +166,7 @@ fn test_unstaking() { indexer_wait_for_latest_checkpoint(store, cluster).await; // Check DelegatedStake object - let staked_iota: Vec = indexer_client.get_stakes(sender).await.unwrap(); + let staked_iota: Vec = client.get_stakes(sender).await.unwrap(); assert_eq!(1, staked_iota.len()); assert_eq!(1000000000, staked_iota[0].stakes[0].principal); assert!(matches!( @@ -179,7 +176,7 @@ fn test_unstaking() { } )); - let transaction_bytes: TransactionBlockBytes = indexer_client + let transaction_bytes: TransactionBlockBytes = client .request_withdraw_stake( sender, staked_iota[0].stakes[0].staked_iota_id, @@ -197,38 +194,29 @@ fn test_unstaking() { cluster.force_new_epoch().await; indexer_wait_for_latest_checkpoint(store, cluster).await; - let node_response = cluster - .rpc_client() - .get_stakes_by_ids(vec![staked_iota[0].stakes[0].staked_iota_id]) - .await - .unwrap(); - assert_eq!(1, node_response.len()); - assert!(matches!( - node_response[0].stakes[0].status, - StakeStatus::Unstaked - )); - - let indexer_response = indexer_client + let indexer_response = client .get_stakes_by_ids(vec![staked_iota[0].stakes[0].staked_iota_id]) .await .unwrap(); assert_eq!(0, indexer_response.len()); - let staked_iota: Vec = indexer_client.get_stakes(sender).await.unwrap(); + let staked_iota: Vec = client.get_stakes(sender).await.unwrap(); assert!(staked_iota.is_empty()); }); } #[test] fn test_timelocked_staking() { - let ApiTestSetup { - runtime, - store, - client, - cluster, - } = ApiTestSetup::get_or_init(); + let ApiTestSetup { runtime, .. } = ApiTestSetup::get_or_init(); runtime.block_on(async move { + let (cluster, store, client) = &start_test_cluster_with_read_write_indexer( + None, + Some("test_timelocked_staking"), + None, + ) + .await; + indexer_wait_for_checkpoint(store, 1).await; let (sender, keypair): (_, AccountKeyPair) = get_key_pair(); @@ -315,22 +303,32 @@ fn test_timelocked_staking() { cluster.force_new_epoch().await; indexer_wait_for_latest_checkpoint(store, cluster).await; - let response = client.get_timelocked_stakes(sender).await.unwrap(); + let staked_iota: Vec = + client.get_timelocked_stakes(sender).await.unwrap(); - assert_eq!(response.len(), 1); + assert_eq!(staked_iota.len(), 1); + assert_eq!(10000000000, staked_iota[0].stakes[0].principal); + assert!(matches!( + staked_iota[0].stakes[0].status, + StakeStatus::Active { + estimated_reward: _ + } + )); }); } #[test] fn test_timelocked_unstaking() { - let ApiTestSetup { - runtime, - store, - client, - cluster, - } = ApiTestSetup::get_or_init(); + let ApiTestSetup { runtime, .. } = ApiTestSetup::get_or_init(); runtime.block_on(async move { + let (cluster, store, client) = &start_test_cluster_with_read_write_indexer( + None, + Some("test_timelocked_unstaking"), + None, + ) + .await; + indexer_wait_for_checkpoint(store, 1).await; let (sender, keypair): (_, AccountKeyPair) = get_key_pair(); @@ -417,11 +415,18 @@ fn test_timelocked_unstaking() { cluster.force_new_epoch().await; indexer_wait_for_latest_checkpoint(store, cluster).await; - let response = client.get_timelocked_stakes(sender).await.unwrap(); + let staked_iota = client.get_timelocked_stakes(sender).await.unwrap(); - assert_eq!(response.len(), 1); + assert_eq!(staked_iota.len(), 1); + assert_eq!(10000000000, staked_iota[0].stakes[0].principal); + assert!(matches!( + staked_iota[0].stakes[0].status, + StakeStatus::Active { + estimated_reward: _ + } + )); - let timelocked_stake_id = response[0].stakes[0].timelocked_staked_iota_id; + let timelocked_stake_id = staked_iota[0].stakes[0].timelocked_staked_iota_id; let timelocked_stake_id_ref = cluster .wallet .get_object_ref(timelocked_stake_id) @@ -463,16 +468,6 @@ fn test_timelocked_unstaking() { let res = client.get_timelocked_stakes(sender).await.unwrap(); assert_eq!(res.len(), 0); - let res = cluster - .rpc_client() - .get_timelocked_stakes_by_ids(vec![timelocked_stake_id]) - .await - .unwrap(); - - assert_eq!(res.len(), 1); - - assert!(matches!(res[0].stakes[0].status, StakeStatus::Unstaked)); - let res = client .get_timelocked_stakes_by_ids(vec![timelocked_stake_id]) .await