-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rewrite of Storage Replication #1458
Conversation
d53050f
to
813df4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, minor changes and Doc fixes
plugins/zenoh-plugin-storage-manager/src/replication/classification.rs
Outdated
Show resolved
Hide resolved
plugins/zenoh-plugin-storage-manager/src/replication/classification.rs
Outdated
Show resolved
Hide resolved
plugins/zenoh-plugin-storage-manager/src/replication/classification.rs
Outdated
Show resolved
Hide resolved
// FIXME: An actual error from the underlying Storage cannot be distinguished from a | ||
// missing entry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which storage produce these errors ?
I can open issues in the storages to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion the problem lies in this method:
zenoh/plugins/zenoh-backend-traits/src/lib.rs
Lines 247 to 251 in ddcc8f1
async fn get( | |
&mut self, | |
key: Option<OwnedKeyExpr>, | |
parameters: &str, | |
) -> ZResult<Vec<StoredData>>; |
It should return a ZResult<Option<Vec<StoredData>>>
where the None
indicates that there is no entry for that key expression.
Right now, for instance, the Memory backend returns an error: https://github.com/eclipse-zenoh/zenoh/blob/main/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs#L131
I think this deserves a dedicated issue and discussion. (#1464)
// TODO Do we really want to perform such an initial alignment? Because this query will | ||
// target any Storage that matches the same key expression, regardless of if they have | ||
// been configured to be replicated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a storage is not configured as a replica, then it shouldn't receive digests correct ?
Will this not simply be ignored by the storage ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment stands: the code that follows makes a regular GET to a Storage that was declared on the same key expression. It does not leverage anything related to the Replication to filter out Storage that are not configured to be replicated.
See issue #1463
This commit re-introduces the configuration that will be used to replicate a Storage. It is composed of the following fields: - `interval`: it is used both a way to group publications together and as a timer to publish the Replication information. - `sub_intervals`: it represents the number of sub-groups present inside an interval. - `hot`: it dictates the number of intervals that are considered recent and where differences are more likely -- eventually leading to sending more information to re-align them quickly. - `warm`: it dictates the number of intervals that are considered more stable than the "hot" intervals -- less information are sent. - `propagation_delay`: an estimated upper bound for the time it takes for a publication to reach a Zenoh node. * DEFAULT_CONFIG.json5: added, as an example, the default Replication configuration for a Storage. * plugins/zenoh-backend-traits/src/config.rs: - Introduced `interval` field to control the publication and the grouping of publications. - Introduced `sub_intervals` field to control the number of subdivisions within an `interval`. - Introduced `hot` field to control the size (i.e. number of intervals) of the hot era. - Introduced `warm` field to control the size (i.e. number of intervals) of the warm era. - Updated the `Default` trait implementation for the `ReplicaConfig`. - Parsed the `ReplicaConfig` from the Zenoh configuration. * plugins/zenoh-backend-traits/src/config.test.rs: unit tests to ensure that the parsing of the `ReplicaConfig` structure is valid. Signed-off-by: Julien Loudet <[email protected]>
The first step in the replication is to fetch existing content at start-up. In order to enable this functionality, changes were made in the storage manager. The main one is that `start_storage_queryable_subscriber` now returns the instance of the `StorageService` it uses. This, in turn, required spawning a dedicated task for its subscriber and queryable. The `ReplicationService` is then started and queries the closest complete Storage for its content. A "wait-strategy" is leveraged for this purpose: the query cannot be issued instantly as Zenoh first needs to propagate the routing information to the node. Additionally, if the replication is enabled but the Storage is not declared with the capability `History::Latest` the storage manager will fail to start the Storage. * plugins/zenoh-backend-traits/src/lib.rs: make the structure `Capability` derive the traits `Debug` and `Clone`. This is needed to make the StorageService structure also implement the same traits. * plugins/zenoh-plugin-storage-manager/src/lib.rs: add `replication` module. * plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: declared the module `service`. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Created the empty structure `ReplicationService`. - Implemented the `start` method that fetches the content of another complete Storage on the Zenoh network. The retrieved `Sample` are then processed by the Storage. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: - Modified the visibility of the `storages_mgt::service` module to `pub(crate)` such that the `ReplicationService` can use it. - Imported the `StorageService` structure. - Removed the call to `tokio::task::spawn` when starting `StorageService::start` as that function now spawns a task. - Called the `ReplicationService` to perform the initial replication phase. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs: - Derive the `Clone` trait for the `StorageService` structure. - Changed the signature of the `start` function to return the created instance of `StorageService`, for it to be used by the `ReplicationService`. - Changed the signature of the `start_storage_queryable_subscriber` to own `self`. - Changed the signature of the `process_sample` method to return an error if one happened. Signed-off-by: Julien Loudet <[email protected]>
This commit introduces the first component of the rewrite of the replication feature: the generation, update and publication of the replication Digest. A Digest is a structure that gives a concise view of the state of the Storage that is replicated. It exposes a set `Fingerprint`s where a `Fingerprint` is a 64 bits integer that uniquely identifies an event or group(s) of events. The groups are based on time: the timestamp of an event dictates to which `Interval` and, within that `Interval`, to which `SubInterval` it belongs. The sizes of an `Interval` and `SubInterval` are controlled by configuration. The `Fingerprint` of an event is computed thanks to the `xxhash-rust` library. The `Fingerprint` of a `SubInterval` is equal to the XOR of the `Fingerprints` of the Events it contains. Similarly, the `Fingerprint` of an `Interval` is equal to the XOR of the `Fingerprint` of the `SubInterval`s it contains, and the `Fingerprint` of an era is equal to the XOR of the `Fingerprint`s of its `Interval`s. `Interval`s are finally grouped into 3 eras: Cold, Warm, and Hot. The size of the Hot and Warm eras are controlled by configuration. Depending on the era, more or less information is carried by the Digest: the hotter the era, the more information. Specifically, the Hot era will send over the `Fingerprint` of all its `SubInterval`s, the Warm era the `Fingerprint`s of all its `Interval`s and the Cold era the `Fingerprint` of the era. The `Replication` structure handles the generation, update and publication of the Digest. It spawns a dedicated task that will wake up at every multiple of `Interval` to update a `LogLatest` from which the `Digest` is generated. To speed up the interactions with the `LogLatest` structure, a bloom filter is leveraged. The bloom filter allows skipping checks when an Event pertains to a key expression that did not have yet another Event associated with it in the replication log. To stop both the `ReplicationService` and the `StorageService`, the flume channel that was used to propagate the `StorageMessage::Stop` was replaced with a `tokio` broadcast channel. Lastly, the Configuration ensures that when we will want to compare Digests we can actually compare them: if any of the fields differ, we cannot guarantee the comparison. * Cargo.lock: (see Cargo.toml) * deny.toml: added the "BSL-1.0" licence, used by the newly added `xxhash-rust` crate, that is approved by the Eclipse Foundation. * plugins/zenoh-plugin-storage-manager/Cargo.toml: added the following dependencies: - `bincode` to serialise the `Digest` when publishing it. - `bloomfilter` to use this data structure in order to reduce the amount of checks performed. - `serde` to be able to serialise and send the `Digest`. - `xxhash-rust` to compute the hash of an Event. - `zenoh-result` to use the `bail!` macro. - (dev-dependency) `uhlc` to generate timestamps in tests. * plugins/zenoh-plugin-storage-manager/src/lib.rs: - Replaced the `flume::Sender` with a `broadcast::Sender` to support sending the same message to both the Replication and Storage services. * plugins/zenoh-plugin-storage-manager/src/replication/classification.rs: - Define the `IntervalIdx` structure: a thin wrapper on top of a `u64` to index the `Interval`s. - Define the `Interval` structure: an interval groups `SubInterval`s and exposes a `Fingerprint`. - Define the `EventRemoval` enumeration: this enumeration carries the result of the `if_newer_remove_older` method. This is required as there are 3 possible outcome: (i) the provided Event is indeed newer than another Event on the same key expression, (ii) the provided Event is older than another Event and (iii) there is no Event associated with that key expression. - Define the `SubIntervalIdx` structure: a thin wrapper on top of a `u64` to index the `SubInterval`s. - Define the `SubInterval` structure: a sub-interval groups `Event`s and exposes a `Fingerprint`. * plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs this file defines a wrapper around the `ReplicaConfig` structure. Its purpose is to expose methods to compute the `IntervalIdx` and `SubIntervalIdx` of a timestamp, as both are dependent on the configuration. Adding this methods to the `ReplicaConfig` was not possible as it would create a cyclic dependency. The fields `prefix` and `storage_key_expr` are included in the computation of the fingerprint, ensuring that when we will need to compare Digest, we are comparing "compatible" Digest. * plugins/zenoh-plugin-storage-manager/src/replication/core.rs: - Added the function `spawn_digest_publisher` that spawns a task that publishes the `Digest` at regular intervals. * plugins/zenoh-plugin-storage-manager/src/replication/digest.rs: this file defines: - `Fingerprint`: a thin wrapper on top of a `u64`. - `Digest`: a set of `Fingerprint`s providing a concise view of a replication log. * plugins/zenoh-plugin-storage-manager/src/replication/log.rs: this file defines: - `Event`: it groups together a key expression, a timestamp, an action and it computes the `Fingerprint` of these fields. - `Insertion`: an enumeration of the different possible outcome when trying to insert an `Event` in the replication log. - `LogLatest`: the replication log for storage that have the `Capability::Latest`. * plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: added the new modules. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Enriched the `ReplicationService` to keep track of the task spawned to publish the `Digest`. - The `start` function was renamed to `spawn_start` to clearly indicate that it also spawns tasks. - Added the method `stop` on the `ReplicationService` structure that cancels the tasks it spawned when a `StorageMessage::Stop` is received. * plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs unit tests to ensure a correct behaviour of the `Interval` and `SubInterval`. * plugins/zenoh-plugin-storage-manager/src/replication/tests/configuration.test.rs unit tests to ensure the correct behaviour. * plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs unit tests to ensure correct behaviour. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: - Made the enumeration `StorageMessage` derive the trait `Clone`. This is so that the broadcast channel can clone it to deliver it to all subscribers. - Added the type alias `LatestUpdates`. - Created a structure `CacheLatest` that keeps track of both the `LatestUpdates` and the Replication Log (if there is one). Depending on if Replication is enabled, the interactions with the Cache differ. - Replaced the `flume::channel` with a `broadcast::channel` to be able to propagate the `Stop` message to both services. - Changed the way the `latest_updates` is populated depending on if the Replication is enabled: if it is then `latest_updates` becomes a simple buffer that the Replication will swap when it updates its Log. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs: - Replaced the `latest_updates` structure with `CacheLatest` that encapsulates the former as well as the Replication Log. - Changed the `latest_updates` structure passed to the `GarbageCollection`: if the Replication is enabled, `None` is provided to the garbage collection as this structure will be processed when updating the Replication. - Renamed the function `is_latest` to `guard_cache_if_latest` to better reflect its use. - Updated the `GarbageCollectionEvent` to take an `Option` over the `LatestUpdates` instead of an instance. This allows avoiding trying to garbage collect it when the Replication is enabled. - Replaced the `futures::select!` macro with the one from Tokio as the one from `futures` requires the future to select to be `FusedFuture` which is not the case for Tokio's broadcast channel. This change should have no impact. Signed-off-by: Julien Loudet <[email protected]> to squash with creation of Digest
…of Interval This commit changes the behaviour of the task publishing the Digest: instead of waking up after every `interval` starting from the moment the task was spawned, now the task will first wait until the current time is at a multiple of `interval` and then update / publish the Digest. A random delay was also added to avoid a "coordinated update storm" where, after this change, all replicas could attempt to publish their Digest at the same time. This delay ranges from 0 (i.e. no delay) to 1/3 of the duration of an `interval`. * Cargo.lock: * plugins/zenoh-plugin-storage-manager/Cargo.toml: added a dependency to `rand` in order to be able to generate a random duration. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Align the time at which the code that updates and publishes the Digest wakes up to a multiple of `interval`. - Add a random delay before publishing the Digest to avoid a "coordinated update storm". - In case the update / publication took longer than the duration of an `interval`, log a warning. Signed-off-by: Julien Loudet <[email protected]>
With this commit, a storage configured to replicate its data will spawn a task that declares a subscriber that will receive the Digest of all the replicas storing the same data. Once a Digest is received, the subscriber task will compare it to the Digest of its replication log. This comparison yields either `None`, indicating that the received Digest is either identical or contains no information that is missing on the local replication log, or it yields a `DigestDiff` that details where the differences are found. As a temporary measure, the `DigestDiff` is published: this behaviour could evolve once the Aligner is implemented. * Cargo.lock: * plugins/zenoh-plugin-storage-manager/Cargo.toml: added dependency to `uuid` in order to associate a unique id to each received Digest. * plugins/zenoh-plugin-storage-manager/src/replication/core.rs: - Added a function `spawn_digest_subscriber` that, as the name indicates, subscribes to the Digest published by other storage and computes their difference. * plugins/zenoh-plugin-storage-manager/src/replication/digest.rs: - Introduce new structure `DigestDiff`. - Introduce new method `diff` that, given two `Digest`, computes their difference returning a `DigestDiff` if (i) they can be compared (i.e. the `Fingerprint` of their configuration is identical) and (ii) they do differ. This method will return `None` if the two `Digest` are either identical or if the "other" `Digest` contains no information that the local `Digest` does not have. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Updated the `ReplicationService` structure to keep track of the "digest subscriber" task. - Updated the `stop` method to also cancel the "digest subscriber" task when stopping. * plugins/zenoh-plugin-storage-manager/src/replication/tests/digest.test.rs: unit tests to ensure the correct behaviour of the `diff` method. Signed-off-by: Julien Loudet <[email protected]>
This commit introduces the Aligner: a queryable spawned by Replicated Storage that takes care of processing `AlignmentQuery` and replies with `AlignmentReply`. The entry point of the Aligner is the Digest Subscriber: whenever it receives a Digest for which it detects a potential misalignment, it will spawn a task that will query the Aligner of the Replica that sent the Digest. The Aligner of the Replica will then process this query and reply with the information contained in its Replication Log, eventually leading to sending the missing data (detected with the misaligned Digest). * plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs: - Defined the new structure `AlignmentQuery` to query an Aligner. - Defined the new structure `AlignmentReply` to answer a query made to the Aligner. - Defined the method `aligner` that takes care of processing an `AlignmentReply` and answering it. - Defined the method `spawn_query_replica_aligner` that spawns a task to query the Aligner of a Replica for which a potential misalignment has been detected. * plugins/zenoh-plugin-storage-manager/src/replication/classification.rs: - Changed the visibility of the fields `fingerprint` and `sub_intervals` of the `Interval` structure to ease their manipulation in the Aligner. - Same for the fields `fingerprint` and `events` of the `SubInterval` structure. * plugins/zenoh-plugin-storage-manager/src/replication/core.rs: - Changed the key expression formatter of the Aligner to include the key expression of the Storage. - Added the field `storage` to the `Replication` structure to access the underlying Storage in order to get / put / delete data when aligning. - Changed the lock on the `latest_updates` structure to a `RwLock` as more reads are performed than writes and this would allow for more parallel operations when aligning. - Updated the code of the Digest subscriber to query the Aligner instead of publishing the DigestDiff. - Added the method `spawn_aligner_queryable` that spawns a task for the Aligner queryable. * plugins/zenoh-plugin-storage-manager/src/replication/log.rs: - Added the `EventMetadata` structure that takes the same elements as the `Event` structure but removes the `Fingerprint`. - Changed the visibility of the fields of the `Event` structure to ease their manipulation. - Added conversion method between the `Event` and the `EventMetadata`. * plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: added the module `aligner`. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Add the field `aligner_queryable_handle` to the `ReplicationService` structure to keep track of the handle of the Aligner task. - Changed the lock on the `latest_updates` structure to a `RwLock`. - Updated the creation of the `Replication` structure, passing the Storage. - Updated the `stop` method to also abort the task of the Aligner. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: - Changed the lock on the `latest_updates` structure to a `RwLock` as more reads are performed than writes and this would allow for more parallel operations when aligning. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs: - Changed the `MutexGuard` to a `RwLockWriteGuard` in the method `guard_cache_if_latest`. - Changed the calls to `lock` to `write`. * zenoh/src/api/sample.rs: - Removed the feature guard above the import to `serde::Serialize` as it is used in all cases for the `SampleKind`. - Made the `SampleKind` derive `Serialize` and `Deserialize` such that it can be sent to / by the Aligner. Signed-off-by: Julien Loudet <[email protected]>
813df4a
to
02fc1d4
Compare
@J-Loudet this is good to merge, thanks for the changes ! |
These commits (not meant to be squashed) re-introduce the Storage Replication feature.
See individual commits for details, each being meant to be reviewed on its own.