Skip to content

Commit

Permalink
Bugfix: validation of delta before applying them.
Browse files Browse the repository at this point in the history
Due to the nature UDP, the existence of resets and the fact that we are
gossipping to several nodes at the same time, it is possible for our
obsolete deltas to arrive.

This PR adds some validation to detect if the delta is valid, and
whether it will bring us to a better state or not.

It also removes the nodes to reset information, which was actually
taking a large amount of the MTU on large clusters.
(For 20 nodes, around 1KB)

Reset is now just expressed by sending the delta with `from_version = 0`.

Closes #129
  • Loading branch information
fulmicoton committed Feb 27, 2024
1 parent cebb7ad commit b5ac6ed
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 164 deletions.
82 changes: 60 additions & 22 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ impl Delta {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
last_gc_version: node_delta.last_gc_version,
from_version: node_delta.from_version,
from_version_excluded: node_delta.from_version_excluded,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
key,
versioned_value,
}
}))
.chain({
node_delta
.max_version
.map(|max_version| DeltaOpRef::SetMaxVersion { max_version })
})
})
}
}
Expand All @@ -46,30 +51,37 @@ enum DeltaOp {
Node {
chitchat_id: ChitchatId,
last_gc_version: Version,
from_version: u64,
from_version_excluded: u64,
},
KeyValue {
key: String,
versioned_value: VersionedValue,
},
SetMaxVersion {
max_version: Version,
},
}

enum DeltaOpRef<'a> {
Node {
chitchat_id: &'a ChitchatId,
last_gc_version: Version,
from_version: u64,
from_version_excluded: u64,
},
KeyValue {
key: &'a str,
versioned_value: &'a VersionedValue,
},
SetMaxVersion {
max_version: Version,
},
}

#[repr(u8)]
enum DeltaOpTag {
Node = 0u8,
KeyValue = 1u8,
SetMaxVersion = 2u8,
}

impl TryFrom<u8> for DeltaOpTag {
Expand Down Expand Up @@ -104,7 +116,7 @@ impl Deserializable for DeltaOp {
Ok(DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
})
}
DeltaOpTag::KeyValue => {
Expand All @@ -123,6 +135,10 @@ impl Deserializable for DeltaOp {
versioned_value,
})
}
DeltaOpTag::SetMaxVersion => {
let max_version = Version::deserialize(buf)?;
Ok(DeltaOp::SetMaxVersion { max_version })
}
}
}
}
Expand All @@ -133,11 +149,11 @@ impl DeltaOp {
DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => DeltaOpRef::Node {
chitchat_id,
last_gc_version: *last_gc_version,
from_version: *from_version,
from_version_excluded: *from_version,
},
DeltaOp::KeyValue {
key,
Expand All @@ -146,6 +162,9 @@ impl DeltaOp {
key,
versioned_value,
},
DeltaOp::SetMaxVersion { max_version } => DeltaOpRef::SetMaxVersion {
max_version: *max_version,
},
}
}
}
Expand All @@ -166,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
Self::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => {
buf.push(DeltaOpTag::Node.into());
chitchat_id.serialize(buf);
Expand All @@ -183,6 +202,10 @@ impl<'a> Serializable for DeltaOpRef<'a> {
versioned_value.version.serialize(buf);
versioned_value.is_tombstone().serialize(buf);
}
Self::SetMaxVersion { max_version } => {
buf.push(DeltaOpTag::SetMaxVersion.into());
max_version.serialize(buf);
}
}
}

Expand All @@ -191,7 +214,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
Self::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
} => {
chitchat_id.serialized_len()
+ last_gc_version.serialized_len()
Expand All @@ -206,6 +229,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
+ versioned_value.version.serialized_len()
+ 1
}
Self::SetMaxVersion { max_version } => max_version.serialized_len(),
}
}
}
Expand Down Expand Up @@ -261,8 +285,9 @@ impl Delta {
self.node_deltas.push(NodeDelta {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
key_values: Vec::new(),
max_version: None,
});
}

Expand Down Expand Up @@ -319,9 +344,10 @@ pub(crate) struct NodeDelta {
//
// Inspect the code in `prepare_apply_delta(..)` to see the rules on how `from_version`
// and `last_gc_version` are used.
pub from_version: Version,
pub from_version_excluded: Version,
pub last_gc_version: Version,
pub key_values: Vec<(String, VersionedValue)>,
pub max_version: Option<Version>,
}

#[cfg(test)]
Expand Down Expand Up @@ -350,16 +376,17 @@ impl DeltaBuilder {
DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded,
} => {
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.current_node_delta = Some(NodeDelta {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded,
key_values: Vec::new(),
max_version: None,
});
}
DeltaOp::KeyValue {
Expand All @@ -381,6 +408,12 @@ impl DeltaBuilder {
.key_values
.push((key.to_string(), versioned_value));
}
DeltaOp::SetMaxVersion { max_version } => {
let Some(current_node_delta) = self.current_node_delta.as_mut() else {
anyhow::bail!("received a key-value op without a node op before.");
};
current_node_delta.max_version = Some(max_version);
}
}
Ok(())
}
Expand Down Expand Up @@ -420,6 +453,11 @@ impl DeltaSerializer {
}
}

