Skip to content

Commit

Permalink
refactor(storage-manager): implement cache for History::Latest
Browse files Browse the repository at this point in the history
The changes introduced in #1367 were actually redundant (and
incomplete): the method `is_latest` was checking that a received Sample
was more recent than what is currently stored.

This commit removes the redundant checks and implements a correct
caching strategy by leveraging both approaches. The changes consist
mostly in:
1. Changing the outer test by continuing early if the Sample is detected
   as deleted. This allows lowering the overall indentation of the
   `process_sample` method by one level.
2. Modifying the behaviour of the `is_latest` method to check the cache
   first and only query the Storage if there is a cache miss.

To help make potential concurrency issues visible, the method
`is_latest` was renamed to `guard_cache_if_latest` as the lock on the
Cache should only be released when the Sample has been processed *and*
the Cache updated.

Additionally, the cache is filled at initialisation.

The `read_cost` configuration parameter was removed as it was not used
and made obsolete with this change.

* plugins/zenoh-backend-example/src/lib.rs:
* plugins/zenoh-backend-traits/src/lib.rs:
* plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs:
  removed the obsolete `read_cost` parameter.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Populate the Cache at initialisation.
  - Updated the call to `StorageService::start` to also pass the Cache.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
  - Updated the call to `StorageService::start` to also pass the Cache.
  - "Reversed" the logic when checking if a Sample is deleted: if it is
    continue to the next one. This allows reducing the indentation
    level.
  - Keep the guard over the cache if a Sample is more recent and only
    release it once it has been updated (after the Sample has been
    successfully processed).

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 19, 2024
1 parent a7599ef commit 3295c82
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 133 deletions.
1 change: 0 additions & 1 deletion plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl Volume for ExampleBackend {
Capability {
persistence: Persistence::Volatile,
history: History::Latest,
read_cost: 0,
}
}
async fn create_storage(&self, _props: StorageConfig) -> ZResult<Box<dyn Storage>> {
Expand Down
8 changes: 1 addition & 7 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@
//! }
//!
//! fn get_capability(&self) -> Capability {
//! // This operation is used to confirm if the volume indeed supports
//! // This operation is used to confirm if the volume indeed supports
//! // the capabilities requested by the configuration
//! Capability{
//! persistence: Persistence::Volatile,
//! history: History::Latest,
//! read_cost: 0,
//! }
//! }
//!
Expand Down Expand Up @@ -145,10 +144,6 @@ const FEATURES: &str =
pub struct Capability {
pub persistence: Persistence,
pub history: History,
/// `read_cost` is a parameter that hels the storage manager take a decision on optimizing database roundtrips
/// If the `read_cost` is higher than a given threshold, the storage manger will maintain a cache with the keys present in the database
/// This is a placeholder, not actually utilised in the current implementation
pub read_cost: u32,
}

/// Persistence is the guarantee expected from a storage in case of failures
Expand Down Expand Up @@ -185,7 +180,6 @@ pub struct StoredData {
}

/// Trait to be implemented by a Backend.
///
#[async_trait]
pub trait Volume: Send + Sync {
/// Returns the status that will be sent as a reply to a query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Volume for MemoryBackend {
Capability {
persistence: Persistence::Volatile,
history: History::Latest,
read_cost: 0,
}
}

Expand Down
20 changes: 18 additions & 2 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use flume::Sender;
use tokio::sync::Mutex;
use zenoh::{session::Session, Result as ZResult};
use zenoh::{internal::bail, session::Session, Result as ZResult};
use zenoh_backend_traits::{config::StorageConfig, VolumeInstance};

mod service;
Expand Down Expand Up @@ -48,9 +48,25 @@ pub(crate) async fn create_and_start_storage(

let (tx, rx) = flume::bounded(1);

let latest_updates = match storage.get_all_entries().await {
Ok(entries) => entries.into_iter().collect(),
Err(e) => {
bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}");
}
};

let storage = Arc::new(Mutex::new(storage));
tokio::task::spawn(async move {
StorageService::start(zenoh_session, config, &name, storage, capability, rx).await;
StorageService::start(
zenoh_session,
config,
&name,
storage,
capability,
rx,
Arc::new(Mutex::new(latest_updates)),
)
.await;
});

Ok(tx)
Expand Down
254 changes: 132 additions & 122 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
use async_trait::async_trait;
use flume::Receiver;
use futures::select;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, MutexGuard, RwLock};
use zenoh::{
internal::{
buffers::{SplitBuffer, ZBuf},
Expand Down Expand Up @@ -75,9 +75,8 @@ impl StorageService {
storage: Arc<Mutex<Box<dyn zenoh_backend_traits::Storage>>>,
capability: Capability,
rx: Receiver<StorageMessage>,
latest_updates: Arc<Mutex<HashMap<Option<OwnedKeyExpr>, Timestamp>>>,
) {
// @TODO: optimization: if read_cost is high for the storage, initialize a cache for the
// latest value
let mut storage_service = StorageService {
session,
key_expr: config.key_expr,
Expand All @@ -88,7 +87,7 @@ impl StorageService {
capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
latest_updates: Arc::new(Mutex::new(HashMap::new())),
latest_updates,
};
if storage_service
.capability
Expand Down Expand Up @@ -236,127 +235,117 @@ impl StorageService {
);

for k in matching_keys {
if !self.is_deleted(&k.clone(), sample_timestamp).await
&& (self.capability.history.eq(&History::All)
|| (self.capability.history.eq(&History::Latest)
&& self.is_latest(&k, sample_timestamp).await))
if self.is_deleted(&k, sample_timestamp).await {
tracing::trace!("Skipping Sample < {} > deleted later on", k);
continue;
}

tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);

// there might be the case that the actual update was outdated due to a wild card
// update, but not stored yet in the storage. get the relevant wild
// card entry and use that value and timestamp to update the storage
let sample_to_store: Sample = if let Some(update) =
self.overriding_wild_update(&k, sample_timestamp).await
{
tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);
// there might be the case that the actual update was outdated due to a wild card
// update, but not stored yet in the storage. get the relevant wild
// card entry and use that value and timestamp to update the storage
let sample_to_store: Sample =
if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await {
match update.kind {
SampleKind::Put => {
SampleBuilder::put(k.clone(), update.data.value.payload().clone())
.encoding(update.data.value.encoding().clone())
.timestamp(update.data.timestamp)
.into()
}
SampleKind::Delete => SampleBuilder::delete(k.clone())
.timestamp(update.data.timestamp)
.into(),
}
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};
match update.kind {
SampleKind::Put => SampleBuilder::put(k.clone(), update.data.value.payload())
.encoding(update.data.value.encoding().clone())
.timestamp(update.data.timestamp)
.into(),
SampleKind::Delete => SampleBuilder::delete(k.clone())
.timestamp(update.data.timestamp)
.into(),
}
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};

// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample
// generated should have a Timestamp and, in theory, this check is
// unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};
// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample
// generated should have a Timestamp and, in theory, this check is
// unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};

let stripped_key = match crate::strip_prefix(
self.strip_prefix.as_ref(),
sample_to_store.key_expr(),
) {
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return;
}
};

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest {
if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) {
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Timestamp is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut cache_guard = None;
if self.capability.history == History::Latest {
match self
.guard_cache_if_latest(&stripped_key, &sample_to_store_timestamp)
.await
{
Some(guard) => {
cache_guard = Some(guard);
}
None => {
tracing::trace!("Skipping outdated Sample < {} >", k);
continue;
}
}
}

let mut storage = self.storage.lock().await;
let storage_result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
let mut storage = self.storage.lock().await;
let storage_result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};

drop(storage);
drop(storage);

match storage_result {
Ok(StorageInsertionResult::Outdated) => {
tracing::trace!(
"Ignoring `Outdated` sample < {} >",
sample_to_store.key_expr()
);
}
Ok(_) => {
if self.capability.history == History::Latest {
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
}
Err(e) => {
tracing::error!(
"`{}` on < {} > failed with: {e:?}",
sample.kind(),
sample_to_store.key_expr()
);
match storage_result {
Ok(StorageInsertionResult::Outdated) => {
tracing::trace!("Ignoring `Outdated` sample < {} >", k);
}
Ok(_) => {
if let Some(mut cache_guard) = cache_guard {
cache_guard.insert(stripped_key, sample_to_store_timestamp);
}
}
Err(e) => {
tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k);
}
}
}
}
Expand Down Expand Up @@ -464,24 +453,45 @@ impl StorageService {
update
}

async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
// @TODO: if cache exists, read from there
let mut storage = self.storage.lock().await;
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return false;
/// Returns a guard over the cache if the provided [Timestamp] is more recent than what is kept
/// in the Storage for the `stripped_key`. Otherwise returns `None`.
///
/// This method will first look up any cached value and if none is found, it will request the
/// Storage.
///
/// # ⚠️ Race-condition
///
/// Returning a guard over the cache is not an "innocent" choice: in order to avoid
/// race-condition, the guard over the cache must be kept until the Storage has processed the
/// Sample and the Cache has been updated accordingly.
///
/// If the lock is released before both operations are performed, the Cache and Storage could
/// end up in an inconsistent state (think two updates being processed at the same time).
async fn guard_cache_if_latest(
&self,
stripped_key: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> Option<MutexGuard<'_, HashMap<Option<OwnedKeyExpr>, Timestamp>>> {
let cache_guard = self.latest_updates.lock().await;
if let Some(latest_timestamp) = cache_guard.get(stripped_key) {
if timestamp > latest_timestamp {
return Some(cache_guard);
} else {
return None;
}
};
if let Ok(stored_data) = storage.get(stripped_key, "").await {
for entry in stored_data {
if entry.timestamp > *timestamp {
return false;
}

let mut storage = self.storage.lock().await;

if let Ok(stored_data) = storage.get(stripped_key.clone(), "").await {
for data in stored_data {
if data.timestamp > *timestamp {
return None;
}
}
}
true

Some(cache_guard)
}

async fn reply_query(&self, query: Result<zenoh::query::Query, flume::RecvError>) {
Expand Down

0 comments on commit 3295c82

Please sign in to comment.