From bb922c3a1770ff0ddc1b6b92babb7e21e07ec245 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 11 Oct 2023 12:38:13 +0200 Subject: [PATCH] fix(hermes): add lock for the entire message state Before this change, there was a lock for each message and it could cause the updateData for multiple ids have 2 updates (because of the race with the thread updating the states). This change adds a RwLock which makes sure that when the entire message state is updating, no one can read from it while allowing concurrent reads in other occasions. --- hermes/Cargo.lock | 2 +- hermes/Cargo.toml | 2 +- hermes/src/state/cache.rs | 60 +++++++++++++++++++++------------------ 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index 01d18eb48..1b2d6517c 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1898,7 +1898,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "async-trait", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 3c3d85a49..ff30faf4e 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.3.1" +version = "0.3.2" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/hermes/src/state/cache.rs b/hermes/src/state/cache.rs index a5f7a35dd..f263c528a 100644 --- a/hermes/src/state/cache.rs +++ b/hermes/src/state/cache.rs @@ -12,14 +12,17 @@ use { anyhow, Result, }, - dashmap::DashMap, + futures::future::join_all, pythnet_sdk::messages::{ FeedId, Message, MessageType, }, std::{ - collections::BTreeMap, + collections::{ + BTreeMap, + HashMap, + }, ops::Bound, sync::Arc, }, @@ -103,16 +106,16 @@ pub struct Cache { /// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap. wormhole_merkle_state_cache: Arc>>, - message_cache: Arc>>, + message_cache: Arc>>>, cache_size: u64, } -fn retrieve_message_state( +async fn retrieve_message_state( cache: &Cache, key: MessageStateKey, request_time: RequestTime, ) -> Option { - match cache.message_cache.get(&key) { + match cache.message_cache.read().await.get(&key) { Some(key_cache) => { match request_time { RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(), @@ -154,7 +157,7 @@ fn retrieve_message_state( impl Cache { pub fn new(cache_size: u64) -> Self { Self { - message_cache: Arc::new(DashMap::new()), + message_cache: Arc::new(RwLock::new(HashMap::new())), accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())), wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())), cache_size, @@ -189,20 +192,20 @@ impl AggregateCache for crate::state::State { async fn message_state_keys(&self) -> Vec { self.cache .message_cache + .read() + .await .iter() - .map(|entry| entry.key().clone()) + .map(|entry| entry.0.clone()) .collect::>() } async fn store_message_states(&self, message_states: Vec) -> Result<()> { + let mut message_cache = self.cache.message_cache.write().await; + for message_state in message_states { let key = message_state.key(); let time = message_state.time(); - let mut cache = self - .cache - .message_cache - .entry(key) - .or_insert_with(BTreeMap::new); + let cache = message_cache.entry(key).or_insert_with(BTreeMap::new); cache.insert(time, message_state); @@ -220,24 +223,25 @@ impl AggregateCache for crate::state::State { request_time: RequestTime, filter: MessageStateFilter, ) -> Result> { - ids.into_iter() - .flat_map(|id| { - let request_time = request_time.clone(); - let message_types: Vec = match filter { - MessageStateFilter::All => MessageType::iter().collect(), - MessageStateFilter::Only(t) => vec![t], + join_all(ids.into_iter().flat_map(|id| { + let request_time = request_time.clone(); + let message_types: Vec = match filter { + MessageStateFilter::All => MessageType::iter().collect(), + MessageStateFilter::Only(t) => vec![t], + }; + + message_types.into_iter().map(move |message_type| { + let key = MessageStateKey { + feed_id: id, + type_: message_type, }; - - message_types.into_iter().map(move |message_type| { - let key = MessageStateKey { - feed_id: id, - type_: message_type, - }; - retrieve_message_state(&self.cache, key, request_time.clone()) - .ok_or(anyhow!("Message not found")) - }) + retrieve_message_state(&self.cache, key, request_time.clone()) }) - .collect() + })) + .await + .into_iter() + .collect::>>() + .ok_or(anyhow!("Message not found")) } async fn store_accumulator_messages(