Skip to content

Commit

Permalink
Key deletion logic bugfix
Browse files Browse the repository at this point in the history
Closes #121
  • Loading branch information
fulmicoton committed Feb 19, 2024
1 parent 173f391 commit 716cf1d
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 111 deletions.
2 changes: 1 addition & 1 deletion chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> {
dead_node_grace_period: Duration::from_secs(10),
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: 60,
marked_for_deletion_grace_period: Duration::from_secs(60),
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
Expand Down
11 changes: 6 additions & 5 deletions chitchat/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ pub struct ChitchatConfig {
pub failure_detector_config: FailureDetectorConfig,
// Marked for deletion grace period expressed as a number of hearbeats.
// Chitchat ensures a key marked for deletion is eventually deleted by three mechanisms:
// - Garbage collection: each heartbeat, marked for deletion keys with `tombstone +
// marked_for_deletion_grace_period > node.heartbeat` are deleted.
// - Garbage collection: each heartbeat, marked for deletion keys with `deletion now > instant +
// marked_for_deletion_grace_period` are deleted.
// - Compute delta: for a given node digest, if `node_digest.heartbeat +
// marked_for_deletion_grace_period < node_state.heartbeat` the node is flagged "to be reset"
// and the delta is populated with all keys and values.
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: usize,
pub marked_for_deletion_grace_period: Duration,
}

impl ChitchatConfig {
Expand All @@ -37,7 +37,7 @@ impl ChitchatConfig {
listen_addr,
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(10_000),
}
}
}
Expand All @@ -56,7 +56,8 @@ impl Default for ChitchatConfig {
failure_detector_config: Default::default(),
// Each heartbeat increments the version, with one heartbeat each second
// 86400 ~ 24h.
marked_for_deletion_grace_period: 86400,
// TODO set that to something much lower.
marked_for_deletion_grace_period: Duration::from_secs(86_400),
}
}
}
24 changes: 18 additions & 6 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashSet;

use tokio::time::Instant;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, VersionedValue};

Expand Down Expand Up @@ -116,7 +118,12 @@ impl Deserializable for DeltaOp {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let tombstone = Option::<u64>::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let tombstone = if deleted {
Some(Instant::now())
} else {
None
};
let versioned_value: VersionedValue = VersionedValue {
value,
version,
Expand Down Expand Up @@ -182,7 +189,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.tombstone.serialize(buf);
versioned_value.tombstone.is_some().serialize(buf);
}
Self::NodeToReset(chitchat_id) => {
buf.push(DeltaOpTag::NodeToReset.into());
Expand All @@ -204,7 +211,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialized_len()
+ versioned_value.value.serialized_len()
+ versioned_value.version.serialized_len()
+ versioned_value.tombstone.serialized_len()
+ 1
}
Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(),
}
Expand Down Expand Up @@ -267,13 +274,18 @@ impl Delta {
key: &str,
value: &str,
version: crate::Version,
tombstone: Option<u64>,
deleted: bool,
) {
let node_delta = self
.node_deltas
.iter_mut()
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();
let tombstone = if deleted {
Some(Instant::now())
} else {
None
};
node_delta.key_values.push((
key.to_string(),
VersionedValue {
Expand Down Expand Up @@ -488,7 +500,7 @@ mod tests {
VersionedValue {
value: "".to_string(),
version: 2,
tombstone: Some(0),
tombstone: Some(Instant::now()),
},
));

Expand All @@ -515,7 +527,7 @@ mod tests {
tombstone: None,
},
));
test_aux_delta_writer(delta_writer, 99);
test_aux_delta_writer(delta_writer, 98);
}

#[test]
Expand Down
6 changes: 2 additions & 4 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl Chitchat {
&digest,
delta_mtu,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
digest: self_digest,
Expand All @@ -137,7 +136,6 @@ impl Chitchat {
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
}
Expand All @@ -155,7 +153,7 @@ impl Chitchat {
fn gc_keys_marked_for_deletion(&mut self) {
let dead_nodes = self.dead_nodes().cloned().collect::<HashSet<_>>();
self.cluster_state.gc_keys_marked_for_deletion(
self.config.marked_for_deletion_grace_period as u64,
self.config.marked_for_deletion_grace_period,
&dead_nodes,
);
}
Expand Down Expand Up @@ -402,7 +400,7 @@ mod tests {
initial_interval: Duration::from_millis(100),
..Default::default()
},
marked_for_deletion_grace_period: 10_000,
marked_for_deletion_grace_period: Duration::from_secs(3_600),
};
let initial_kvs: Vec<(String, String)> = Vec::new();
spawn_chitchat(config, initial_kvs, transport)
Expand Down
12 changes: 6 additions & 6 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);

let syn_ack = ChitchatMessage::SynAck { digest, delta };
// 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta).
test_serdeser_aux(&syn_ack, 116);
test_serdeser_aux(&syn_ack, 108);
}
}

Expand All @@ -188,10 +188,10 @@ mod tests {
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, Some(5));
delta.set_serialized_len(70);
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 71);
test_serdeser_aux(&ack, 63);
}
}

Expand Down
Loading

0 comments on commit 716cf1d

Please sign in to comment.