Skip to content

Commit

Permalink
Bugfix: validation of delta before applying them.
Browse files Browse the repository at this point in the history
Due to the nature UDP, the existence of resets and the fact that we are
gossipping to several nodes at the same time, it is possible for our
obsolete deltas to arrive.

This PR adds some validation to detect if the delta is valid, and
whether it will bring us to a better state or not.

It also removes the nodes to reset information, which was actually
taking a large amount of the MTU on large clusters.
(For 20 nodes, around 1KB)

Reset is now just expressed by sending the delta with `from_version = 0`.

Closes #129
  • Loading branch information
fulmicoton committed Feb 26, 2024
1 parent 69eb755 commit 0a91c4e
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 102 deletions.
44 changes: 22 additions & 22 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Delta {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
last_gc_version: node_delta.last_gc_version,
from_version: node_delta.from_version,
from_version_excluded: node_delta.from_version_excluded,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
Expand All @@ -46,7 +46,7 @@ enum DeltaOp {
Node {
chitchat_id: ChitchatId,
last_gc_version: Version,
from_version: u64,
from_version_excluded: u64,
},
KeyValue {
key: String,
Expand All @@ -58,7 +58,7 @@ enum DeltaOpRef<'a> {
Node {
chitchat_id: &'a ChitchatId,
last_gc_version: Version,
from_version: u64,
from_version_excluded: u64,
},
KeyValue {
key: &'a str,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Deserializable for DeltaOp {
Ok(DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
})
}
DeltaOpTag::KeyValue => {
Expand Down Expand Up @@ -133,11 +133,11 @@ impl DeltaOp {
DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => DeltaOpRef::Node {
chitchat_id,
last_gc_version: *last_gc_version,
from_version: *from_version,
from_version_excluded: *from_version,
},
DeltaOp::KeyValue {
key,
Expand Down Expand Up @@ -166,7 +166,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
Self::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => {
buf.push(DeltaOpTag::Node.into());
chitchat_id.serialize(buf);
Expand All @@ -191,7 +191,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
Self::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => {
chitchat_id.serialized_len()
+ last_gc_version.serialized_len()
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Delta {
self.node_deltas.push(NodeDelta {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -319,7 +319,7 @@ pub(crate) struct NodeDelta {
//
// Inspect the code in `prepare_apply_delta(..)` to see the rules on how `from_version`
// and `last_gc_version` are used.
pub from_version: Version,
pub from_version_excluded: Version,
pub last_gc_version: Version,
pub key_values: Vec<(String, VersionedValue)>,
}
Expand Down Expand Up @@ -350,15 +350,15 @@ impl DeltaBuilder {
DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded,
} => {
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.current_node_delta = Some(NodeDelta {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -452,7 +452,7 @@ impl DeltaSerializer {
let new_node_op = DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
};
self.try_add_op(new_node_op)
}
Expand Down Expand Up @@ -531,7 +531,7 @@ mod tests {
#[test]
fn test_delta_serialization_simple_node() {
// 1 bytes (End tag)
let mut delta_writer = DeltaSerializer::with_mtu(128);
let mut delta_writer = DeltaSerializer::with_mtu(140);

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
Expand Down Expand Up @@ -608,9 +608,9 @@ mod tests {
let node2 = ChitchatId::for_local_test(10_002);
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag)
// = 155
assert!(delta_writer.try_add_node(node2, 0u64, false));
assert!(delta_writer.try_add_node(node2, 0u64, 0));
// The block got compressed.
test_aux_delta_writer(delta_writer, 85);
test_aux_delta_writer(delta_writer, 80);
}

#[test]
Expand All @@ -620,7 +620,7 @@ mod tests {

let node1 = ChitchatId::for_local_test(10_001);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(delta_writer.try_add_node(node1, 0, false));
assert!(delta_writer.try_add_node(node1, 0, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand All @@ -643,7 +643,7 @@ mod tests {

let node2 = ChitchatId::for_local_test(10_002);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(!delta_writer.try_add_node(node2, 0u64, false));
assert!(!delta_writer.try_add_node(node2, 0u64, 1u64));

// The block got compressed.
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -658,7 +658,7 @@ mod tests {

// + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag)
// = 40
assert!(delta_writer.try_add_node(node1, 0u64, false));
assert!(delta_writer.try_add_node(node1, 0u64, 1u64));

// +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag)
// = 67
Expand All @@ -681,7 +681,7 @@ mod tests {
tombstone: None,
}
));
test_aux_delta_writer(delta_writer, 64);
test_aux_delta_writer(delta_writer, 72);
}

#[test]
Expand All @@ -690,7 +690,7 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(62);

let node1 = ChitchatId::for_local_test(10_001);
assert!(delta_writer.try_add_node(node1, 0u64, false));
assert!(delta_writer.try_add_node(node1, 0u64, 1u64));

assert!(delta_writer.try_add_kv(
"key11",
Expand Down Expand Up @@ -728,6 +728,6 @@ mod tests {
num_valid_tags += 1;
}
}
assert_eq!(num_valid_tags, 3);
assert_eq!(num_valid_tags, 2);
}
}
15 changes: 8 additions & 7 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,16 @@ mod tests {
// 4 bytes
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), 0u64, false);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes
// (last_gc_version) + 8 bytes (from_version).
delta.add_node(node.clone(), 0u64, 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
delta.set_serialized_len(60);

let syn_ack = ChitchatMessage::SynAck { digest, delta };
// 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta).
test_serdeser_aux(&syn_ack, 108);
test_serdeser_aux(&syn_ack, 106);
}
}

Expand All @@ -186,12 +187,12 @@ mod tests {
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), 0u64, false);
delta.add_node(node.clone(), 0u64, 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
delta.set_serialized_len(60);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 63);
test_serdeser_aux(&ack, 61);
}
}

Expand Down
Loading

0 comments on commit 0a91c4e

Please sign in to comment.