pub fn try_set_max_version(&mut self, max_version: Version) -> bool {
let key_value_op = DeltaOp::SetMaxVersion { max_version };
self.try_add_op(key_value_op)
}

fn try_add_op(&mut self, delta_op: DeltaOp) -> bool {
if self
.compressed_stream_writer
Expand Down Expand Up @@ -452,7 +490,7 @@ impl DeltaSerializer {
let new_node_op = DeltaOp::Node {
chitchat_id,
last_gc_version,
from_version,
from_version_excluded: from_version,
};
self.try_add_op(new_node_op)
}
Expand Down Expand Up @@ -531,7 +569,7 @@ mod tests {
#[test]
fn test_delta_serialization_simple_node() {
// 1 bytes (End tag)
let mut delta_writer = DeltaSerializer::with_mtu(128);
let mut delta_writer = DeltaSerializer::with_mtu(140);

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
Expand Down Expand Up @@ -608,9 +646,9 @@ mod tests {
let node2 = ChitchatId::for_local_test(10_002);
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag)
// = 155
assert!(delta_writer.try_add_node(node2, 0u64, false));
assert!(delta_writer.try_add_node(node2, 0u64, 0));
// The block got compressed.
test_aux_delta_writer(delta_writer, 85);
test_aux_delta_writer(delta_writer, 80);
}

#[test]
Expand All @@ -620,7 +658,7 @@ mod tests {

let node1 = ChitchatId::for_local_test(10_001);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(delta_writer.try_add_node(node1, 0, false));
assert!(delta_writer.try_add_node(node1, 0, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand All @@ -643,7 +681,7 @@ mod tests {

let node2 = ChitchatId::for_local_test(10_002);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(!delta_writer.try_add_node(node2, 0u64, false));
assert!(!delta_writer.try_add_node(node2, 0u64, 1u64));

// The block got compressed.
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -658,7 +696,7 @@ mod tests {

// + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag)
// = 40
assert!(delta_writer.try_add_node(node1, 0u64, false));
assert!(delta_writer.try_add_node(node1, 0u64, 1u64));

// +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag)
// = 67
Expand All @@ -681,7 +719,7 @@ mod tests {
tombstone: None,
}
));
test_aux_delta_writer(delta_writer, 64);
test_aux_delta_writer(delta_writer, 72);
}

#[test]
Expand All @@ -690,7 +728,7 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(62);

let node1 = ChitchatId::for_local_test(10_001);
assert!(delta_writer.try_add_node(node1, 0u64, false));
assert!(delta_writer.try_add_node(node1, 0u64, 1u64));

assert!(delta_writer.try_add_kv(
"key11",
Expand Down Expand Up @@ -728,6 +766,6 @@ mod tests {
num_valid_tags += 1;
}
}
assert_eq!(num_valid_tags, 3);
assert_eq!(num_valid_tags, 2);
}
}
2 changes: 1 addition & 1 deletion chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Chitchat {
/// Executes the catch-up callback if necessary.
fn maybe_trigger_catchup_callback(&self, delta: &Delta) {
let has_reset = delta.node_deltas.iter()
.any(|node_delta| node_delta.from_version == 0 && node_delta.last_gc_version > 0);
.any(|node_delta| node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0);
if has_reset {
if let Some(catchup_callback) = &self.config.catchup_callback {
info!("executing catch-up callback");
Expand Down
15 changes: 8 additions & 7 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,16 @@ mod tests {
// 4 bytes
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), 0u64, false);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes
// (last_gc_version) + 8 bytes (from_version).
delta.add_node(node.clone(), 0u64, 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
delta.set_serialized_len(60);

let syn_ack = ChitchatMessage::SynAck { digest, delta };
// 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta).
test_serdeser_aux(&syn_ack, 108);
test_serdeser_aux(&syn_ack, 106);
}
}

Expand All @@ -186,12 +187,12 @@ mod tests {
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), 0u64, false);
delta.add_node(node.clone(), 0u64, 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
delta.set_serialized_len(60);
let ack = ChitchatMessage::Ack { delta };
test_serdeser_aux(&ack, 63);
test_serdeser_aux(&ack, 61);
}
}

Expand Down
6 changes: 6 additions & 0 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ mod tests {
let server_handle = spawn_chitchat(server_config, Vec::new(), &transport)
.await
.unwrap();
server_handle
.chitchat()
.lock()
.await
.self_node_state()
.set("key", "value");

// Add our test socket to the server's nodes.
server_handle
Expand Down
Loading

0 comments on commit b5ac6ed

Please sign in to comment.