Skip to content

Commit

Permalink
Key deletion logic bugfix
Browse files Browse the repository at this point in the history
See README and issue #121.

Closes #121
  • Loading branch information
fulmicoton committed Feb 19, 2024
1 parent 173f391 commit d729b26
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 178 deletions.
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,25 @@ associated with a version, and replicated using the same mechanism as a KV updat
The library will then interpret this versioned tombstone before exposing kv to
the user.

To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold`
(age is measured in ticks of heartbeat), it is safe to be deleted.
To avoid keeping deleted KV indefinitely, the library includes a GC mechanism.
Every tombstone is associated with a monotonic timestamp.
It is local in the sense that it is computed locally to the given node, and never shared with other servers.

All KV with a timestamp older than a given `grace_period_threshold` will be deleted upon delete operations. (Note for a given KV, GC can happen at different
times on different nodes.)

This yields the following problem. If a node was disconnected for more than
`marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it.

To address this problem, nodes that are too outdated have to reset their state.
To address this problem, nodes keep a record of the version of the last KV they
have GCed. Here is how it works:

Let's assume a Node A sends a Syn message to a Node B. The digest expresses that A want for updates about Node N with a version stricly greater than `V`.
Node B will compare the version `V` of the digest with its `max_gc_version` for the node N.

More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N.
Node B will compare the version of the digest with its own version.
If `V > max_gc_version`, Node B knows that no GC has impacted Key values with a version above V. It can safely emit a normal delta to A.

If V is fresher than `own version - marked_for_deletion_grace_period`,
Node B knows that no GC has impacted Key values with a version above V. It can
safely emit a normal delta to A.
If however V is older than `own version - marked_for_deletion_grace_period`,
a GC could have been executed. Instead of sending a delta to Node A, Node B will
instruct A to reset its state.
If however V is older, a GC could have been executed. Instead of sending a delta to Node A, Node B will instruct A to reset its state.

Node A will then wipe-off whatever information it has about N, and will start syncing from a blank state.

Expand Down
2 changes: 1 addition & 1 deletion chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> {
dead_node_grace_period: Duration::from_secs(10),
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: 60,
marked_for_deletion_grace_period: Duration::from_secs(60),
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
Expand Down
11 changes: 6 additions & 5 deletions chitchat/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ pub struct ChitchatConfig {
pub failure_detector_config: FailureDetectorConfig,
// Marked for deletion grace period expressed as a number of hearbeats.
// Chitchat ensures a key marked for deletion is eventually deleted by three mechanisms:
// - Garbage collection: each heartbeat, marked for deletion keys with `tombstone +
// marked_for_deletion_grace_period > node.heartbeat` are deleted.
// - Garbage collection: each heartbeat, marked for deletion keys with `deletion now > instant
// + marked_for_deletion_grace_period` are deleted.
// - Compute delta: for a given node digest, if `node_digest.heartbeat +
// marked_for_deletion_grace_period < node_state.heartbeat` the node is flagged "to be reset"
// and the delta is populated with all keys and values.
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: usize,
pub marked_for_deletion_grace_period: Duration,
}

impl ChitchatConfig {
Expand All @@ -37,7 +37,7 @@ impl ChitchatConfig {
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(10_000),
}
}
}
Expand All @@ -56,7 +56,8 @@ impl Default for ChitchatConfig {
failure_detector_config: Default::default(),
// Each heartbeat increments the version, with one heartbeat each second
// 86400 ~ 24h.
marked_for_deletion_grace_period: 86400,
// TODO set that to something much lower.
marked_for_deletion_grace_period: Duration::from_secs(86_400),
}
}
}
16 changes: 10 additions & 6 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashSet;

use tokio::time::Instant;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, VersionedValue};

