From bdcd8c62b08df6d5c419fa4e38dd30e7abeb6e87 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 27 Feb 2024 11:39:40 +0900 Subject: [PATCH] Removing hidden contract We avoid computing tombstone's Instant upon deserialization. It was hiding a very hidden contract forcing us to deserialize mutation in the order of their version. With this change, we defer the computation of the instant to the call of the apply_delta method. All of the tombstone from a delta get the exact same `Instant`. --- chitchat/src/delta.rs | 116 +++++++++++-------------------- chitchat/src/lib.rs | 8 ++- chitchat/src/state.rs | 154 +++++++++++++++++++++--------------------- chitchat/src/types.rs | 61 +++++++++++++++++ 4 files changed, 183 insertions(+), 156 deletions(-) diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 8ee4974..f8e8021 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,8 +1,7 @@ use std::collections::HashSet; -use tokio::time::Instant; - use crate::serialize::*; +use crate::types::{KeyValueMutation, KeyValueMutationRef}; use crate::{ChitchatId, Version, VersionedValue}; /// A delta is the message we send to another node to update it. @@ -32,12 +31,12 @@ impl Delta { last_gc_version: node_delta.last_gc_version, from_version_excluded: node_delta.from_version_excluded, }) - .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { - DeltaOpRef::KeyValue { - key, - versioned_value, - } - })) + .chain( + node_delta + .key_values + .iter() + .map(|key_value_mutation| DeltaOpRef::KeyValue(key_value_mutation.into())), + ) .chain({ node_delta .max_version @@ -53,10 +52,7 @@ enum DeltaOp { last_gc_version: Version, from_version_excluded: u64, }, - KeyValue { - key: String, - versioned_value: VersionedValue, - }, + KeyValue(KeyValueMutation), SetMaxVersion { max_version: Version, }, @@ -68,10 +64,7 @@ enum DeltaOpRef<'a> { last_gc_version: Version, from_version_excluded: u64, }, - KeyValue { - key: &'a str, - versioned_value: &'a VersionedValue, - }, + KeyValue(KeyValueMutationRef<'a>), SetMaxVersion { max_version: Version, }, @@ -112,11 +105,11 @@ impl Deserializable for DeltaOp { DeltaOpTag::Node => { let chitchat_id = ChitchatId::deserialize(buf)?; let last_gc_version = Version::deserialize(buf)?; - let from_version = u64::deserialize(buf)?; + let from_version_excluded = u64::deserialize(buf)?; Ok(DeltaOp::Node { chitchat_id, last_gc_version, - from_version_excluded: from_version, + from_version_excluded, }) } DeltaOpTag::KeyValue => { @@ -124,16 +117,12 @@ impl Deserializable for DeltaOp { let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; let deleted = bool::deserialize(buf)?; - let tombstone = if deleted { Some(Instant::now()) } else { None }; - let versioned_value: VersionedValue = VersionedValue { + Ok(DeltaOp::KeyValue(KeyValueMutation { + key, value, version, - tombstone, - }; - Ok(DeltaOp::KeyValue { - key, - versioned_value, - }) + tombstone: deleted, + })) } DeltaOpTag::SetMaxVersion => { let max_version = Version::deserialize(buf)?; @@ -149,19 +138,15 @@ impl DeltaOp { DeltaOp::Node { chitchat_id, last_gc_version, - from_version_excluded: from_version, + from_version_excluded, } => DeltaOpRef::Node { chitchat_id, last_gc_version: *last_gc_version, - from_version_excluded: *from_version, - }, - DeltaOp::KeyValue { - key, - versioned_value, - } => DeltaOpRef::KeyValue { - key, - versioned_value, + from_version_excluded: *from_version_excluded, }, + DeltaOp::KeyValue(key_value_mutation) => { + DeltaOpRef::KeyValue(key_value_mutation.into()) + } DeltaOp::SetMaxVersion { max_version } => DeltaOpRef::SetMaxVersion { max_version: *max_version, }, @@ -192,15 +177,9 @@ impl<'a> Serializable for DeltaOpRef<'a> { last_gc_version.serialize(buf); from_version.serialize(buf); } - Self::KeyValue { - key, - versioned_value, - } => { + Self::KeyValue(key_value_mutation_ref) => { buf.push(DeltaOpTag::KeyValue.into()); - key.serialize(buf); - versioned_value.value.serialize(buf); - versioned_value.version.serialize(buf); - versioned_value.is_tombstone().serialize(buf); + key_value_mutation_ref.serialize(buf); } Self::SetMaxVersion { max_version } => { buf.push(DeltaOpTag::SetMaxVersion.into()); @@ -220,15 +199,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { + last_gc_version.serialized_len() + from_version.serialized_len() } - Self::KeyValue { - key, - versioned_value, - } => { - key.serialized_len() - + versioned_value.value.serialized_len() - + versioned_value.version.serialized_len() - + 1 - } + Self::KeyValue(key_value_mutation_ref) => key_value_mutation_ref.serialized_len(), Self::SetMaxVersion { max_version } => max_version.serialized_len(), } } @@ -304,15 +275,12 @@ impl Delta { .iter_mut() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); - let tombstone = if deleted { Some(Instant::now()) } else { None }; - node_delta.key_values.push(( - key.to_string(), - VersionedValue { - value: value.to_string(), - version, - tombstone, - }, - )); + node_delta.key_values.push(KeyValueMutation { + key: key.to_string(), + value: value.to_string(), + version, + tombstone: deleted, + }); } pub(crate) fn set_serialized_len(&mut self, serialized_len: usize) { @@ -346,7 +314,7 @@ pub(crate) struct NodeDelta { // and `last_gc_version` are used. pub from_version_excluded: Version, pub last_gc_version: Version, - pub key_values: Vec<(String, VersionedValue)>, + pub key_values: Vec, pub max_version: Option, } @@ -389,24 +357,17 @@ impl DeltaBuilder { max_version: None, }); } - DeltaOp::KeyValue { - key, - versioned_value, - } => { + DeltaOp::KeyValue(key_value_mutation) => { let Some(current_node_delta) = self.current_node_delta.as_mut() else { anyhow::bail!("received a key-value op without a node op before."); }; - if let Some((_last_key, last_versioned_value)) = - current_node_delta.key_values.last() - { + if let Some(previous_key_value_mutation) = current_node_delta.key_values.last() { anyhow::ensure!( - last_versioned_value.version < versioned_value.version, + previous_key_value_mutation.version < key_value_mutation.version, "kv version should be increasing" ); } - current_node_delta - .key_values - .push((key.to_string(), versioned_value)); + current_node_delta.key_values.push(key_value_mutation); } DeltaOp::SetMaxVersion { max_version } => { let Some(current_node_delta) = self.current_node_delta.as_mut() else { @@ -473,10 +434,13 @@ impl DeltaSerializer { /// Returns false if the KV could not be added because the payload would exceed the mtu. pub fn try_add_kv(&mut self, key: &str, versioned_value: VersionedValue) -> bool { - let key_value_op = DeltaOp::KeyValue { + let key_value_mutation = KeyValueMutation { key: key.to_string(), - versioned_value, + value: versioned_value.value, + version: versioned_value.version, + tombstone: versioned_value.tombstone.is_some(), }; + let key_value_op = DeltaOp::KeyValue(key_value_mutation); self.try_add_op(key_value_op) } @@ -503,6 +467,8 @@ impl DeltaSerializer { #[cfg(test)] mod tests { + use tokio::time::Instant; + use super::*; #[test] diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 2e42bee..7f93a5c 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -109,8 +109,9 @@ impl Chitchat { /// Executes the catch-up callback if necessary. fn maybe_trigger_catchup_callback(&self, delta: &Delta) { - let has_reset = delta.node_deltas.iter() - .any(|node_delta| node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0); + let has_reset = delta.node_deltas.iter().any(|node_delta| { + node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0 + }); if has_reset { if let Some(catchup_callback) = &self.config.catchup_callback { info!("executing catch-up callback"); @@ -1048,7 +1049,8 @@ mod tests { node.process_delta(delta); let mut delta = Delta::default(); - delta.add_node_to_reset(ChitchatId::for_local_test(10_002)); + let chitchat_id = ChitchatId::for_local_test(10_002); + delta.add_node(chitchat_id, 1000u64, 0u64); node.process_delta(delta); assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index a9458fb..81c177a 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -170,7 +170,7 @@ impl NodeState { let Some(delta_max_version) = node_delta .key_values .iter() - .map(|(_, v)| v.version) + .map(|key_value_mutation| key_value_mutation.version) .max() .or(node_delta.max_version) else { @@ -206,23 +206,33 @@ impl NodeState { true } - fn apply_delta(&mut self, node_delta: NodeDelta) { + fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) { info!(delta=?node_delta); if !self.prepare_apply_delta(&node_delta) { return; } let current_max_version = self.max_version(); - for (key, versioned_value) in node_delta.key_values { - if versioned_value.tombstone.is_some() { + for key_value_mutation in node_delta.key_values { + if key_value_mutation.version <= current_max_version { + // We already know about this KV. + continue; + } + if key_value_mutation.tombstone { // We don't want to keep any tombstone before `last_gc_version`. - if versioned_value.version <= self.last_gc_version { + if key_value_mutation.version <= self.last_gc_version { continue; } } - if versioned_value.version <= current_max_version { - continue; - } - self.set_versioned_value(key, versioned_value); + let versioned_value = VersionedValue { + value: key_value_mutation.value, + version: key_value_mutation.version, + tombstone: if key_value_mutation.tombstone { + Some(now) + } else { + None + }, + }; + self.set_versioned_value(key_value_mutation.key, versioned_value); } } @@ -466,10 +476,11 @@ impl ClusterState { } pub(crate) fn apply_delta(&mut self, delta: Delta) { + let now = Instant::now(); // Apply delta. for node_delta in delta.node_deltas { let node_state = self.node_state_mut(&node_delta.chitchat_id); - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, now); } } @@ -731,6 +742,7 @@ fn random_generator() -> impl Rng { mod tests { use super::*; use crate::serialize::Serializable; + use crate::types::KeyValueMutation; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -1392,30 +1404,26 @@ mod tests { node_state.set_with_version("key_a", "val_a", 1); node_state.set_with_version("key_b", "val_a", 2); let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 2, last_gc_version: 0u64, max_version: None, key_values: vec![ - ( - "key_c".to_string(), - VersionedValue { - value: "val_c".to_string(), - version: 4, - tombstone: None, - }, - ), - ( - "key_b".to_string(), - VersionedValue { - value: "val_b2".to_string(), - version: 3, - tombstone: None, - }, - ), + KeyValueMutation { + key: "key_c".to_string(), + value: "val_c".to_string(), + version: 4, + tombstone: false, + }, + KeyValueMutation { + key: "key_b".to_string(), + value: "val_b2".to_string(), + version: 3, + tombstone: false, + }, ], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); assert_eq!(node_state.num_key_values(), 3); assert_eq!(node_state.max_version(), 4); assert_eq!(node_state.last_gc_version, 0); @@ -1433,20 +1441,18 @@ mod tests { let mut node_state = NodeState::for_test(); node_state.set_with_version("key_a", "val_a", 1); let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 1, last_gc_version: 0, max_version: None, - key_values: vec![( - "key_a".to_string(), - VersionedValue { - value: "val_a".to_string(), - version: 3, - tombstone: None, - }, - )], + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "val_a".to_string(), + version: 3, + tombstone: false, + }], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 3); assert!(versioned_a.tombstone.is_none()); @@ -1459,20 +1465,18 @@ mod tests { node_state.set_with_version("key_a", "val_a", 5); assert_eq!(node_state.max_version(), 5); let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 6, // we skipped version 6 here. last_gc_version: 0, max_version: None, - key_values: vec![( - "key_a".to_string(), - VersionedValue { - value: "new_val".to_string(), - version: 7, - tombstone: None, - }, - )], + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 7, + tombstone: false, + }], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 5); assert!(versioned_a.tombstone.is_none()); @@ -1492,20 +1496,18 @@ mod tests { assert_eq!(node_state.max_version(), 18); node_state.set_with_version("key_a", "val_a", 31); let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 31, // we skipped version 6 here. last_gc_version: 30, max_version: None, - key_values: vec![( - "key_a".to_string(), - VersionedValue { - value: "new_val".to_string(), - version: 32, - tombstone: None, - }, - )], + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 32, + tombstone: false, + }], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 32); assert_eq!(node_state.max_version(), 32); @@ -1520,20 +1522,18 @@ mod tests { node_state.set_with_version("key_a", "val_a", 17); assert_eq!(node_state.max_version(), 17); let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 0, // we skipped version 6 here. last_gc_version: 30, max_version: None, - key_values: vec![( - "key_b".to_string(), - VersionedValue { - value: "val_b".to_string(), - version: 32, - tombstone: None, - }, - )], + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 32, + tombstone: false, + }], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); assert!(node_state.get_versioned("key_a").is_none()); let versioned_b = node_state.get_versioned("key_b").unwrap(); assert_eq!(versioned_b.version, 32); @@ -1549,20 +1549,18 @@ mod tests { // This does look like a valid reset, but we are already at version 32. // Let's ignore this. let node_delta = NodeDelta { - chitchat_id: node_state.node_id.clone(), + chitchat_id: node_state.chitchat_id.clone(), from_version_excluded: 0, // we skipped version 6 here. last_gc_version: 17, max_version: None, - key_values: vec![( - "key_b".to_string(), - VersionedValue { - value: "val_b".to_string(), - version: 30, - tombstone: None, - }, - )], + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 30, + tombstone: false, + }], }; - node_state.apply_delta(node_delta); + node_state.apply_delta(node_delta, Instant::now()); assert_eq!(node_state.max_version, 32); let versioned_b = node_state.get_versioned("key_b").unwrap(); assert_eq!(versioned_b.version, 32); diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 483a61f..27f8664 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -4,6 +4,9 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::serialize::Deserializable; +use crate::Serializable; + /// For the lifetime of a cluster, nodes can go down and come back up multiple times. They may also /// die permanently. A [`ChitchatId`] is composed of three components: /// - `node_id`: an identifier unique across the cluster. @@ -109,6 +112,64 @@ impl PartialEq for VersionedValue { } } +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub(crate) struct KeyValueMutation { + pub(crate) key: String, + pub(crate) value: String, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> From<&'a KeyValueMutation> for KeyValueMutationRef<'a> { + fn from(mutation: &'a KeyValueMutation) -> KeyValueMutationRef<'a> { + KeyValueMutationRef { + key: mutation.key.as_str(), + value: mutation.value.as_str(), + version: mutation.version, + tombstone: mutation.tombstone, + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize)] +pub(crate) struct KeyValueMutationRef<'a> { + pub(crate) key: &'a str, + pub(crate) value: &'a str, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> Serializable for KeyValueMutationRef<'a> { + fn serialize(&self, buf: &mut Vec) { + Serializable::serialize(self.key, buf); + Serializable::serialize(self.value, buf); + Serializable::serialize(&self.version, buf); + Serializable::serialize(&self.tombstone, buf); + } + + fn serialized_len(&self) -> usize { + Serializable::serialized_len(self.key) + + Serializable::serialized_len(self.value) + + Serializable::serialized_len(&self.version) + + Serializable::serialized_len(&self.tombstone) + } +} + +impl Deserializable for KeyValueMutation { + fn deserialize(buf: &mut &[u8]) -> anyhow::Result { + let key: String = Deserializable::deserialize(buf)?; + let value: String = Deserializable::deserialize(buf)?; + let version: u64 = Deserializable::deserialize(buf)?; + let tombstone: bool = Deserializable::deserialize(buf)?; + Ok(KeyValueMutation { + key, + value, + version, + tombstone, + }) + } +} + #[derive(Serialize, Deserialize)] struct VersionedValueForSerialization { pub value: String,