diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 69ecf9477c..06c5882408 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -19,15 +19,19 @@ use async_trait::async_trait; use flume::{Receiver, Sender}; use futures::select; use std::collections::{HashMap, HashSet}; -use std::str::FromStr; +use std::str::{self, FromStr}; use std::time::{SystemTime, UNIX_EPOCH}; +use zenoh::buffers::buffer::SplitBuffer; use zenoh::buffers::ZBuf; use zenoh::prelude::r#async::*; -use zenoh::query::ConsolidationMode; +use zenoh::query::{ConsolidationMode, QueryTarget}; +use zenoh::sample::builder::SampleBuilder; +use zenoh::sample::{Sample, SampleKind}; use zenoh::time::{new_reception_timestamp, Timestamp, NTP64}; -use zenoh::{Result as ZResult, session::Session}; +use zenoh::value::Value; +use zenoh::{Result as ZResult, Session}; use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig}; -use zenoh_backend_traits::{Capability, History, Persistence, Storage, StorageInsertionResult, StoredData}; +use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData}; use zenoh_keyexpr::key_expr::OwnedKeyExpr; use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider; use zenoh_keyexpr::keyexpr_tree::{support::NonWild, support::UnknownWildness, KeBoxTree}; @@ -38,148 +42,15 @@ use zenoh_util::{zenoh_home, Timed, TimedEvent, Timer}; pub const WILDCARD_UPDATES_FILENAME: &str = "wildcard_updates"; pub const TOMBSTONE_FILENAME: &str = "tombstones"; -#[derive(Clone, Debug)] -pub enum StorageSampleKind { - Put(Value), - Delete, -} - -#[derive(Clone, Debug)] -pub struct StorageSample { - pub key_expr: KeyExpr<'static>, - pub timestamp: Timestamp, - pub kind: StorageSampleKind, -} - -impl From for StorageSample { - fn from(sample: Sample) -> Self { - let timestamp = *sample.timestamp().unwrap_or(&new_reception_timestamp()); - // TODO: add API for disassembly of Sample - let key_expr = sample.key_expr().clone(); - let payload = sample.payload().clone(); - let encoding = sample.encoding().clone(); - let kind = match sample.kind() { - SampleKind::Put => StorageSampleKind::Put(Value::new(payload).with_encoding(encoding)), - SampleKind::Delete => StorageSampleKind::Delete, - }; - StorageSample { - key_expr, - timestamp, - kind, - } - } -} - #[derive(Clone)] -enum Update { - Put(StoredData), - Delete(Timestamp), -} - -impl From for Update { - fn from(value: StorageSample) -> Self { - match value.kind { - StorageSampleKind::Put(data) => Update::Put(StoredData { - value: data, - timestamp: value.timestamp, - }), - StorageSampleKind::Delete => Update::Delete(value.timestamp), - } - } -} - -impl Update { - fn timestamp(&self) -> &Timestamp { - match self { - Update::Put(data) => &data.timestamp, - Update::Delete(ts) => ts, - } - } -} - -// implement from String for Update -impl TryFrom for Update { - type Error = zenoh::Error; - - fn try_from(value: String) -> Result { - let result: (String, String, String, Vec<&[u8]>) = serde_json::from_str(&value)?; - let mut payload = ZBuf::default(); - for slice in result.3 { - payload.push_zslice(slice.to_vec().into()); - } - let value = Value::new(payload).with_encoding(result.2); - let timestamp = Timestamp::from_str(&result.1).map_err(|_| "Error parsing timestamp")?; - if result.0.eq(&(SampleKind::Put).to_string()) { - Ok(Update::Put(StoredData { value, timestamp })) - } else { - Ok(Update::Delete(timestamp)) - } - } -} - -// implement to_string for Update -impl ToString for Update { - fn to_string(&self) -> String { - let result = match self { - Update::Put(data) => ( - SampleKind::Put.to_string(), - data.timestamp.to_string(), - data.value.encoding.to_string(), - data.value.payload.slices().collect::>(), - ), - Update::Delete(ts) => ( - SampleKind::Delete.to_string(), - ts.to_string(), - "".to_string(), - vec![], - ), - }; - serde_json::to_string_pretty(&result).unwrap() - } -} - -trait IntoStorageSample { - fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample - where - IntoKeyExpr: Into>; -} - -impl IntoStorageSample for StoredData { - fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample - where - IntoKeyExpr: Into>, - { - StorageSample { - key_expr: key_expr.into(), - timestamp: self.timestamp, - kind: StorageSampleKind::Put(self.value), - } - } -} - -impl IntoStorageSample for Update { - fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample - where - IntoKeyExpr: Into>, - { - match self { - Update::Put(data) => StorageSample { - key_expr: key_expr.into(), - timestamp: data.timestamp, - kind: StorageSampleKind::Put(data.value), - }, - Update::Delete(ts) => StorageSample { - key_expr: key_expr.into(), - timestamp: ts, - kind: StorageSampleKind::Delete, - }, - } - } +struct Update { + kind: SampleKind, + data: StoredData, } pub struct ReplicationService { pub empty_start: bool, - pub aligner_updates: Receiver, + pub aligner_updates: Receiver, pub log_propagation: Sender<(OwnedKeyExpr, Timestamp)>, } @@ -238,11 +109,10 @@ impl StorageService { let saved_wc = std::fs::read_to_string(zenoh_home().join(WILDCARD_UPDATES_FILENAME)).unwrap(); let saved_wc: HashMap = - serde_json::from_str(&saved_wc).unwrap(); // TODO: Remove unwrap + serde_json::from_str(&saved_wc).unwrap(); let mut wildcard_updates = storage_service.wildcard_updates.write().await; for (k, data) in saved_wc { - wildcard_updates.insert(&k, Update::try_from(data).unwrap()); - // TODO: Remove unwrap + wildcard_updates.insert(&k, construct_update(data)); } } } @@ -313,7 +183,7 @@ impl StorageService { log::error!("Sample {:?} is not timestamped. Please timestamp samples meant for replicated storage.", sample); } else { - self.process_sample(sample.into()).await; + self.process_sample(sample).await; } }, // on query on key_expr @@ -353,15 +223,16 @@ impl StorageService { select!( // on sample for key_expr sample = storage_sub.recv_async() => { - let mut sample = match sample { + let sample = match sample { Ok(sample) => sample, Err(e) => { log::error!("Error in sample: {}", e); continue; } }; - sample.ensure_timestamp(); - self.process_sample(sample.into()).await; + let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp()); + let sample = SampleBuilder::from(sample).timestamp(timestamp).into(); + self.process_sample(sample).await; }, // on query on key_expr query = storage_queryable.recv_async() => { @@ -391,48 +262,61 @@ impl StorageService { // The storage should only simply save the key, sample pair while put and retrieve the same during get // the trimming during PUT and GET should be handled by the plugin - async fn process_sample(&self, sample: StorageSample) { + async fn process_sample(&self, sample: Sample) { log::trace!("[STORAGE] Processing sample: {:?}", sample); - // if wildcard, update wildcard_updates - if sample.key_expr.is_wild() { + if sample.key_expr().is_wild() { self.register_wildcard_update(sample.clone()).await; } - let matching_keys = if sample.key_expr.is_wild() { - self.get_matching_keys(&sample.key_expr).await + let matching_keys = if sample.key_expr().is_wild() { + self.get_matching_keys(sample.key_expr()).await } else { - vec![sample.key_expr.clone().into()] + vec![sample.key_expr().clone().into()] }; log::trace!( "The list of keys matching `{}` is : {:?}", - sample.key_expr, + sample.key_expr(), matching_keys ); for k in matching_keys { if !self - .is_deleted(&k.clone(), &sample.timestamp) + .is_deleted(&k.clone(), sample.timestamp().unwrap()) .await && (self.capability.history.eq(&History::All) || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, &sample.timestamp).await)) + && self.is_latest(&k, sample.timestamp().unwrap()).await)) { log::trace!( "Sample `{:?}` identified as neded processing for key {}", sample, - &k + k ); // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. // get the relevant wild card entry and use that value and timestamp to update the storage - let sample_to_store = - match self.ovderriding_wild_update(&k, &sample.timestamp).await { - Some(overriding_update) => overriding_update.into_sample(k.clone()), - - None => sample.clone().into(), - }; + let sample_to_store: Sample = if let Some(update) = self + .ovderriding_wild_update(&k, sample.timestamp().unwrap()) + .await + { + match update.kind { + SampleKind::Put => { + SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload) + .encoding(update.data.value.encoding) + .timestamp(update.data.timestamp) + .into() + } + SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone())) + .timestamp(update.data.timestamp) + .into(), + } + } else { + SampleBuilder::from(sample.clone()) + .keyexpr(k.clone()) + .into() + }; - let stripped_key = match self.strip_prefix(&sample_to_store.key_expr) { + let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { log::error!("{}", e); @@ -440,21 +324,25 @@ impl StorageService { } }; let mut storage = self.storage.lock().await; - let result = match sample_to_store.kind { - StorageSampleKind::Put(data) => { + let result = match sample.kind() { + SampleKind::Put => { storage .put( stripped_key, - data, - sample_to_store.timestamp, + Value::new(sample_to_store.payload().clone()) + .encoding(sample_to_store.encoding().clone()), + *sample_to_store.timestamp().unwrap(), ) .await - }, - StorageSampleKind::Delete => { + } + SampleKind::Delete => { // register a tombstone - self.mark_tombstone(&k, sample_to_store.timestamp).await; - storage.delete(stripped_key, sample_to_store.timestamp).await - }, + self.mark_tombstone(&k, *sample_to_store.timestamp().unwrap()) + .await; + storage + .delete(stripped_key, *sample_to_store.timestamp().unwrap()) + .await + } }; drop(storage); if self.replication.is_some() @@ -466,7 +354,7 @@ impl StorageService { .as_ref() .unwrap() .log_propagation - .send((k.clone(), sample_to_store.timestamp)); + .send((k.clone(), *sample_to_store.timestamp().unwrap())); match sending { Ok(_) => (), Err(e) => { @@ -497,16 +385,26 @@ impl StorageService { } } - async fn register_wildcard_update(&self, sample: StorageSample) { + async fn register_wildcard_update(&self, sample: Sample) { // @TODO: change into a better store that does incremental writes - let key = sample.key_expr.clone(); + let key = sample.key_expr().clone(); let mut wildcards = self.wildcard_updates.write().await; - wildcards.insert(&key, sample.into()); + let timestamp = *sample.timestamp().unwrap(); + wildcards.insert( + &key, + Update { + kind: sample.kind(), + data: StoredData { + value: Value::from(sample), + timestamp, + }, + }, + ); if self.capability.persistence.eq(&Persistence::Durable) { // flush to disk to makeit durable let mut serialized_data = HashMap::new(); for (k, update) in wildcards.key_value_pairs() { - serialized_data.insert(k, update.to_string()); + serialized_data.insert(k, serialize_update(update)); } if let Err(e) = std::fs::write( zenoh_home().join(WILDCARD_UPDATES_FILENAME), @@ -535,36 +433,34 @@ impl StorageService { let mut update = None; for node in wildcards.intersecting_keys(key_expr) { let weight = wildcards.weight_at(&node); - if let Some(weight) = weight { - if weight.timestamp() > ts { - // if the key matches a wild card update, check whether it was saved in storage - // remember that wild card updates change only existing keys - let stripped_key = match self.strip_prefix(&key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - log::error!("{}", e); - break; - } - }; - let mut storage = self.storage.lock().await; - match storage.get(stripped_key, "").await { - Ok(stored_data) => { - for entry in stored_data { - if entry.timestamp > *ts { - return None; - } + if weight.is_some() && weight.unwrap().data.timestamp > *ts { + // if the key matches a wild card update, check whether it was saved in storage + // remember that wild card updates change only existing keys + let stripped_key = match self.strip_prefix(&key_expr.into()) { + Ok(stripped) => stripped, + Err(e) => { + log::error!("{}", e); + break; + } + }; + let mut storage = self.storage.lock().await; + match storage.get(stripped_key, "").await { + Ok(stored_data) => { + for entry in stored_data { + if entry.timestamp > *ts { + return None; } } - Err(e) => { - log::warn!( - "Storage '{}' raised an error fetching a query on key {} : {}", - self.name, - key_expr, - e - ); - ts = weight.timestamp(); - update = Some(weight.clone()); - } + } + Err(e) => { + log::warn!( + "Storage '{}' raised an error fetching a query on key {} : {}", + self.name, + key_expr, + e + ); + ts = &weight.unwrap().data.timestamp; + update = Some(weight.unwrap().clone()); } } } @@ -617,8 +513,13 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - let sample = entry.into_sample(key.clone()); - if let Err(e) = q.reply_sample(sample).res().await { + if let Err(e) = q + .reply(key.clone(), entry.value.payload) + .encoding(entry.value.encoding) + .timestamp(entry.timestamp) + .res() + .await + { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -644,13 +545,13 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - let Value { - payload, encoding, .. - } = entry.value; - let sample = Sample::put(q.key_expr().clone(), payload) - .with_encoding(encoding) - .with_timestamp(entry.timestamp); - if let Err(e) = q.reply_sample(sample).res().await { + if let Err(e) = q + .reply(q.key_expr().clone(), entry.value.payload) + .encoding(entry.value.encoding) + .timestamp(entry.timestamp) + .res() + .await + { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -757,7 +658,7 @@ impl StorageService { while let Ok(reply) = replies.recv_async().await { match reply.sample { Ok(sample) => { - self.process_sample(sample.into()).await; + self.process_sample(sample).await; } Err(e) => log::warn!( "Storage '{}' received an error to align query: {:?}", @@ -770,6 +671,47 @@ impl StorageService { } } +fn serialize_update(update: &Update) -> String { + let Update { + kind, + data: + StoredData { + value: Value { + payload, encoding, .. + }, + timestamp, + }, + } = update; + let zbuf: ZBuf = payload.into(); + + let result = ( + kind.to_string(), + timestamp.to_string(), + encoding.to_string(), + zbuf.slices().collect::>(), + ); + serde_json::to_string_pretty(&result).unwrap() +} + +fn construct_update(data: String) -> Update { + let result: (String, String, String, Vec<&[u8]>) = serde_json::from_str(&data).unwrap(); // @TODO: remove the unwrap() + let mut payload = ZBuf::default(); + for slice in result.3 { + payload.push_zslice(slice.to_vec().into()); + } + let value = Value::new(payload).encoding(result.2); + let data = StoredData { + value, + timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap() + }; + let kind = if result.0.eq(&(SampleKind::Put).to_string()) { + SampleKind::Put + } else { + SampleKind::Delete + }; + Update { kind, data } +} + // Periodic event cleaning-up data info for old metadata struct GarbageCollectionEvent { config: GarbageCollectionConfig, @@ -801,7 +743,7 @@ impl Timed for GarbageCollectionEvent { let mut to_be_removed = HashSet::new(); for (k, update) in wildcard_updates.key_value_pairs() { - let ts = update.timestamp(); + let ts = update.data.timestamp; if ts.get_time() < &time_limit { // mark key to be removed to_be_removed.insert(k);