Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewrite of Storage Replication #1458

Merged
merged 6 commits into from
Sep 20, 2024

Conversation

J-Loudet
Copy link
Contributor

@J-Loudet J-Loudet commented Sep 19, 2024

These commits (not meant to be squashed) re-introduce the Storage Replication feature.

See individual commits for details, each being meant to be reviewed on its own.

@J-Loudet J-Loudet added the new feature Something new is needed label Sep 19, 2024
@J-Loudet J-Loudet force-pushed the refactor/storage-manager/replication branch from d53050f to 813df4a Compare September 20, 2024 07:06
Copy link
Member

@Charles-Schleich Charles-Schleich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, minor changes and Doc fixes

plugins/zenoh-backend-traits/src/config.rs Outdated Show resolved Hide resolved
plugins/zenoh-backend-traits/src/config.rs Show resolved Hide resolved
DEFAULT_CONFIG.json5 Show resolved Hide resolved
Comment on lines +511 to +512
// FIXME: An actual error from the underlying Storage cannot be distinguished from a
// missing entry.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which storage produce these errors ?
I can open issues in the storages to fix.

Copy link
Contributor Author

@J-Loudet J-Loudet Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion the problem lies in this method:

async fn get(
&mut self,
key: Option<OwnedKeyExpr>,
parameters: &str,
) -> ZResult<Vec<StoredData>>;

It should return a ZResult<Option<Vec<StoredData>>> where the None indicates that there is no entry for that key expression.

Right now, for instance, the Memory backend returns an error: https://github.com/eclipse-zenoh/zenoh/blob/main/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs#L131

