diff --git a/Cargo.lock b/Cargo.lock index 33b5367eb7..015a06ee67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -586,7 +586,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" dependencies = [ - "bit-vec", + "bit-vec 0.6.3", ] [[package]] @@ -595,6 +595,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bit-vec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c54ff287cfc0a34f38a6b832ea1bd8e448a330b3e40a50859e6488bee07f22" + [[package]] name = "bitflags" version = "1.3.2" @@ -656,6 +662,17 @@ dependencies = [ "piper", ] +[[package]] +name = "bloomfilter" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc0bdbcf2078e0ba8a74e1fe0cf36f54054a04485759b61dfd60b174658e9607" +dependencies = [ + "bit-vec 0.7.0", + "getrandom 0.2.15", + "siphasher 1.0.1", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -2763,7 +2780,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -3882,6 +3899,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -5362,6 +5385,12 @@ dependencies = [ "time 0.3.36", ] +[[package]] +name = "xxhash-rust" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" + [[package]] name = "z-serial" version = "0.2.3" @@ -5839,17 +5868,23 @@ version = "1.0.0-dev" dependencies = [ "async-global-executor", "async-trait", + "bincode", + "bloomfilter", "flume", "futures", "git-version", "jsonschema", "lazy_static", + "rand 0.8.5", "rustc_version 0.4.1", "schemars", "serde", "serde_json", "tokio", "tracing", + "uhlc", + "uuid", + "xxhash-rust", "zenoh", "zenoh-plugin-trait", "zenoh_backend_traits", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index ec8fba723f..2ee6c5ddb5 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -562,6 +562,10 @@ // }, // demo2: { // key_expr: "demo/memory2/**", + // /// This prefix will be stripped of the received keys when storing. + // /// ⚠️ If you replicate this Storage then THIS VALUE SHOULD BE THE SAME FOR ALL THE REPLICAS YOU WANT TO + // /// KEEP ALIGNED. + // strip_prefix: "demo/memory2", // volume: "memory", // /// Storage manager plugin handles metadata in order to ensure convergence of distributed storages configured in Zenoh. // /// Metadata includes the set of wild card updates and deletions (tombstones). @@ -574,6 +578,24 @@ // /// The duration is specified in seconds. // lifespan: 86400, // }, + // /// If multiple storages subscribing to the same key_expr should be synchronized, declare them as replicas. + // /// In the absence of this configuration, a normal storage is initialized + // /// Note: all the samples to be stored in replicas should be timestamped + // /// + // /// ⚠️ THESE VALUE SHOULD BE THE SAME FOR ALL THE REPLICAS YOU WANT TO KEEP ALIGNED. + // replication: { + // /// Specifying the parameters is optional, by default the values provided will be used. + // /// Time interval between different synchronization attempts in SECONDS. + // interval: 10.0, + // /// Number of sub-intervals, of equal duration, within an interval. + // sub_intervals: 5, + // /// Number of intervals that compose the "hot" era. + // hot: 6, + // /// Number of intervals that compose the "warm" era. + // warm: 30, + // /// The average time, expressed in MILLISECONDS, it takes a publication to reach the Storage. + // propagation_delay: 250, + // } // }, // demo3: { // key_expr: "demo/memory3/**", diff --git a/deny.toml b/deny.toml index 02c6caeb32..4d0c54bd34 100644 --- a/deny.toml +++ b/deny.toml @@ -14,6 +14,7 @@ allow = [ "CC0-1.0", "MPL-2.0", "OpenSSL", + "BSL-1.0" ] # This was copied from https://github.com/EmbarkStudios/cargo-deny/blob/main/deny.toml#L64 diff --git a/plugins/zenoh-backend-traits/src/config.rs b/plugins/zenoh-backend-traits/src/config.rs index 1cd16d62a9..8673bbb2cd 100644 --- a/plugins/zenoh-backend-traits/src/config.rs +++ b/plugins/zenoh-backend-traits/src/config.rs @@ -65,6 +65,17 @@ pub struct StorageConfig { pub volume_id: String, pub volume_cfg: Value, pub garbage_collection_config: GarbageCollectionConfig, + // Note: ReplicaConfig is optional. Alignment will be performed only if it is a replica + pub replication: Option, +} +// Note: All parameters should be same for replicas, else will result on huge overhead +#[derive(JsonSchema, Debug, Clone, PartialEq, Eq)] +pub struct ReplicaConfig { + pub interval: Duration, + pub sub_intervals: usize, + pub hot: u64, + pub warm: u64, + pub propagation_delay: Duration, } impl StructVersion for VolumeConfig { @@ -78,6 +89,67 @@ impl StructVersion for VolumeConfig { impl PluginStartArgs for VolumeConfig {} +impl Default for ReplicaConfig { + fn default() -> Self { + Self { + // This variable, expressed in SECONDS (f64), controls the frequency at which the + // Digests are computed and published. + // + // This also determines the time up to which replicas might diverge. + // + // Its default value is 10.0 seconds. + // + // ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS. + interval: Duration::from_secs_f64(10.0), + // This variable dictates the number of sub-intervals, of equal duration, within an + // interval. + // + // This is used to separate the publications in smaller batches when computing their + // fingerprints. A value of `1` will effectively disable the sub-intervals. + // Higher values will slightly increase the size of the `Digest` sent on the + // network but will reduce the amount of information sent when aligning. + // + // Hence, the trade-off is the following: with higher values more information are sent + // at every interval, with lower values more information are sent when + // aligning. + // + // Its default value is 5. + // + // ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS. + sub_intervals: 5, + // The number of intervals that compose the "hot" era. + // + // Its default value is 6 which, with the default `interval` value, corresponds to the + // last minute. + // + // ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS. + hot: 6, + // The number of intervals that compose the "warm" era. + // + // Its default value is 30 which, with the default `interval` value, corresponds to 5 + // minutes. + // + // ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS. + warm: 30, + // The average time, expressed in MILLISECONDS, it takes for a publication to reach a + // storage. + // + // This value controls when the replication Digest is generated and, hence, published. + // Assuming that the `interval` is set to 10.0 seconds, with a + // `propagation_delay` of 250 milliseconds, the replication Digest + // will be computed at ( n × 10.0 ) + 0.25 seconds. + // + // ⚠️ For the reason above, this value cannot be greater or equal than half of the + // duration of an `interval`. + // + // Its default value is 250 milliseconds. + // + // ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS. + propagation_delay: Duration::from_millis(250), + } + } +} + // The configuration for periodic garbage collection of metadata in storage manager #[derive(JsonSchema, Debug, Clone, PartialEq, Eq)] pub struct GarbageCollectionConfig { @@ -475,6 +547,83 @@ impl StorageConfig { } None => GarbageCollectionConfig::default(), }; + let replication = match config.get("replication") { + Some(s) => { + let mut replication = ReplicaConfig::default(); + if let Some(p) = s.get("interval") { + let p = p.to_string().parse::(); + if let Ok(p) = p { + replication.interval = Duration::from_secs_f64(p); + } else { + bail!( + "Invalid type for field `interval` in `replica_config` of storage \ + `{}`. Expecting integer or floating point number.", + plugin_name + ) + } + } + if let Some(p) = s.get("sub_intervals") { + let p = p.to_string().parse::(); + if let Ok(p) = p { + replication.sub_intervals = p; + } else { + bail!( + "Invalid type for field `sub_intervals` in `replica_config` of \ + storage `{}`. Only integer values are accepted.", + plugin_name + ) + } + } + if let Some(d) = s.get("hot") { + let d = d.to_string().parse::(); + if let Ok(d) = d { + replication.hot = d; + } else { + bail!( + "Invalid type for field `hot` in `replica_config` of storage `{}`. \ + Only integer values are accepted.", + plugin_name + ) + } + } + if let Some(d) = s.get("warm") { + let d = d.to_string().parse::(); + if let Ok(d) = d { + replication.warm = d; + } else { + bail!( + "Invalid type for field `warm` in `replica_config` of storage `{}`. \ + Only integer values are accepted.", + plugin_name + ) + } + } + if let Some(p) = s.get("propagation_delay") { + let p = p.to_string().parse::(); + if let Ok(p) = p { + let propagation_delay = Duration::from_millis(p); + if (replication.interval - propagation_delay) < propagation_delay { + bail!( + "Invalid value for field `propagation_delay`: its value is too \ + high compared to the `interval`, consider increasing the \ + `interval` to at least twice its value (i.e. {}).", + p as f64 * 2.0 / 1000.0 + ); + } + + replication.propagation_delay = propagation_delay; + } else { + bail!( + "Invalid type for field `propagation_delay` in `replica_config` of \ + storage `{}`. Only integer values are accepted.", + plugin_name + ) + } + } + Some(replication) + } + None => None, + }; Ok(StorageConfig { name: storage_name.into(), key_expr, @@ -483,6 +632,7 @@ impl StorageConfig { volume_id, volume_cfg, garbage_collection_config, + replication, }) } } @@ -518,3 +668,7 @@ impl PrivacyTransparentGet for serde_json::Map +// + +use std::time::Duration; + +use serde_json::json; + +use super::StorageConfig; +use crate::config::ReplicaConfig; + +#[test] +fn test_replica_config() { + let empty_config = json!({ + "key_expr": "test/**", + "volume": "memory", + "replication": {} + }); + let storage_config = + StorageConfig::try_from("test-plugin", "test-storage", &empty_config).unwrap(); + assert_eq!(storage_config.replication, Some(ReplicaConfig::default())); + + let incorrect_propagation_delay_config = json!({ + "key_expr": "test/**", + "volume": "memory", + "replication": { + "interval": 1, + "propagation_delay": 750, + } + }); + let result = StorageConfig::try_from( + "test-plugin", + "test-storage", + &incorrect_propagation_delay_config, + ); + let err = result.unwrap_err(); + let expected_error_msg = + "consider increasing the `interval` to at least twice its value (i.e. 1.5)"; + assert!( + err.to_string().contains(expected_error_msg), + "\nExpected to contain: {expected_error_msg} +Actual message: {err}", + ); + + let replica_config = json!({ + "key_expr": "test/**", + "volume": "memory", + "replication": { + "interval": 10, + "sub_intervals": 4, + "hot": 6, + "warm": 60, + "propagation_delay": 250, + } + }); + let storage_config = + StorageConfig::try_from("test-plugin", "test-storage", &replica_config).unwrap(); + assert_eq!( + storage_config.replication, + Some(ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 4, + hot: 6, + warm: 60, + propagation_delay: Duration::from_millis(250) + }) + ); +} diff --git a/plugins/zenoh-backend-traits/src/lib.rs b/plugins/zenoh-backend-traits/src/lib.rs index 3f2be96030..645aba7ccf 100644 --- a/plugins/zenoh-backend-traits/src/lib.rs +++ b/plugins/zenoh-backend-traits/src/lib.rs @@ -141,6 +141,7 @@ const FEATURES: &str = /// Capability of a storage indicates the guarantees of the storage /// It is used by the storage manager to take decisions on the trade-offs to ensure correct performance +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Capability { pub persistence: Persistence, pub history: History, diff --git a/plugins/zenoh-plugin-storage-manager/Cargo.toml b/plugins/zenoh-plugin-storage-manager/Cargo.toml index eb57fcd363..42d1cbd359 100644 --- a/plugins/zenoh-plugin-storage-manager/Cargo.toml +++ b/plugins/zenoh-plugin-storage-manager/Cargo.toml @@ -33,13 +33,19 @@ crate-type = ["cdylib", "rlib"] [dependencies] async-trait = { workspace = true } +bincode = { workspace = true } +bloomfilter = "1" flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } lazy_static = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tracing = { workspace = true} +tracing = { workspace = true } +uuid = { workspace = true } +xxhash-rust = { version = "0.8", features = ["xxh3"] } zenoh = { workspace = true, features = [ "default", "plugins", @@ -59,6 +65,7 @@ jsonschema = { workspace = true } [dev-dependencies] async-global-executor = { workspace = true } +uhlc = { workspace = true } [package.metadata.deb] name = "zenoh-plugin-storage-manager" diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index d76e1dc08a..5b52e67274 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -26,9 +26,9 @@ use std::{ sync::{Arc, Mutex}, }; -use flume::Sender; use memory_backend::MemoryBackend; use storages_mgt::StorageMessage; +use tokio::sync::broadcast::Sender; use zenoh::{ internal::{ bail, @@ -49,6 +49,7 @@ use zenoh_plugin_trait::{ }; mod memory_backend; +mod replication; mod storages_mgt; use storages_mgt::*; @@ -138,7 +139,8 @@ impl StorageRuntimeInner { // Hence, in that scenario, we refuse to start the storage manager and any storage. if session.hlc().is_none() { tracing::error!( - "Cannot start storage manager (and thus any storage) without the 'timestamping' setting enabled in the Zenoh configuration" + "Cannot start storage manager (and thus any storage) without the 'timestamping' \ + setting enabled in the Zenoh configuration" ); bail!("Cannot start storage manager, 'timestamping' is disabled in the configuration"); } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs new file mode 100644 index 0000000000..e805bda26c --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -0,0 +1,665 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, +}; + +use serde::{Deserialize, Serialize}; +use zenoh::{ + bytes::ZBytes, + internal::Value, + key_expr::OwnedKeyExpr, + query::{ConsolidationMode, Query, Selector}, + sample::{Sample, SampleKind}, +}; +use zenoh_backend_traits::StorageInsertionResult; + +use super::{ + classification::{IntervalIdx, SubIntervalIdx}, + core::Replication, + digest::{DigestDiff, Fingerprint}, + log::EventMetadata, +}; + +/// The `AlignmentQuery` enumeration represents the information requested by a Replica to align +/// its storage. +/// +/// Requests are made in the following order: +/// +/// DigestDiff -> Intervals -> SubIntervals -> Events +/// +/// Not all requests are made, it depends on the Era where a misalignment was detected. +/// +/// For instance, if a divergence is detected in the Cold era then the `AlignmentReply` will provide +/// the Replica with the [Fingerprint] of all the "cold" [Interval]s. In turn, the Replica will +/// requests more details on the [Interval]s that differ (the `Intervals` variant). +/// +/// A divergence in the Hot era, will directly let the Replica assess which [SubInterval]s it needs, +/// hence directly skipping to the `SubIntervals` variant. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] +pub(crate) enum AlignmentQuery { + Diff(DigestDiff), + Intervals(HashSet), + SubIntervals(HashMap>), + Events(Vec), +} + +/// The `AlignmentReply` enumeration contains the possible information needed by a Replica to align +/// its storage. +/// +/// The are sent in the following order: +/// +/// Intervals -> SubIntervals -> Events -> Retrieval +/// +/// Not all replies are made, it depends on the Era when a misalignment was detected. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] +pub(crate) enum AlignmentReply { + Intervals(HashMap), + SubIntervals(HashMap>), + Events(Vec), + Retrieval(EventMetadata), +} + +impl Replication { + /// Replies with the information requested by the Replica. + /// + /// This method will: + /// 1. Parse the attachment of the received [Query] into an [AlignmentQuery]. + /// 2. Depending on the variant of the [AlignmentQuery], reply with the requested information. + pub(crate) async fn aligner(&self, query: Query) { + let attachment = match query.attachment() { + Some(attachment) => attachment, + None => { + tracing::debug!("Skipping query with empty attachment"); + return; + } + }; + + let alignment_query = + match bincode::deserialize::(&attachment.into::>()) { + Ok(alignment) => alignment, + Err(e) => { + tracing::error!( + "Failed to deserialize `attachment` of received Query into \ + AlignmentQuery: {e:?}" + ); + return; + } + }; + + match alignment_query { + AlignmentQuery::Diff(digest_diff) => { + tracing::trace!("Processing `AlignmentQuery::Diff`"); + if digest_diff.cold_eras_differ { + self.reply_cold_era(&query).await; + } + + if !digest_diff.warm_eras_differences.is_empty() { + self.reply_sub_intervals(&query, digest_diff.warm_eras_differences) + .await; + } + + if !digest_diff.hot_eras_differences.is_empty() { + self.reply_events_metadata(&query, digest_diff.hot_eras_differences) + .await; + } + } + AlignmentQuery::Intervals(different_intervals) => { + tracing::trace!("Processing `AlignmentQuery::Intervals`"); + if !different_intervals.is_empty() { + self.reply_sub_intervals(&query, different_intervals).await; + } + } + AlignmentQuery::SubIntervals(different_sub_intervals) => { + tracing::trace!("Processing `AlignmentQuery::SubIntervals`"); + if !different_sub_intervals.is_empty() { + self.reply_events_metadata(&query, different_sub_intervals) + .await; + } + } + AlignmentQuery::Events(events_to_retrieve) => { + tracing::trace!("Processing `AlignmentQuery::Events`"); + if !events_to_retrieve.is_empty() { + self.reply_events(&query, events_to_retrieve).await; + } + } + } + } + + /// Replies to the provided [Query] with a hash map containing the index of the [Interval] in + /// the Cold era and their [Fingerprint]. + /// + /// The Replica will use this response to assess which [Interval]s differ. + /// + /// # Temporality + /// + /// There is no guarantee that the Replica indicating a difference in the Cold era is "aligned": + /// it is possible that its Cold era is either ahead or late (i.e. it has more or less + /// Interval(s) in its Replication Log in the Cold era). + /// + /// We believe this is not important: the Replication Log does not separate the Intervals based + /// on their era so performing this comparison will still be relevant — even if an Interval is + /// in the Cold era on one end and in the Warm era in the other. + pub(crate) async fn reply_cold_era(&self, query: &Query) { + let log = self.replication_log.read().await; + let configuration = log.configuration(); + let last_elapsed_interval = match configuration.last_elapsed_interval() { + Ok(last_elapsed_idx) => last_elapsed_idx, + Err(e) => { + tracing::error!( + "Fatal error: failed to obtain the index of the last elapsed interval: {e:?}" + ); + return; + } + }; + let warm_era_lower_bound = configuration.warm_era_lower_bound(last_elapsed_interval); + + let reply = AlignmentReply::Intervals({ + log.intervals + .iter() + .filter(|(&idx, _)| idx < warm_era_lower_bound) + .map(|(idx, interval)| (*idx, interval.fingerprint())) + .collect::>() + }); + + reply_to_query(query, reply, None).await; + } + + /// Replies to the [Query] with a structure containing, for each interval index present in the + /// `different_intervals`, all the [SubInterval]s [Fingerprint]. + /// + /// The Replica will use this structure to assess which [SubInterval]s differ. + pub(crate) async fn reply_sub_intervals( + &self, + query: &Query, + different_intervals: HashSet, + ) { + let mut sub_intervals_fingerprints = HashMap::with_capacity(different_intervals.len()); + + { + let log = self.replication_log.read().await; + different_intervals.iter().for_each(|interval_idx| { + if let Some(interval) = log.intervals.get(interval_idx) { + sub_intervals_fingerprints + .insert(*interval_idx, interval.sub_intervals_fingerprints()); + } + }); + } + + let reply = AlignmentReply::SubIntervals(sub_intervals_fingerprints); + reply_to_query(query, reply, None).await; + } + + /// Replies to the [Query] with all the [EventMetadata] of the [Event]s present in the + /// [SubInterval]s listed in `different_sub_intervals`. + /// + /// The Replica will use this structure to assess which [Event] (and its associated payload) are + /// missing in its Replication Log and connected Storage. + /// + /// # TODO Performance improvement + /// + /// Although the Replica we are answering has to find if, for each provided [EventMetadata], + /// there is a more recent one, it does not need to go through all its Replication Log. It only + /// needs, for each [EventMetadata], to go through the Intervals that are greater than the one + /// it is contained in. + /// + /// The rationale is that the Intervals are already sorted in increasing order, so if no Event, + /// for the same key expression, can be found in any greater Interval, then by definition the + /// Replication Log does not contain a more recent Event. + /// + /// That would require the following changes: + /// - Change the `sub_intervals` field of the `Interval` structure to a BTreeMap. + /// - In the `reply_events_metadata` method (just below), send out a `HashMap>>` instead of a `Vec`. + /// - In the `process_alignment_reply` method, implement the searching algorithm described + /// above. + pub(crate) async fn reply_events_metadata( + &self, + query: &Query, + different_sub_intervals: HashMap>, + ) { + let mut events = Vec::default(); + { + let log = self.replication_log.read().await; + different_sub_intervals + .iter() + .for_each(|(interval_idx, sub_intervals)| { + if let Some(interval) = log.intervals.get(interval_idx) { + sub_intervals.iter().for_each(|sub_interval_idx| { + if let Some(sub_interval) = interval.sub_intervals.get(sub_interval_idx) + { + sub_interval + .events + .values() + .for_each(|event| events.push(event.into())) + } + }); + } + }); + } + + let reply = AlignmentReply::Events(events); + reply_to_query(query, reply, None).await; + } + + /// Replies to the [Query] with the [EventMetadata] and [Value] that were identified as missing. + /// + /// This method will fetch the [StoredData] from the Storage for each provided [EventMetadata], + /// making a distinct reply for each. The fact that multiple replies are sent to the same Query + /// is the reason why we need the consolidation to set to be `None` (⚠️). + pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { + for event_metadata in events_to_retrieve { + let stored_data = { + let mut storage = self.storage.lock().await; + match storage.get(event_metadata.stripped_key.clone(), "").await { + Ok(stored_data) => stored_data, + Err(e) => { + tracing::error!( + "Failed to retrieve data associated to key < {:?} >: {e:?}", + event_metadata.key_expr() + ); + continue; + } + } + }; + + let requested_data = stored_data + .into_iter() + .find(|data| data.timestamp == *event_metadata.timestamp()); + match requested_data { + Some(data) => { + tracing::trace!("Sending Sample: {:?}", event_metadata.stripped_key); + reply_to_query( + query, + AlignmentReply::Retrieval(event_metadata), + Some(data.value), + ) + .await; + } + None => { + // NOTE: This is not necessarily an error. There is a possibility that the data + // associated with this specific key was updated between the time the + // [AlignmentQuery] was sent and when it is processed. + // + // Hence, at the time it was "valid" but it no longer is. + tracing::debug!( + "Found no data in the Storage associated to key < {:?} > with a Timestamp \ + equal to: {}", + event_metadata.key_expr(), + event_metadata.timestamp() + ); + } + } + } + } + + /// Spawns a new task to query the Aligner of the Replica which potentially has data this + /// Storage is missing. + /// + /// This method will: + /// 1. Serialise the AlignmentQuery. + /// 2. Send a Query to the Aligner of the Replica, adding the serialised AlignmentQuery as an + /// attachment. + /// 3. Process all replies. + /// + /// Note that the processing of a reply can trigger a new query (requesting additional + /// information), spawning a new task. + /// + /// This process is stateless and all the required information are carried in the query / reply. + pub(crate) fn spawn_query_replica_aligner( + &self, + replica_aligner_ke: OwnedKeyExpr, + alignment_query: AlignmentQuery, + ) { + let replication = self.clone(); + tokio::task::spawn(async move { + let attachment = match bincode::serialize(&alignment_query) { + Ok(attachment) => attachment, + Err(e) => { + tracing::error!("Failed to serialize AlignmentQuery: {e:?}"); + return; + } + }; + + match replication + .zenoh_session + .get(Into::::into(replica_aligner_ke.clone())) + .attachment(attachment) + // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies + // are sent, they will be "consolidated" and only one of them will make it + // through. + // + // When we retrieve Samples from a Replica, each Sample is sent in a separate + // reply. Hence the need to have no consolidation. + .consolidation(ConsolidationMode::None) + .await + { + Err(e) => { + tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}"); + } + Ok(reply_receiver) => { + while let Ok(reply) = reply_receiver.recv_async().await { + let sample = match reply.into_result() { + Ok(sample) => sample, + Err(e) => { + tracing::warn!( + "Skipping reply to query to < {replica_aligner_ke} >: {e:?}" + ); + continue; + } + }; + + let alignment_reply = match sample.attachment() { + None => { + tracing::debug!("Skipping reply without attachment"); + continue; + } + Some(attachment) => match bincode::deserialize::( + &attachment.into::>(), + ) { + Err(e) => { + tracing::error!( + "Failed to deserialize attachment as AlignmentReply: {e:?}" + ); + continue; + } + Ok(alignment_reply) => alignment_reply, + }, + }; + + replication + .process_alignment_reply( + replica_aligner_ke.clone(), + alignment_reply, + sample, + ) + .await; + } + } + } + }); + } + + /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is + /// missing. + /// + /// This method is a big "match" statement, processing each variant of the [AlignmentReply] in + /// the following manner: + /// + /// - Intervals: the Replica sent a list of [IntervalIdx] and their associated [Fingerprint]. + /// This Storage needs to compare these [Fingerprint] with its local state and, for each that + /// differ, request the [Fingerprint] of their [SubInterval]. + /// + /// This only happens as a response to a misalignment detected in the Cold Era. + /// + /// + /// - SubIntervals: the Replica sent a list of [IntervalIdx], their associated [SubIntervalIdx] + /// and the [Fingerprint] of these [SubInterval]. + /// This Storage again needs to compare these [Fingerprint] with its local state and, for each + /// that differ, request all the [EventMetadata] the [SubInterval] contains. + /// + /// This would happen as a response to a misalignment detected in the Warm Era or as a + /// follow-up step from a misalignment in the Cold Era. + /// + /// + /// - Events: the Replica sent a list of [EventMetadata]. + /// This Storage needs to check, for each of them, if it has a newer [Event] stored. If not, + /// it needs to ask to retrieve the associated data from the Replica. + /// If the [EventMetadata] is indeed more recent and its associated action is `Delete` then + /// the data will be directly deleted from the Storage without requiring an extra exchange. + /// + /// This would happen as a response to a misalignment detected in the Hot Era or as a + /// follow-up step from a misalignment in the Cold / Warm Eras. + /// + /// + /// - Retrieval: the Replica sent an [Event] and its associated payload. + /// This Storage needs to check if it is still more recent and, if so, add it. + /// + /// Note that only one [Event] is sent per reply but multiple replies are sent to the same + /// Query (by setting `Consolidation::None`). + #[tracing::instrument(skip_all, fields(storage = self.storage_key_expr.as_str(), replica = replica_aligner_ke.as_str(), sample, t))] + pub(crate) async fn process_alignment_reply( + &self, + replica_aligner_ke: OwnedKeyExpr, + alignment_reply: AlignmentReply, + sample: Sample, + ) { + match alignment_reply { + AlignmentReply::Intervals(replica_intervals) => { + tracing::trace!("Processing `AlignmentReply::Intervals`"); + let intervals_diff = { + let replication_log_guard = self.replication_log.read().await; + replica_intervals + .into_iter() + .filter(|(idx, fp)| match replication_log_guard.intervals.get(idx) { + Some(interval) => interval.fingerprint() != *fp, + None => true, + }) + .map(|(idx, _)| idx) + .collect::>() + }; + + if !intervals_diff.is_empty() { + self.spawn_query_replica_aligner( + replica_aligner_ke, + AlignmentQuery::Intervals(intervals_diff), + ); + } + } + AlignmentReply::SubIntervals(replica_sub_intervals) => { + tracing::trace!("Processing `AlignmentReply::SubIntervals`"); + let sub_intervals_diff = { + let mut sub_ivl_diff = HashMap::default(); + let replication_log_guard = self.replication_log.read().await; + for (interval_idx, replica_sub_ivl) in replica_sub_intervals { + match replication_log_guard.intervals.get(&interval_idx) { + None => { + sub_ivl_diff.insert( + interval_idx, + replica_sub_ivl.into_keys().collect::>(), + ); + } + Some(interval) => { + let diff = replica_sub_ivl + .into_iter() + .filter(|(sub_idx, sub_fp)| { + match interval.sub_intervals.get(sub_idx) { + None => true, + Some(sub_interval) => { + sub_interval.fingerprint != *sub_fp + } + } + }) + .map(|(sub_idx, _)| sub_idx) + .collect(); + sub_ivl_diff.insert(interval_idx, diff); + } + } + } + + sub_ivl_diff + }; + + if !sub_intervals_diff.is_empty() { + self.spawn_query_replica_aligner( + replica_aligner_ke, + AlignmentQuery::SubIntervals(sub_intervals_diff), + ); + } + } + AlignmentReply::Events(replica_events) => { + tracing::trace!("Processing `AlignmentReply::Events`"); + let mut diff_events = Vec::default(); + + for replica_event in replica_events { + { + let span = tracing::Span::current(); + span.record( + "sample", + replica_event + .stripped_key + .as_ref() + .map_or("", |key| key.as_str()), + ); + span.record("t", replica_event.timestamp.to_string()); + } + + if self + .latest_updates + .read() + .await + .get(&replica_event.stripped_key) + .is_some_and(|latest_event| { + latest_event.timestamp >= replica_event.timestamp + }) + { + continue; + } + + match replica_event.action { + SampleKind::Put => { + let replication_log_guard = self.replication_log.read().await; + if let Some(latest_event) = + replication_log_guard.lookup(&replica_event.stripped_key) + { + if latest_event.timestamp >= replica_event.timestamp { + continue; + } + } + diff_events.push(replica_event); + } + SampleKind::Delete => { + let mut replication_log_guard = self.replication_log.write().await; + if let Some(latest_event) = + replication_log_guard.lookup(&replica_event.stripped_key) + { + if latest_event.timestamp >= replica_event.timestamp { + continue; + } + } + if matches!( + self.storage + .lock() + .await + .delete( + replica_event.stripped_key.clone(), + replica_event.timestamp + ) + .await, + // NOTE: In some of our backend implementation, a deletion on a + // non-existing key will return an error. Given that we cannot + // distinguish an error from a missing key, we will assume + // the latter and move forward. + // + // FIXME: Once the behaviour described above is fixed, check for + // errors. + Ok(StorageInsertionResult::Outdated) + ) { + continue; + } + + replication_log_guard.insert_event(replica_event.into()); + } + } + } + + if !diff_events.is_empty() { + self.spawn_query_replica_aligner( + replica_aligner_ke, + AlignmentQuery::Events(diff_events), + ); + } + } + AlignmentReply::Retrieval(replica_event) => { + tracing::trace!("Processing `AlignmentReply::Retrieval`"); + { + let span = tracing::Span::current(); + span.record( + "sample", + replica_event + .stripped_key + .as_ref() + .map_or("", |key| key.as_str()), + ); + span.record("t", replica_event.timestamp.to_string()); + } + + if self + .latest_updates + .read() + .await + .get(&replica_event.stripped_key) + .is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp) + { + return; + } + + let mut replication_log_guard = self.replication_log.write().await; + if let Some(latest_event) = + replication_log_guard.lookup(&replica_event.stripped_key) + { + if latest_event.timestamp >= replica_event.timestamp { + return; + } + } + + if matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) { + return; + } + + replication_log_guard.insert_event(replica_event.into()); + } + } + } +} + +/// Replies to a Query, adding the [AlignmentReply] as an attachment and, if provided, the [Value] +/// as the payload (not forgetting to set the Encoding!). +async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option) { + let attachment = match bincode::serialize(&reply) { + Ok(attachment) => attachment, + Err(e) => { + tracing::error!("Failed to serialize AlignmentReply: {e:?}"); + return; + } + }; + + let reply_fut = if let Some(value) = value { + query + .reply(query.key_expr(), value.payload) + .encoding(value.encoding) + .attachment(attachment) + } else { + query + .reply(query.key_expr(), ZBytes::empty()) + .attachment(attachment) + }; + + if let Err(e) = reply_fut.await { + tracing::error!("Failed to reply to Query: {e:?}"); + } +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs b/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs new file mode 100644 index 0000000000..c786f459e8 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs @@ -0,0 +1,282 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + collections::HashMap, + ops::{Deref, Sub}, +}; + +use serde::{Deserialize, Serialize}; +use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp}; + +use super::{digest::Fingerprint, log::Event}; + +/// The `EventRemoval` enumeration lists the possible outcomes when searching for an older [Event] +/// and removing it if one was found. +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum EventRemoval { + /// The Replication Log contains no [Event] with the provided key expression. + NotFound, + /// An [Event] with the same key expression and an earlier (or identical) timestamp is already + /// present in the Replication Log. + KeptNewer, + /// An [Event] with the same key expression and an older timestamp was removed from the + /// Replication Log. + RemovedOlder(Event), +} + +/// An `IntervalIdx` represents the index of an `Interval`. +/// +/// It is a thin wrapper around a `u64`. +#[derive(Deserialize, Serialize, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] +#[repr(transparent)] +pub struct IntervalIdx(pub(crate) u64); + +impl Deref for IntervalIdx { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for IntervalIdx { + fn from(value: u64) -> Self { + Self(value) + } +} + +impl Sub for IntervalIdx { + type Output = IntervalIdx; + + fn sub(self, rhs: u64) -> Self::Output { + (self.0 - rhs).into() + } +} + +/// An `Interval` is a subdivision of a replication Log. +/// +/// It contains a set of [SubInterval]s, each of which, in turn, contains a set of [Event]s. +/// +/// A [Fingerprint] is associated to an `Interval` and is equal to the "exclusive or" (XOR) of the +/// [Fingerprint] of all the [SubInterval]s it contains. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub(crate) struct Interval { + pub(crate) fingerprint: Fingerprint, + pub(crate) sub_intervals: HashMap, +} + +impl From<[(SubIntervalIdx, SubInterval); N]> for Interval { + fn from(sub_intervals: [(SubIntervalIdx, SubInterval); N]) -> Self { + Self { + fingerprint: sub_intervals + .iter() + .fold(Fingerprint::default(), |acc, (_, sub_interval)| { + acc ^ sub_interval.fingerprint + }), + sub_intervals: sub_intervals.into(), + } + } +} + +impl Interval { + /// Returns the [Fingerprint] of this [Interval]. + /// + /// The [Fingerprint] of an [Interval] is equal to the XOR (exclusive or) of the fingerprints + /// of the all the [SubInterval]s it contains. + pub(crate) fn fingerprint(&self) -> Fingerprint { + self.fingerprint + } + + /// Lookup the provided key expression and return, if found, its associated [Event]. + pub(crate) fn lookup(&self, stripped_key: &Option) -> Option<&Event> { + for sub_interval in self.sub_intervals.values() { + if let Some(event) = sub_interval.events.get(stripped_key) { + return Some(event); + } + } + + None + } + + /// Returns an [HashMap] of the index and [Fingerprint] of all the [SubInterval]s contained in + /// this [Interval]. + pub(crate) fn sub_intervals_fingerprints(&self) -> HashMap { + self.sub_intervals + .iter() + .filter(|(_, sub_interval)| sub_interval.fingerprint != Fingerprint::default()) + .map(|(sub_interval_idx, sub_interval)| (*sub_interval_idx, sub_interval.fingerprint)) + .collect() + } + + /// Inserts the [Event] in the [SubInterval] specified by the provided [SubIntervalIdx], + /// regardless of its [Timestamp]. + /// + /// The fingerprint of the [Interval] is also updated. + /// + /// # Caveat: "_unchecked" + /// + /// As its name indicates, this method DOES NOT CHECK if there is another [Event] associated to + /// the same key expression (regardless of its [Timestamp]). + /// + /// This uniqueness property (i.e. there should only be a single [Event] in the replication Log + /// for a given key expression) cannot be enforced at the [Interval] level. Hence, this method + /// assumes the check has already been performed and thus does not do redundant work. + pub(crate) fn insert_unchecked(&mut self, sub_interval_idx: SubIntervalIdx, event: Event) { + self.fingerprint ^= event.fingerprint(); + self.sub_intervals + .entry(sub_interval_idx) + .or_default() + .insert_unchecked(event); + } + + /// Removes, if one exists, the [Event] associated with the provided key expression if its + /// [Timestamp] is older than that of the provided one. + /// + /// This method will go through all of the [SubInterval]s included in this [Interval] and stop + /// at the first that indicates having an [Event] for the provided key expression. + /// + /// This method returns, through the [EventRemoval] enumeration, the action that was performed. + pub(crate) fn if_newer_remove_older( + &mut self, + key_expr: &Option, + timestamp: &Timestamp, + ) -> EventRemoval { + let mut sub_interval_idx_to_remove = None; + let mut result = EventRemoval::NotFound; + + for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() { + result = sub_interval.if_newer_remove_older(key_expr, timestamp); + if let EventRemoval::RemovedOlder(ref old_event) = result { + self.fingerprint ^= old_event.fingerprint(); + if sub_interval.events.is_empty() { + sub_interval_idx_to_remove = Some(*sub_interval_idx); + } + } + + // If the SubInterval returned anything other than `NotFound`, we can exit the search. + if !matches!(result, EventRemoval::NotFound) { + break; + } + } + + if let Some(sub_interval_idx) = sub_interval_idx_to_remove { + self.sub_intervals.remove(&sub_interval_idx); + } + + result + } +} + +/// A `SubIntervalIdx` represents the index of a [SubInterval]. +/// +/// It is a thin wrapper around a `u64`. +#[derive(Deserialize, Serialize, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] +#[repr(transparent)] +pub struct SubIntervalIdx(pub(crate) u64); + +impl Deref for SubIntervalIdx { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for SubIntervalIdx { + fn from(value: u64) -> Self { + Self(value) + } +} + +/// A `SubInterval` is a subdivision of an [Interval] and groups together a set of [Event]s. +/// +/// A [Fingerprint] is associated to a `SubInterval` and is equal to the "exclusive or" of all the +/// [Event]s it contains. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub(crate) struct SubInterval { + pub(crate) fingerprint: Fingerprint, + pub(crate) events: HashMap, Event>, +} + +impl From<[Event; N]> for SubInterval { + fn from(events: [Event; N]) -> Self { + let fingerprint = events.iter().fold(Fingerprint::default(), |acc, event| { + acc ^ event.fingerprint() + }); + + Self { + fingerprint, + events: events + .into_iter() + .map(|event| (event.key_expr().clone(), event)) + .collect(), + } + } +} + +impl SubInterval { + /// Inserts the [Event], regardless of its [Timestamp]. + /// + /// This method also updates the fingerprint of the [SubInterval]. + /// + /// # Caveat: "_unchecked" + /// + /// As its name indicates, this method DOES NOT CHECK if there is another [Event] associated to + /// the same key expression (regardless of its [Timestamp]). + /// + /// This uniqueness property (i.e. there should only be a single [Event] in the replication Log + /// for a given key expression) cannot be enforced at the [SubInterval] level. Hence, this + /// method assumes the check has already been performed and thus does not do redundant work. + /// + /// In the unlikely scenario that this has happened, the [Fingerprint] of the [SubInterval] will + /// be updated to keep it correct and a warning message will be emitted. + fn insert_unchecked(&mut self, event: Event) { + self.fingerprint ^= event.fingerprint(); + if let Some(replaced_event) = self.events.insert(event.key_expr().clone(), event) { + tracing::warn!( + "Call to `insert_unchecked` replaced an Event in the replication Log, this should \ + NOT have happened: {replaced_event:?}" + ); + self.fingerprint ^= replaced_event.fingerprint(); + } + } + + /// Removes, if one exists, the [Event] associated with the provided key expression if its + /// [Timestamp] is older than that of the provided one. + /// + /// This method returns, through the [EventRemoval] enumeration, returns the action that was + /// performed. + fn if_newer_remove_older( + &mut self, + key_expr: &Option, + timestamp: &Timestamp, + ) -> EventRemoval { + if let Some((key_expr, event)) = self.events.remove_entry(key_expr) { + if event.timestamp() < timestamp { + self.fingerprint ^= event.fingerprint(); + return EventRemoval::RemovedOlder(event); + } else { + self.events.insert(key_expr, event); + return EventRemoval::KeptNewer; + } + } + + EventRemoval::NotFound + } +} + +#[cfg(test)] +#[path = "./tests/classification.test.rs"] +mod tests; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs b/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs new file mode 100644 index 0000000000..64a792b6b1 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs @@ -0,0 +1,216 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + ops::Deref, + time::{SystemTime, UNIX_EPOCH}, +}; + +use zenoh::{internal::bail, key_expr::OwnedKeyExpr, time::Timestamp, Result}; +use zenoh_backend_traits::config::ReplicaConfig; + +use super::{ + classification::{IntervalIdx, SubIntervalIdx}, + digest::Fingerprint, +}; + +/// The [Configuration] is, mostly, a thin wrapper around the [ReplicaConfig]. +/// +/// It exposes its fingerprint: a 64 bits hash of its inner fields. The `storage_key_expr` (and +/// specifically the fact that it is used to compute the fingerprint) is here to prevent having +/// replicas that are active on different subset to exchange their Digest. We want to avoid having +/// a Replica active on "replication/**" to receive and process the Digests emitted by a Replica +/// active on "replication/a/*". +/// +/// Using the newtype pattern allows us to add methods to compute the time classification of +/// events. +#[derive(Debug, PartialEq, Eq, Clone)] +pub(crate) struct Configuration { + storage_key_expr: OwnedKeyExpr, + prefix: Option, + replica_config: ReplicaConfig, + fingerprint: Fingerprint, +} + +impl Deref for Configuration { + type Target = ReplicaConfig; + + fn deref(&self) -> &Self::Target { + &self.replica_config + } +} + +impl Configuration { + /// Creates a new [Configuration] based on the provided [ReplicaConfig]. + /// + /// This constructor also computes its [Fingerprint]. + pub fn new( + storage_key_expr: OwnedKeyExpr, + prefix: Option, + replica_config: ReplicaConfig, + ) -> Self { + let mut hasher = xxhash_rust::xxh3::Xxh3::default(); + hasher.update(storage_key_expr.as_bytes()); + if let Some(prefix) = &prefix { + hasher.update(prefix.as_bytes()); + } + hasher.update(&replica_config.interval.as_millis().to_le_bytes()); + hasher.update(&replica_config.sub_intervals.to_le_bytes()); + hasher.update(&replica_config.hot.to_le_bytes()); + hasher.update(&replica_config.warm.to_le_bytes()); + hasher.update(&replica_config.propagation_delay.as_millis().to_le_bytes()); + + Self { + storage_key_expr, + prefix, + replica_config, + fingerprint: Fingerprint::from(hasher.digest()), + } + } + + /// Returns the [Fingerprint] of the `Configuration`. + /// + /// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate. + pub fn fingerprint(&self) -> Fingerprint { + self.fingerprint + } + + /// Returns the last elapsed [Interval]. + /// + /// This method will call [SystemTime::now()] to get the current timestamp and, based on the + /// `interval` configuration, return the last [Interval] that elapsed. + /// + /// # Errors + /// + /// This method will return an error if: + /// - The computation to obtain the duration elapsed since `UNIX_EPOCH` failed. Unless the + /// internal clock of the host is set to a time earlier than `UNIX_EPOCH`, this should never + /// happen. + /// - The index of the current interval is higher than `u64::MAX`. Again, unless the internal + /// clock of the host machine is set to a time (very) far in the future, this should never + /// happen. + /// + /// ⚠️ **Both errors cannot be recovered from**. + /// + /// [Interval]: super::classification::Interval + pub fn last_elapsed_interval(&self) -> Result { + let duration_since_epoch = SystemTime::now().duration_since(UNIX_EPOCH)?; + + let last_elapsed_interval = duration_since_epoch.as_millis() / self.interval.as_millis(); + + if last_elapsed_interval > u64::MAX as u128 { + bail!("Overflow detected, last elapsed interval is higher than u64::MAX"); + } + + Ok(IntervalIdx(last_elapsed_interval as u64)) + } + + /// Returns the index of the lowest interval contained in the *hot* era, assuming that the + /// highest interval contained in the *hot* era is the one provided. + /// + /// # Example + /// + /// ```no_compile + /// use crate::replication::Configuration; + /// + /// // If we assume the following configuration: + /// let mut replica_config = ReplicaConfig::default(); + /// replica_config.hot = 2; + /// + /// let configuration = Configuration::new(replica_config); + /// configuration.hot_era_lower_bound(IntervalIdx(10)); // IntervalIdx(9) + /// ``` + pub fn hot_era_lower_bound(&self, hot_era_upper_bound: IntervalIdx) -> IntervalIdx { + (*hot_era_upper_bound - self.hot + 1).into() + } + + /// Returns the index of the lowest interval contained in the *warm* era, assuming that the + /// highest interval contained in the *hot* era is the one provided. + /// + /// ⚠️ Note that, even though this method computes the lower bound of the WARM era, the index + /// provided is the upper bound of the HOT era. + /// + /// # Example + /// + /// ```no_compile + /// use crate::replication::Configuration; + /// + /// // If we assume the following configuration: + /// let mut replica_config = ReplicaConfig::default(); + /// replica_config.hot = 2; + /// replica_config.warm = 5; + /// + /// let configuration = Configuration::new(replica_config); + /// configuration.warm_era_lower_bound(IntervalIdx(10)); // IntervalIdx(4) + /// ``` + pub fn warm_era_lower_bound(&self, hot_era_upper_bound: IntervalIdx) -> IntervalIdx { + (*hot_era_upper_bound - self.hot - self.warm + 1).into() + } + + /// Returns the time classification — [Interval] and [SubInterval] — of the provided + /// [Timestamp]. + /// + /// # Errors + /// + /// This method will return an error in the following cases: + /// - The call to compute the duration since [UNIX_EPOCH] for the provided [Timestamp] returned + /// an inconsistent value. This can happen only if the provided [Timestamp] is earlier than + /// [UNIX_EPOCH], which, in normal circumstances, should not happen. + /// - The [Interval] associated with the provided [Timestamp] is higher than [u64::MAX]. This + /// can only happen if the [Timestamp] is (very) far in the future. + /// + /// The first error is not recoverable but would also be triggered by calls to the method + /// `get_last_elapsed_interval()` – which is called regularly and will effectively stop the + /// replication. + /// + /// The second error is recoverable: it is entirely possible to receive a publication with an + /// erroneous timestamp (willingly or unwillingly). + /// + /// Hence, errors resulting from this call can be safely logged and ignored. + /// + /// [Interval]: super::classification::Interval + /// [SubInterval]: super::classification::SubInterval + pub fn get_time_classification( + &self, + timestamp: &Timestamp, + ) -> Result<(IntervalIdx, SubIntervalIdx)> { + let timestamp_ms_since_epoch = timestamp + .get_time() + .to_system_time() + .duration_since(UNIX_EPOCH)? + .as_millis(); + + let interval = timestamp_ms_since_epoch / self.interval.as_millis(); + if interval > u64::MAX as u128 { + bail!( + "Overflow detected, interval associated with Timestamp < {} > is higher than \ + u64::MAX", + timestamp.to_string() + ); + } + + let sub_interval = (timestamp_ms_since_epoch - (self.interval.as_millis() * interval)) + / (self.interval.as_millis() / self.sub_intervals as u128); + let interval = interval as u64; + + Ok(( + IntervalIdx::from(interval), + SubIntervalIdx::from(sub_interval as u64), + )) + } +} + +#[cfg(test)] +#[path = "tests/configuration.test.rs"] +mod test; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs new file mode 100644 index 0000000000..0ef0824c65 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -0,0 +1,438 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + borrow::Cow, + collections::HashMap, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use rand::Rng; +use tokio::{ + sync::{Mutex, RwLock}, + task::JoinHandle, +}; +use tracing::{debug_span, Instrument}; +use zenoh::{ + key_expr::{ + format::{kedefine, keformat}, + OwnedKeyExpr, + }, + sample::Locality, + Session, +}; +use zenoh_backend_traits::Storage; + +use super::{ + digest::Digest, + log::LogLatest, + service::{MAX_RETRY, WAIT_PERIOD_SECS}, +}; +use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates}; + +kedefine!( + pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}", + pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner", +); + +#[derive(Clone)] +pub(crate) struct Replication { + pub(crate) zenoh_session: Arc, + pub(crate) replication_log: Arc>, + pub(crate) storage_key_expr: OwnedKeyExpr, + pub(crate) latest_updates: Arc>, + pub(crate) storage: Arc>>, +} + +impl Replication { + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. + /// + /// This task will perform the following steps: + /// 1. It will swap the `latest_updates` structure with an empty one -- with the sole purpose of + /// minimising the contention on the StorageService. + /// 2. With the content from the `latest_updates`, it will update the Replication [Log]. + /// 3. It will recompute the [Digest]. + /// 4. It will publish the [Digest]. The periodicity of this publication is dictated by the + /// `interval` configuration option. + /// + /// [Log]: crate::replication::log::LogLatest + pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> { + let zenoh_session = self.zenoh_session.clone(); + let storage_key_expr = self.storage_key_expr.clone(); + let replication_log = self.replication_log.clone(); + let latest_updates = self.latest_updates.clone(); + + tokio::task::spawn(async move { + let digest_key_put = match keformat!( + digest_key_expr_formatter::formatter(), + zid = zenoh_session.zid(), + storage_ke = storage_key_expr + ) { + Ok(key) => key, + Err(e) => { + tracing::error!( + "Failed to generate a key expression to publish the digest: {e:?}" + ); + return; + } + }; + + // Scope to not forget to release the lock. + let (publication_interval, propagation_delay, last_elapsed_interval) = { + let replication_log_guard = replication_log.read().await; + let configuration = replication_log_guard.configuration(); + let last_elapsed_interval = match configuration.last_elapsed_interval() { + Ok(idx) => idx, + Err(e) => { + tracing::error!( + "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" + ); + return; + } + }; + + ( + configuration.interval, + configuration.propagation_delay, + last_elapsed_interval, + ) + }; + + // We have no control over when a replica is going to be started. The purpose is here + // is to try to align its publications and make it so that they happen more or less + // at every interval (+ δ). + let duration_until_next_interval = { + let millis_last_elapsed = + *last_elapsed_interval as u128 * publication_interval.as_millis(); + + if millis_last_elapsed > u64::MAX as u128 { + tracing::error!( + "Fatal error, the last elapsed interval converted to milliseconds is \ + higher than u64::MAX. The host is likely misconfigured (internal clock \ + far ahead in the future?)." + ); + return; + } + + let millis_since_now = + match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + Ok(duration) => duration.as_millis(), + Err(e) => { + tracing::error!( + "Fatal error, failed to obtain the Duration until `now()`: {e:?}" + ); + return; + } + }; + + Duration::from_millis( + (publication_interval.as_millis() - (millis_since_now - millis_last_elapsed)) + as u64, + ) + }; + tokio::time::sleep(duration_until_next_interval).await; + + let mut serialization_buffer = Vec::default(); + let mut events = HashMap::default(); + + // Internal delay to avoid an "update storm". + let max_publication_delay = (publication_interval.as_millis() / 3) as u64; + + let mut digest_update_start: Instant; + let mut digest: Digest; + loop { + digest_update_start = Instant::now(); + + // The publisher will be awoken every multiple of `publication_interval`. + // + // Except that we want to take into account the time it takes for a publication to + // reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully, + // catch the publications that are in transit. + tokio::time::sleep(propagation_delay).await; + + { + let mut latest_updates_guard = latest_updates.write().await; + std::mem::swap(&mut events, &mut latest_updates_guard); + } + + { + let mut replication_guard = replication_log.write().await; + replication_guard.update(events.drain().map(|(_, event)| event)); + digest = match replication_guard.digest() { + Ok(digest) => digest, + Err(e) => { + tracing::error!("Fatal error, failed to compute the Digest: {e:?}"); + return; + } + }; + } + + if let Err(e) = bincode::serialize_into(&mut serialization_buffer, &digest) { + tracing::warn!("Failed to serialise the replication Digest: {e:?}"); + continue; + } + + // We do not want to create a "coordinated update storm" with all replicas + // publishing at the same time, hence we wait some variable additional time. + let publication_delay = rand::thread_rng().gen_range(0..max_publication_delay); + tokio::time::sleep(Duration::from_millis(publication_delay)).await; + + // To try to minimise the allocations performed, we extract the current capacity + // of the buffer (capacity >= len) to later call `std::mem::replace` with a + // buffer that, hopefully, has enough memory. + let buffer_capacity = serialization_buffer.capacity(); + + match zenoh_session + .put( + &digest_key_put, + std::mem::replace( + &mut serialization_buffer, + Vec::with_capacity(buffer_capacity), + ), + ) + .await + { + Ok(_) => tracing::trace!("Published Digest: {digest:?}"), + Err(e) => tracing::error!("Failed to publish the replication Digest: {e:?}"), + } + + let digest_update_duration = digest_update_start.elapsed(); + if digest_update_duration > publication_interval { + tracing::warn!( + "The duration it took to update and publish the Digest is superior to the \ + duration of an Interval ({} ms), we recommend increasing the duration of \ + the latter. Digest update: {} ms (incl. delay: {} ms)", + publication_interval.as_millis(), + digest_update_duration.as_millis(), + publication_delay + propagation_delay.as_millis() as u64 + ); + } else { + tokio::time::sleep(publication_interval - digest_update_duration).await; + } + } + }) + } + + /// Spawns a task that subscribes to the [Digest] published by other Replicas. + /// + /// Upon reception of a [Digest], it is compared with the local Replication Log. If this + /// comparison generates a [DigestDiff], the Aligner of the Replica that generated the [Digest] + /// that was processed is queried to start an alignment. + /// + /// [DigestDiff]: super::digest::DigestDiff + pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> { + let zenoh_session = self.zenoh_session.clone(); + let storage_key_expr = self.storage_key_expr.clone(); + let replication_log = self.replication_log.clone(); + let replication = self.clone(); + + tokio::task::spawn(async move { + let digest_key_sub = match keformat!( + digest_key_expr_formatter::formatter(), + zid = "*", + storage_ke = &storage_key_expr + ) { + Ok(key) => key, + Err(e) => { + tracing::error!( + "Fatal error, failed to generate a key expression to subscribe to \ + Digests: {e:?}. The storage will not receive the Replication Digest of \ + other replicas." + ); + return; + } + }; + + let mut retry = 0; + let subscriber = loop { + match zenoh_session + .declare_subscriber(&digest_key_sub) + // NOTE: We need to explicitly set the locality to `Remote` as otherwise the + // Digest subscriber will also receive the Digest published by its own + // Digest publisher. + .allowed_origin(Locality::Remote) + .await + { + Ok(subscriber) => break subscriber, + Err(e) => { + if retry < MAX_RETRY { + retry += 1; + tracing::warn!( + "Failed to declare Digest subscriber: {e:?}. Attempt \ + {retry}/{MAX_RETRY}." + ); + tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; + } else { + tracing::error!( + "Could not declare Digest subscriber. The storage will not \ + receive the Replication Digest of other replicas." + ); + return; + } + } + } + }; + + tracing::debug!("Subscribed to {digest_key_sub}"); + + loop { + if let Ok(sample) = subscriber.recv_async().await { + let parsed_ke = match digest_key_expr_formatter::parse(sample.key_expr()) { + Ok(parsed_ke) => parsed_ke, + Err(e) => { + tracing::error!( + "Failed to parse key expression associated with Digest \ + publication < {} >: {e:?}", + sample.key_expr() + ); + continue; + } + }; + let source_zid = parsed_ke.zid(); + + let span = debug_span!( + "Digest subscriber", + source_zid = source_zid.as_str(), + request_id = uuid::Uuid::new_v4().simple().to_string() + ); + + // Async block such that we can `instrument` it in an asynchronous compatible + // manner using the `span` we created just above. + async { + let other_digest = match bincode::deserialize::( + &sample.payload().into::>(), + ) { + Ok(other_digest) => other_digest, + Err(e) => { + tracing::warn!( + "Failed to deserialize Payload as Digest: {e:?}. Skipping." + ); + return; + } + }; + + tracing::debug!("Replication digest received"); + + let digest = match replication_log.read().await.digest() { + Ok(digest) => digest, + Err(e) => { + tracing::error!( + "Fatal error, failed to compute local Digest: {e:?}" + ); + return; + } + }; + + if let Some(digest_diff) = digest.diff(other_digest) { + tracing::debug!("Potential misalignment detected"); + + let replica_aligner_ke = match keformat!( + aligner_key_expr_formatter::formatter(), + storage_ke = &storage_key_expr, + zid = source_zid, + ) { + Ok(key) => key, + Err(e) => { + tracing::warn!( + "Failed to generate a key expression to contact aligner: \ + {e:?}" + ); + return; + } + }; + + replication.spawn_query_replica_aligner( + replica_aligner_ke, + AlignmentQuery::Diff(digest_diff), + ); + } + } + .instrument(span) + .await; + } + } + }) + } + + /// Spawns a task that handles alignment queries. + /// + /// An alignment query will always come from a Replica. Hence, as multiple Replicas could query + /// at the same time, for each received query a new task is spawned. This newly spawned task is + /// responsible for fetching in the Replication Log or in the Storage the relevant information + /// to send to the Replica such that it can align its own Storage. + pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> { + let zenoh_session = self.zenoh_session.clone(); + let storage_key_expr = self.storage_key_expr.clone(); + let replication = self.clone(); + + tokio::task::spawn(async move { + let aligner_ke = match keformat!( + aligner_key_expr_formatter::formatter(), + zid = zenoh_session.zid(), + storage_ke = storage_key_expr, + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Fatal error, failed to generate a key expression for the Aligner \ + queryable: {e:?}. The storage will NOT align with other replicas." + ); + return; + } + }; + + let mut retry = 0; + let queryable = loop { + match zenoh_session + .declare_queryable(&aligner_ke) + .allowed_origin(Locality::Remote) + .await + { + Ok(queryable) => break queryable, + Err(e) => { + if retry < MAX_RETRY { + retry += 1; + tracing::warn!( + "Failed to declare the Aligner queryable: {e:?}. Attempt \ + {retry}/{MAX_RETRY}." + ); + tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; + } else { + tracing::error!( + "Could not declare the Aligner queryable. This storage will NOT \ + align with other replicas." + ); + return; + } + } + } + }; + + tracing::debug!("Declared Aligner queryable: {aligner_ke}"); + + while let Ok(query) = queryable.recv_async().await { + if query.attachment().is_none() { + tracing::debug!("Skipping query with empty Attachment"); + continue; + } + + tracing::trace!("Received Alignment Query"); + + let replication = replication.clone(); + tokio::task::spawn(async move { replication.aligner(query).await }); + } + }) + } +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/digest.rs b/plugins/zenoh-plugin-storage-manager/src/replication/digest.rs new file mode 100644 index 0000000000..82110ef54c --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/digest.rs @@ -0,0 +1,170 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + collections::{HashMap, HashSet}, + ops::{BitXor, BitXorAssign, Deref}, +}; + +use serde::{Deserialize, Serialize}; + +use super::classification::{IntervalIdx, SubIntervalIdx}; + +/// A [Fingerprint] is a 64 bits hash of the content it "represents". +/// +/// The crate used to obtain this hash is +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Default, Deserialize, Serialize)] +#[repr(transparent)] +pub struct Fingerprint(u64); + +impl Deref for Fingerprint { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl BitXor for Fingerprint { + type Output = Fingerprint; + + fn bitxor(self, rhs: Self) -> Self::Output { + (self.0 ^ rhs.0).into() + } +} + +impl BitXorAssign for Fingerprint { + fn bitxor_assign(&mut self, rhs: Self) { + self.0 ^= rhs.0 + } +} + +impl From for Fingerprint { + fn from(value: u64) -> Self { + Self(value) + } +} + +/// A `Digest` is a concise view of the data present in a storage. +/// +/// The purpose of this structure is to be sent over the network to other storage subscribing to the +/// exact same key expression such that they can quickly assess if they are misaligned, i.e. if +/// their data differ. +/// +/// To make this assessment quick and lightweight, the `Digest` is composed of a set [Fingerprint]s +/// for each "era". An "era" is a way of grouping sets of [Event]s according to their +/// [Timestamp]. There are three eras: "hot", "warm" and "cold". The closer a timestamp is to the +/// current time, the "hotter" the era it belongs to will be. +/// +/// Eras are further divided into [Interval]s and [SubInterval]s — which duration and number can be +/// configured. +/// +/// [Event]: super::log::Event +/// [Timestamp]: zenoh::time::Timestamp +/// [Interval]: super::classification::Interval +/// [SubInterval]: super::classification::SubInterval +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct Digest { + pub(crate) configuration_fingerprint: Fingerprint, + pub(crate) cold_era_fingerprint: Fingerprint, + pub(crate) warm_era_fingerprints: HashMap, + pub(crate) hot_era_fingerprints: HashMap>, +} + +/// The `DigestDiff` summarises the differences between two [Digest]s. +/// +/// For the Cold Era, a pair `(IntervalIdx, bool)` is computed. The `bool` indicates if the two +/// [Fingerprint]s differ. The [IntervalIdx] is the lower bound of the Warm Era of the [Digest] of +/// the storage that computed the `DigestDiff`. This allows the replica that receives the +/// `DigestDiff` to know where to stop when sending the [Fingerprint]s of its intervals. +/// +/// For the Warm Era, the set of [IntervalIdx] that differ is kept. +/// +/// For the Hot Era, the set of [SubIntervalIdx], grouped by their [IntervalIdx], that differ is +/// kept. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct DigestDiff { + pub(crate) cold_eras_differ: bool, + pub(crate) warm_eras_differences: HashSet, + pub(crate) hot_eras_differences: HashMap>, +} + +impl Digest { + pub(crate) fn diff(&self, mut other: Digest) -> Option { + if self.configuration_fingerprint != other.configuration_fingerprint { + return None; + } + + // Hot era. For all the intervals that are contained in `self`, we remove the sub-intervals + // in `other` that have the same fingerprints as their counterpart in `self` for the same + // sub-interval. + // + // The ultimate purpose if these loops is to keep in `other` only the intervals / + // sub-intervals that differ or that are only present in `other`. + for (interval_idx, sub_intervals_fingerprints) in &self.hot_era_fingerprints { + if let Some(other_sub_intervals_fingerprints) = + other.hot_era_fingerprints.get_mut(interval_idx) + { + other_sub_intervals_fingerprints.retain(|other_idx, other_fingerprint| { + match sub_intervals_fingerprints.get(other_idx) { + Some(fingerprint) => other_fingerprint != fingerprint, + None => true, + } + }); + } + } + other + .hot_era_fingerprints + .retain(|_, sub_intervals| !sub_intervals.is_empty()); + + // Warm era. Same process as for the hot era, we want to keep only the values that differ or + // that are present only in `other`. + other + .warm_era_fingerprints + .retain(|other_idx, other_fingerprint| { + match self.warm_era_fingerprints.get(other_idx) { + Some(fingerprint) => other_fingerprint != fingerprint, + None => true, + } + }); + + if !other.hot_era_fingerprints.is_empty() || !other.warm_era_fingerprints.is_empty() { + return Some(DigestDiff { + cold_eras_differ: self.cold_era_fingerprint != other.cold_era_fingerprint, + warm_eras_differences: other.warm_era_fingerprints.into_keys().collect(), + hot_eras_differences: other + .hot_era_fingerprints + .into_iter() + .map(|(interval_idx, sub_intervals)| { + (interval_idx, sub_intervals.into_keys().collect()) + }) + .collect(), + }); + } + + if self.cold_era_fingerprint != other.cold_era_fingerprint { + return Some(DigestDiff { + cold_eras_differ: true, + warm_eras_differences: HashSet::default(), + hot_eras_differences: HashMap::default(), + }); + } + + None + } +} + +#[cfg(test)] +#[path = "tests/digest.test.rs"] +mod tests; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs new file mode 100644 index 0000000000..dab26f3020 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs @@ -0,0 +1,354 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::collections::{BTreeMap, HashMap}; + +use bloomfilter::Bloom; +use serde::{Deserialize, Serialize}; +use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind, time::Timestamp, Result}; +use zenoh_backend_traits::config::ReplicaConfig; + +use super::{ + classification::{EventRemoval, Interval, IntervalIdx}, + configuration::Configuration, + digest::{Digest, Fingerprint}, +}; + +/// The `EventMetadata` structure contains all the information needed by a replica to assess if it +/// is missing an [Event] in its log. +/// +/// Associating the `action` allows only sending the metadata when the associate action is +/// [SampleKind::Delete]. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct EventMetadata { + pub(crate) stripped_key: Option, + pub(crate) timestamp: Timestamp, + pub(crate) action: SampleKind, +} + +impl EventMetadata { + pub fn key_expr(&self) -> &Option { + &self.stripped_key + } + + pub fn timestamp(&self) -> &Timestamp { + &self.timestamp + } +} + +impl From<&Event> for EventMetadata { + fn from(event: &Event) -> Self { + Self { + stripped_key: event.maybe_stripped_key.clone(), + timestamp: event.timestamp, + action: event.action, + } + } +} + +/// An `Event` records the fact that a publication occurred on the associated key expression at the +/// associated timestamp. +/// +/// When an `Event` is created, its [Fingerprint] is computed, using the `xxhash-rust` crate. This +/// [Fingerprint] is used to construct the [Digest] associated with the replication log. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Event { + pub(crate) maybe_stripped_key: Option, + pub(crate) timestamp: Timestamp, + pub(crate) action: SampleKind, + pub(crate) fingerprint: Fingerprint, +} + +impl From for Event { + fn from(metadata: EventMetadata) -> Self { + Event::new(metadata.stripped_key, metadata.timestamp, metadata.action) + } +} + +impl Event { + /// Creates a new [Event] with the provided key expression and timestamp. + /// + /// This function computes the [Fingerprint] of both using the `xxhash_rust` crate. + pub fn new(key_expr: Option, timestamp: Timestamp, action: SampleKind) -> Self { + let mut hasher = xxhash_rust::xxh3::Xxh3::default(); + if let Some(key_expr) = &key_expr { + hasher.update(key_expr.as_bytes()); + } + hasher.update(×tamp.get_time().0.to_le_bytes()); + hasher.update(×tamp.get_id().to_le_bytes()); + + Self { + maybe_stripped_key: key_expr, + timestamp, + action, + fingerprint: hasher.digest().into(), + } + } + + /// Returns a reference over the key expression associated with this [Event]. + /// + /// Note that this method can return `None` as the underlying key expression could be the + /// *stripped* of a prefix. + /// This prefix is defined as part of the configuration of the associated [Storage]. + pub fn key_expr(&self) -> &Option { + &self.maybe_stripped_key + } + + /// Returns the [Timestamp] associated with this [Event]. + // + // NOTE: Even though `Timestamp` implements the `Copy` trait, it does not fit on two general + // purpose (64bits) registers so, in theory, a reference should be more efficient. + // + // https://rust-lang.github.io/rust-clippy/master/#/trivially_copy_pass_by_ref + pub fn timestamp(&self) -> &Timestamp { + &self.timestamp + } + + /// Returns the [Fingerprint] associated with this [Event]. + pub fn fingerprint(&self) -> Fingerprint { + self.fingerprint + } +} + +/// The `EventInsertion` enumeration lists the possible outcomes when attempting to insert an +/// [Event] in the replication [Log]. +/// +/// The outcomes are: +/// - `New(Event)`: there was no [Event] in the log with the same key expression. +/// +/// - `Replaced(Event)`: there was an [Event] in the log with the same key expression but an older +/// [Timestamp]. +/// +/// - `NotInsertedAsOlder`: there was an [Event] in the log with the same key expression but a more +/// recent [Timestamp]. +/// +/// - `NotInsertedAsOutOfBound`: the provided [Timestamp] is too far away in the future (compared to +/// the clock of this Zenoh node) and cannot be inserted in the log. +/// +/// [Log]: LogLatest +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EventInsertion { + New(Event), + ReplacedOlder(Event), + NotInsertedAsOlder, + NotInsertedAsOutOfBound, +} + +/// The `LogLatest` keeps track of the last publication that happened on a key expression. +/// +/// By definition, the `LogLatest` is only compatible with storage that have the capability +/// `History::Latest` (the default value). For instance, that means that it will *not* work for +/// time-series storage that keep track of all the publications that happen for a given key +/// expression. +/// +/// Internally, the `LogLatest` groups publications (i.e. [Event]s) according to their [Timestamp] +/// in [Interval]s and [SubInterval]s. The purpose of this grouping is to facilitate the alignment +/// and diminish the amount of data sent over the network. See the [Digest] structure for further +/// explanations. +/// +/// As it only keeps track of the latest publication, whenever a new publication is received we need +/// to make sure that there is no [Event] with a newer [Timestamp] already present. The Bloom Filter +/// helps speed up this process by telling quickly if an event for that key expression is already +/// tracked or not. +/// +/// [Interval]: super::classification::Interval +/// [SubInterval]: super::classification::SubInterval +pub struct LogLatest { + pub(crate) configuration: Configuration, + pub(crate) intervals: BTreeMap, + pub(crate) bloom_filter_event: Bloom>, +} + +impl LogLatest { + /// Creates a new [LogLatest] configured with the provided [ReplicaConfig]. + pub fn new( + storage_key_expr: OwnedKeyExpr, + prefix: Option, + replica_config: ReplicaConfig, + ) -> Self { + Self { + configuration: Configuration::new(storage_key_expr, prefix, replica_config), + intervals: BTreeMap::default(), + // TODO Should these be configurable? + // + // With their current values, the bloom filter structure will consume ~5MB. Note + // that this applies for each Storage that has replication enabled (hence, if a + // node has two Storage that have replication enabled, ~10MB of memory will be + // consumed on that node). + // + // 2 << 22 = 4_194_304 items. + bloom_filter_event: Bloom::new_for_fp_rate(2 << 22, 0.01), + } + } + + /// Returns the [Configuration] associated with the [LogLatest]. + pub fn configuration(&self) -> &Configuration { + &self.configuration + } + + /// Lookup the provided key expression and, if found, return its associated [Event]. + pub fn lookup(&self, stripped_key: &Option) -> Option<&Event> { + if !self.bloom_filter_event.check(stripped_key) { + return None; + } + + for interval in self.intervals.values().rev() { + if let Some(event) = interval.lookup(stripped_key) { + return Some(event); + } + } + + None + } + + /// Attempts to insert the provided [Event] in the replication log and return the [Insertion] + /// outcome. + /// + /// # Caveat: out of bound + /// + /// This method will record an error in the Zenoh log if the timestamp associated with the + /// [Event] is so far in the future that the index of its interval is higher than + /// [u64::MAX]. This should not happen unless a specially crafted [Event] is sent to this node + /// or if the internal clock of the host that produced it is (very) far in the future. + pub(crate) fn insert_event(&mut self, event: Event) -> EventInsertion { + let mut result = None; + + // A Bloom filter never returns false negative. Hence if the call to `check_and_set` we + // can be sure (provided that we update correctly the Bloom filter) that there is no + // Event with that key expression. + if self.bloom_filter_event.check(event.key_expr()) { + // The Bloom filter indicates that there is an Event with the same key expression, + // we need to check if it is older or not than the one we are processing. + // + // By construction of the LogLatest, there can only be a single [Event] with the + // same key expression, hence the moment we find it we can skip the search. + // + // NOTE: `rev()` + // We are making here the following assumption: it is more likely that a recent + // key will be updated. Iterating over a `BTreeMap` will yield its elements in + // increasing order --- in our particular case that means from oldest to + // newest. Using `rev()` yields them from newest to oldest. + for interval in self.intervals.values_mut().rev() { + match interval.if_newer_remove_older(event.key_expr(), event.timestamp()) { + EventRemoval::RemovedOlder(old_event) => { + result = Some(old_event); + break; + } + EventRemoval::KeptNewer => return EventInsertion::NotInsertedAsOlder, + EventRemoval::NotFound => continue, + } + } + } + + let result = match result { + Some(old_event) => EventInsertion::ReplacedOlder(old_event), + None => EventInsertion::New(event.clone()), + }; + + let (interval_idx, sub_interval_idx) = match self + .configuration + .get_time_classification(event.timestamp()) + { + Ok((interval_idx, sub_interval_idx)) => (interval_idx, sub_interval_idx), + Err(e) => { + tracing::error!("{e:?}"); + return EventInsertion::NotInsertedAsOutOfBound; + } + }; + + self.bloom_filter_event.set(event.key_expr()); + + self.intervals + .entry(interval_idx) + .or_default() + .insert_unchecked(sub_interval_idx, event); + + result + } + + /// Updates the replication log with the provided set of [Event]s and return the updated + /// [Digest]. + /// + /// # Caveat: out of bounds [Event]s + /// + /// This method will log an error message for all [Event]s that have a [Timestamp] that is so + /// far in the future that the index of their interval is higher than [u64::MAX]. This should + /// not happen unless specifically crafted [Event]s are sent to this node or if the internal + /// clock of a host is (very) far in the future. + pub fn update(&mut self, events: impl Iterator) { + events.for_each(|event| { + self.insert_event(event); + }); + } + + /// Retrieves the latest [Digest], assuming that the hot era starts at the last elapsed + /// interval. + /// + /// # Errors + /// + /// This method will return an error if the index of the last elapsed interval is superior to + /// [u64::MAX]. In theory, this should not happen but if it does, **it is an error that cannot + /// be recovered from (⚠️)**. + pub fn digest(&self) -> Result { + let last_elapsed_interval = self.configuration.last_elapsed_interval()?; + + Ok(self.digest_from(last_elapsed_interval)) + } + + /// Considering the upper bound of the hot era, generates a [Digest] of the [LogLatest]. + /// + /// Passing the upper bound of the hot era allows generating a [Digest] that can be compared + /// with a possibly older [Digest]. + // + // NOTE: One of the advantages of having that method take an upper bound is to facilitate unit + // testing. + fn digest_from(&self, hot_era_upper_bound: IntervalIdx) -> Digest { + let hot_era_lower_bound = self.configuration.hot_era_lower_bound(hot_era_upper_bound); + let warm_era_lower_bound = self.configuration.warm_era_lower_bound(hot_era_upper_bound); + + let mut warm_era_fingerprints = HashMap::default(); + let mut hot_era_fingerprints = HashMap::default(); + let mut cold_era_fingerprint = Fingerprint::default(); + for (interval_idx, interval) in self + .intervals + .iter() + .filter(|(&idx, _)| idx <= hot_era_upper_bound) + { + // NOTE: As the intervals are traversed in increasing order (because of the use of a + // [BTreeMap]) and as the cold era contains the most and older intervals + // (i.e. with a lower interval index), the order of the comparisons should + // minimise their number to generate the Digest. + if *interval_idx < warm_era_lower_bound { + cold_era_fingerprint ^= interval.fingerprint(); + } else if *interval_idx < hot_era_lower_bound { + if interval.fingerprint() != Fingerprint::default() { + warm_era_fingerprints.insert(*interval_idx, interval.fingerprint()); + } + } else { + hot_era_fingerprints.insert(*interval_idx, interval.sub_intervals_fingerprints()); + } + } + + Digest { + configuration_fingerprint: self.configuration.fingerprint(), + cold_era_fingerprint, + warm_era_fingerprints, + hot_era_fingerprints, + } + } +} + +#[cfg(test)] +#[path = "tests/log.test.rs"] +mod tests; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs new file mode 100644 index 0000000000..e065dfc157 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs @@ -0,0 +1,38 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! This module exposes the [ReplicationService] structure needed by the storage manager to +//! replicate the content of storage across a Zenoh network. +//! +//! This structure, and thus the replication, only works for storage that have the [History::Latest] +//! capability. +//! +//! From a high-level, the replication works by generating a concise view of the state of the +//! storage at regular time intervals. To do so, the time is divided in 'intervals' (themselves +//! divided into 'sub-intervals') and each publication mapped to one such group. Then a fingerprint +//! of each group is computed and these fingerprints are sent over the network to other storage for +//! comparison. +//! +//! [History::Latest]: zenoh_backend_traits::History::Latest + +mod aligner; +mod classification; +mod configuration; +mod core; +mod digest; +mod log; +mod service; + +pub(crate) use log::{Event, LogLatest}; +pub(crate) use service::ReplicationService; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs new file mode 100644 index 0000000000..06ec31d9a2 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -0,0 +1,131 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{sync::Arc, time::Duration}; + +use tokio::{ + sync::{broadcast::Receiver, RwLock}, + task::JoinHandle, +}; +use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; + +use super::{core::Replication, LogLatest}; +use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; + +pub(crate) struct ReplicationService { + digest_publisher_handle: JoinHandle<()>, + digest_subscriber_handle: JoinHandle<()>, + aligner_queryable_handle: JoinHandle<()>, +} + +pub(crate) const MAX_RETRY: usize = 2; +pub(crate) const WAIT_PERIOD_SECS: u64 = 4; + +impl ReplicationService { + /// Starts the `ReplicationService`, spawning multiple tasks. + /// + /// # Tasks spawned + /// + /// This function will spawn two tasks: + /// 1. One to publish the [Digest]. + /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// attempting to abort all the tasks that were spawned, once a Stop message has been + /// received. + pub async fn spawn_start( + zenoh_session: Arc, + storage_service: StorageService, + storage_key_expr: OwnedKeyExpr, + replication_log: Arc>, + latest_updates: Arc>, + mut rx: Receiver, + ) { + // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing + // information and, here, we need to have the queryables propagated. + // + // 4 seconds is an arbitrary value. + let mut attempt = 0; + let mut received_reply = false; + + while attempt < MAX_RETRY { + attempt += 1; + tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; + + match zenoh_session + .get(&storage_key_expr) + // `BestMatching`, the default option for `target`, will try to minimise the storage + // that are queried and their distance while trying to maximise the key space + // covered. + // + // In other words, if there is a close and complete storage, it will only query this + // one. + .target(QueryTarget::BestMatching) + // The value `Remote` is self-explanatory but why it is needed deserves an + // explanation: we do not want to query the local database as the purpose is to get + // the data from other replicas (if there is one). + .allowed_destination(Locality::Remote) + .await + { + Ok(replies) => { + while let Ok(reply) = replies.recv_async().await { + received_reply = true; + if let Ok(sample) = reply.into_result() { + if let Err(e) = storage_service.process_sample(sample).await { + tracing::error!("{e:?}"); + } + } + } + } + Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), + } + + if received_reply { + break; + } + + tracing::debug!( + "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." + ); + } + + tokio::task::spawn(async move { + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage: storage_service.storage.clone(), + }; + + let replication_service = Self { + digest_publisher_handle: replication.spawn_digest_publisher(), + digest_subscriber_handle: replication.spawn_digest_subscriber(), + aligner_queryable_handle: replication.spawn_aligner_queryable(), + }; + + while let Ok(storage_message) = rx.recv().await { + if matches!(storage_message, StorageMessage::Stop) { + replication_service.stop(); + return; + } + } + }); + } + + /// Stops all the tasks spawned by the `ReplicationService`. + pub fn stop(self) { + self.digest_publisher_handle.abort(); + self.digest_subscriber_handle.abort(); + self.aligner_queryable_handle.abort(); + } +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs b/plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs new file mode 100644 index 0000000000..8510de8c22 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs @@ -0,0 +1,133 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::str::FromStr; + +use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind}; + +use super::*; + +fn new_event(key_expr: Option, timestamp: Timestamp) -> Event { + Event::new(key_expr, timestamp, SampleKind::Put) +} + +#[test] +fn test_sub_interval() { + let hlc = uhlc::HLC::default(); + + let event_a = new_event( + Some(OwnedKeyExpr::from_str("test/a").unwrap()), + hlc.new_timestamp(), + ); + let mut sub_interval = SubInterval::default(); + sub_interval.insert_unchecked(event_a.clone()); + assert_eq!(event_a.fingerprint(), sub_interval.fingerprint); + + let event_b = new_event( + Some(OwnedKeyExpr::from_str("test/b").unwrap()), + hlc.new_timestamp(), + ); + sub_interval.insert_unchecked(event_b.clone()); + + let event_c = new_event( + Some(OwnedKeyExpr::from_str("test/c").unwrap()), + hlc.new_timestamp(), + ); + sub_interval.insert_unchecked(event_c.clone()); + let mut expected_fingerprint = + event_a.fingerprint() ^ event_b.fingerprint() ^ event_c.fingerprint(); + assert_eq!(expected_fingerprint, sub_interval.fingerprint); + + let event_d = new_event( + Some(OwnedKeyExpr::from_str("test/d").unwrap()), + hlc.new_timestamp(), + ); + assert_eq!( + EventRemoval::NotFound, + sub_interval.if_newer_remove_older(event_d.key_expr(), event_d.timestamp()) + ); + sub_interval.insert_unchecked(event_d.clone()); + + let event_d_new = new_event(event_d.key_expr().clone(), hlc.new_timestamp()); + assert_eq!( + EventRemoval::RemovedOlder(event_d), + sub_interval.if_newer_remove_older(event_d_new.key_expr(), event_d_new.timestamp()) + ); + // NOTE: We added and removed `event_d` the fingerprint should be identical. + assert_eq!(expected_fingerprint, sub_interval.fingerprint); + + sub_interval.insert_unchecked(event_d_new.clone()); + expected_fingerprint ^= event_d_new.fingerprint(); + assert_eq!(expected_fingerprint, sub_interval.fingerprint); +} + +#[test] +fn test_interval() { + let hlc = uhlc::HLC::default(); + + let event_0_0 = new_event( + Some(OwnedKeyExpr::from_str("test/0/0").unwrap()), + hlc.new_timestamp(), + ); + let event_0_1 = new_event( + Some(OwnedKeyExpr::from_str("test/0/1").unwrap()), + hlc.new_timestamp(), + ); + let event_1_0 = new_event( + Some(OwnedKeyExpr::from_str("test/1/0").unwrap()), + hlc.new_timestamp(), + ); + + let mut interval = Interval::default(); + interval.insert_unchecked(SubIntervalIdx(0), event_0_0.clone()); + interval.insert_unchecked(SubIntervalIdx(0), event_0_1.clone()); + interval.insert_unchecked(SubIntervalIdx(1), event_1_0.clone()); + + let expected_fingerprint = + event_0_0.fingerprint() ^ event_1_0.fingerprint() ^ event_0_1.fingerprint(); + assert_eq!(expected_fingerprint, interval.fingerprint); + + let event_1_1 = new_event( + Some(OwnedKeyExpr::from_str("test/1/1").unwrap()), + hlc.new_timestamp(), + ); + // No Event with the same key expression. + assert_eq!( + EventRemoval::NotFound, + interval.if_newer_remove_older(event_1_1.key_expr(), event_1_1.timestamp()) + ); + // Event already present in the Interval: the event is not the newest. + interval.insert_unchecked(SubIntervalIdx(1), event_1_1.clone()); + assert_eq!( + EventRemoval::KeptNewer, + interval.if_newer_remove_older(event_1_1.key_expr(), event_1_1.timestamp()) + ); + + let event_1_1_new = new_event(event_1_1.key_expr().clone(), hlc.new_timestamp()); + assert_eq!( + EventRemoval::RemovedOlder(event_1_1), + interval.if_newer_remove_older(event_1_1_new.key_expr(), event_1_1_new.timestamp()) + ); + // We removed `event_1_1`, we should be back to having only `event_0_0`, `event_0_1` and + // `event_1_0`. + assert_eq!(expected_fingerprint, interval.fingerprint); + + // We remove `event_1_0`, there is no event left in SubInterval(1) so it should be removed from + // the Interval. + assert_eq!( + EventRemoval::RemovedOlder(event_1_0.clone()), + interval.if_newer_remove_older(event_1_0.key_expr(), &hlc.new_timestamp()) + ); + assert!(!interval.sub_intervals.contains_key(&SubIntervalIdx(1))); +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/tests/configuration.test.rs b/plugins/zenoh-plugin-storage-manager/src/replication/tests/configuration.test.rs new file mode 100644 index 0000000000..e91e3aeb86 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/tests/configuration.test.rs @@ -0,0 +1,150 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{str::FromStr, time::Duration}; + +use uhlc::HLC; + +use super::*; + +#[test] +fn test_lower_bounds() { + let configuration = Configuration::new( + OwnedKeyExpr::from_str("replication/test/*").unwrap(), + None, + ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 5, + hot: 1, + warm: 5, + propagation_delay: Duration::from_millis(250), + }, + ); + + assert_eq!( + IntervalIdx(10), + configuration.hot_era_lower_bound(IntervalIdx(10)) + ); + assert_eq!( + IntervalIdx(5), + configuration.warm_era_lower_bound(IntervalIdx(10)) + ); +} + +#[test] +fn test_difference() { + let identical_replica_config = ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 5, + hot: 1, + warm: 5, + propagation_delay: Duration::from_millis(250), + }; + + let configuration_a = Configuration::new( + OwnedKeyExpr::from_str("replication/test/**").unwrap(), + None, + identical_replica_config.clone(), + ); + + let configuration_b = Configuration::new( + OwnedKeyExpr::from_str("replication/test/a/*").unwrap(), + None, + identical_replica_config.clone(), + ); + + assert_ne!(configuration_a.fingerprint, configuration_b.fingerprint); + + let configuration_c = Configuration::new( + configuration_a.storage_key_expr, + Some(OwnedKeyExpr::from_str("replication/test").unwrap()), + identical_replica_config, + ); + + assert_ne!(configuration_a.fingerprint, configuration_c.fingerprint); +} + +#[test] +fn test_get_classification() { + let configuration = Configuration::new( + OwnedKeyExpr::from_str("replication/test/*").unwrap(), + None, + ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 5, + hot: 1, + warm: 5, + propagation_delay: Duration::from_millis(250), + }, + ); + + let hlc = HLC::default(); + + let new_timestamp = |duration: Duration| Timestamp::new(duration.into(), *hlc.get_id()); + + // 0 = 0 * 10_000 + 0 * 2_000 + // ^ interval = 0 + // ^ sub-interval = 0 + let timestamp_0_0_lower = new_timestamp(Duration::from_millis(0)); + assert_eq!( + (IntervalIdx(0), SubIntervalIdx(0)), + configuration + .get_time_classification(×tamp_0_0_lower) + .unwrap() + ); + + // 1_999 = 0 * 10_000 + 0 * 2_000 + 1999 + // ^ interval = 0 + // ^ sub-interval = 0 + let timestamp_0_0_upper = new_timestamp(Duration::from_millis(1999)); + assert_eq!( + (IntervalIdx(0), SubIntervalIdx(0)), + configuration + .get_time_classification(×tamp_0_0_upper) + .unwrap() + ); + + // 2_000 = 0 * 10_000 + 1 * 2_000 + 0 + // ^ interval = 0 + // ^ sub-interval = 1 + let timestamp_0_1_lower = new_timestamp(Duration::from_millis(2_000)); + assert_eq!( + (IntervalIdx(0), SubIntervalIdx(1)), + configuration + .get_time_classification(×tamp_0_1_lower) + .unwrap() + ); + + // 19_999 = 1 * 10_000 + 4 * 2_000 + 1_999 + // ^ interval = 1 + // ^ sub-interval = 4 + let timestamp_1_4_upper = new_timestamp(Duration::from_millis(19_999)); + assert_eq!( + (IntervalIdx(1), SubIntervalIdx(4)), + configuration + .get_time_classification(×tamp_1_4_upper) + .unwrap() + ); + + // 106_042 = 10 * 10_000 + 3 * 2_000 + 42 + // ^^ interval = 10 + // ^ sub-interval = 3 + let timestamp_10_3 = new_timestamp(Duration::from_millis(106_042)); + assert_eq!( + (IntervalIdx(10), SubIntervalIdx(3)), + configuration + .get_time_classification(×tamp_10_3) + .unwrap() + ); +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/tests/digest.test.rs b/plugins/zenoh-plugin-storage-manager/src/replication/tests/digest.test.rs new file mode 100644 index 0000000000..50b630bea0 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/tests/digest.test.rs @@ -0,0 +1,181 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::collections::{HashMap, HashSet}; + +use super::{Digest, Fingerprint}; +use crate::replication::{ + classification::{IntervalIdx, SubIntervalIdx}, + digest::DigestDiff, +}; + +#[test] +fn test_diff() { + // Base Digest. The actual values of the Fingerprints do not matter. + let digest = Digest { + configuration_fingerprint: Fingerprint(15), + cold_era_fingerprint: Fingerprint(10), + warm_era_fingerprints: HashMap::from([ + (IntervalIdx(1), Fingerprint(1)), + (IntervalIdx(2), Fingerprint(2)), + (IntervalIdx(4), Fingerprint(4)), + ]), + hot_era_fingerprints: HashMap::from([ + ( + IntervalIdx(1), + HashMap::from([(SubIntervalIdx(1), Fingerprint(1))]), + ), + ( + IntervalIdx(2), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + (SubIntervalIdx(3), Fingerprint(3)), + ]), + ), + ( + IntervalIdx(4), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + (SubIntervalIdx(2), Fingerprint(2)), + ]), + ), + ]), + }; + + // Test: everything matches except the configuration -> should return None. + let mut other_digest = digest.clone(); + other_digest.configuration_fingerprint = Fingerprint(5); // 5 vs 15 + + assert!(digest.diff(other_digest).is_none()); + + // Test: everything matches -> should return None. + assert!(digest.diff(digest.clone()).is_none()); + + // Test: only cold era differs -> should return Some with everything else being empty. + let mut other_digest = digest.clone(); + other_digest.cold_era_fingerprint = Fingerprint(15); // 15 vs 10 + let expected_diff = Some(DigestDiff { + cold_eras_differ: true, + warm_eras_differences: HashSet::default(), + hot_eras_differences: HashMap::default(), + }); + assert_eq!(expected_diff, digest.diff(other_digest)); + + // Test: `digest` has an interval that other does not have -> should not appear in the diff. The + // diff is only concerned by intervals from `other` that either differ from `digest` or that are + // missing from `digest`. + let mut other_digest = digest.clone(); + other_digest.warm_era_fingerprints = HashMap::from([ + (IntervalIdx(1), Fingerprint(1)), + (IntervalIdx(2), Fingerprint(2)), + // (IntervalIdx(4), Fingerprint(4)), // Not missing in `digest`. + ]); + + assert!(digest.diff(other_digest).is_none()); + + // Test: `other` has both (i) an interval also present in `digest` that differs and (ii) an + // interval that `digest` does not have. + let mut other_digest = digest.clone(); + other_digest.warm_era_fingerprints = HashMap::from([ + (IntervalIdx(1), Fingerprint(1)), + (IntervalIdx(2), Fingerprint(20)), // 20 vs 2 + (IntervalIdx(3), Fingerprint(3)), + // (IntervalIdx(4), Fingerprint(4)), // Not missing in `digest`. + ]); + + let expected_diff = Some(DigestDiff { + cold_eras_differ: false, + warm_eras_differences: HashSet::from([IntervalIdx(2), IntervalIdx(3)]), + hot_eras_differences: HashMap::default(), + }); + assert_eq!(expected_diff, digest.diff(other_digest)); + + // Test: `digest` has an interval that `other` does not have + in an interval they both have, + // `digest` has a sub-interval that `other` does not have -> the diff should still be None. + let mut other_digest = digest.clone(); + other_digest.hot_era_fingerprints = HashMap::from([ + // Not missing in `digest` + // ( + // IntervalIdx(1), + // HashMap::from([(SubIntervalIdx(1), Fingerprint(1))]), + // ), + ( + IntervalIdx(2), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + (SubIntervalIdx(3), Fingerprint(3)), + ]), + ), + ( + IntervalIdx(4), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + // (SubIntervalIdx(2), Fingerprint(2)), // Not missing in `digest` + ]), + ), + ]); + + assert!(digest.diff(other_digest).is_none()); + + // Test: `other` has (i) an interval also present in `digest` that differs, (ii) an interval + // that `digest` does not have, (iii) a sub-interval in an interval shared by both that `digest` + // does not have, and (iv) a sub-interval in an interval shared by both that differs from + // `digest`. + let mut other_digest = digest.clone(); + other_digest.hot_era_fingerprints = HashMap::from([ + // Not missing in `digest` + // ( + // IntervalIdx(1), + // HashMap::from([(SubIntervalIdx(1), Fingerprint(1))]), + // ), + ( + IntervalIdx(2), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + (SubIntervalIdx(2), Fingerprint(2)), // Not present in `digest` + (SubIntervalIdx(3), Fingerprint(30)), // 30 vs 3 + ]), + ), + ( + IntervalIdx(3), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + (SubIntervalIdx(2), Fingerprint(2)), + ]), + ), + ( + IntervalIdx(4), + HashMap::from([ + (SubIntervalIdx(1), Fingerprint(1)), + // (SubIntervalIdx(2), Fingerprint(2)), // Not missing in `digest` + ]), + ), + ]); + + let expected_diff = Some(DigestDiff { + cold_eras_differ: false, + hot_eras_differences: HashMap::from([ + ( + IntervalIdx(2), + HashSet::from([SubIntervalIdx(2), SubIntervalIdx(3)]), + ), + ( + IntervalIdx(3), + HashSet::from([SubIntervalIdx(1), SubIntervalIdx(2)]), + ), + ]), + warm_eras_differences: HashSet::default(), + }); + assert_eq!(expected_diff, digest.diff(other_digest)); +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs b/plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs new file mode 100644 index 0000000000..186bd98903 --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs @@ -0,0 +1,298 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{collections::HashMap, str::FromStr, time::Duration}; + +use uhlc::{Timestamp, HLC, NTP64}; +use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind}; +use zenoh_backend_traits::config::ReplicaConfig; + +use super::{Event, LogLatest}; +use crate::replication::{ + classification::{Interval, IntervalIdx, SubInterval, SubIntervalIdx}, + digest::{Digest, Fingerprint}, + log::EventInsertion, +}; + +fn generate_timestamp_matching( + log: &LogLatest, + hlc: &HLC, + interval: u32, + sub_interval: u32, + delta: u64, +) -> Timestamp { + let duration_since_epoch = Duration::from_millis( + (log.configuration.interval.as_millis() * interval as u128 + + (log.configuration.interval.as_millis() / log.configuration.sub_intervals as u128) + * sub_interval as u128) as u64 + + delta, + ); + + Timestamp::new(NTP64::from(duration_since_epoch), *hlc.get_id()) +} + +#[test] +fn test_insert() { + let hlc = HLC::default(); + let mut log = LogLatest::new( + OwnedKeyExpr::from_str("replication/test/**").unwrap(), + None, + ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 2, + hot: 1, + warm: 5, + propagation_delay: Duration::from_millis(250), + }, + ); + + let event_10_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("10/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 10, 0, 0), + SampleKind::Put, + ); + assert!(!log.bloom_filter_event.check(event_10_0_0.key_expr())); + assert_eq!( + EventInsertion::New(event_10_0_0.clone()), + log.insert_event(Event::new( + event_10_0_0.key_expr().clone(), + *event_10_0_0.timestamp(), + SampleKind::Put + )) + ); + assert!(log.bloom_filter_event.check(event_10_0_0.key_expr())); + + let event_10_0_0_new = Event::new( + event_10_0_0.key_expr().clone(), + generate_timestamp_matching(&log, &hlc, 10, 0, 1), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::ReplacedOlder(event_10_0_0.clone()), + log.insert_event(Event::new( + event_10_0_0_new.key_expr().clone(), + *event_10_0_0_new.timestamp(), + SampleKind::Put + )) + ); + + // Try to insert the same event a second time -> NotInsertedAsOlder. + assert_eq!( + EventInsertion::NotInsertedAsOlder, + log.insert_event(Event::new( + event_10_0_0_new.key_expr().clone(), + *event_10_0_0_new.timestamp(), + SampleKind::Put + )) + ); + + let expected_interval = Interval::from([( + SubIntervalIdx(0), + SubInterval::from([event_10_0_0_new.clone()]), + )]); + let interval_10 = log.intervals.get(&IntervalIdx(10)).unwrap(); + assert_eq!(&expected_interval, interval_10); + assert_eq!(event_10_0_0_new.fingerprint(), interval_10.fingerprint()); +} + +#[test] +fn test_digest() { + let hlc = HLC::default(); + let mut log = LogLatest::new( + OwnedKeyExpr::from_str("replication/test/**").unwrap(), + None, + ReplicaConfig { + interval: Duration::from_secs(10), + sub_intervals: 5, + hot: 1, + warm: 5, + propagation_delay: Duration::from_millis(250), + }, + ); + + let event_warm_5_1_0 = Event::new( + Some(OwnedKeyExpr::from_str("5/1/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 5, 1, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_warm_5_1_0.clone()), + log.insert_event(Event::new( + event_warm_5_1_0.key_expr().clone(), + *event_warm_5_1_0.timestamp(), + SampleKind::Put + )) + ); + let event_warm_6_2_0 = Event::new( + Some(OwnedKeyExpr::from_str("6/2/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 6, 2, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_warm_6_2_0.clone()), + log.insert_event(Event::new( + event_warm_6_2_0.key_expr().clone(), + *event_warm_6_2_0.timestamp(), + SampleKind::Put + )) + ); + let event_warm_6_2_1 = Event::new( + Some(OwnedKeyExpr::from_str("6/2/1").unwrap()), + generate_timestamp_matching(&log, &hlc, 6, 2, 1), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_warm_6_2_1.clone()), + log.insert_event(Event::new( + event_warm_6_2_1.key_expr().clone(), + *event_warm_6_2_1.timestamp(), + SampleKind::Put + )) + ); + + let event_hot_10_4_1 = Event::new( + Some(OwnedKeyExpr::from_str("10/4/1").unwrap()), + generate_timestamp_matching(&log, &hlc, 10, 4, 1), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_hot_10_4_1.clone()), + log.insert_event(Event::new( + event_hot_10_4_1.key_expr().clone(), + *event_hot_10_4_1.timestamp(), + SampleKind::Put + )) + ); + + // "Burning" interval that should not appear in the Digest. + let event_burning_11_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("11/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 11, 0, 42), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_burning_11_0_0.clone()), + log.insert_event(Event::new( + event_burning_11_0_0.key_expr().clone(), + *event_burning_11_0_0.timestamp(), + SampleKind::Put + )) + ); + + // 🧊 There are no Event in the cold era. + + // We set the upper bound when generating the Digest to 10, so: + // - 10 <= hot <= 10 + // - 5 <= warm <= 9 + // - 4 <= cold + let mut expected_digest = Digest { + configuration_fingerprint: log.configuration.fingerprint(), + cold_era_fingerprint: Fingerprint::default(), + warm_era_fingerprints: HashMap::from([ + (IntervalIdx(5), event_warm_5_1_0.fingerprint()), + ( + IntervalIdx(6), + event_warm_6_2_0.fingerprint() ^ event_warm_6_2_1.fingerprint(), + ), + ]), + hot_era_fingerprints: HashMap::from([( + IntervalIdx(10), + HashMap::from([(SubIntervalIdx(4), event_hot_10_4_1.fingerprint())]), + )]), + }; + assert_eq!(expected_digest, log.digest_from(IntervalIdx(10))); + + // 🧊 Let's add some Events in the cold era. + let event_cold_0_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("0/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 0, 0, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_cold_0_0_0.clone()), + log.insert_event(Event::new( + event_cold_0_0_0.key_expr().clone(), + *event_cold_0_0_0.timestamp(), + SampleKind::Put + )) + ); + let event_cold_1_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("1/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 1, 0, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_cold_1_0_0.clone()), + log.insert_event(Event::new( + event_cold_1_0_0.key_expr().clone(), + *event_cold_1_0_0.timestamp(), + SampleKind::Put + )) + ); + let event_cold_2_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("2/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 2, 0, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_cold_2_0_0.clone()), + log.insert_event(Event::new( + event_cold_2_0_0.key_expr().clone(), + *event_cold_2_0_0.timestamp(), + SampleKind::Put + )) + ); + let event_cold_4_0_0 = Event::new( + Some(OwnedKeyExpr::from_str("4/0/0").unwrap()), + generate_timestamp_matching(&log, &hlc, 4, 0, 0), + SampleKind::Put, + ); + assert_eq!( + EventInsertion::New(event_cold_4_0_0.clone()), + log.insert_event(Event::new( + event_cold_4_0_0.key_expr().clone(), + *event_cold_4_0_0.timestamp(), + SampleKind::Put + )) + ); + + let expected_cold_fingerprint = event_cold_0_0_0.fingerprint() + ^ event_cold_1_0_0.fingerprint() + ^ event_cold_2_0_0.fingerprint() + ^ event_cold_4_0_0.fingerprint(); + expected_digest.cold_era_fingerprint = expected_cold_fingerprint; + assert_eq!(expected_digest, log.digest_from(IntervalIdx(10))); + + // We now set the upper bound when generating the Digest to 12, so: + // - 12 <= hot <= 12 + // - 7 <= warm <= 11 + // - cold <= 6 + let expected_cold_fingerprint = event_cold_0_0_0.fingerprint() + ^ event_cold_1_0_0.fingerprint() + ^ event_cold_2_0_0.fingerprint() + ^ event_cold_4_0_0.fingerprint() + ^ event_warm_5_1_0.fingerprint() + ^ event_warm_6_2_0.fingerprint() + ^ event_warm_6_2_1.fingerprint(); + let expected_digest = Digest { + configuration_fingerprint: log.configuration.fingerprint(), + cold_era_fingerprint: expected_cold_fingerprint, + warm_era_fingerprints: HashMap::from([ + (IntervalIdx(10), event_hot_10_4_1.fingerprint()), + (IntervalIdx(11), event_burning_11_0_0.fingerprint()), + ]), + hot_era_fingerprints: HashMap::default(), + }; + assert_eq!(expected_digest, log.digest_from(IntervalIdx(12))); +} diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 8f2b184af9..7dcc60bf32 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -12,21 +12,45 @@ // ZettaScale Zenoh Team, // -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; -use flume::Sender; -use tokio::sync::Mutex; -use zenoh::{internal::bail, session::Session, Result as ZResult}; -use zenoh_backend_traits::{config::StorageConfig, VolumeInstance}; +use tokio::sync::{broadcast::Sender, Mutex, RwLock}; +use zenoh::{ + internal::bail, key_expr::OwnedKeyExpr, sample::SampleKind, session::Session, Result as ZResult, +}; +use zenoh_backend_traits::{config::StorageConfig, History, VolumeInstance}; -mod service; -use service::StorageService; +use crate::replication::{Event, LogLatest, ReplicationService}; +pub(crate) mod service; +pub(crate) use service::StorageService; + +#[derive(Clone)] pub enum StorageMessage { Stop, GetStatus(tokio::sync::mpsc::Sender), } +pub(crate) type LatestUpdates = HashMap, Event>; + +#[derive(Clone)] +pub(crate) struct CacheLatest { + pub(crate) latest_updates: Arc>, + pub(crate) replication_log: Option>>, +} + +impl CacheLatest { + pub fn new( + latest_updates: Arc>, + replication_log: Option>>, + ) -> Self { + Self { + latest_updates, + replication_log, + } + } +} + pub(crate) async fn create_and_start_storage( admin_key: String, config: StorageConfig, @@ -46,28 +70,91 @@ pub(crate) async fn create_and_start_storage( tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr); - let (tx, rx) = flume::bounded(1); + let (tx, rx_storage) = tokio::sync::broadcast::channel(1); - let latest_updates = match storage.get_all_entries().await { - Ok(entries) => entries.into_iter().collect(), - Err(e) => { - bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}"); - } + let mut entries = match storage.get_all_entries().await { + Ok(entries) => entries + .into_iter() + .map(|(stripped_key, ts)| { + ( + stripped_key.clone(), + Event::new(stripped_key, ts, SampleKind::Put), + ) + }) + .collect::>(), + Err(e) => bail!("`get_all_entries` failed with: {e:?}"), }; + let mut replication_log = None; + let mut latest_updates = HashMap::default(); + if let Some(replica_config) = &config.replication { + if capability.history != History::Latest { + bail!( + "Replication was enabled for storage {name} but its history capability is not \ + supported: found < {:?} >, expected < {:?} >", + capability.history, + History::Latest + ); + } + let mut log_latest = LogLatest::new( + config.key_expr.clone(), + config.strip_prefix.clone(), + replica_config.clone(), + ); + log_latest.update(entries.drain().map(|(_, event)| event)); + + replication_log = Some(Arc::new(RwLock::new(log_latest))); + } else { + latest_updates = entries; + } + + let latest_updates = Arc::new(RwLock::new(latest_updates)); + let storage = Arc::new(Mutex::new(storage)); - tokio::task::spawn(async move { - StorageService::start( - zenoh_session, - config, - &name, - storage, - capability, - rx, - Arc::new(Mutex::new(latest_updates)), - ) - .await; - }); + let storage_service = StorageService::start( + zenoh_session.clone(), + config.clone(), + &name, + storage, + capability, + rx_storage, + CacheLatest::new(latest_updates.clone(), replication_log.clone()), + ) + .await; + + // Testing if the `replication_log` is set is equivalent to testing if the `replication` is + // set: the `replication_log` is only set when the latter is. + if let Some(replication_log) = replication_log { + let rx_replication = tx.subscribe(); + + // NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we + // still need to call it within a dedicated task because the Zenoh routing tables are + // populated only after the plugins have been loaded. + // + // If we don't wait for the routing tables to be populated the initial alignment + // (i.e. querying any Storage on the network handling the same key expression), will + // never work. + // + // TODO Do we really want to perform such an initial alignment? Because this query will + // target any Storage that matches the same key expression, regardless of if they have + // been configured to be replicated. + tokio::task::spawn(async move { + tracing::debug!( + "Starting replication of storage '{}' on keyexpr '{}'", + name, + config.key_expr, + ); + ReplicationService::spawn_start( + zenoh_session, + storage_service, + config.key_expr, + replication_log, + latest_updates, + rx_replication, + ) + .await; + }); + } Ok(tx) } diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index a34805fe30..42e82eae94 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -20,11 +20,10 @@ use std::{ }; use async_trait::async_trait; -use flume::Receiver; -use futures::select; -use tokio::sync::{Mutex, MutexGuard, RwLock}; +use tokio::sync::{broadcast::Receiver, Mutex, RwLock, RwLockWriteGuard}; use zenoh::{ internal::{ + bail, buffers::{SplitBuffer, ZBuf}, zenoh_home, Timed, TimedEvent, Timer, Value, }, @@ -37,13 +36,18 @@ use zenoh::{ sample::{Sample, SampleBuilder, SampleKind}, session::Session, time::{Timestamp, NTP64}, + Result as ZResult, }; use zenoh_backend_traits::{ config::{GarbageCollectionConfig, StorageConfig}, Capability, History, Persistence, StorageInsertionResult, StoredData, }; -use crate::storages_mgt::StorageMessage; +use super::LatestUpdates; +use crate::{ + replication::Event, + storages_mgt::{CacheLatest, StorageMessage}, +}; pub const WILDCARD_UPDATES_FILENAME: &str = "wildcard_updates"; pub const TOMBSTONE_FILENAME: &str = "tombstones"; @@ -54,17 +58,18 @@ struct Update { data: StoredData, } +#[derive(Clone)] pub struct StorageService { session: Arc, key_expr: OwnedKeyExpr, complete: bool, name: String, strip_prefix: Option, - storage: Arc>>, + pub(crate) storage: Arc>>, capability: Capability, tombstones: Arc>>, wildcard_updates: Arc>>, - latest_updates: Arc, Timestamp>>>, + cache_latest: CacheLatest, } impl StorageService { @@ -75,9 +80,9 @@ impl StorageService { storage: Arc>>, capability: Capability, rx: Receiver, - latest_updates: Arc, Timestamp>>>, - ) { - let mut storage_service = StorageService { + cache_latest: CacheLatest, + ) -> Self { + let storage_service = StorageService { session, key_expr: config.key_expr, complete: config.complete, @@ -87,7 +92,7 @@ impl StorageService { capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), - latest_updates, + cache_latest, }; if storage_service .capability @@ -117,24 +122,34 @@ impl StorageService { } } storage_service + .clone() .start_storage_queryable_subscriber(rx, config.garbage_collection_config) - .await + .await; + + storage_service } async fn start_storage_queryable_subscriber( - &mut self, - rx: Receiver, + self, + mut rx: Receiver, gc_config: GarbageCollectionConfig, ) { // start periodic GC event let t = Timer::default(); + + let latest_updates = if self.cache_latest.replication_log.is_none() { + Some(self.cache_latest.latest_updates.clone()) + } else { + None + }; + let gc = TimedEvent::periodic( gc_config.period, GarbageCollectionEvent { config: gc_config, tombstones: self.tombstones.clone(), wildcard_updates: self.wildcard_updates.clone(), - latest_updates: self.latest_updates.clone(), + latest_updates, }, ); t.add_async(gc).await; @@ -162,49 +177,50 @@ impl StorageService { } }; - loop { - select!( - // on sample for key_expr - sample = storage_sub.recv_async() => { - let sample = match sample { - Ok(sample) => sample, - Err(e) => { - tracing::error!("Error in sample: {}", e); - continue; - } - }; - let timestamp = sample.timestamp().cloned().unwrap_or(self.session.new_timestamp()); - let sample = SampleBuilder::from(sample).timestamp(timestamp).into(); - self.process_sample(sample).await; - }, - // on query on key_expr - query = storage_queryable.recv_async() => { - self.reply_query(query).await; - }, - // on storage handle drop - message = rx.recv_async() => { - match message { - Ok(StorageMessage::Stop) => { - tracing::trace!("Dropping storage '{}'", self.name); - return - }, - Ok(StorageMessage::GetStatus(tx)) => { - let storage = self.storage.lock().await; - std::mem::drop(tx.send(storage.get_admin_status()).await); - drop(storage); + tokio::task::spawn(async move { + loop { + tokio::select!( + // on sample for key_expr + sample = storage_sub.recv_async() => { + let sample = match sample { + Ok(sample) => sample, + Err(e) => { + tracing::error!("Error in sample: {}", e); + continue; + } + }; + let timestamp = sample.timestamp().cloned().unwrap_or(self.session.new_timestamp()); + let sample = SampleBuilder::from(sample).timestamp(timestamp).into(); + if let Err(e) = self.process_sample(sample).await { + tracing::error!("{e:?}"); } - Err(e) => { - tracing::error!("Storage Message Channel Error: {}", e); - }, - }; - }, - ); - } + }, + // on query on key_expr + query = storage_queryable.recv_async() => { + self.reply_query(query).await; + }, + // on storage handle drop + Ok(message) = rx.recv() => { + match message { + StorageMessage::Stop => { + tracing::trace!("Dropping storage '{}'", self.name); + return + }, + StorageMessage::GetStatus(tx) => { + let storage = self.storage.lock().await; + std::mem::drop(tx.send(storage.get_admin_status()).await); + drop(storage); + } + }; + }, + ); + } + }); } // The storage should only simply save the key, sample pair while put and retrieve the same // during get the trimming during PUT and GET should be handled by the plugin - pub(crate) async fn process_sample(&self, sample: Sample) { + pub(crate) async fn process_sample(&self, sample: Sample) -> ZResult<()> { tracing::trace!("[STORAGE] Processing sample: {:?}", sample); // A Sample, in theory, will not arrive to a Storage without a Timestamp. This check (which, @@ -213,8 +229,7 @@ impl StorageService { let sample_timestamp = match sample.timestamp() { Some(timestamp) => timestamp, None => { - tracing::error!("Discarding Sample that has no Timestamp: {:?}", sample); - return; + bail!("Discarding Sample without a Timestamp: {:?}", sample); } }; @@ -286,8 +301,7 @@ impl StorageService { match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { - tracing::error!("{}", e); - return; + bail!("{e:?}"); } }; @@ -340,14 +354,21 @@ impl StorageService { } Ok(_) => { if let Some(mut cache_guard) = cache_guard { - cache_guard.insert(stripped_key, sample_to_store_timestamp); + cache_guard.insert( + stripped_key.clone(), + Event::new(stripped_key, sample_to_store_timestamp, sample.kind()), + ); } } Err(e) => { + // TODO In case of a wildcard update, multiple keys can be updated. What should + // be the behaviour if one or more of these updates fail? tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k); } } } + + Ok(()) } async fn mark_tombstone(&self, key_expr: &OwnedKeyExpr, timestamp: Timestamp) { @@ -470,25 +491,32 @@ impl StorageService { async fn guard_cache_if_latest( &self, stripped_key: &Option, - timestamp: &Timestamp, - ) -> Option, Timestamp>>> { - let cache_guard = self.latest_updates.lock().await; - if let Some(latest_timestamp) = cache_guard.get(stripped_key) { - if timestamp > latest_timestamp { + received_ts: &Timestamp, + ) -> Option> { + let cache_guard = self.cache_latest.latest_updates.write().await; + if let Some(event) = cache_guard.get(stripped_key) { + if received_ts > event.timestamp() { return Some(cache_guard); - } else { - return None; } } - let mut storage = self.storage.lock().await; - - if let Ok(stored_data) = storage.get(stripped_key.clone(), "").await { - for data in stored_data { - if data.timestamp > *timestamp { + if let Some(replication_log) = &self.cache_latest.replication_log { + if let Some(event) = replication_log.read().await.lookup(stripped_key) { + if received_ts <= event.timestamp() { return None; } } + } else { + let mut storage = self.storage.lock().await; + // FIXME: An actual error from the underlying Storage cannot be distinguished from a + // missing entry. + if let Ok(stored_data) = storage.get(stripped_key.clone(), "").await { + for data in stored_data { + if data.timestamp > *received_ts { + return None; + } + } + } } Some(cache_guard) @@ -641,7 +669,7 @@ struct GarbageCollectionEvent { config: GarbageCollectionConfig, tombstones: Arc>>, wildcard_updates: Arc>>, - latest_updates: Arc, Timestamp>>>, + latest_updates: Option>>, } #[async_trait] @@ -678,10 +706,12 @@ impl Timed for GarbageCollectionEvent { wildcard_updates.remove(&k); } - self.latest_updates - .lock() - .await - .retain(|_, timestamp| timestamp.get_time() < &time_limit); + if let Some(latest_updates) = &self.latest_updates { + latest_updates + .write() + .await + .retain(|_, event| event.timestamp().get_time() < &time_limit); + } tracing::trace!("End garbage collection of obsolete data-infos"); } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 1bfddab18a..20ccdf6617 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -15,8 +15,7 @@ //! Sample primitives use std::{convert::TryFrom, fmt}; -#[cfg(feature = "unstable")] -use serde::Serialize; +use serde::{Deserialize, Serialize}; use zenoh_config::wrappers::EntityGlobalId; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; @@ -251,7 +250,7 @@ impl Default for SourceInfo { /// The kind of a `Sample`. #[repr(u8)] -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum SampleKind { /// if the `Sample` was issued by a `put` operation. #[default]