Skip to content

Commit

Permalink
Merge pull request #108 from quickwit-oss/trinity--forget-previous-ge…
Browse files Browse the repository at this point in the history
…neration2

forget previous generation
  • Loading branch information
trinity-1686a authored Jan 25, 2024
2 parents c3ac187 + b54089c commit 5d7ff34
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 328 deletions.
13 changes: 6 additions & 7 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ impl Api {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard
.dead_nodes()
.cloned()
.map(|node| node.0)
.collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
};
Json(serde_json::to_value(&response).unwrap())
}
Expand Down Expand Up @@ -99,8 +95,11 @@ async fn main() -> anyhow::Result<()> {
gossip_interval: Duration::from_millis(opt.interval),
listen_addr: opt.listen_addr,
seed_nodes: opt.seeds.clone(),
failure_detector_config: FailureDetectorConfig::default(),
marked_for_deletion_grace_period: 10_000,
failure_detector_config: FailureDetectorConfig {
dead_node_grace_period: Duration::from_secs(10),
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: 60,
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
Expand Down
49 changes: 18 additions & 31 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,40 @@ use std::collections::{BTreeMap, HashSet};
use std::mem;

use crate::serialize::*;
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion, VersionedValue};
use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};

#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
pub(crate) node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatIdGenerationEq>,
pub(crate) node_deltas: BTreeMap<ChitchatId, NodeDelta>,
pub(crate) nodes_to_reset: HashSet<ChitchatId>,
}

