Skip to content

Commit

Permalink
Added node id to the change event
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 30, 2023
1 parent 72a994f commit edbfb71
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 61 deletions.
38 changes: 28 additions & 10 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,32 @@ impl Chitchat {
pub fn subscribe_event(
&self,
key_prefix: impl ToString,
callback: impl Fn(&str, &str) + 'static + Send + Sync,
callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync,
) -> ListenerHandle {
self.cluster_state()
.listeners
.subscribe_event(key_prefix, callback)
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct KeyChangeEvent<'a> {
key: &'a str,
value: &'a str,
node: &'a ChitchatId,
}

impl<'a> KeyChangeEvent<'a> {
pub fn strip_key_prefix(&self, prefix: &str) -> Option<KeyChangeEvent> {
let key_without_prefix = self.key.strip_prefix(prefix)?;
Some(KeyChangeEvent {
key: key_without_prefix,
value: self.value,
node: self.node,
})
}
}

#[cfg(test)]
mod tests {
use std::ops::{Add, RangeInclusive};
Expand Down Expand Up @@ -715,26 +733,26 @@ mod tests {

let counter_self_key_clone = counter_self_key.clone();
node1
.subscribe_event("self1:", move |key, value| {
assert_eq!(key, "suffix1");
assert_eq!(value, "updated");
.subscribe_event("self1:", move |evt| {
assert_eq!(evt.key, "suffix1");
assert_eq!(evt.value, "updated");
counter_self_key_clone.fetch_add(1, Ordering::SeqCst);
})
.forever();
let counter_other_key_clone = counter_other_key.clone();
node1
.subscribe_event("other:", move |key, value| {
assert_eq!(key, "suffix");
assert_eq!(value, "hello");
.subscribe_event("other:", move |evt| {
assert_eq!(evt.key, "suffix");
assert_eq!(evt.value, "hello");
counter_other_key_clone.fetch_add(1, Ordering::SeqCst);
})
.forever();

let counter_self_key_clone = counter_self_key.clone();
node1
.subscribe_event("self2:", move |key, value| {
assert_eq!(key, "suffix2");
assert_eq!(value, "hello2");
.subscribe_event("self2:", move |evt| {
assert_eq!(evt.key, "suffix2");
assert_eq!(evt.value, "hello2");
counter_self_key_clone.fetch_add(1, Ordering::SeqCst);
})
.forever();
Expand Down
126 changes: 93 additions & 33 deletions chitchat/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::ops::Bound;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak};

use crate::KeyChangeEvent;

pub struct ListenerHandle {
prefix: String,
listener_id: usize,
Expand All @@ -28,7 +30,7 @@ impl Drop for ListenerHandle {
}
}

type BoxedListener = Box<dyn Fn(&str, &str) + 'static + Send + Sync>;
type BoxedListener = Box<dyn Fn(KeyChangeEvent) + 'static + Send + Sync>;

#[derive(Default, Clone)]
pub(crate) struct Listeners {
Expand All @@ -40,7 +42,7 @@ impl Listeners {
pub(crate) fn subscribe_event(
&self,
key_prefix: impl ToString,
callback: impl Fn(&str, &str) + 'static + Send + Sync,
callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync,
) -> ListenerHandle {
let key_prefix = key_prefix.to_string();
let boxed_listener = Box::new(callback);
Expand All @@ -66,8 +68,8 @@ impl Listeners {
}
}

pub(crate) fn trigger_event(&mut self, key: &str, value: &str) {
self.inner.read().unwrap().trigger_event(key, value);
pub(crate) fn trigger_event(&mut self, key_change_event: KeyChangeEvent<'_>) {
self.inner.read().unwrap().trigger_event(key_change_event);
}
}

Expand All @@ -91,27 +93,30 @@ impl InnerListeners {
}
}

fn trigger_event(&self, key: &str, value: &str) {
fn trigger_event(&self, key_change_event: KeyChangeEvent<'_>) {
// We treat the empty prefix a tiny bit separately to get able to at least
// use the first character as a range bound, as if we were going to the first level of
// a trie.
if let Some(listeners) = self.listeners.get("") {
for listener in listeners.values() {
(*listener)(key, value);
(*listener)(key_change_event);
}
}
if key.is_empty() {
if key_change_event.key.is_empty() {
return;
}

let range = (Bound::Included(&key[0..1]), Bound::Included(key));
let range = (
Bound::Included(&key_change_event.key[0..1]),
Bound::Included(key_change_event.key),
);
for (prefix_key, listeners) in self.listeners.range::<str, _>(range) {
if prefix_key.as_str() > key {
if prefix_key.as_str() > key_change_event.key {
break;
}
if let Some(stripped_key) = key.strip_prefix(prefix_key) {
if let Some(stripped_key_change_event) = key_change_event.strip_key_prefix(prefix_key) {
for listener in listeners.values() {
(*listener)(stripped_key, value);
(*listener)(stripped_key_change_event);
}
}
}
Expand All @@ -127,22 +132,37 @@ impl InnerListeners {
#[cfg(test)]
mod tests {
use super::*;
use crate::ChitchatId;

fn chitchat_id(port: u16) -> ChitchatId {
ChitchatId::new(format!("node{port}"), 0, ([127, 0, 0, 1], port).into())
}

#[test]
fn test_listeners_simple() {
let mut listeners = Listeners::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let handle = listeners.subscribe_event("prefix:", move |key, value| {
assert_eq!(key, "strippedprefix");
assert_eq!(value, "value");
let handle = listeners.subscribe_event("prefix:", move |key_change_event| {
assert_eq!(key_change_event.key, "strippedprefix");
assert_eq!(key_change_event.value, "value");
counter_clone.fetch_add(1, Ordering::Relaxed);
});
let node_id = chitchat_id(7280u16);
assert_eq!(counter.load(Ordering::Relaxed), 0);
listeners.trigger_event("prefix:strippedprefix", "value");
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix",
value: "value",
node: &node_id,
});
assert_eq!(counter.load(Ordering::Relaxed), 1);
std::mem::drop(handle);
listeners.trigger_event("prefix:strippedprefix", "value");
let node_id = chitchat_id(7280u16);
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix",
value: "value",
node: &node_id,
});
assert_eq!(counter.load(Ordering::Relaxed), 1);
}

Expand All @@ -152,31 +172,46 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
listeners
.subscribe_event("", move |key, value| {
assert_eq!(key, "prefix:strippedprefix");
assert_eq!(value, "value");
.subscribe_event("", move |key_change_event| {
assert_eq!(key_change_event.key, "prefix:strippedprefix");
assert_eq!(key_change_event.value, "value");
counter_clone.fetch_add(1, Ordering::Relaxed);
})
.forever();
assert_eq!(counter.load(Ordering::Relaxed), 0);
listeners.trigger_event("prefix:strippedprefix", "value");
let node_id = chitchat_id(7280u16);
let key_change_event = KeyChangeEvent {
key: "prefix:strippedprefix",
value: "value",
node: &node_id,
};
listeners.trigger_event(key_change_event);
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn test_listeners_forever() {
let mut listeners = Listeners::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let handle = listeners.subscribe_event("prefix:", move |key, value| {
assert_eq!(key, "strippedprefix");
assert_eq!(value, "value");
let handle = listeners.subscribe_event("prefix:", move |evt| {
assert_eq!(evt.key, "strippedprefix");
assert_eq!(evt.value, "value");
counter_clone.fetch_add(1, Ordering::Relaxed);
});
assert_eq!(counter.load(Ordering::Relaxed), 0);
listeners.trigger_event("prefix:strippedprefix", "value");
let node_id = chitchat_id(7280u16);
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix",
value: "value",
node: &node_id,
});
assert_eq!(counter.load(Ordering::Relaxed), 1);
handle.forever();
listeners.trigger_event("prefix:strippedprefix", "value");
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix",
value: "value",
node: &node_id,
});
assert_eq!(counter.load(Ordering::Relaxed), 2);
}

Expand All @@ -188,7 +223,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
listeners
.subscribe_event(prefix, move |_key, _value| {
.subscribe_event(prefix, move |_evt| {
counter_clone.fetch_add(1, Ordering::Relaxed);
})
.forever();
Expand All @@ -201,42 +236,67 @@ mod tests {
let counter_bb2 = subscribe_event("bb");
let counter_bc = subscribe_event("bc");

listeners.trigger_event("hello", "value");
let node_id = chitchat_id(7280u16);
listeners.trigger_event(KeyChangeEvent {
key: "hello",
value: "value",
node: &node_id,
});
assert_eq!(counter_empty.load(Ordering::Relaxed), 1);
assert_eq!(counter_b.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb2.load(Ordering::Relaxed), 0);
assert_eq!(counter_bc.load(Ordering::Relaxed), 0);

listeners.trigger_event("", "value");
listeners.trigger_event(KeyChangeEvent {
key: "",
value: "value",
node: &node_id,
});
assert_eq!(counter_empty.load(Ordering::Relaxed), 2);
assert_eq!(counter_b.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb2.load(Ordering::Relaxed), 0);
assert_eq!(counter_bc.load(Ordering::Relaxed), 0);

listeners.trigger_event("a", "value");
listeners.trigger_event(KeyChangeEvent {
key: "a",
value: "value",
node: &node_id,
});
assert_eq!(counter_empty.load(Ordering::Relaxed), 3);
assert_eq!(counter_b.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb2.load(Ordering::Relaxed), 0);
assert_eq!(counter_bc.load(Ordering::Relaxed), 0);

listeners.trigger_event("b", "value");
listeners.trigger_event(KeyChangeEvent {
key: "b",
value: "value",
node: &node_id,
});

assert_eq!(counter_empty.load(Ordering::Relaxed), 4);
assert_eq!(counter_b.load(Ordering::Relaxed), 1);
assert_eq!(counter_bb.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb2.load(Ordering::Relaxed), 0);
assert_eq!(counter_bc.load(Ordering::Relaxed), 0);

listeners.trigger_event("ba", "value");
listeners.trigger_event(KeyChangeEvent {
key: "ba",
value: "value",
node: &node_id,
});
assert_eq!(counter_empty.load(Ordering::Relaxed), 5);
assert_eq!(counter_b.load(Ordering::Relaxed), 2);
assert_eq!(counter_bb.load(Ordering::Relaxed), 0);
assert_eq!(counter_bb2.load(Ordering::Relaxed), 0);
assert_eq!(counter_bc.load(Ordering::Relaxed), 0);

listeners.trigger_event("bb", "value");
listeners.trigger_event(KeyChangeEvent {
key: "bb",
value: "value",
node: &node_id,
});
assert_eq!(counter_empty.load(Ordering::Relaxed), 6);
assert_eq!(counter_b.load(Ordering::Relaxed), 3);
assert_eq!(counter_bb.load(Ordering::Relaxed), 1);
Expand Down
6 changes: 6 additions & 0 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub struct ChitchatHandle {
join_handle: JoinHandle<Result<(), anyhow::Error>>,
}

impl ChitchatHandle {
pub fn abort(&self) {
self.join_handle.abort();
}
}

const DNS_POLLING_DURATION: Duration = Duration::from_secs(60);

async fn dns_refresh_loop(
Expand Down
Loading

0 comments on commit edbfb71

Please sign in to comment.