Skip to content

Commit

Permalink
Merge pull request #1458 from eclipse-zenoh/refactor/storage-manager/…
Browse files Browse the repository at this point in the history
…replication

feat: rewrite of Storage Replication
  • Loading branch information
Mallets authored Sep 20, 2024
2 parents ffbf431 + 02fc1d4 commit 3b6d773
Show file tree
Hide file tree
Showing 23 changed files with 3,579 additions and 107 deletions.
39 changes: 37 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@
// },
// demo2: {
// key_expr: "demo/memory2/**",
// /// This prefix will be stripped of the received keys when storing.
// /// ⚠️ If you replicate this Storage then THIS VALUE SHOULD BE THE SAME FOR ALL THE REPLICAS YOU WANT TO
// /// KEEP ALIGNED.
// strip_prefix: "demo/memory2",
// volume: "memory",
// /// Storage manager plugin handles metadata in order to ensure convergence of distributed storages configured in Zenoh.
// /// Metadata includes the set of wild card updates and deletions (tombstones).
Expand All @@ -574,6 +578,24 @@
// /// The duration is specified in seconds.
// lifespan: 86400,
// },
// /// If multiple storages subscribing to the same key_expr should be synchronized, declare them as replicas.
// /// In the absence of this configuration, a normal storage is initialized
// /// Note: all the samples to be stored in replicas should be timestamped
// ///
// /// ⚠️ THESE VALUE SHOULD BE THE SAME FOR ALL THE REPLICAS YOU WANT TO KEEP ALIGNED.
// replication: {
// /// Specifying the parameters is optional, by default the values provided will be used.
// /// Time interval between different synchronization attempts in SECONDS.
// interval: 10.0,
// /// Number of sub-intervals, of equal duration, within an interval.
// sub_intervals: 5,
// /// Number of intervals that compose the "hot" era.
// hot: 6,
// /// Number of intervals that compose the "warm" era.
// warm: 30,
// /// The average time, expressed in MILLISECONDS, it takes a publication to reach the Storage.
// propagation_delay: 250,
// }
// },
// demo3: {
// key_expr: "demo/memory3/**",
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ allow = [
"CC0-1.0",
"MPL-2.0",
"OpenSSL",
"BSL-1.0"
]

# This was copied from https://github.com/EmbarkStudios/cargo-deny/blob/main/deny.toml#L64
Expand Down
154 changes: 154 additions & 0 deletions plugins/zenoh-backend-traits/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ pub struct StorageConfig {
pub volume_id: String,
pub volume_cfg: Value,
pub garbage_collection_config: GarbageCollectionConfig,
// Note: ReplicaConfig is optional. Alignment will be performed only if it is a replica
pub replication: Option<ReplicaConfig>,
}
// Note: All parameters should be same for replicas, else will result on huge overhead
#[derive(JsonSchema, Debug, Clone, PartialEq, Eq)]
pub struct ReplicaConfig {
pub interval: Duration,
pub sub_intervals: usize,
pub hot: u64,
pub warm: u64,
pub propagation_delay: Duration,
}

