diff --git a/chitchat-test/src/main.rs b/chitchat-test/src/main.rs index 56076e0..458bbbf 100644 --- a/chitchat-test/src/main.rs +++ b/chitchat-test/src/main.rs @@ -50,8 +50,7 @@ impl Api { let mut chitchat_guard = self.chitchat.lock().await; let cc_state = chitchat_guard.self_node_state(); - cc_state.mark_for_deletion(key.as_str()); - + cc_state.delete(key.as_str()); Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap()) } } diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index e95bc17..fa20272 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use crate::serialize::*; -use crate::types::{KeyValueMutation, KeyValueMutationRef}; +use crate::types::{DeletionStatusMutation, KeyValueMutation, KeyValueMutationRef}; use crate::{ChitchatId, Version, VersionedValue}; /// A delta is the message we send to another node to update it. @@ -117,12 +117,12 @@ impl Deserializable for DeltaOp { let key = String::deserialize(buf)?; let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; - let deleted = bool::deserialize(buf)?; + let deleted = DeletionStatusMutation::deserialize(buf)?; Ok(DeltaOp::KeyValue(KeyValueMutation { key, value, version, - tombstone: deleted, + status: deleted, })) } DeltaOpTag::SetMaxVersion => { @@ -280,7 +280,11 @@ impl Delta { key: key.to_string(), value: value.to_string(), version, - tombstone: deleted, + status: if deleted { + DeletionStatusMutation::Delete + } else { + DeletionStatusMutation::Set + }, }); } @@ -440,7 +444,7 @@ impl DeltaSerializer { key: key.to_string(), value: versioned_value.value, version: versioned_value.version, - tombstone: versioned_value.tombstone.is_some(), + status: versioned_value.status.into(), }; let key_value_op = DeltaOp::KeyValue(key_value_mutation); self.try_add_op(key_value_op) @@ -472,6 +476,7 @@ mod tests { use tokio::time::Instant; use super::*; + use crate::types::DeletionStatus; #[test] fn test_delta_serialization_default() { @@ -512,7 +517,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, }, )); // +26 bytes: 2 bytes (key length) + 5 bytes (key) + 8 bytes (version) + @@ -522,7 +527,7 @@ mod tests { VersionedValue { value: "".to_string(), version: 2, - tombstone: Some(Instant::now()), + status: DeletionStatus::Deleted(Instant::now()), }, )); @@ -536,7 +541,7 @@ mod tests { VersionedValue { value: "val21".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, }, )); // +23 bytes. @@ -545,7 +550,7 @@ mod tests { VersionedValue { value: "val22".to_string(), version: 3, - tombstone: None, + status: DeletionStatus::Set, }, )); test_aux_delta_writer(delta_writer, 98); @@ -567,7 +572,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } )); @@ -577,7 +582,7 @@ mod tests { VersionedValue { value: "val12".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } )); @@ -614,7 +619,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } )); // +23 bytes (kv) + 1 (op tag) @@ -624,7 +629,7 @@ mod tests { VersionedValue { value: "val12".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } )); @@ -651,7 +656,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } )); // +23 bytes. @@ -660,7 +665,7 @@ mod tests { VersionedValue { value: "val12".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } )); @@ -690,7 +695,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } )); @@ -701,7 +706,7 @@ mod tests { VersionedValue { value: "val12aaaaaaaaaabcc".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } )); test_aux_delta_writer(delta_writer, 72); @@ -720,7 +725,7 @@ mod tests { VersionedValue { value: "val11".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } )); assert!(!delta_writer.try_add_kv( @@ -728,7 +733,7 @@ mod tests { VersionedValue { value: "val12".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } )); delta_writer.try_add_kv( @@ -736,7 +741,7 @@ mod tests { VersionedValue { value: "val12".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, }, ); } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 09a3b9a..95dfb84 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -32,7 +32,7 @@ use crate::digest::Digest; pub use crate::message::ChitchatMessage; pub use crate::server::{spawn_chitchat, ChitchatHandle}; use crate::state::ClusterState; -pub use crate::types::{ChitchatId, Heartbeat, Version, VersionedValue}; +pub use crate::types::{ChitchatId, DeletionStatus, Heartbeat, Version, VersionedValue}; /// Maximum UDP datagram payload size (in bytes). /// @@ -608,7 +608,7 @@ mod tests { assert_nodes_sync(&[&node1, &node2]); - node1.self_node_state().mark_for_deletion("k1"); + node1.self_node_state().delete("k1"); // Advance time before triggering the GC of that deleted key tokio::time::advance(Duration::from_secs(3_600 * 3)).await; @@ -718,7 +718,7 @@ mod tests { .lock() .await .self_node_state() - .mark_for_deletion("READY"); + .delete("READY"); let live_members = loop { let live_nodes = live_nodes_stream.next().await.unwrap(); @@ -1085,8 +1085,8 @@ mod tests { node1.self_node_state().set("self1:suffix1", "updated"); assert_eq!(counter_self_key.load(Ordering::SeqCst), 2); - node1.self_node_state().mark_for_deletion("self1:suffix1"); - node2.self_node_state().mark_for_deletion("other:suffix"); + node1.self_node_state().delete("self1:suffix1"); + node2.self_node_state().delete("other:suffix"); run_chitchat_handshake(&mut node1, &mut node2); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index af2ef20..083d333 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -17,6 +17,7 @@ use tracing::{info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; use crate::listener::Listeners; +use crate::types::{DeletionStatus, DeletionStatusMutation}; use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue}; #[derive(Clone, Serialize, Deserialize)] @@ -124,7 +125,7 @@ impl NodeState { /// Returns an iterator over all of the (non-deleted) key-values. pub fn key_values(&self) -> impl Iterator { self.key_values_including_deleted() - .filter(|(_, versioned_value)| !versioned_value.is_tombstone()) + .filter(|(_, versioned_value)| !versioned_value.is_deleted()) .map(|(key, versioned_value)| (key, versioned_value.value.as_str())) } @@ -235,22 +236,18 @@ impl NodeState { // We already know about this KV. continue; } - if key_value_mutation.tombstone { - // We don't want to keep any tombstone before `last_gc_version`. + if key_value_mutation.status.scheduled_for_deletion() { + // This KV has already been GCed. if key_value_mutation.version <= self.last_gc_version { continue; } } - let versioned_value = VersionedValue { + let new_versioned_value = VersionedValue { value: key_value_mutation.value, version: key_value_mutation.version, - tombstone: if key_value_mutation.tombstone { - Some(now) - } else { - None - }, + status: key_value_mutation.status.into_status(now), }; - self.set_versioned_value(key_value_mutation.key, versioned_value); + self.set_versioned_value(key_value_mutation.key, new_versioned_value); } } @@ -263,7 +260,7 @@ impl NodeState { self.key_values .range::(range) .take_while(move |(key, _)| key.starts_with(prefix)) - .filter(|&(_, versioned_value)| !versioned_value.is_tombstone()) + .filter(|&(_, versioned_value)| !versioned_value.is_deleted()) .map(|(key, versioned_value)| (key.as_str(), versioned_value)) } @@ -279,7 +276,7 @@ impl NodeState { pub fn get(&self, key: &str) -> Option<&str> { let versioned_value = self.get_versioned(key)?; - if versioned_value.is_tombstone() { + if versioned_value.is_deleted() { return None; } Some(versioned_value.value.as_str()) @@ -304,19 +301,44 @@ impl NodeState { } } - /// Marks key for deletion and sets the value to an empty string. - pub fn mark_for_deletion(&mut self, key: &str) { + /// Deletes the entry associated to the given key. + /// + /// From the reader's perspective, the entry is deleted right away. + /// + /// In reality, the entry is not removed from memory right away, but rather + /// marked with a tombstone. + /// That tombstone is annotated with the time of removal, so that after a configurable + /// grace period, it will be remove by the garbage collection. + pub fn delete(&mut self, key: &str) { + let Some(versioned_value) = self.key_values.get_mut(key) else { + warn!("Key `{key}` does not exist in the node's state and could not be deleted.",); + return; + }; + self.max_version += 1; + versioned_value.version = self.max_version; + versioned_value.value = "".to_string(); + versioned_value.status = DeletionStatusMutation::Delete.into_status(Instant::now()); + } + + /// Contrary to `delete`, this does not delete an entry right away, + /// but rather schedules its deletion for after the grace period. + /// + /// At the grace period, the entry will be really deleted just like a regular + /// tombstoned entry. + /// + /// Implementation wise, the only difference with `delete` is that it is + /// treated as if it was present during the grace period.`` + pub fn delete_after_ttl(&mut self, key: &str) { let Some(versioned_value) = self.key_values.get_mut(key) else { warn!( - "Key `{key}` does not exist in the node's state and could not be marked for \ - deletion.", + "Key `{key}` does not exist in the node's state and could not scheduled for an \ + eventual deletion.", ); return; }; self.max_version += 1; versioned_value.version = self.max_version; - versioned_value.value = "".to_string(); - versioned_value.tombstone = Some(Instant::now()); + versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now()); } pub(crate) fn inc_heartbeat(&mut self) { @@ -357,11 +379,14 @@ impl NodeState { let mut max_deleted_version = self.last_gc_version; self.key_values .retain(|_, versioned_value: &mut VersionedValue| { - let Some(deleted_instant) = versioned_value.tombstone else { + let Some(deleted_start_instant) = versioned_value + .status + .time_of_start_scheduled_for_deletion() + else { // The KV is not deleted. We keep it! return true; }; - if now < deleted_instant + grace_period { + if now < deleted_start_instant + grace_period { // We haved not passed the grace period yet. We keep it! return true; } @@ -423,7 +448,7 @@ impl NodeState { vacant.insert(versioned_value_update.clone()); } }; - if !versioned_value_update.is_tombstone() { + if !versioned_value_update.is_deleted() { self.listeners.trigger_event(key_change_event); } } @@ -435,7 +460,7 @@ impl NodeState { VersionedValue { value: value.to_string(), version, - tombstone: None, + status: DeletionStatus::Set, }, ); } @@ -782,7 +807,7 @@ fn random_generator() -> impl Rng { mod tests { use super::*; use crate::serialize::Serializable; - use crate::types::KeyValueMutation; + use crate::types::{DeletionStatusMutation, KeyValueMutation}; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -975,7 +1000,7 @@ mod tests { &VersionedValue { value: "".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } ); } @@ -990,7 +1015,7 @@ mod tests { &VersionedValue { value: "1".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } ); node_state.set("key_b", "2"); @@ -999,7 +1024,7 @@ mod tests { &VersionedValue { value: "1".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } ); assert_eq!( @@ -1007,7 +1032,7 @@ mod tests { &VersionedValue { value: "2".to_string(), version: 2, - tombstone: None, + status: DeletionStatus::Set, } ); node_state.set("key_a", "3"); @@ -1016,7 +1041,7 @@ mod tests { &VersionedValue { value: "3".to_string(), version: 3, - tombstone: None, + status: DeletionStatus::Set } ); } @@ -1031,7 +1056,7 @@ mod tests { &VersionedValue { value: "1".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set } ); node_state.set("key", "1"); @@ -1040,7 +1065,7 @@ mod tests { &VersionedValue { value: "1".to_string(), version: 1, - tombstone: None, + status: DeletionStatus::Set, } ); } @@ -1051,12 +1076,17 @@ mod tests { let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); node_state.heartbeat = Heartbeat(10); node_state.set("key", "1"); - node_state.mark_for_deletion("key"); + node_state.delete("key"); + assert!(node_state.get("key").is_none()); { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, ""); assert_eq!(versioned_value.version, 2u64); - assert!(&versioned_value.is_tombstone()); + assert!(versioned_value.is_deleted()); + assert!(versioned_value + .status + .time_of_start_scheduled_for_deletion() + .is_some()); } // Overriding the same key @@ -1065,7 +1095,50 @@ mod tests { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, "2"); assert_eq!(versioned_value.version, 3u64); - assert!(!versioned_value.is_tombstone()); + assert!(!versioned_value.is_deleted()); + assert!(versioned_value + .status + .time_of_start_scheduled_for_deletion() + .is_none()); + } + } + + #[test] + fn test_cluster_state_delete_after_ttl() { + let mut cluster_state = ClusterState::default(); + let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + node_state.heartbeat = Heartbeat(10); + node_state.set("key", "1"); + node_state.delete_after_ttl("key"); + { + let value = node_state.get("key").unwrap(); + assert_eq!(value, "1"); + let versioned_value = node_state.get_versioned("key").unwrap(); + assert_eq!(&versioned_value.value, "1"); + assert_eq!(versioned_value.version, 2u64); + assert!(versioned_value + .status + .time_of_start_scheduled_for_deletion() + .is_some()); + assert!(!versioned_value.is_deleted()); + assert!(matches!( + versioned_value.status, + DeletionStatus::DeleteAfterTtl(_) + )); + } + + // Overriding the same key + node_state.set("key", "2"); + { + let versioned_value = node_state.get_versioned("key").unwrap(); + assert_eq!(&versioned_value.value, "2"); + assert_eq!(versioned_value.version, 3u64); + assert!(!versioned_value.is_deleted()); + assert!(versioned_value + .status + .time_of_start_scheduled_for_deletion() + .is_none()); + assert!(matches!(versioned_value.status, DeletionStatus::Set)); } } @@ -1098,7 +1171,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); let node1_state = cluster_state.node_state_mut(&node1); node1_state.set("key_a", "1"); - node1_state.mark_for_deletion("key_a"); // Version 2. Tombstone set to heartbeat 100. + node1_state.delete("key_a"); // Version 2. Tombstone set to heartbeat 100. tokio::time::advance(Duration::from_secs(5)).await; node1_state.set_with_version("key_b".to_string(), "3".to_string(), 13); // 3 node1_state.heartbeat = Heartbeat(110); @@ -1164,7 +1237,7 @@ mod tests { &VersionedValue { value: "4".to_string(), version: 4, - tombstone: None, + status: DeletionStatus::Set, } ); // We ignore stale values. @@ -1173,7 +1246,7 @@ mod tests { &VersionedValue { value: "3".to_string(), version: 3, - tombstone: None, + status: DeletionStatus::Set, } ); // Check node 2 is reset and is only populated with the new `key_d`. @@ -1184,7 +1257,7 @@ mod tests { &VersionedValue { value: "4".to_string(), version: 4, - tombstone: None, + status: DeletionStatus::Set } ); } @@ -1245,7 +1318,7 @@ mod tests { node2_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 node2_state.set_with_version("key_c".to_string(), "3".to_string(), 3); // 3 node2_state.set_with_version("key_d".to_string(), "4".to_string(), 4); // 4 - node2_state.mark_for_deletion("key_d"); // 5 + node2_state.delete("key_d"); // 5 cluster_state } @@ -1370,9 +1443,7 @@ mod tests { assert_eq!(delta, expected_delta); } - cluster_state - .node_state_mut(&node1) - .mark_for_deletion("key_a"); + cluster_state.node_state_mut(&node1).delete("key_a"); tokio::time::advance(Duration::from_secs(5)).await; cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); @@ -1431,7 +1502,7 @@ mod tests { node_state.set("Europe:Italy", ""); node_state.set("Africa:Uganda", ""); node_state.set("Oceania", ""); - node_state.mark_for_deletion("Europe:UK"); + node_state.delete("Europe:UK"); let node_states: Vec<&str> = node_state .iter_prefix("Europe:") .map(|(key, _v)| key) @@ -1454,13 +1525,13 @@ mod tests { key: "key_c".to_string(), value: "val_c".to_string(), version: 4, - tombstone: false, + status: DeletionStatusMutation::Set, }, KeyValueMutation { key: "key_b".to_string(), value: "val_b2".to_string(), version: 3, - tombstone: false, + status: DeletionStatusMutation::Set, }, ], }; @@ -1490,13 +1561,13 @@ mod tests { key: "key_a".to_string(), value: "val_a".to_string(), version: 3, - tombstone: false, + status: DeletionStatusMutation::Set, }], }; node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 3); - assert!(versioned_a.tombstone.is_none()); + assert_eq!(versioned_a.status, DeletionStatus::Set); assert_eq!(&versioned_a.value, "val_a"); } @@ -1514,13 +1585,13 @@ mod tests { key: "key_a".to_string(), value: "new_val".to_string(), version: 7, - tombstone: false, + status: DeletionStatusMutation::Set, }], }; node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 5); - assert!(versioned_a.tombstone.is_none()); + assert_eq!(versioned_a.status, DeletionStatus::Set); assert_eq!(&versioned_a.value, "val_a"); } @@ -1530,7 +1601,7 @@ mod tests { const GC_PERIOD: Duration = Duration::from_secs(10); let mut node_state = NodeState::for_test(); node_state.set_with_version("key_a", "val_a", 17); - node_state.mark_for_deletion("key_a"); + node_state.delete("key_a"); tokio::time::advance(GC_PERIOD).await; node_state.gc_keys_marked_for_deletion(GC_PERIOD); assert_eq!(node_state.last_gc_version, 18); @@ -1545,14 +1616,14 @@ mod tests { key: "key_a".to_string(), value: "new_val".to_string(), version: 32, - tombstone: false, + status: DeletionStatusMutation::Set, }], }; node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 32); assert_eq!(node_state.max_version(), 32); - assert!(versioned_a.tombstone.is_none()); + assert_eq!(versioned_a.status, DeletionStatus::Set); assert_eq!(&versioned_a.value, "new_val"); } @@ -1571,7 +1642,7 @@ mod tests { key: "key_b".to_string(), value: "val_b".to_string(), version: 32, - tombstone: false, + status: DeletionStatusMutation::Set, }], }; node_state.apply_delta(node_delta, Instant::now()); @@ -1598,7 +1669,7 @@ mod tests { key: "key_b".to_string(), value: "val_b".to_string(), version: 30, - tombstone: false, + status: DeletionStatusMutation::Set, }], }; node_state.apply_delta(node_delta, Instant::now()); diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 27f8664..6f47128 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -63,8 +63,36 @@ impl ChitchatId { } } +#[derive(Clone, Copy, Debug)] +pub enum DeletionStatus { + Set, + // In both `Deleted` and `DeleteAfterWithTtl`, the `Instant` is NOT the scheduled time of + // deletion, but the reference start time. + // + // To get the actual time of deletion, one needs to add the grace period. + Deleted(Instant), + DeleteAfterTtl(Instant), +} + +#[cfg(test)] +impl PartialEq for DeletionStatus { + fn eq(&self, other: &Self) -> bool { + std::mem::discriminant(self) == std::mem::discriminant(other) + } +} + +impl DeletionStatus { + pub fn time_of_start_scheduled_for_deletion(&self) -> Option { + match self { + DeletionStatus::Set => None, + DeletionStatus::Deleted(time_of_deletion) + | DeletionStatus::DeleteAfterTtl(time_of_deletion) => Some(*time_of_deletion), + } + } +} + /// A versioned key-value pair. -#[derive(Debug, Clone, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde( into = "VersionedValueForSerialization", from = "VersionedValueForSerialization" @@ -74,7 +102,7 @@ pub struct VersionedValue { pub version: Version, // The tombstone instant is transient: // Only the presence of a tombstone or not is serialized, and used in partial eq eq. - pub(crate) tombstone: Option, + pub status: DeletionStatus, } impl VersionedValue { @@ -82,16 +110,20 @@ impl VersionedValue { VersionedValue { value, version, - tombstone: if is_tombstone { - Some(Instant::now()) + status: if is_tombstone { + DeletionStatus::Deleted(Instant::now()) } else { - None + DeletionStatus::Set }, } } - pub fn is_tombstone(&self) -> bool { - self.tombstone.is_some() + pub fn is_deleted(&self) -> bool { + match self.status { + DeletionStatus::Set => false, + DeletionStatus::Deleted(_) => true, + DeletionStatus::DeleteAfterTtl(_) => false, + } } #[cfg(test)] @@ -99,16 +131,18 @@ impl VersionedValue { Self { value: value.to_string(), version, - tombstone: None, + status: DeletionStatus::Set, } } } +/// This `PartialEq` implementation is for test only. +/// It checks for the equality of value, version and delete_status. +/// However, it does NOT check the timestamp associated with the delete_status. +#[cfg(test)] impl PartialEq for VersionedValue { fn eq(&self, other: &Self) -> bool { - self.value.eq(&other.value) - && self.version.eq(&other.version) - && self.is_tombstone().eq(&other.is_tombstone()) + self.value == other.value && self.version == other.version && self.status == other.status } } @@ -117,7 +151,79 @@ pub(crate) struct KeyValueMutation { pub(crate) key: String, pub(crate) value: String, pub(crate) version: Version, - pub(crate) tombstone: bool, + pub(crate) status: DeletionStatusMutation, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[repr(u8)] +pub enum DeletionStatusMutation { + Set = 0u8, + Delete = 1u8, + DeleteAfterTtl = 2u8, +} + +impl DeletionStatusMutation { + pub fn into_status(self, now: Instant) -> DeletionStatus { + match self { + DeletionStatusMutation::Set => DeletionStatus::Set, + DeletionStatusMutation::DeleteAfterTtl => DeletionStatus::DeleteAfterTtl(now), + DeletionStatusMutation::Delete => DeletionStatus::Deleted(now), + } + } + + pub fn scheduled_for_deletion(&self) -> bool { + match self { + DeletionStatusMutation::Set => false, + DeletionStatusMutation::Delete | DeletionStatusMutation::DeleteAfterTtl => true, + } + } +} + +impl From for DeletionStatusMutation { + fn from(deletion_status: DeletionStatus) -> Self { + match deletion_status { + DeletionStatus::Set => DeletionStatusMutation::Set, + DeletionStatus::DeleteAfterTtl(_) => DeletionStatusMutation::DeleteAfterTtl, + DeletionStatus::Deleted(_) => DeletionStatusMutation::Delete, + } + } +} + +impl TryFrom for DeletionStatusMutation { + type Error = (); + + fn try_from(state_code: u8) -> Result { + match state_code { + 0u8 => Ok(DeletionStatusMutation::Set), + 1u8 => Ok(DeletionStatusMutation::Delete), + 2u8 => Ok(DeletionStatusMutation::DeleteAfterTtl), + _ => Err(()), + } + } +} + +impl From for u8 { + fn from(state_mutation: DeletionStatusMutation) -> u8 { + state_mutation as u8 + } +} + +impl Serializable for DeletionStatusMutation { + fn serialize(&self, buf: &mut Vec) { + buf.push(u8::from(*self)); + } + + fn serialized_len(&self) -> usize { + 1 + } +} + +impl Deserializable for DeletionStatusMutation { + fn deserialize(buf: &mut &[u8]) -> anyhow::Result { + let deletion_status_code = ::deserialize(buf)?; + DeletionStatusMutation::try_from(deletion_status_code) + .map_err(|_| anyhow::anyhow!("Invalid deletion status code {deletion_status_code}")) + } } impl<'a> From<&'a KeyValueMutation> for KeyValueMutationRef<'a> { @@ -126,7 +232,7 @@ impl<'a> From<&'a KeyValueMutation> for KeyValueMutationRef<'a> { key: mutation.key.as_str(), value: mutation.value.as_str(), version: mutation.version, - tombstone: mutation.tombstone, + state: mutation.status, } } } @@ -136,7 +242,7 @@ pub(crate) struct KeyValueMutationRef<'a> { pub(crate) key: &'a str, pub(crate) value: &'a str, pub(crate) version: Version, - pub(crate) tombstone: bool, + pub(crate) state: DeletionStatusMutation, } impl<'a> Serializable for KeyValueMutationRef<'a> { @@ -144,14 +250,14 @@ impl<'a> Serializable for KeyValueMutationRef<'a> { Serializable::serialize(self.key, buf); Serializable::serialize(self.value, buf); Serializable::serialize(&self.version, buf); - Serializable::serialize(&self.tombstone, buf); + Serializable::serialize(&self.state, buf); } fn serialized_len(&self) -> usize { Serializable::serialized_len(self.key) + Serializable::serialized_len(self.value) + Serializable::serialized_len(&self.version) - + Serializable::serialized_len(&self.tombstone) + + Serializable::serialized_len(&self.state) } } @@ -160,12 +266,12 @@ impl Deserializable for KeyValueMutation { let key: String = Deserializable::deserialize(buf)?; let value: String = Deserializable::deserialize(buf)?; let version: u64 = Deserializable::deserialize(buf)?; - let tombstone: bool = Deserializable::deserialize(buf)?; + let state: DeletionStatusMutation = Deserializable::deserialize(buf)?; Ok(KeyValueMutation { key, value, version, - tombstone, + status: state, }) } } @@ -174,16 +280,17 @@ impl Deserializable for KeyValueMutation { struct VersionedValueForSerialization { pub value: String, pub version: Version, - pub is_tombstone: bool, + pub status: DeletionStatusMutation, /* TODO fixme. Deserialization could result in incorrect + * ttls. */ } impl From for VersionedValue { fn from(versioned_value: VersionedValueForSerialization) -> Self { - VersionedValue::new( - versioned_value.value, - versioned_value.version, - versioned_value.is_tombstone, - ) + VersionedValue { + value: versioned_value.value, + version: versioned_value.version, + status: versioned_value.status.into_status(Instant::now()), + } } } @@ -192,7 +299,7 @@ impl From for VersionedValueForSerialization { VersionedValueForSerialization { value: versioned_value.value, version: versioned_value.version, - is_tombstone: versioned_value.tombstone.is_some(), + status: DeletionStatusMutation::from(versioned_value.status), } } } @@ -217,3 +324,22 @@ impl From for u64 { heartbeat.0 } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deletion_status_to_u8() { + let mut count_values = 0; + for deletion_status_code in 0..=u8::MAX { + let Ok(deletion_status) = DeletionStatusMutation::try_from(deletion_status_code) else { + continue; + }; + let deletion_status_code_deser_ser: u8 = deletion_status.into(); + assert_eq!(deletion_status_code, deletion_status_code_deser_ser); + count_values += 1; + } + assert_eq!(count_values, 3); + } +} diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index 919dd00..931cc3e 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -17,7 +17,11 @@ enum Operation { chitchat_id: ChitchatId, keys_values: Vec<(String, String)>, }, - MarkKeyForDeletion { + DeleteKey { + chitchat_id: ChitchatId, + key: String, + }, + DeleteKeyAfterTtl { chitchat_id: ChitchatId, key: String, }, @@ -38,9 +42,9 @@ enum Operation { #[derive(Debug)] enum NodeStatePredicate { - EqualKeyValue(String, String), // key, value - KeyPresent(String, bool), // key, present - MarkedForDeletion(String, bool), // key, marked + EqualKeyValue(String, String), // key, value + KeyPresent(String, bool), // key, present + Deleted(String, bool), // key, marked } impl NodeStatePredicate { @@ -56,9 +60,9 @@ impl NodeStatePredicate { debug!(key=%key, present=present, "assert-key-present"); &node_state.get_versioned(key).is_some() == present } - NodeStatePredicate::MarkedForDeletion(key, marked) => { + NodeStatePredicate::Deleted(key, marked) => { debug!(key=%key, marked=marked, "assert-key-marked-for-deletion"); - node_state.get_versioned(key).unwrap().is_tombstone() == *marked + node_state.get_versioned(key).unwrap().is_deleted() == *marked } } } @@ -97,8 +101,11 @@ impl Simulator { } => { self.insert_keys_values(chitchat_id, keys_values).await; } - Operation::MarkKeyForDeletion { chitchat_id, key } => { - self.mark_for_deletion(chitchat_id, key).await; + Operation::DeleteKey { chitchat_id, key } => { + self.delete(chitchat_id, &key).await; + } + Operation::DeleteKeyAfterTtl { chitchat_id, key } => { + self.delete_after_ttl(chitchat_id, &key).await; } Operation::Wait(duration) => { tokio::time::sleep(duration).await; @@ -180,12 +187,20 @@ impl Simulator { } } - pub async fn mark_for_deletion(&mut self, chitchat_id: ChitchatId, key: String) { + pub async fn delete(&mut self, chitchat_id: ChitchatId, key: &str) { + let chitchat = self.node_handles.get(&chitchat_id).unwrap().chitchat(); + let mut chitchat_guard = chitchat.lock().await; + chitchat_guard.self_node_state().delete(key); + let hearbeat = chitchat_guard.self_node_state().heartbeat(); + debug!(node_id=%chitchat_id.node_id, key=%key, hearbeat=?hearbeat, "Deleted key."); + } + + pub async fn delete_after_ttl(&mut self, chitchat_id: ChitchatId, key: &str) { let chitchat = self.node_handles.get(&chitchat_id).unwrap().chitchat(); let mut chitchat_guard = chitchat.lock().await; - chitchat_guard.self_node_state().mark_for_deletion(&key); + chitchat_guard.self_node_state().delete_after_ttl(key); let hearbeat = chitchat_guard.self_node_state().heartbeat(); - debug!(node_id=%chitchat_id.node_id, key=%key, hearbeat=?hearbeat, "Marked key for deletion."); + debug!(node_id=%chitchat_id.node_id, key=%key, hearbeat=?hearbeat, "Scheduled key for deletion after grace period"); } pub async fn spawn_node( @@ -292,6 +307,70 @@ async fn test_simple_simulation_insert() { simulator.execute(operations).await; } +#[tokio::test] +async fn test_simple_simulation_delete_after_ttl() { + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); + let chitchat_id_1 = create_chitchat_id("node-1"); + let chitchat_id_2 = create_chitchat_id("node-2"); + let operations = vec![ + Operation::AddNode { + chitchat_id: chitchat_id_1.clone(), + peer_seeds: None, + }, + Operation::AddNode { + chitchat_id: chitchat_id_2.clone(), + peer_seeds: None, + }, + Operation::InsertKeysValues { + chitchat_id: chitchat_id_1.clone(), + keys_values: vec![("key_a".to_string(), "0".to_string())], + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::EqualKeyValue("key_a".to_string(), "0".to_string()), + timeout_opt: Some(Duration::from_millis(200)), + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_1.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::EqualKeyValue("key_a".to_string(), "0".to_string()), + timeout_opt: None, + }, + Operation::DeleteKeyAfterTtl { + chitchat_id: chitchat_id_1.clone(), + key: "key_a".to_string(), + }, + Operation::Wait(Duration::from_millis(300)), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::EqualKeyValue("key_a".to_string(), "0".to_string()), + timeout_opt: None, + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_1.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::EqualKeyValue("key_a".to_string(), "0".to_string()), + timeout_opt: None, + }, + Operation::Wait(Duration::from_secs(3)), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_1.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: None, + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: None, + }, + ]; + simulator.execute(operations).await; +} + #[tokio::test] async fn test_simple_simulation_with_network_partition() { // let _ = tracing_subscriber::fmt::try_init(); @@ -365,7 +444,7 @@ async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), Operation::Wait(Duration::from_secs(5)), // Mark for deletion key. - Operation::MarkKeyForDeletion { + Operation::DeleteKey { chitchat_id: chitchat_id_1.clone(), key: "key_a".to_string(), }, @@ -373,7 +452,7 @@ async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { Operation::NodeStateAssert { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), - predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), false), + predicate: NodeStatePredicate::Deleted("key_a".to_string(), false), timeout_opt: None, }, // Wait for garbage collection: grace period * heartbeat ~ 1 second + margin of 1 second. @@ -450,7 +529,7 @@ async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), Operation::RemoveNetworkLink(chitchat_id_2.clone(), chitchat_id_3.clone()), // Mark for deletion key. - Operation::MarkKeyForDeletion { + Operation::DeleteKey { chitchat_id: chitchat_id_1.clone(), key: "key_a".to_string(), }, @@ -458,14 +537,14 @@ async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { Operation::NodeStateAssert { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), - predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), true), + predicate: NodeStatePredicate::Deleted("key_a".to_string(), true), timeout_opt: Some(TIMEOUT), }, // Check marked for deletion is not propagated to node 3. Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), - predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), false), + predicate: NodeStatePredicate::Deleted("key_a".to_string(), false), timeout_opt: None, }, // Wait for garbage collection: grace period + margin of 1 second. @@ -591,7 +670,7 @@ async fn test_simple_simulation_heavy_insert_delete() { // Marked all keys for deletion. for (chitchat_id, keys) in keys_values_inserted_per_chitchat_id.clone().into_iter() { for key in keys { - let check_operation = Operation::MarkKeyForDeletion { + let check_operation = Operation::DeleteKey { chitchat_id: chitchat_id.clone(), key, };