impl Serializable for Delta {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_deltas.len() as u16).serialize(buf);
for (chitchat_id, node_delta) in &self.node_deltas {
chitchat_id.0.serialize(buf);
chitchat_id.serialize(buf);
node_delta.serialize(buf);
}
(self.nodes_to_reset.len() as u16).serialize(buf);
for chitchat_id in &self.nodes_to_reset {
chitchat_id.0.serialize(buf);
chitchat_id.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let mut node_deltas: BTreeMap<ChitchatIdGenerationEq, NodeDelta> = Default::default();
let mut node_deltas: BTreeMap<ChitchatId, NodeDelta> = Default::default();
let num_nodes = u16::deserialize(buf)?;
for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let node_delta = NodeDelta::deserialize(buf)?;
node_deltas.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
node_deltas.insert(chitchat_id, node_delta);
}
let num_nodes_to_reset = u16::deserialize(buf)?;
let mut nodes_to_reset = HashSet::with_capacity(num_nodes_to_reset as usize);
for _ in 0..num_nodes_to_reset {
let chitchat_id = ChitchatId::deserialize(buf)?;
nodes_to_reset.insert(ChitchatIdGenerationEq(chitchat_id));
nodes_to_reset.insert(chitchat_id);
}
Ok(Delta {
node_deltas,
Expand All @@ -46,12 +46,12 @@ impl Serializable for Delta {
fn serialized_len(&self) -> usize {
let mut len = 2;
for (chitchat_id, node_delta) in &self.node_deltas {
len += chitchat_id.0.serialized_len();
len += chitchat_id.serialized_len();
len += node_delta.serialized_len();
}
len += 2;
for chitchat_id in &self.nodes_to_reset {
len += chitchat_id.0.serialized_len();
len += chitchat_id.serialized_len();
}
len
}
Expand All @@ -68,7 +68,7 @@ impl Delta {

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
self.node_deltas
.entry(ChitchatIdGenerationEq(chitchat_id))
.entry(chitchat_id)
.or_insert_with(|| NodeDelta {
heartbeat,
..Default::default()
Expand All @@ -83,10 +83,7 @@ impl Delta {
version: crate::Version,
tombstone: Option<u64>,
) {
let node_delta = self
.node_deltas
.get_mut(&ChitchatIdGenerationEq(chitchat_id.clone()))
.unwrap();
let node_delta = self.node_deltas.get_mut(chitchat_id).unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.insert(
Expand All @@ -100,8 +97,7 @@ impl Delta {
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) {
self.nodes_to_reset
.insert(ChitchatIdGenerationEq(chitchat_id));
self.nodes_to_reset.insert(chitchat_id);
}
}

Expand Down Expand Up @@ -145,38 +141,29 @@ impl DeltaWriter {
let chitchat_id_opt = mem::take(&mut self.current_chitchat_id);
let node_delta = mem::take(&mut self.current_node_delta);
if let Some(chitchat_id) = chitchat_id_opt {
self.delta
.node_deltas
.insert(ChitchatIdGenerationEq(chitchat_id), node_delta);
self.delta.node_deltas.insert(chitchat_id, node_delta);
}
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) -> bool {
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(!self.delta.nodes_to_reset.contains(&chitchat_id));
if !self.attempt_add_bytes(chitchat_id.0.serialized_len()) {
if !self.attempt_add_bytes(chitchat_id.serialized_len()) {
return false;
}
self.delta.nodes_to_reset.insert(chitchat_id);
true
}

pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool {
assert!(self
.current_chitchat_id
.as_ref()
.map(|current_node| !current_node.eq_generation(&chitchat_id))
.unwrap_or(true));
let chitchat_id = ChitchatIdGenerationEq(chitchat_id);
assert!(self.current_chitchat_id.as_ref() != Some(&chitchat_id));
assert!(!self.delta.node_deltas.contains_key(&chitchat_id));
self.flush();
// Reserve bytes for [`ChitchatId`], [`Hearbeat`], and for an empty [`NodeDelta`] which has
// a size of 2 bytes.
if !self.attempt_add_bytes(chitchat_id.0.serialized_len() + heartbeat.serialized_len() + 2)
{
if !self.attempt_add_bytes(chitchat_id.serialized_len() + heartbeat.serialized_len() + 2) {
return false;
}
self.current_chitchat_id = Some(chitchat_id.0);
self.current_chitchat_id = Some(chitchat_id);
self.current_node_delta.heartbeat = heartbeat;
true
}
Expand Down
15 changes: 7 additions & 8 deletions chitchat/src/digest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use crate::serialize::*;
use crate::{ChitchatId, ChitchatIdGenerationEq, Heartbeat, MaxVersion};
use crate::{ChitchatId, Heartbeat, MaxVersion};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct NodeDigest {
Expand All @@ -25,46 +25,45 @@ impl NodeDigest {
/// peer -> (heartbeat, max version).
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Digest {
pub(crate) node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest>,
pub(crate) node_digests: BTreeMap<ChitchatId, NodeDigest>,
}

#[cfg(test)]
impl Digest {
pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: MaxVersion) {
let node_digest = NodeDigest::new(heartbeat, max_version);
self.node_digests
.insert(ChitchatIdGenerationEq(node), node_digest);
self.node_digests.insert(node, node_digest);
}
}

impl Serializable for Digest {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_digests.len() as u16).serialize(buf);
for (chitchat_id, node_digest) in &self.node_digests {
chitchat_id.0.serialize(buf);
chitchat_id.serialize(buf);
node_digest.heartbeat.serialize(buf);
node_digest.max_version.serialize(buf);
}
}

fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let num_nodes = u16::deserialize(buf)?;
let mut node_digests: BTreeMap<ChitchatIdGenerationEq, NodeDigest> = Default::default();
let mut node_digests: BTreeMap<ChitchatId, NodeDigest> = Default::default();

for _ in 0..num_nodes {
let chitchat_id = ChitchatId::deserialize(buf)?;
let heartbeat = Heartbeat::deserialize(buf)?;
let max_version = u64::deserialize(buf)?;
let node_digest = NodeDigest::new(heartbeat, max_version);
node_digests.insert(ChitchatIdGenerationEq(chitchat_id), node_digest);
node_digests.insert(chitchat_id, node_digest);
}
Ok(Digest { node_digests })
}

fn serialized_len(&self) -> usize {
let mut len = (self.node_digests.len() as u16).serialized_len();
for (chitchat_id, node_digest) in &self.node_digests {
len += chitchat_id.0.serialized_len();
len += chitchat_id.serialized_len();
len += node_digest.heartbeat.serialized_len();
len += node_digest.max_version.serialized_len();
}
Expand Down
Loading

0 comments on commit 5d7ff34

Please sign in to comment.