Expand Down Expand Up @@ -116,7 +118,8 @@ impl Deserializable for DeltaOp {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let tombstone = Option::<u64>::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let tombstone = if deleted { Some(Instant::now()) } else { None };
let versioned_value: VersionedValue = VersionedValue {
value,
version,
Expand Down Expand Up @@ -182,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.tombstone.serialize(buf);
versioned_value.tombstone.is_some().serialize(buf);
}
Self::NodeToReset(chitchat_id) => {
buf.push(DeltaOpTag::NodeToReset.into());
Expand All @@ -204,7 +207,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialized_len()
+ versioned_value.value.serialized_len()
+ versioned_value.version.serialized_len()
+ versioned_value.tombstone.serialized_len()
+ 1
}
Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(),
}
Expand Down Expand Up @@ -267,13 +270,14 @@ impl Delta {
key: &str,
value: &str,
version: crate::Version,
tombstone: Option<u64>,
deleted: bool,
) {
let node_delta = self
.node_deltas
.iter_mut()
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();
let tombstone = if deleted { Some(Instant::now()) } else { None };
node_delta.key_values.push((
key.to_string(),
VersionedValue {
Expand Down Expand Up @@ -488,7 +492,7 @@ mod tests {
VersionedValue {
value: "".to_string(),
version: 2,
tombstone: Some(0),
tombstone: Some(Instant::now()),
},
));

Expand All @@ -515,7 +519,7 @@ mod tests {
tombstone: None,
},
));
test_aux_delta_writer(delta_writer, 99);
test_aux_delta_writer(delta_writer, 98);
}

#[test]
Expand Down
11 changes: 3 additions & 8 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl Chitchat {
&digest,
delta_mtu,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
digest: self_digest,
Expand All @@ -137,7 +136,6 @@ impl Chitchat {
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
}
Expand All @@ -153,11 +151,8 @@ impl Chitchat {
}

fn gc_keys_marked_for_deletion(&mut self) {
let dead_nodes = self.dead_nodes().cloned().collect::<HashSet<_>>();
self.cluster_state.gc_keys_marked_for_deletion(
self.config.marked_for_deletion_grace_period as u64,
&dead_nodes,
);
self.cluster_state
.gc_keys_marked_for_deletion(self.config.marked_for_deletion_grace_period);
}

/// Reports heartbeats to the failure detector for nodes in the delta for which we received an
Expand Down Expand Up @@ -402,7 +397,7 @@ mod tests {
initial_interval: Duration::from_millis(100),
..Default::default()
},
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(3_600),
};
let initial_kvs: Vec<(String, String)> = Vec::new();
spawn_chitchat(config, initial_kvs, transport)
Expand Down
12 changes: 6 additions & 6 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);

let syn_ack = ChitchatMessage::SynAck { digest, delta };
// 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta).
test_serdeser_aux(&syn_ack, 116);
test_serdeser_aux(&syn_ack, 108);
}
}

Expand All @@ -188,10 +188,10 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 71);
test_serdeser_aux(&ack, 63);
}
}

Expand Down
33 changes: 0 additions & 33 deletions chitchat/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,33 +93,6 @@ impl Deserializable for u64 {
}
}

impl Serializable for Option<u64> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.is_some().serialize(buf);
if let Some(tombstone) = &self {
tombstone.serialize(buf);
}
}
fn serialized_len(&self) -> usize {
if self.is_some() {
9
} else {
1
}
}
}

impl Deserializable for Option<u64> {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let is_some: bool = Deserializable::deserialize(buf)?;
if is_some {
let u64_value = Deserializable::deserialize(buf)?;
return Ok(Some(u64_value));
}
Ok(None)
}
}

impl Serializable for bool {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.push(*self as u8);
Expand Down Expand Up @@ -525,12 +498,6 @@ mod tests {
test_serdeser_aux(&ipv6, 17);
}

#[test]
fn test_serialize_option_u64() {
test_serdeser_aux(&Some(1), 9);
test_serdeser_aux(&None, 1);
}

#[test]
fn test_serialize_block_type() {
let mut valid_vals_count = 0;
Expand Down
Loading

0 comments on commit d729b26

Please sign in to comment.