Skip to content

Commit

Permalink
fix(hermes): add lock for the entire message state
Browse files Browse the repository at this point in the history
Before this change, there was a lock for each message and it could
cause the updateData for multiple ids have 2 updates (because of the
race with the thread updating the states). This change adds a RwLock
which makes sure that when the entire message state is updating,
no one can read from it while allowing concurrent reads in other
occasions.
  • Loading branch information
ali-bahjati committed Oct 11, 2023
1 parent a224199 commit bb922c3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
2 changes: 1 addition & 1 deletion hermes/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hermes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.3.1"
version = "0.3.2"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
60 changes: 32 additions & 28 deletions hermes/src/state/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ use {
anyhow,
Result,
},
dashmap::DashMap,
futures::future::join_all,
pythnet_sdk::messages::{
FeedId,
Message,
MessageType,
},
std::{
collections::BTreeMap,
collections::{
BTreeMap,
HashMap,
},
ops::Bound,
sync::Arc,
},
Expand Down Expand Up @@ -103,16 +106,16 @@ pub struct Cache {
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,

message_cache: Arc<DashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>,
message_cache: Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>,
cache_size: u64,
}

fn retrieve_message_state(
async fn retrieve_message_state(
cache: &Cache,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
match cache.message_cache.get(&key) {
match cache.message_cache.read().await.get(&key) {
Some(key_cache) => {
match request_time {
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
Expand Down Expand Up @@ -154,7 +157,7 @@ fn retrieve_message_state(
impl Cache {
pub fn new(cache_size: u64) -> Self {
Self {
message_cache: Arc::new(DashMap::new()),
message_cache: Arc::new(RwLock::new(HashMap::new())),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
cache_size,
Expand Down Expand Up @@ -189,20 +192,20 @@ impl AggregateCache for crate::state::State {
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
.message_cache
.read()
.await
.iter()
.map(|entry| entry.key().clone())
.map(|entry| entry.0.clone())
.collect::<Vec<_>>()
}

async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
let mut message_cache = self.cache.message_cache.write().await;

for message_state in message_states {
let key = message_state.key();
let time = message_state.time();
let mut cache = self
.cache
.message_cache
.entry(key)
.or_insert_with(BTreeMap::new);
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);

cache.insert(time, message_state);

Expand All @@ -220,24 +223,25 @@ impl AggregateCache for crate::state::State {
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>> {
ids.into_iter()
.flat_map(|id| {
let request_time = request_time.clone();
let message_types: Vec<MessageType> = match filter {
MessageStateFilter::All => MessageType::iter().collect(),
MessageStateFilter::Only(t) => vec![t],
join_all(ids.into_iter().flat_map(|id| {
let request_time = request_time.clone();
let message_types: Vec<MessageType> = match filter {
MessageStateFilter::All => MessageType::iter().collect(),
MessageStateFilter::Only(t) => vec![t],
};

message_types.into_iter().map(move |message_type| {
let key = MessageStateKey {
feed_id: id,
type_: message_type,
};

message_types.into_iter().map(move |message_type| {
let key = MessageStateKey {
feed_id: id,
type_: message_type,
};
retrieve_message_state(&self.cache, key, request_time.clone())
.ok_or(anyhow!("Message not found"))
})
retrieve_message_state(&self.cache, key, request_time.clone())
})
.collect()
}))
.await
.into_iter()
.collect::<Option<Vec<_>>>()
.ok_or(anyhow!("Message not found"))
}

async fn store_accumulator_messages(
Expand Down

0 comments on commit bb922c3

Please sign in to comment.