Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key deletion logic bugfix #122

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,25 @@ associated with a version, and replicated using the same mechanism as a KV updat
The library will then interpret this versioned tombstone before exposing kv to
the user.

To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold`
(age is measured in ticks of heartbeat), it is safe to be deleted.
To avoid keeping deleted KV indefinitely, the library includes a GC mechanism.
Every tombstone is associated with a monotonic timestamp.
It is local in the sense that it is computed locally to the given node, and never shared with other servers.

All KV with a timestamp older than a given `marked_for_deletion_grace_period` will be deleted upon delete operations. (Note for a given KV, GC can happen at different
times on different nodes.)

This yields the following problem. If a node was disconnected for more than
`marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it.

To address this problem, nodes that are too outdated have to reset their state.
To address this problem, nodes keep a record of the version of the last KV they
have GCed. Here is how it works:

Let's assume a Node A sends a Syn message to a Node B. The digest expresses that A want for updates about Node N with a version stricly greater than `V`.
Node B will compare the version `V` of the digest with its `max_gc_version` for the node N.

More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N.
Node B will compare the version of the digest with its own version.
If `V > max_gc_version`, Node B knows that no GC has impacted Key values with a version above V. It can safely emit a normal delta to A.

If V is fresher than `own version - marked_for_deletion_grace_period`,
Node B knows that no GC has impacted Key values with a version above V. It can
safely emit a normal delta to A.
If however V is older than `own version - marked_for_deletion_grace_period`,
a GC could have been executed. Instead of sending a delta to Node A, Node B will
instruct A to reset its state.
If however V is older, a GC could have been executed. Instead of sending a delta to Node A, Node B will instruct A to reset its state.

Node A will then wipe-off whatever information it has about N, and will start syncing from a blank state.

Expand Down
2 changes: 1 addition & 1 deletion chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> {
dead_node_grace_period: Duration::from_secs(10),
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: 60,
marked_for_deletion_grace_period: Duration::from_secs(60),
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
Expand Down
12 changes: 5 additions & 7 deletions chitchat/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ pub struct ChitchatConfig {
pub failure_detector_config: FailureDetectorConfig,
// Marked for deletion grace period expressed as a number of hearbeats.
// Chitchat ensures a key marked for deletion is eventually deleted by three mechanisms:
// - Garbage collection: each heartbeat, marked for deletion keys with `tombstone +
// marked_for_deletion_grace_period > node.heartbeat` are deleted.
// - Garbage collection: each heartbeat, marked for deletion keys with `deletion now > instant
// + marked_for_deletion_grace_period` are deleted.
// - Compute delta: for a given node digest, if `node_digest.heartbeat +
// marked_for_deletion_grace_period < node_state.heartbeat` the node is flagged "to be reset"
// and the delta is populated with all keys and values.
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: usize,
pub marked_for_deletion_grace_period: Duration,
}

impl ChitchatConfig {
Expand All @@ -37,7 +37,7 @@ impl ChitchatConfig {
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(10_000),
}
}
}
Expand All @@ -54,9 +54,7 @@ impl Default for ChitchatConfig {
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
// Each heartbeat increments the version, with one heartbeat each second
// 86400 ~ 24h.
marked_for_deletion_grace_period: 86400,
marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h
}
}
}
16 changes: 10 additions & 6 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashSet;

use tokio::time::Instant;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, VersionedValue};