impl StructVersion for VolumeConfig {
Expand All @@ -78,6 +89,67 @@ impl StructVersion for VolumeConfig {

impl PluginStartArgs for VolumeConfig {}

impl Default for ReplicaConfig {
fn default() -> Self {
Self {
// This variable, expressed in SECONDS (f64), controls the frequency at which the
// Digests are computed and published.
//
// This also determines the time up to which replicas might diverge.
//
// Its default value is 10.0 seconds.
//
// ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS.
interval: Duration::from_secs_f64(10.0),
// This variable dictates the number of sub-intervals, of equal duration, within an
// interval.
//
// This is used to separate the publications in smaller batches when computing their
// fingerprints. A value of `1` will effectively disable the sub-intervals.
// Higher values will slightly increase the size of the `Digest` sent on the
// network but will reduce the amount of information sent when aligning.
//
// Hence, the trade-off is the following: with higher values more information are sent
// at every interval, with lower values more information are sent when
// aligning.
//
// Its default value is 5.
//
// ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS.
sub_intervals: 5,
// The number of intervals that compose the "hot" era.
//
// Its default value is 6 which, with the default `interval` value, corresponds to the
// last minute.
//
// ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS.
hot: 6,
// The number of intervals that compose the "warm" era.
//
// Its default value is 30 which, with the default `interval` value, corresponds to 5
// minutes.
//
// ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS.
warm: 30,
// The average time, expressed in MILLISECONDS, it takes for a publication to reach a
// storage.
//
// This value controls when the replication Digest is generated and, hence, published.
// Assuming that the `interval` is set to 10.0 seconds, with a
// `propagation_delay` of 250 milliseconds, the replication Digest
// will be computed at ( n × 10.0 ) + 0.25 seconds.
//
// ⚠️ For the reason above, this value cannot be greater or equal than half of the
// duration of an `interval`.
//
// Its default value is 250 milliseconds.
//
// ⚠️ THIS VALUE SHOULD BE THE SAME FOR ALL REPLICAS.
propagation_delay: Duration::from_millis(250),
}
}
}

// The configuration for periodic garbage collection of metadata in storage manager
#[derive(JsonSchema, Debug, Clone, PartialEq, Eq)]
pub struct GarbageCollectionConfig {
Expand Down Expand Up @@ -475,6 +547,83 @@ impl StorageConfig {
}
None => GarbageCollectionConfig::default(),
};
let replication = match config.get("replication") {
Some(s) => {
let mut replication = ReplicaConfig::default();
if let Some(p) = s.get("interval") {
let p = p.to_string().parse::<f64>();
if let Ok(p) = p {
replication.interval = Duration::from_secs_f64(p);
} else {
bail!(
"Invalid type for field `interval` in `replica_config` of storage \
`{}`. Expecting integer or floating point number.",
plugin_name
)
}
}
if let Some(p) = s.get("sub_intervals") {
let p = p.to_string().parse::<usize>();
if let Ok(p) = p {
replication.sub_intervals = p;
} else {
bail!(
"Invalid type for field `sub_intervals` in `replica_config` of \
storage `{}`. Only integer values are accepted.",
plugin_name
)
}
}
if let Some(d) = s.get("hot") {
let d = d.to_string().parse::<u64>();
if let Ok(d) = d {
replication.hot = d;
} else {
bail!(
"Invalid type for field `hot` in `replica_config` of storage `{}`. \
Only integer values are accepted.",
plugin_name
)
}
}
if let Some(d) = s.get("warm") {
let d = d.to_string().parse::<u64>();
if let Ok(d) = d {
replication.warm = d;
} else {
bail!(
"Invalid type for field `warm` in `replica_config` of storage `{}`. \
Only integer values are accepted.",
plugin_name
)
}
}
if let Some(p) = s.get("propagation_delay") {
let p = p.to_string().parse::<u64>();
if let Ok(p) = p {
let propagation_delay = Duration::from_millis(p);
if (replication.interval - propagation_delay) < propagation_delay {
bail!(
"Invalid value for field `propagation_delay`: its value is too \
high compared to the `interval`, consider increasing the \
`interval` to at least twice its value (i.e. {}).",
p as f64 * 2.0 / 1000.0
);
}

replication.propagation_delay = propagation_delay;
} else {
bail!(
"Invalid type for field `propagation_delay` in `replica_config` of \
storage `{}`. Only integer values are accepted.",
plugin_name
)
}
}
Some(replication)
}
None => None,
};
Ok(StorageConfig {
name: storage_name.into(),
key_expr,
Expand All @@ -483,6 +632,7 @@ impl StorageConfig {
volume_id,
volume_cfg,
garbage_collection_config,
replication,
})
}
}
Expand Down Expand Up @@ -518,3 +668,7 @@ impl PrivacyTransparentGet<serde_json::Value> for serde_json::Map<String, serde_
}
}
}

#[cfg(test)]
#[path = "config.test.rs"]
mod tests;
Loading

0 comments on commit 3b6d773

Please sign in to comment.