Skip to content

Commit

Permalink
Removing hidden contract
Browse files Browse the repository at this point in the history
We avoid computing tombstone's Instant upon deserialization.
It was hiding a very hidden contract forcing us to deserialize mutation
in the order of their version.

With this change, we defer the computation of the instant to the
call of the apply_delta method. All of the tombstone from a delta
get the exact same `Instant`.
  • Loading branch information
fulmicoton committed Feb 27, 2024
1 parent b5ac6ed commit bdcd8c6
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 156 deletions.
116 changes: 41 additions & 75 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::collections::HashSet;

use tokio::time::Instant;

use crate::serialize::*;
use crate::types::{KeyValueMutation, KeyValueMutationRef};
use crate::{ChitchatId, Version, VersionedValue};

/// A delta is the message we send to another node to update it.
Expand Down Expand Up @@ -32,12 +31,12 @@ impl Delta {
last_gc_version: node_delta.last_gc_version,
from_version_excluded: node_delta.from_version_excluded,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
key,
versioned_value,
}
}))
.chain(
node_delta
.key_values
.iter()
.map(|key_value_mutation| DeltaOpRef::KeyValue(key_value_mutation.into())),
)
.chain({
node_delta
.max_version
Expand All @@ -53,10 +52,7 @@ enum DeltaOp {
last_gc_version: Version,
from_version_excluded: u64,
},
KeyValue {
key: String,
versioned_value: VersionedValue,
},
KeyValue(KeyValueMutation),
SetMaxVersion {
max_version: Version,
},
Expand All @@ -68,10 +64,7 @@ enum DeltaOpRef<'a> {
last_gc_version: Version,
from_version_excluded: u64,
},
KeyValue {
key: &'a str,
versioned_value: &'a VersionedValue,
},
KeyValue(KeyValueMutationRef<'a>),
SetMaxVersion {
max_version: Version,
},
Expand Down Expand Up @@ -112,28 +105,24 @@ impl Deserializable for DeltaOp {
DeltaOpTag::Node => {
let chitchat_id = ChitchatId::deserialize(buf)?;
let last_gc_version = Version::deserialize(buf)?;
let from_version = u64::deserialize(buf)?;
let from_version_excluded = u64::deserialize(buf)?;
Ok(DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version_excluded: from_version,
from_version_excluded,
})
}
DeltaOpTag::KeyValue => {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let tombstone = if deleted { Some(Instant::now()) } else { None };
let versioned_value: VersionedValue = VersionedValue {
Ok(DeltaOp::KeyValue(KeyValueMutation {
key,
value,
version,
tombstone,
};
Ok(DeltaOp::KeyValue {
key,
versioned_value,
})
tombstone: deleted,
}))
}
DeltaOpTag::SetMaxVersion => {
let max_version = Version::deserialize(buf)?;
Expand All @@ -149,19 +138,15 @@ impl DeltaOp {
DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version_excluded: from_version,
from_version_excluded,
} => DeltaOpRef::Node {
chitchat_id,
last_gc_version: *last_gc_version,
from_version_excluded: *from_version,
},
DeltaOp::KeyValue {
key,
versioned_value,
} => DeltaOpRef::KeyValue {
key,
versioned_value,
from_version_excluded: *from_version_excluded,
},
DeltaOp::KeyValue(key_value_mutation) => {
DeltaOpRef::KeyValue(key_value_mutation.into())
}
DeltaOp::SetMaxVersion { max_version } => DeltaOpRef::SetMaxVersion {
max_version: *max_version,
},
Expand Down Expand Up @@ -192,15 +177,9 @@ impl<'a> Serializable for DeltaOpRef<'a> {
last_gc_version.serialize(buf);
from_version.serialize(buf);
}
Self::KeyValue {
key,
versioned_value,
} => {
Self::KeyValue(key_value_mutation_ref) => {
buf.push(DeltaOpTag::KeyValue.into());
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.is_tombstone().serialize(buf);
key_value_mutation_ref.serialize(buf);
}
Self::SetMaxVersion { max_version } => {
buf.push(DeltaOpTag::SetMaxVersion.into());
Expand All @@ -220,15 +199,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
+ last_gc_version.serialized_len()
+ from_version.serialized_len()
}
Self::KeyValue {
key,
versioned_value,
} => {
key.serialized_len()
+ versioned_value.value.serialized_len()
+ versioned_value.version.serialized_len()
+ 1
}
Self::KeyValue(key_value_mutation_ref) => key_value_mutation_ref.serialized_len(),
Self::SetMaxVersion { max_version } => max_version.serialized_len(),
}
}
Expand Down Expand Up @@ -304,15 +275,12 @@ impl Delta {
.iter_mut()
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();
let tombstone = if deleted { Some(Instant::now()) } else { None };
node_delta.key_values.push((
key.to_string(),
VersionedValue {
value: value.to_string(),
version,
tombstone,
},
));
node_delta.key_values.push(KeyValueMutation {
key: key.to_string(),
value: value.to_string(),
version,
tombstone: deleted,
});
}

pub(crate) fn set_serialized_len(&mut self, serialized_len: usize) {
Expand Down Expand Up @@ -346,7 +314,7 @@ pub(crate) struct NodeDelta {
// and `last_gc_version` are used.
pub from_version_excluded: Version,
pub last_gc_version: Version,
pub key_values: Vec<(String, VersionedValue)>,
pub key_values: Vec<KeyValueMutation>,
pub max_version: Option<Version>,
}

Expand Down Expand Up @@ -389,24 +357,17 @@ impl DeltaBuilder {
max_version: None,
});
}
DeltaOp::KeyValue {
key,
versioned_value,
} => {
DeltaOp::KeyValue(key_value_mutation) => {
let Some(current_node_delta) = self.current_node_delta.as_mut() else {
anyhow::bail!("received a key-value op without a node op before.");
};
if let Some((_last_key, last_versioned_value)) =
current_node_delta.key_values.last()
{
if let Some(previous_key_value_mutation) = current_node_delta.key_values.last() {
anyhow::ensure!(
last_versioned_value.version < versioned_value.version,
previous_key_value_mutation.version < key_value_mutation.version,
"kv version should be increasing"
);
}
current_node_delta
.key_values
.push((key.to_string(), versioned_value));
current_node_delta.key_values.push(key_value_mutation);
}
DeltaOp::SetMaxVersion { max_version } => {
let Some(current_node_delta) = self.current_node_delta.as_mut() else {
Expand Down Expand Up @@ -473,10 +434,13 @@ impl DeltaSerializer {

/// Returns false if the KV could not be added because the payload would exceed the mtu.
pub fn try_add_kv(&mut self, key: &str, versioned_value: VersionedValue) -> bool {
let key_value_op = DeltaOp::KeyValue {
let key_value_mutation = KeyValueMutation {
key: key.to_string(),
versioned_value,
value: versioned_value.value,
version: versioned_value.version,
tombstone: versioned_value.tombstone.is_some(),
};
let key_value_op = DeltaOp::KeyValue(key_value_mutation);
self.try_add_op(key_value_op)
}

Expand All @@ -503,6 +467,8 @@ impl DeltaSerializer {

#[cfg(test)]
mod tests {
use tokio::time::Instant;

use super::*;

#[test]
Expand Down
8 changes: 5 additions & 3 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ impl Chitchat {

/// Executes the catch-up callback if necessary.
fn maybe_trigger_catchup_callback(&self, delta: &Delta) {
let has_reset = delta.node_deltas.iter()
.any(|node_delta| node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0);
let has_reset = delta.node_deltas.iter().any(|node_delta| {
node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0
});
if has_reset {
if let Some(catchup_callback) = &self.config.catchup_callback {
info!("executing catch-up callback");
Expand Down Expand Up @@ -1048,7 +1049,8 @@ mod tests {
node.process_delta(delta);

let mut delta = Delta::default();
delta.add_node_to_reset(ChitchatId::for_local_test(10_002));
let chitchat_id = ChitchatId::for_local_test(10_002);
delta.add_node(chitchat_id, 1000u64, 0u64);
node.process_delta(delta);

assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1);
Expand Down
Loading

0 comments on commit bdcd8c6

Please sign in to comment.