diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index d2147b137c..297188dc60 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -71,6 +71,7 @@ pub struct StorageService { capability: Capability, tombstones: Arc>>, wildcard_updates: Arc>>, + latest_updates: Arc, Timestamp>>>, replication: Option, } @@ -94,6 +95,7 @@ impl StorageService { capability: store_intercept.capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), + latest_updates: Arc::new(Mutex::new(HashMap::new())), replication, }; if storage_service @@ -143,6 +145,7 @@ impl StorageService { config: gc_config, tombstones: self.tombstones.clone(), wildcard_updates: self.wildcard_updates.clone(), + latest_updates: self.latest_updates.clone(), }, ); t.add_async(gc).await; @@ -335,7 +338,11 @@ impl StorageService { let sample_to_store_timestamp = match sample_to_store.timestamp() { Some(timestamp) => *timestamp, None => { - tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store); + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); continue; } }; @@ -350,12 +357,32 @@ impl StorageService { return; } }; + + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + if self.capability.history == History::Latest { + if let Some(stored_timestamp) = + self.latest_updates.lock().await.get(&stripped_key) + { + if sample_to_store_timestamp < *stored_timestamp { + tracing::debug!( + "Skipping Sample for < {:?} >, a Value with a more recent \ + Timestamp is stored: (received) {} vs (stored) {}", + stripped_key, + sample_to_store_timestamp, + stored_timestamp + ); + continue; + } + } + } + let mut storage = self.storage.lock().await; let result = match sample.kind() { SampleKind::Put => { storage .put( - stripped_key, + stripped_key.clone(), Value::new( sample_to_store.payload().clone(), sample_to_store.encoding().clone(), @@ -368,24 +395,30 @@ impl StorageService { // register a tombstone self.mark_tombstone(&k, sample_to_store_timestamp).await; storage - .delete(stripped_key, sample_to_store_timestamp) + .delete(stripped_key.clone(), sample_to_store_timestamp) .await } }; drop(storage); - if self.replication.is_some() - && result.is_ok() - && !matches!(result.unwrap(), StorageInsertionResult::Outdated) - { - let sending = self - .replication - .as_ref() - .unwrap() - .log_propagation - .send((k.clone(), sample_to_store_timestamp)); - match sending { - Ok(_) => (), - Err(e) => { + + if result.is_ok_and(|insertion_result| { + !matches!(insertion_result, StorageInsertionResult::Outdated) + }) { + // Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps + // track of the Latest value, the timestamp is indeed more recent (it was + // checked before being processed): we update our internal structure. + if self.capability.history == History::Latest { + self.latest_updates + .lock() + .await + .insert(stripped_key, sample_to_store_timestamp); + } + + if let Some(replication) = &self.replication { + if let Err(e) = replication + .log_propagation + .send((k.clone(), sample_to_store_timestamp)) + { tracing::error!("Error in sending the sample to the log: {}", e); } } @@ -696,6 +729,7 @@ struct GarbageCollectionEvent { config: GarbageCollectionConfig, tombstones: Arc>>, wildcard_updates: Arc>>, + latest_updates: Arc, Timestamp>>>, } #[async_trait] @@ -732,6 +766,11 @@ impl Timed for GarbageCollectionEvent { wildcard_updates.remove(&k); } + self.latest_updates + .lock() + .await + .retain(|_, timestamp| timestamp.get_time() < &time_limit); + tracing::trace!("End garbage collection of obsolete data-infos"); } }