I think this deserves a dedicated issue and discussion. (#1464)

Comment on lines +138 to +140
// TODO Do we really want to perform such an initial alignment? Because this query will
// target any Storage that matches the same key expression, regardless of if they have
// been configured to be replicated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a storage is not configured as a replica, then it shouldn't receive digests correct ?
Will this not simply be ignored by the storage ?

Copy link
Contributor Author

@J-Loudet J-Loudet Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment stands: the code that follows makes a regular GET to a Storage that was declared on the same key expression. It does not leverage anything related to the Replication to filter out Storage that are not configured to be replicated.

See issue #1463

This commit re-introduces the configuration that will be used to
replicate a Storage.

It is composed of the following fields:
- `interval`: it is used both a way to group publications together and
  as a timer to publish the Replication information.
- `sub_intervals`: it represents the number of sub-groups present inside
  an interval.
- `hot`: it dictates the number of intervals that are considered recent
  and where differences are more likely -- eventually leading to sending
  more information to re-align them quickly.
- `warm`: it dictates the number of intervals that are considered more
  stable than the "hot" intervals -- less information are sent.
- `propagation_delay`: an estimated upper bound for the time it takes
  for a publication to reach a Zenoh node.

* DEFAULT_CONFIG.json5: added, as an example, the default Replication
  configuration for a Storage.

* plugins/zenoh-backend-traits/src/config.rs:
  - Introduced `interval` field to control the publication and the
    grouping of publications.
  - Introduced `sub_intervals` field to control the number of
    subdivisions within an `interval`.
  - Introduced `hot` field to control the size (i.e. number of
    intervals) of the hot era.
  - Introduced `warm` field to control the size (i.e. number of
    intervals) of the warm era.
  - Updated the `Default` trait implementation for the `ReplicaConfig`.
  - Parsed the `ReplicaConfig` from the Zenoh configuration.

* plugins/zenoh-backend-traits/src/config.test.rs: unit tests to ensure
  that the parsing of the `ReplicaConfig` structure is valid.

Signed-off-by: Julien Loudet <[email protected]>
The first step in the replication is to fetch existing content at
start-up.

In order to enable this functionality, changes were made in the storage
manager. The main one is that `start_storage_queryable_subscriber` now
returns the instance of the `StorageService` it uses. This, in turn,
required spawning a dedicated task for its subscriber and queryable.

The `ReplicationService` is then started and queries the closest
complete Storage for its content. A "wait-strategy" is leveraged for
this purpose: the query cannot be issued instantly as Zenoh first needs
to propagate the routing information to the node.

Additionally, if the replication is enabled but the Storage is not
declared with the capability `History::Latest` the storage manager will
fail to start the Storage.

* plugins/zenoh-backend-traits/src/lib.rs: make the structure
  `Capability` derive the traits `Debug` and `Clone`. This is needed to
  make the StorageService structure also implement the same traits.

* plugins/zenoh-plugin-storage-manager/src/lib.rs: add `replication`
  module.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: declared
  the module `service`.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Created the empty structure `ReplicationService`.
  - Implemented the `start` method that fetches the content of another
    complete Storage on the Zenoh network. The retrieved `Sample` are
    then processed by the Storage.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Modified the visibility of the `storages_mgt::service` module to
    `pub(crate)` such that the `ReplicationService` can use it.
  - Imported the `StorageService` structure.
  - Removed the call to `tokio::task::spawn` when starting
    `StorageService::start` as that function now spawns a task.
  - Called the `ReplicationService` to perform the initial replication
    phase.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Derive the `Clone` trait for the `StorageService` structure.
  - Changed the signature of the `start` function to return the created
    instance of `StorageService`, for it to be used by the
    `ReplicationService`.
  - Changed the signature of the `start_storage_queryable_subscriber` to
    own `self`.
  - Changed the signature of the `process_sample` method to return an
    error if one happened.

Signed-off-by: Julien Loudet <[email protected]>
This commit introduces the first component of the rewrite of the
replication feature: the generation, update and publication of the
replication Digest.

A Digest is a structure that gives a concise view of the state of the
Storage that is replicated. It exposes a set `Fingerprint`s where a
`Fingerprint` is a 64 bits integer that uniquely identifies an event or
group(s) of events.

The groups are based on time: the timestamp of an event dictates to
which `Interval` and, within that `Interval`, to which `SubInterval` it
belongs. The sizes of an `Interval` and `SubInterval` are controlled by
configuration.

The `Fingerprint` of an event is computed thanks to the `xxhash-rust`
library.  The `Fingerprint` of a `SubInterval` is equal to the XOR of
the `Fingerprints` of the Events it contains. Similarly, the
`Fingerprint` of an `Interval` is equal to the XOR of the `Fingerprint`
of the `SubInterval`s it contains, and the `Fingerprint` of an era is
equal to the XOR of the `Fingerprint`s of its `Interval`s.

`Interval`s are finally grouped into 3 eras: Cold, Warm, and Hot. The
size of the Hot and Warm eras are controlled by configuration. Depending
on the era, more or less information is carried by the Digest: the
hotter the era, the more information. Specifically, the Hot era will
send over the `Fingerprint` of all its `SubInterval`s, the Warm era the
`Fingerprint`s of all its `Interval`s and the Cold era the `Fingerprint`
of the era.

The `Replication` structure handles the generation, update and
publication of the Digest. It spawns a dedicated task that will wake up
at every multiple of `Interval` to update a `LogLatest` from which the
`Digest` is generated.

To speed up the interactions with the `LogLatest` structure, a bloom
filter is leveraged. The bloom filter allows skipping checks when an
Event pertains to a key expression that did not have yet another Event
associated with it in the replication log.

To stop both the `ReplicationService` and the `StorageService`, the
flume channel that was used to propagate the `StorageMessage::Stop` was
replaced with a `tokio` broadcast channel.

Lastly, the Configuration ensures that when we will want to compare
Digests we can actually compare them: if any of the fields differ, we
cannot guarantee the comparison.

* Cargo.lock: (see Cargo.toml)
* deny.toml: added the "BSL-1.0" licence, used by the newly added
  `xxhash-rust` crate, that is approved by the Eclipse Foundation.
* plugins/zenoh-plugin-storage-manager/Cargo.toml: added the following
  dependencies:
  - `bincode` to serialise the `Digest` when publishing it.
  - `bloomfilter` to use this data structure in order to reduce the
    amount of checks performed.
  - `serde` to be able to serialise and send the `Digest`.
  - `xxhash-rust` to compute the hash of an Event.
  - `zenoh-result` to use the `bail!` macro.
  - (dev-dependency) `uhlc` to generate timestamps in tests.

* plugins/zenoh-plugin-storage-manager/src/lib.rs:
  - Replaced the `flume::Sender` with a `broadcast::Sender` to support
    sending the same message to both the Replication and Storage
    services.

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  - Define the `IntervalIdx` structure: a thin wrapper on top of a `u64`
    to index the `Interval`s.
  - Define the `Interval` structure: an interval groups `SubInterval`s
    and exposes a `Fingerprint`.
  - Define the `EventRemoval` enumeration: this enumeration carries the
    result of the `if_newer_remove_older` method. This is required as
    there are 3 possible outcome: (i) the provided Event is indeed newer
    than another Event on the same key expression, (ii) the provided
    Event is older than another Event and (iii) there is no Event
    associated with that key expression.
  - Define the `SubIntervalIdx` structure: a thin wrapper on top of a
    `u64` to index the `SubInterval`s.
  - Define the `SubInterval` structure: a sub-interval groups `Event`s
    and exposes a `Fingerprint`.

* plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs
  this file defines a wrapper around the `ReplicaConfig` structure. Its
  purpose is to expose methods to compute the `IntervalIdx` and
  `SubIntervalIdx` of a timestamp, as both are dependent on the
  configuration.
  Adding this methods to the `ReplicaConfig` was not possible as it
  would create a cyclic dependency.
  The fields `prefix` and `storage_key_expr` are included in the
  computation of the fingerprint, ensuring that when we will need to
  compare Digest, we are comparing "compatible" Digest.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Added the function `spawn_digest_publisher` that spawns a task that
    publishes the `Digest` at regular intervals.

* plugins/zenoh-plugin-storage-manager/src/replication/digest.rs: this
  file defines:
  - `Fingerprint`: a thin wrapper on top of a `u64`.
  - `Digest`: a set of `Fingerprint`s providing a concise view of a
    replication log.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs: this file
  defines:
  - `Event`: it groups together a key expression, a timestamp, an action
    and it computes the `Fingerprint` of these fields.
  - `Insertion`: an enumeration of the different possible outcome when
    trying to insert an `Event` in the replication log.
  - `LogLatest`: the replication log for storage that have the
    `Capability::Latest`.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs:
  added the new modules.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Enriched the `ReplicationService` to keep track of the task spawned
    to publish the `Digest`.
  - The `start` function was renamed to `spawn_start` to clearly
    indicate that it also spawns tasks.
  - Added the method `stop` on the `ReplicationService` structure that
    cancels the tasks it spawned when a `StorageMessage::Stop` is received.

* plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs
  unit tests to ensure a correct behaviour of the `Interval` and
  `SubInterval`.
* plugins/zenoh-plugin-storage-manager/src/replication/tests/configuration.test.rs
  unit tests to ensure the correct behaviour.
* plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs
  unit tests to ensure correct behaviour.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Made the enumeration `StorageMessage` derive the trait `Clone`. This
    is so that the broadcast channel can clone it to deliver it to all
    subscribers.
  - Added the type alias `LatestUpdates`.
  - Created a structure `CacheLatest` that keeps track of both the
    `LatestUpdates` and the Replication Log (if there is one). Depending
    on if Replication is enabled, the interactions with the Cache differ.
  - Replaced the `flume::channel` with a `broadcast::channel` to be able
    to propagate the `Stop` message to both services.
  - Changed the way the `latest_updates` is populated depending on if
    the Replication is enabled: if it is then `latest_updates` becomes a
    simple buffer that the Replication will swap when it updates its
    Log.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Replaced the `latest_updates` structure with `CacheLatest` that
    encapsulates the former as well as the Replication Log.
  - Changed the `latest_updates` structure passed to the
    `GarbageCollection`: if the Replication is enabled, `None` is
    provided to the garbage collection as this structure will be
    processed when updating the Replication.
  - Renamed the function `is_latest` to `guard_cache_if_latest` to
    better reflect its use.
  - Updated the `GarbageCollectionEvent` to take an `Option` over the
    `LatestUpdates` instead of an instance. This allows avoiding trying
    to garbage collect it when the Replication is enabled.
  - Replaced the `futures::select!` macro with the one from Tokio as the
    one from `futures` requires the future to select to be `FusedFuture`
    which is not the case for Tokio's broadcast channel. This change
    should have no impact.

Signed-off-by: Julien Loudet <[email protected]>

to squash with creation of Digest
…of Interval

This commit changes the behaviour of the task publishing the Digest:
instead of waking up after every `interval` starting from the moment the
task was spawned, now the task will first wait until the current time is
at a multiple of `interval` and then update / publish the Digest.

A random delay was also added to avoid a "coordinated update storm"
where, after this change, all replicas could attempt to publish their
Digest at the same time. This delay ranges from 0 (i.e. no delay) to 1/3
of the duration of an `interval`.

* Cargo.lock:
* plugins/zenoh-plugin-storage-manager/Cargo.toml: added a dependency
  to `rand` in order to be able to generate a random duration.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Align the time at which the code that updates and publishes the
    Digest wakes up to a multiple of `interval`.
  - Add a random delay before publishing the Digest to avoid a
    "coordinated update storm".
  - In case the update / publication took longer than the duration of an
    `interval`, log a warning.

Signed-off-by: Julien Loudet <[email protected]>
With this commit, a storage configured to replicate its data will
spawn a task that declares a subscriber that will receive the Digest of
all the replicas storing the same data.

Once a Digest is received, the subscriber task will compare it to the
Digest of its replication log. This comparison yields either `None`,
indicating that the received Digest is either identical or contains no
information that is missing on the local replication log, or it yields a
`DigestDiff` that details where the differences are found.

As a temporary measure, the `DigestDiff` is published: this behaviour
could evolve once the Aligner is implemented.

* Cargo.lock:
* plugins/zenoh-plugin-storage-manager/Cargo.toml: added dependency to
  `uuid` in order to associate a unique id to each received Digest.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Added a function `spawn_digest_subscriber` that, as the name
    indicates, subscribes to the Digest published by other storage and
    computes their difference.

* plugins/zenoh-plugin-storage-manager/src/replication/digest.rs:
  - Introduce new structure `DigestDiff`.
  - Introduce new method `diff` that, given two `Digest`, computes their
    difference returning a `DigestDiff` if (i) they can be
    compared (i.e. the `Fingerprint` of their configuration is
    identical) and (ii) they do differ. This method will return `None`
    if the two `Digest` are either identical or if the "other" `Digest`
    contains no information that the local `Digest` does not have.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Updated the `ReplicationService` structure to keep track of the
    "digest subscriber" task.
  - Updated the `stop` method to also cancel the "digest subscriber"
    task when stopping.

* plugins/zenoh-plugin-storage-manager/src/replication/tests/digest.test.rs:
  unit tests to ensure the correct behaviour of the `diff` method.

Signed-off-by: Julien Loudet <[email protected]>
This commit introduces the Aligner: a queryable spawned by Replicated
Storage that takes care of processing `AlignmentQuery` and replies with
`AlignmentReply`.

The entry point of the Aligner is the Digest Subscriber: whenever it
receives a Digest for which it detects a potential misalignment, it will
spawn a task that will query the Aligner of the Replica that sent the
Digest.

The Aligner of the Replica will then process this query and reply with
the information contained in its Replication Log, eventually leading to
sending the missing data (detected with the misaligned Digest).

* plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs:
  - Defined the new structure `AlignmentQuery` to query an Aligner.
  - Defined the new structure `AlignmentReply` to answer a query made
    to the Aligner.
  - Defined the method `aligner` that takes care of processing an
    `AlignmentReply` and answering it.
  - Defined the method `spawn_query_replica_aligner` that spawns a task
    to query the Aligner of a Replica for which a potential misalignment
    has been detected.

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  - Changed the visibility of the fields `fingerprint` and
    `sub_intervals` of the `Interval` structure to ease their
    manipulation in the Aligner.
  - Same for the fields `fingerprint` and `events` of the `SubInterval`
    structure.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Changed the key expression formatter of the Aligner to include the
    key expression of the Storage.
  - Added the field `storage` to the `Replication` structure to access
    the underlying Storage in order to get / put / delete data when
    aligning.
  - Changed the lock on the `latest_updates` structure to a `RwLock` as
    more reads are performed than writes and this would allow for more
    parallel operations when aligning.
  - Updated the code of the Digest subscriber to query the Aligner
    instead of publishing the DigestDiff.
  - Added the method `spawn_aligner_queryable` that spawns a task for
    the Aligner queryable.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs:
  - Added the `EventMetadata` structure that takes the same elements as
    the `Event` structure but removes the `Fingerprint`.
  - Changed the visibility of the fields of the `Event` structure to
    ease their manipulation.
  - Added conversion method between the `Event` and the `EventMetadata`.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: added the
  module `aligner`.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Add the field `aligner_queryable_handle` to the `ReplicationService`
    structure to keep track of the handle of the Aligner task.
  - Changed the lock on the `latest_updates` structure to a `RwLock`.
  - Updated the creation of the `Replication` structure, passing
    the Storage.
  - Updated the `stop` method to also abort the task of the Aligner.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Changed the lock on the `latest_updates` structure to a `RwLock` as
    more reads are performed than writes and this would allow for more
    parallel operations when aligning.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Changed the `MutexGuard` to a `RwLockWriteGuard` in the method
    `guard_cache_if_latest`.
  - Changed the calls to `lock` to `write`.

* zenoh/src/api/sample.rs:
  - Removed the feature guard above the import to `serde::Serialize` as
    it is used in all cases for the `SampleKind`.
  - Made the `SampleKind` derive `Serialize` and `Deserialize` such that
    it can be sent to / by the Aligner.

Signed-off-by: Julien Loudet <[email protected]>
@Charles-Schleich
Copy link
Member

@J-Loudet this is good to merge, thanks for the changes !

@Mallets Mallets merged commit 3b6d773 into main Sep 20, 2024
24 checks passed
@Mallets Mallets deleted the refactor/storage-manager/replication branch September 20, 2024 12:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new feature Something new is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants