diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index 39480a3912..3a68aa3bda 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -219,21 +219,36 @@ impl AlignQueryable { match reply.sample { Ok(sample) => { tracing::trace!( - "[ALIGN QUERYABLE] Received ('{}': '{}')", + "[ALIGN QUERYABLE] Received ('{}': '{}' @ {:?})", sample.key_expr.as_str(), - sample.value + sample.value, + sample.timestamp ); if let Some(timestamp) = sample.timestamp { match timestamp.cmp(&logentry.timestamp) { - Ordering::Greater => return None, + Ordering::Greater => { + tracing::error!( + "[ALIGN QUERYABLE] Data in the storage is newer than requested." + ); + return None; + } Ordering::Less => { tracing::error!( "[ALIGN QUERYABLE] Data in the storage is older than requested." ); return None; } - Ordering::Equal => return Some(sample), + Ordering::Equal => { + tracing::debug!( + "[ALIGN QUERYABLE] Data in the storage has a good timestamp." + ); + return Some(sample); + } } + } else { + tracing::error!( + "[ALIGN QUERYABLE] No timestamp on log entry sample from storage." + ); } } Err(err) => { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/digest.rs b/plugins/zenoh-plugin-storage-manager/src/replica/digest.rs index e64e81f3d6..55ae097892 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/digest.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/digest.rs @@ -154,8 +154,14 @@ impl Digest { HashMap::new(), ); for entry in raw_log { - let sub = Digest::get_subinterval(config.delta, entry.timestamp, config.sub_intervals); - let interval = Digest::get_interval(sub, config.sub_intervals); + let Some(sub) = + Digest::get_subinterval(config.delta, entry.timestamp, config.sub_intervals) + else { + continue; + }; + let Some(interval) = Digest::get_interval(sub, config.sub_intervals) else { + continue; + }; let era = Digest::get_era(&config, latest_interval, interval); if sub == curr_sub { @@ -263,37 +269,44 @@ impl Digest { } // Updates an existing digest with new entries, also replaces removed entries - pub async fn update_digest( - mut current: Digest, + pub fn update_digest( + current: Digest, latest_interval: u64, last_snapshot_time: Timestamp, new_content: HashSet, deleted_content: HashSet, ) -> Digest { - // push content in correct places - let (mut current, mut subintervals_to_update, mut intervals_to_update, mut eras_to_update) = - Digest::update_new_content(&mut current, new_content, latest_interval); - // remove deleted content from proper places - let (mut current, further_subintervals, further_intervals, further_eras) = - Digest::remove_deleted_content(&mut current, deleted_content, latest_interval); + let (current, further_subintervals, further_intervals, further_eras) = + Digest::remove_deleted_content(current, deleted_content, latest_interval); + tracing::trace!("Removed deleted content: {current:?}"); + // push content in correct places + let (current, mut subintervals_to_update, mut intervals_to_update, mut eras_to_update) = + Digest::update_new_content(current, new_content, latest_interval); - // move intervals into eras if changed -- iterate through hot and move them to warm/cold if needed, iterate through warm and move them to cold if needed - let (current, realigned_eras) = - Digest::recalculate_era_content(&mut current, latest_interval); + // move intervals into eras if changed -- iterate through hot + // and move them to warm/cold if needed, iterate through warm + // and move them to cold if needed + let (current, realigned_eras) = Digest::recalculate_era_content(current, latest_interval); subintervals_to_update.extend(further_subintervals); intervals_to_update.extend(further_intervals); eras_to_update.extend(further_eras); eras_to_update.extend(realigned_eras); - let mut subintervals = current.subintervals.clone(); - let mut intervals = current.intervals.clone(); - let mut eras = current.eras.clone(); + let mut subintervals = current.subintervals; + let mut intervals = current.intervals; + let mut eras = current.eras; // reconstruct updated parts of the digest for sub in subintervals_to_update { - let subinterval = subintervals.get_mut(&sub).unwrap(); + let subinterval = match subintervals.get_mut(&sub) { + Some(subinterval) => subinterval, + None => { + tracing::error!("failed to get subinterval {sub:?}"); + continue; + } + }; let content = &subinterval.content; if !content.is_empty() { // order the content, hash them @@ -308,7 +321,13 @@ impl Digest { } for int in intervals_to_update { - let interval = intervals.get_mut(&int).unwrap(); + let interval = match intervals.get_mut(&int) { + Some(interval) => interval, + None => { + tracing::error!("failed to get interval: {int:?}"); + continue; + } + }; interval.content.retain(|x| subintervals.contains_key(x)); let content = &interval.content; if !content.is_empty() { @@ -325,8 +344,15 @@ impl Digest { } for era_type in eras_to_update { - let era = eras.get_mut(&era_type).unwrap(); + let era = match eras.get_mut(&era_type) { + Some(era) => era, + None => { + tracing::error!("failed to get era: {era_type:?}"); + continue; + } + }; era.content.retain(|x| intervals.contains_key(x)); + let content = &era.content; if !content.is_empty() { // order the content, hash them @@ -372,8 +398,9 @@ impl Digest { fn get_interval_checksum(content: &[u64], info: &HashMap) -> u64 { let mut hashable_content = Vec::new(); for i in content { - let i_cont = info.get(i).unwrap().checksum; - hashable_content.push(i_cont); + if let Some(i_cont) = info.get(i).map(|i| i.checksum) { + hashable_content.push(i_cont); + } } Digest::get_content_hash(&hashable_content) } @@ -382,8 +409,9 @@ impl Digest { fn get_era_checksum(content: &[u64], info: &HashMap) -> u64 { let mut hashable_content = Vec::new(); for i in content { - let i_cont = info.get(i).unwrap().checksum; - hashable_content.push(i_cont); + if let Some(i_cont) = info.get(i).map(|i| i.checksum) { + hashable_content.push(i_cont); + } } Digest::get_content_hash(&hashable_content) } @@ -405,7 +433,7 @@ impl Digest { // update the digest with new content fn update_new_content( - current: &mut Digest, + mut current: Digest, content: HashSet, latest_interval: u64, ) -> (Digest, HashSet, HashSet, HashSet) { @@ -414,13 +442,18 @@ impl Digest { let mut subintervals_to_update = HashSet::new(); for log_entry in content { - let subinterval = Digest::get_subinterval( + let Some(subinterval) = Digest::get_subinterval( current.config.delta, log_entry.timestamp, current.config.sub_intervals, - ); + ) else { + continue; + }; subintervals_to_update.insert(subinterval); - let interval = Digest::get_interval(subinterval, current.config.sub_intervals); + let Some(interval) = Digest::get_interval(subinterval, current.config.sub_intervals) + else { + continue; + }; intervals_to_update.insert(interval); let era = Digest::get_era(¤t.config, latest_interval, interval); eras_to_update.insert(era.clone()); @@ -458,7 +491,7 @@ impl Digest { } ( - current.clone(), + current, subintervals_to_update, intervals_to_update, eras_to_update, @@ -467,7 +500,7 @@ impl Digest { // remove deleted content from the digest fn remove_deleted_content( - current: &mut Digest, + mut current: Digest, deleted_content: HashSet, latest_interval: u64, ) -> (Digest, HashSet, HashSet, HashSet) { @@ -476,47 +509,47 @@ impl Digest { let mut subintervals_to_update = HashSet::new(); for entry in deleted_content { - let subinterval = Digest::get_subinterval( + let Some(subinterval) = Digest::get_subinterval( current.config.delta, entry.timestamp, current.config.sub_intervals, - ); + ) else { + continue; + }; subintervals_to_update.insert(subinterval); - let interval = Digest::get_interval(subinterval, current.config.sub_intervals); + let Some(interval) = Digest::get_interval(subinterval, current.config.sub_intervals) + else { + continue; + }; intervals_to_update.insert(interval); let era = Digest::get_era(¤t.config, latest_interval, interval); eras_to_update.insert(era.clone()); - if current.subintervals.contains_key(&subinterval) { - current - .subintervals - .get_mut(&subinterval) - .unwrap() - .content + if let Some(sub) = current.subintervals.get_mut(&subinterval) { + sub.content .retain(|x| x.timestamp != entry.timestamp || x.key != entry.key); subintervals_to_update.insert(subinterval); - } - if current.intervals.contains_key(&interval) { - current - .intervals - .get_mut(&interval) - .unwrap() - .content - .retain(|&x| x != subinterval); - intervals_to_update.insert(interval); - } - if current.eras.contains_key(&era) { - current - .eras - .get_mut(&era) - .unwrap() - .content - .retain(|&x| x != interval); - eras_to_update.insert(era.clone()); + + // Remove this subinterval from the parent interval if it's all empty + if sub.content.is_empty() { + if let Some(int) = current.intervals.get_mut(&interval) { + int.content.retain(|&x| x != subinterval); + intervals_to_update.insert(interval); + + // We need to update the containing era if we've + // emptied out this interval. + if int.content.is_empty() { + if let Some(e) = current.eras.get_mut(&era) { + e.content.retain(|&x| x != interval); + } + eras_to_update.insert(era.clone()); + } + } + } } } ( - current.clone(), + current, subintervals_to_update, intervals_to_update, eras_to_update, @@ -525,7 +558,7 @@ impl Digest { // re-assign intervals into eras as time moves on fn recalculate_era_content( - current: &mut Digest, + mut current: Digest, latest_interval: u64, ) -> (Digest, HashSet) { let mut eras_to_update = HashSet::new(); @@ -556,31 +589,32 @@ impl Digest { }) .or_insert(Interval { checksum: 0, - content: [interval].iter().cloned().collect(), + content: [interval].into(), }); } - (current.clone(), eras_to_update) + (current, eras_to_update) } // compute the subinterval for a given timestamp - fn get_subinterval(delta: Duration, ts: Timestamp, subintervals: usize) -> u64 { - let ts = u64::try_from( - ts.get_time() - .to_system_time() - .duration_since(super::EPOCH_START) - .unwrap() - .as_millis(), - ) - .unwrap(); - let delta = u64::try_from(delta.as_millis()).unwrap(); - ts / (delta / u64::try_from(subintervals).unwrap()) + fn get_subinterval(delta: Duration, ts: Timestamp, subintervals: usize) -> Option { + let ts = ts + .get_time() + .to_system_time() + .duration_since(super::EPOCH_START) + .ok() + .map(|d| d.as_millis()) + .and_then(|m| u64::try_from(m).ok()); + let delta = u64::try_from(delta.as_millis()).ok(); + + ts.zip(delta) + .zip(u64::try_from(subintervals).ok()) + .map(|((ts, delta), sub)| ts / (delta / sub)) } // compute the interval for a given subinterval - fn get_interval(subinterval: u64, subintervals: usize) -> u64 { - let subintervals = u64::try_from(subintervals).unwrap(); - subinterval / subintervals + fn get_interval(subinterval: u64, subintervals: usize) -> Option { + u64::try_from(subintervals).map(|s| subinterval / s).ok() } // compute era for a given interval @@ -607,11 +641,17 @@ impl Digest { pub fn compress(&self) -> Digest { let mut compressed_intervals = HashMap::new(); let mut compressed_subintervals = HashMap::new(); - if self.eras.contains_key(&EraType::Hot) { - for int in &self.eras.get(&EraType::Hot).unwrap().content { - compressed_intervals.insert(*int, self.intervals.get(int).unwrap().clone()); - for sub in &self.intervals.get(int).unwrap().content { - let subinterval = self.subintervals.get(sub).unwrap().clone(); + if let Some(eras) = self.eras.get(&EraType::Hot) { + for int in &eras.content { + let Some(inter) = self.intervals.get(int) else { + continue; + }; + + compressed_intervals.insert(*int, inter.clone()); + for sub in &inter.content { + let Some(subinterval) = self.subintervals.get(sub) else { + continue; + }; let comp_sub = SubInterval { checksum: subinterval.checksum, content: BTreeSet::new(), @@ -620,9 +660,11 @@ impl Digest { } } }; - if self.eras.contains_key(&EraType::Warm) { - for int in &self.eras.get(&EraType::Warm).unwrap().content { - let interval = self.intervals.get(int).unwrap().clone(); + if let Some(era) = self.eras.get(&EraType::Warm) { + for int in &era.content { + let Some(interval) = self.intervals.get(int) else { + continue; + }; let comp_int = Interval { checksum: interval.checksum, content: BTreeSet::new(), @@ -631,17 +673,17 @@ impl Digest { } }; let mut compressed_eras = HashMap::new(); - for era in self.eras.keys() { - if era.clone() == EraType::Cold { + for (key, era) in &self.eras { + if *key == EraType::Cold { compressed_eras.insert( EraType::Cold, Interval { - checksum: self.eras.get(era).unwrap().checksum, + checksum: era.checksum, content: BTreeSet::new(), }, ); } else { - compressed_eras.insert(era.clone(), self.eras.get(era).unwrap().clone()); + compressed_eras.insert(key.clone(), era.clone()); } } Digest { @@ -657,8 +699,22 @@ impl Digest { // get the intervals of a given era pub fn get_era_content(&self, era: &EraType) -> HashMap { let mut result = HashMap::new(); - for int in self.eras.get(era).unwrap().content.clone() { - result.insert(int, self.intervals.get(&int).unwrap().checksum); + let content = match self.eras.get(era) { + Some(era) => era.content.clone(), + None => { + tracing::error!("failed to get era content: {era:?}"); + return result; + } + }; + for int in content { + let checksum = match self.intervals.get(&int) { + Some(interval) => interval.checksum, + None => { + tracing::error!("failed to get interval checksum: {int:?}"); + continue; + } + }; + result.insert(int, checksum); } result } @@ -668,8 +724,22 @@ impl Digest { //return (subintervalid, checksum) for the set of intervals let mut result = HashMap::new(); for each in intervals { - for sub in self.intervals.get(&each).unwrap().content.clone() { - result.insert(sub, self.subintervals.get(&sub).unwrap().checksum); + let content = match self.intervals.get(&each) { + Some(interval) => interval.content.clone(), + None => { + tracing::error!("failed to get interval content: {each:?}"); + continue; + } + }; + for sub in content { + let checksum = match self.subintervals.get(&sub) { + Some(subinterval) => subinterval.checksum, + None => { + tracing::error!("failed to get subinterval checksum: {sub:?}"); + continue; + } + }; + result.insert(sub, checksum); } } result @@ -682,7 +752,14 @@ impl Digest { ) -> HashMap> { let mut result = HashMap::new(); for each in subintervals { - result.insert(each, self.subintervals.get(&each).unwrap().content.clone()); + let content = match self.subintervals.get(&each) { + Some(subinterval) => subinterval.content.clone(), + None => { + tracing::error!("failed to get subinterval content: {each:?}"); + continue; + } + }; + result.insert(each, content); } result } @@ -702,13 +779,13 @@ impl Digest { // return mismatching intervals in an era pub fn get_interval_diff(&self, other_intervals: HashMap) -> HashSet { let mut mis_int = HashSet::new(); - for int in other_intervals.keys() { - if self.intervals.contains_key(int) { - if *other_intervals.get(int).unwrap() != self.intervals.get(int).unwrap().checksum { - mis_int.insert(*int); + for (key, other_int) in &other_intervals { + if let Some(int) = self.intervals.get(key) { + if *other_int != int.checksum { + mis_int.insert(*key); } } else { - mis_int.insert(*int); + mis_int.insert(*key); } } mis_int @@ -717,15 +794,13 @@ impl Digest { // return mismatching subintervals in an interval pub fn get_subinterval_diff(&self, other_subintervals: HashMap) -> HashSet { let mut mis_sub = HashSet::new(); - for sub in other_subintervals.keys() { - if self.subintervals.contains_key(sub) { - if *other_subintervals.get(sub).unwrap() - != self.subintervals.get(sub).unwrap().checksum - { - mis_sub.insert(*sub); + for (key, other_sub) in &other_subintervals { + if let Some(sub) = self.subintervals.get(key) { + if *other_sub != sub.checksum { + mis_sub.insert(*key); } } else { - mis_sub.insert(*sub); + mis_sub.insert(*key); } } mis_sub @@ -746,22 +821,14 @@ impl Digest { //return missing content in a subinterval pub fn get_content_diff(&self, subinterval: u64, content: Vec) -> Vec { - if !self.subintervals.contains_key(&subinterval) { - return content; - } - let mut mis_content = Vec::new(); - for c in content { - if !self - .subintervals - .get(&subinterval) - .unwrap() - .content - .contains(&c) - { - mis_content.push(c); - } + if let Some(sub) = self.subintervals.get(&subinterval) { + content + .into_iter() + .filter(|c| !sub.content.contains(c)) + .collect() + } else { + content } - mis_content } } @@ -970,7 +1037,7 @@ fn test_update_digest_add_content() { async_std::task::block_on(async { zenoh_core::zasync_executor_init!(); }); - let created = async_std::task::block_on(Digest::update_digest( + let created = Digest::update_digest( Digest { timestamp: Timestamp::from_str("2022-12-21T13:00:00.000000000Z/1").unwrap(), config: DigestConfig { @@ -991,7 +1058,7 @@ fn test_update_digest_add_content() { key: OwnedKeyExpr::from_str("demo/example/a").unwrap(), }]), HashSet::new(), - )); + ); let expected = Digest { timestamp: Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), config: DigestConfig { @@ -1034,7 +1101,7 @@ fn test_update_digest_remove_content() { async_std::task::block_on(async { zenoh_core::zasync_executor_init!(); }); - let created = async_std::task::block_on(Digest::update_digest( + let created = Digest::update_digest( Digest { timestamp: Timestamp::from_str("2022-12-21T13:00:00.000000000Z/1").unwrap(), config: DigestConfig { @@ -1076,7 +1143,7 @@ fn test_update_digest_remove_content() { timestamp: Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), key: OwnedKeyExpr::from_str("demo/example/a").unwrap(), }]), - )); + ); let expected = Digest { timestamp: Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), config: DigestConfig { @@ -1109,7 +1176,7 @@ fn test_update_remove_digest() { Vec::new(), 1671612730, ); - let added = async_std::task::block_on(Digest::update_digest( + let added = Digest::update_digest( created.clone(), 1671612730, Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), @@ -1118,10 +1185,10 @@ fn test_update_remove_digest() { key: OwnedKeyExpr::from_str("a/b/c").unwrap(), }]), HashSet::new(), - )); + ); assert_ne!(created, added); - let removed = async_std::task::block_on(Digest::update_digest( + let removed = Digest::update_digest( added.clone(), 1671612730, Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), @@ -1130,10 +1197,10 @@ fn test_update_remove_digest() { timestamp: Timestamp::from_str("2022-12-21T12:00:00.000000000Z/1").unwrap(), key: OwnedKeyExpr::from_str("a/b/c").unwrap(), }]), - )); + ); assert_eq!(created, removed); - let added_again = async_std::task::block_on(Digest::update_digest( + let added_again = Digest::update_digest( removed, 1671612730, Timestamp::from_str("2022-12-21T15:00:00.000000000Z/1").unwrap(), @@ -1142,6 +1209,6 @@ fn test_update_remove_digest() { key: OwnedKeyExpr::from_str("a/b/c").unwrap(), }]), HashSet::new(), - )); + ); assert_eq!(added, added_again); } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 3382320610..51bd613e22 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -16,9 +16,9 @@ use crate::backends_mgt::StoreIntercept; use crate::storages_mgt::StorageMessage; +use async_std::stream::{interval, StreamExt}; use async_std::sync::Arc; use async_std::sync::RwLock; -use async_std::task::sleep; use flume::{Receiver, Sender}; use futures::{pin_mut, select, FutureExt}; use std::collections::{HashMap, HashSet}; @@ -273,8 +273,11 @@ impl Replica { .await .unwrap(); + // Ensure digest gets published every interval, accounting for + // time it takes to publish. + let mut interval = interval(self.replica_config.publication_interval); loop { - sleep(self.replica_config.publication_interval).await; + let _ = interval.next().await; let digest = snapshotter.get_digest().await; let digest = digest.compress(); diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs b/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs index ae9090c544..e66a6e88ca 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use super::{Digest, DigestConfig, LogEntry}; +use async_std::stream::{interval, StreamExt}; use async_std::sync::Arc; use async_std::sync::RwLock; use async_std::task::sleep; @@ -113,8 +114,11 @@ impl Snapshotter { // Periodically update parameters for snapshot async fn task_update_snapshot_params(&self) { sleep(Duration::from_secs(2)).await; + + let mut interval = interval(self.replica_config.delta); loop { - sleep(self.replica_config.delta).await; + let _ = interval.next().await; + let mut last_snapshot_time = self.content.last_snapshot_time.write().await; let mut last_interval = self.content.last_interval.write().await; let (time, interval) = Snapshotter::compute_snapshot_params( @@ -258,8 +262,7 @@ impl Snapshotter { *last_snapshot_time, new_stable_content, deleted_content, - ) - .await; + ); *digest = updated_digest; } @@ -304,8 +307,7 @@ impl Snapshotter { *last_snapshot_time, new_stable, deleted_stable, - ) - .await; + ); *digest = updated_digest; drop(digest);