diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 956cd57f8d..6571845fad 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -300,21 +300,20 @@ impl StorageService { ); for k in matching_keys { - if !self.is_deleted(&k.clone(), sample_timestamp).await - && (self.capability.history.eq(&History::All) - || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample_timestamp).await)) - { - tracing::trace!( - "Sample `{:?}` identified as needed processing for key {}", - sample, - k - ); - // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. - // get the relevant wild card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = if let Some(update) = - self.ovderriding_wild_update(&k, sample_timestamp).await - { + if self.is_deleted(&k.clone(), sample_timestamp).await { + tracing::trace!("Ignoring publication on < {} > that is deleted later on", k); + continue; + } + + tracing::trace!( + "Sample `{:?}` identified as needed processing for key {}", + sample, + k + ); + // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. + // get the relevant wild card entry and use that value and timestamp to update the storage + let sample_to_store: Sample = + if let Some(update) = self.ovderriding_wild_update(&k, sample_timestamp).await { match update.kind { SampleKind::Put => { SampleBuilder::put(k.clone(), update.data.value.payload().clone()) @@ -332,24 +331,22 @@ impl StorageService { .into() }; - // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have - // a Timestamp and, in theory, this check is unneeded. - let sample_to_store_timestamp = match sample_to_store.timestamp() { - Some(timestamp) => *timestamp, - None => { - tracing::error!( - "Discarding `Sample` generated through `SampleBuilder` that has no \ - Timestamp: {:?}", - sample_to_store - ); - continue; - } - }; + // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have + // a Timestamp and, in theory, this check is unneeded. + let sample_to_store_timestamp = match sample_to_store.timestamp() { + Some(timestamp) => *timestamp, + None => { + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); + continue; + } + }; - let stripped_key = match crate::strip_prefix( - self.strip_prefix.as_ref(), - sample_to_store.key_expr(), - ) { + let stripped_key = + match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { tracing::error!("{}", e); @@ -357,66 +354,65 @@ impl StorageService { } }; - // If the Storage was declared as only keeping the Latest value, we ensure that, for - // each received Sample, it is indeed the Latest value that is processed. - let mut latest_updates_guard = self.latest_updates.lock().await; - if self.capability.history == History::Latest { - if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) { - if sample_to_store_timestamp < *stored_timestamp { - tracing::debug!( - "Skipping Sample for < {:?} >, a Value with a more recent \ - Timestamp is stored: (received) {} vs (stored) {}", - stripped_key, - sample_to_store_timestamp, - stored_timestamp - ); - continue; - } + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + let mut latest_updates_guard = self.latest_updates.lock().await; + if self.capability.history == History::Latest { + if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) { + if sample_to_store_timestamp < *stored_timestamp { + tracing::debug!( + "Skipping Sample for < {:?} >, a Value with a more recent Timestamp \ + is stored: (received) {} vs (stored) {}", + stripped_key, + sample_to_store_timestamp, + stored_timestamp + ); + continue; } } + } - let mut storage = self.storage.lock().await; - let result = match sample.kind() { - SampleKind::Put => { - storage - .put( - stripped_key.clone(), - Value::new( - sample_to_store.payload().clone(), - sample_to_store.encoding().clone(), - ), - sample_to_store_timestamp, - ) - .await - } - SampleKind::Delete => { - // register a tombstone - self.mark_tombstone(&k, sample_to_store_timestamp).await; - storage - .delete(stripped_key.clone(), sample_to_store_timestamp) - .await - } - }; - drop(storage); - - if result.is_ok_and(|insertion_result| { - !matches!(insertion_result, StorageInsertionResult::Outdated) - }) { - // Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps - // track of the Latest value, the timestamp is indeed more recent (it was - // checked before being processed): we update our internal structure. - if self.capability.history == History::Latest { - latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); - } - drop(latest_updates_guard); + let mut storage = self.storage.lock().await; + let result = match sample.kind() { + SampleKind::Put => { + storage + .put( + stripped_key.clone(), + Value::new( + sample_to_store.payload().clone(), + sample_to_store.encoding().clone(), + ), + sample_to_store_timestamp, + ) + .await + } + SampleKind::Delete => { + // register a tombstone + self.mark_tombstone(&k, sample_to_store_timestamp).await; + storage + .delete(stripped_key.clone(), sample_to_store_timestamp) + .await + } + }; + drop(storage); - if let Some(replication) = &self.replication { - if let Err(e) = replication - .log_propagation - .send((k.clone(), sample_to_store_timestamp)) - { - tracing::error!("Error in sending the sample to the log: {}", e); - } + if result.is_ok_and(|insertion_result| { + !matches!(insertion_result, StorageInsertionResult::Outdated) + }) { + // Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps + // track of the Latest value, the timestamp is indeed more recent (it was + // checked before being processed): we update our internal structure. + if self.capability.history == History::Latest { + latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); + } + drop(latest_updates_guard); + + if let Some(replication) = &self.replication { + if let Err(e) = replication + .log_propagation + .send((k.clone(), sample_to_store_timestamp)) + { + tracing::error!("Error in sending the sample to the log: {}", e); } } } @@ -526,26 +522,6 @@ impl StorageService { update } - async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool { - // @TODO: if cache exists, read from there - let mut storage = self.storage.lock().await; - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - return false; - } - }; - if let Ok(stored_data) = storage.get(stripped_key, "").await { - for entry in stored_data { - if entry.timestamp > *timestamp { - return false; - } - } - } - true - } - async fn reply_query(&self, query: Result) { let q = match query { Ok(q) => q,