Expand Down Expand Up @@ -116,7 +118,8 @@ impl Deserializable for DeltaOp {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let tombstone = Option::<u64>::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let tombstone = if deleted { Some(Instant::now()) } else { None };
let versioned_value: VersionedValue = VersionedValue {
value,
version,
Expand Down Expand Up @@ -182,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.tombstone.serialize(buf);
versioned_value.tombstone.is_some().serialize(buf);
}
Self::NodeToReset(chitchat_id) => {
buf.push(DeltaOpTag::NodeToReset.into());
Expand All @@ -204,7 +207,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialized_len()
+ versioned_value.value.serialized_len()
+ versioned_value.version.serialized_len()
+ versioned_value.tombstone.serialized_len()
+ 1
}
Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(),
}
Expand Down Expand Up @@ -267,13 +270,14 @@ impl Delta {
key: &str,
value: &str,
version: crate::Version,
tombstone: Option<u64>,
deleted: bool,
) {
let node_delta = self
.node_deltas
.iter_mut()
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();
let tombstone = if deleted { Some(Instant::now()) } else { None };
node_delta.key_values.push((
key.to_string(),
VersionedValue {
Expand Down Expand Up @@ -488,7 +492,7 @@ mod tests {
VersionedValue {
value: "".to_string(),
version: 2,
tombstone: Some(0),
tombstone: Some(Instant::now()),
},
));

Expand All @@ -515,7 +519,7 @@ mod tests {
tombstone: None,
},
));
test_aux_delta_writer(delta_writer, 99);
test_aux_delta_writer(delta_writer, 98);
}

#[test]
Expand Down
22 changes: 9 additions & 13 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl Chitchat {
&digest,
delta_mtu,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
digest: self_digest,
Expand All @@ -137,7 +136,6 @@ impl Chitchat {
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
}
Expand All @@ -153,11 +151,8 @@ impl Chitchat {
}

fn gc_keys_marked_for_deletion(&mut self) {
let dead_nodes = self.dead_nodes().cloned().collect::<HashSet<_>>();
self.cluster_state.gc_keys_marked_for_deletion(
self.config.marked_for_deletion_grace_period as u64,
&dead_nodes,
);
self.cluster_state
.gc_keys_marked_for_deletion(self.config.marked_for_deletion_grace_period);
}

/// Reports heartbeats to the failure detector for nodes in the delta for which we received an
Expand Down Expand Up @@ -190,11 +185,12 @@ impl Chitchat {

let current_live_nodes = self
.live_nodes()
.map(|chitchat_id| {
let node_state = self
.node_state(chitchat_id)
.expect("Node state should exist.");
(chitchat_id.clone(), node_state.max_version())
.flat_map(|chitchat_id| {
if let Some(node_state) = self.node_state(chitchat_id) {
return Some((chitchat_id.clone(), node_state.max_version()));
}
warn!("node state for {chitchat_id:?} is absent");
None
})
.collect::<HashMap<_, _>>();

Expand Down Expand Up @@ -402,7 +398,7 @@ mod tests {
initial_interval: Duration::from_millis(100),
..Default::default()
},
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(3_600),
};
let initial_kvs: Vec<(String, String)> = Vec::new();
spawn_chitchat(config, initial_kvs, transport)
Expand Down
12 changes: 6 additions & 6 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);

let syn_ack = ChitchatMessage::SynAck { digest, delta };
// 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta).
test_serdeser_aux(&syn_ack, 116);
test_serdeser_aux(&syn_ack, 108);
}
}

Expand All @@ -188,10 +188,10 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 71);
test_serdeser_aux(&ack, 63);
}
}

Expand Down
33 changes: 0 additions & 33 deletions chitchat/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,33 +93,6 @@ impl Deserializable for u64 {
}
}

impl Serializable for Option<u64> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.is_some().serialize(buf);
if let Some(tombstone) = &self {
tombstone.serialize(buf);
}
}
fn serialized_len(&self) -> usize {
if self.is_some() {
9
} else {
1
}
}
}

impl Deserializable for Option<u64> {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let is_some: bool = Deserializable::deserialize(buf)?;
if is_some {
let u64_value = Deserializable::deserialize(buf)?;
return Ok(Some(u64_value));
}
Ok(None)
}
}

impl Serializable for bool {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.push(*self as u8);
Expand Down Expand Up @@ -525,12 +498,6 @@ mod tests {
test_serdeser_aux(&ipv6, 17);
}

#[test]
fn test_serialize_option_u64() {
test_serdeser_aux(&Some(1), 9);
test_serdeser_aux(&None, 1);
}

#[test]
fn test_serialize_block_type() {
let mut valid_vals_count = 0;
Expand Down
Loading
Loading