Skip to content

Commit

Permalink
Added a way to set a TTL to key values.
Browse files Browse the repository at this point in the history
After the GC grace period, the key value will be subject to the GC
just like regularly deleted KVs.
  • Loading branch information
fulmicoton committed Mar 11, 2024
1 parent 78f8aff commit 2b72c1d
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 122 deletions.
3 changes: 1 addition & 2 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ impl Api {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
cc_state.mark_for_deletion(key.as_str());

cc_state.delete(key.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
}
Expand Down
45 changes: 25 additions & 20 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashSet;

use crate::serialize::*;
use crate::types::{KeyValueMutation, KeyValueMutationRef};
use crate::types::{DeletionStatusMutation, KeyValueMutation, KeyValueMutationRef};
use crate::{ChitchatId, Version, VersionedValue};

/// A delta is the message we send to another node to update it.
Expand Down Expand Up @@ -117,12 +117,12 @@ impl Deserializable for DeltaOp {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let deleted = DeletionStatusMutation::deserialize(buf)?;
Ok(DeltaOp::KeyValue(KeyValueMutation {
key,
value,
version,
tombstone: deleted,
status: deleted,
}))
}
DeltaOpTag::SetMaxVersion => {
Expand Down Expand Up @@ -280,7 +280,11 @@ impl Delta {
key: key.to_string(),
value: value.to_string(),
version,
tombstone: deleted,
status: if deleted {
DeletionStatusMutation::Delete
} else {
DeletionStatusMutation::Set
},
});
}

Expand Down Expand Up @@ -440,7 +444,7 @@ impl DeltaSerializer {
key: key.to_string(),
value: versioned_value.value,
version: versioned_value.version,
tombstone: versioned_value.tombstone.is_some(),
status: versioned_value.status.into(),
};
let key_value_op = DeltaOp::KeyValue(key_value_mutation);
self.try_add_op(key_value_op)
Expand Down Expand Up @@ -472,6 +476,7 @@ mod tests {
use tokio::time::Instant;

use super::*;
use crate::types::DeletionStatus;

#[test]
fn test_delta_serialization_default() {
Expand Down Expand Up @@ -512,7 +517,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
},
));
// +26 bytes: 2 bytes (key length) + 5 bytes (key) + 8 bytes (version) +
Expand All @@ -522,7 +527,7 @@ mod tests {
VersionedValue {
value: "".to_string(),
version: 2,
tombstone: Some(Instant::now()),
status: DeletionStatus::Deleted(Instant::now()),
},
));

Expand All @@ -536,7 +541,7 @@ mod tests {
VersionedValue {
value: "val21".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
},
));
// +23 bytes.
Expand All @@ -545,7 +550,7 @@ mod tests {
VersionedValue {
value: "val22".to_string(),
version: 3,
tombstone: None,
status: DeletionStatus::Set,
},
));
test_aux_delta_writer(delta_writer, 98);
Expand All @@ -567,7 +572,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -577,7 +582,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand Down Expand Up @@ -614,7 +619,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
// +23 bytes (kv) + 1 (op tag)
Expand All @@ -624,7 +629,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -651,7 +656,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
// +23 bytes.
Expand All @@ -660,7 +665,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand Down Expand Up @@ -690,7 +695,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -701,7 +706,7 @@ mod tests {
VersionedValue {
value: "val12aaaaaaaaaabcc".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -720,23 +725,23 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
assert!(!delta_writer.try_add_kv(
"key12",
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));
delta_writer.try_add_kv(
"key13",
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
},
);
}
Expand Down
10 changes: 5 additions & 5 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::digest::Digest;
pub use crate::message::ChitchatMessage;
pub use crate::server::{spawn_chitchat, ChitchatHandle};
use crate::state::ClusterState;
pub use crate::types::{ChitchatId, Heartbeat, Version, VersionedValue};
pub use crate::types::{ChitchatId, DeletionStatus, Heartbeat, Version, VersionedValue};

/// Maximum UDP datagram payload size (in bytes).
///
Expand Down Expand Up @@ -608,7 +608,7 @@ mod tests {

assert_nodes_sync(&[&node1, &node2]);

node1.self_node_state().mark_for_deletion("k1");
node1.self_node_state().delete("k1");

// Advance time before triggering the GC of that deleted key
tokio::time::advance(Duration::from_secs(3_600 * 3)).await;
Expand Down Expand Up @@ -718,7 +718,7 @@ mod tests {
.lock()
.await
.self_node_state()
.mark_for_deletion("READY");
.delete("READY");

let live_members = loop {
let live_nodes = live_nodes_stream.next().await.unwrap();
Expand Down Expand Up @@ -1085,8 +1085,8 @@ mod tests {
node1.self_node_state().set("self1:suffix1", "updated");
assert_eq!(counter_self_key.load(Ordering::SeqCst), 2);

node1.self_node_state().mark_for_deletion("self1:suffix1");
node2.self_node_state().mark_for_deletion("other:suffix");
node1.self_node_state().delete("self1:suffix1");
node2.self_node_state().delete("other:suffix");

run_chitchat_handshake(&mut node1, &mut node2);

Expand Down
Loading

0 comments on commit 2b72c1d

Please sign in to comment.