Skip to content

Commit

Permalink
feat(storage-manager): enforce History::Latest based on Timestamp
Browse files Browse the repository at this point in the history
This commit forces all Storage that have the capability
`History::Latest` to only keep the Latest value based on their
Timestamp. This is achieved by (i) keeping track of the Timestamp
associated to all key expressions (for which a publication was received)
and (ii) discarding received publications that have an older
Timestamp (compared to what is tracked for the same key expression).

The main motivation is the Replication (or Storage Alignment) feature:
the algorithm we use rely on the assumption that a Storage with the
capability `History::Latest` only keeps the latest publication based on
the Timestamp. If that property is not upheld, no guarantee can be made
on the Replication algorithm.

Hence, to avoid leaving this responsibility on Storage developers, it
has been decided to enforce that behaviour at the Storage Manager
level. This is what this commit does.

Additionally, to avoid having the `latest_updates` structure grow
infinitely, it was included in the `GarbageCollectionEvent` structure to
be garbage collected at regular interval. This garbage collection is
controlled by the `GarbageCollectionConfig` which defaults to removing
entries that are more than 24h old.

* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs:
  - Add a new field `latest_updates` to the `StorageService` structure,
    which is used to keep track of the latest publication timestamp for
    all stripped key expression for which the Storage received a
    publication.
  - In the `process_sample` function, add a check to ensure that the
    Sample being processed is more recent than what is currently stored in
    the Storage. If the Sample is less recent, it is discarded.
  - If the `process_sample` function, after the Sample was successfully
    inserted, update the `latest_updates` structure with the more recent
    Timestamp. Note that the `StorageInsertionResult::Outdated` could
    still be used as an escape hatch for a Storage to filter Samples on
    something else than the Timestamp.
  - In the `GarbageCollectionEvent`, add the `last_updates` structure such
    that it is garbage collected.
  - Updated the `run` method of the `GarbageCollectionEvent` to retain
    only entries that are less than `GarbageCollectionConfig::lifespan`
    old.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 6, 2024
1 parent d9ce46f commit aa79fcb
Showing 1 changed file with 55 additions and 16 deletions.
71 changes: 55 additions & 16 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct StorageService {
capability: Capability,
tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
wildcard_updates: Arc<RwLock<KeBoxTree<Update, UnknownWildness, KeyedSetProvider>>>,
latest_updates: Arc<Mutex<HashMap<Option<OwnedKeyExpr>, Timestamp>>>,
replication: Option<ReplicationService>,
}

Expand All @@ -94,6 +95,7 @@ impl StorageService {
capability: store_intercept.capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
latest_updates: Arc::new(Mutex::new(HashMap::new())),
replication,
};
if storage_service
Expand Down Expand Up @@ -143,6 +145,7 @@ impl StorageService {
config: gc_config,
tombstones: self.tombstones.clone(),
wildcard_updates: self.wildcard_updates.clone(),
latest_updates: self.latest_updates.clone(),
},
);
t.add_async(gc).await;
Expand Down Expand Up @@ -335,7 +338,11 @@ impl StorageService {
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store);
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};
Expand All @@ -350,12 +357,32 @@ impl StorageService {
return;
}
};

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
if self.capability.history == History::Latest {
if let Some(stored_timestamp) =
self.latest_updates.lock().await.get(&stripped_key)
{
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Timestamp is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
}
}

let mut storage = self.storage.lock().await;
let result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key,
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
Expand All @@ -368,24 +395,30 @@ impl StorageService {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key, sample_to_store_timestamp)
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
drop(storage);
if self.replication.is_some()
&& result.is_ok()
&& !matches!(result.unwrap(), StorageInsertionResult::Outdated)
{
let sending = self
.replication
.as_ref()
.unwrap()
.log_propagation
.send((k.clone(), sample_to_store_timestamp));
match sending {
Ok(_) => (),
Err(e) => {

if result.is_ok_and(|insertion_result| {
!matches!(insertion_result, StorageInsertionResult::Outdated)
}) {
// Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps
// track of the Latest value, the timestamp is indeed more recent (it was
// checked before being processed): we update our internal structure.
if self.capability.history == History::Latest {
self.latest_updates
.lock()
.await
.insert(stripped_key, sample_to_store_timestamp);
}

if let Some(replication) = &self.replication {
if let Err(e) = replication
.log_propagation
.send((k.clone(), sample_to_store_timestamp))
{
tracing::error!("Error in sending the sample to the log: {}", e);
}
}
Expand Down Expand Up @@ -696,6 +729,7 @@ struct GarbageCollectionEvent {
config: GarbageCollectionConfig,
tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
wildcard_updates: Arc<RwLock<KeBoxTree<Update, UnknownWildness, KeyedSetProvider>>>,
latest_updates: Arc<Mutex<HashMap<Option<OwnedKeyExpr>, Timestamp>>>,
}

#[async_trait]
Expand Down Expand Up @@ -732,6 +766,11 @@ impl Timed for GarbageCollectionEvent {
wildcard_updates.remove(&k);
}

self.latest_updates
.lock()
.await
.retain(|_, timestamp| timestamp.get_time() < &time_limit);

tracing::trace!("End garbage collection of obsolete data-infos");
}
}

0 comments on commit aa79fcb

Please sign in to comment.