Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Dec 13, 2024
1 parent 899faf0 commit af01719
Show file tree
Hide file tree
Showing 3 changed files with 482 additions and 212 deletions.
122 changes: 69 additions & 53 deletions src/service/replicate_kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@ pub struct Slot<V> {
version: Version,
}

impl<V> Slot<V> {
pub fn new(value: V, version: Version) -> Self {
Self { value, version }
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Action<V> {
Set(V),
Del,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Changed<V> {
pub(crate) key: Key,
pub struct Changed<K, V> {
pub(crate) key: K,
pub(crate) version: Version,
pub(crate) action: Action<V>,
}

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub struct Key(u64);

#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub struct Version(u64);

Expand Down Expand Up @@ -86,55 +89,63 @@ impl Deref for Version {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub enum BroadcastEvent<V> {
Changed(Changed<V>),
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BroadcastEvent<K, V> {
Changed(Changed<K, V>),
Version(Version),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RpcReq {
FetchChanged { from: Version, to: Option<Version> },
FetchChanged { from: Version, count: u64 },
FetchSnapshot,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum RpcRes<V> {
FetchChanged { changes: Vec<Changed<V>>, remain: bool },
FetchSnapshot { slots: Vec<(Key, Slot<V>)>, version: Version },
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum FetchChangedError {
MissingData,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum RpcEvent<V> {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RpcRes<K, V> {
FetchChanged(Result<Vec<Changed<K, V>>, FetchChangedError>),
FetchSnapshot { slots: Vec<(K, Slot<V>)>, version: Version },
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RpcEvent<K, V> {
RpcReq(RpcReq),
RpcRes(RpcRes<V>),
RpcRes(RpcRes<K, V>),
}

pub enum KvEvent<N, V> {
Set(N, Key, V),
Del(N, Key),
#[derive(Debug, PartialEq, Eq)]
pub enum KvEvent<N, K, V> {
Set(Option<N>, K, V),
Del(Option<N>, K),
}

#[derive(Debug, Serialize, Deserialize)]
pub enum NetEvent<N, V> {
Broadcast(BroadcastEvent<V>),
Unicast(N, RpcEvent<V>),
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum NetEvent<N, K, V> {
Broadcast(BroadcastEvent<K, V>),
Unicast(N, RpcEvent<K, V>),
}

pub enum Event<N, V> {
NetEvent(NetEvent<N, V>),
KvEvent(KvEvent<N, V>),
#[derive(Debug, PartialEq, Eq)]
pub enum Event<N, K, V> {
NetEvent(NetEvent<N, K, V>),
KvEvent(KvEvent<N, K, V>),
}

pub struct ReplicatedKvStore<N, V> {
remotes: HashMap<N, RemoteStore<N, V>>,
local: LocalStore<N, V>,
outs: VecDeque<Event<N, V>>,
pub struct ReplicatedKvStore<N, K, V> {
remotes: HashMap<N, RemoteStore<N, K, V>>,
local: LocalStore<N, K, V>,
outs: VecDeque<Event<N, K, V>>,
}

impl<N, V> ReplicatedKvStore<N, V>
impl<N, K, V> ReplicatedKvStore<N, K, V>
where
N: Eq + Hash + Clone,
K: Debug + Hash + Ord + Eq + Clone,
V: Debug + Eq + Clone,
{
pub fn new() -> Self {
Expand All @@ -148,30 +159,30 @@ where
pub fn on_tick(&mut self) {
self.local.on_tick();
while let Some(event) = self.local.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
self.remotes.retain(|_, remote| remote.last_active().elapsed().as_millis() < REMOTE_TIMEOUT_MS);
}

pub fn set(&mut self, key: Key, value: V) {
pub fn set(&mut self, key: K, value: V) {
self.local.set(key.clone(), value.clone());
while let Some(event) = self.local.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
}

pub fn del(&mut self, key: Key) {
pub fn del(&mut self, key: K) {
self.local.del(key.clone());
while let Some(event) = self.local.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
}

pub fn on_remote_event(&mut self, from: N, event: NetEvent<N, V>) {
pub fn on_remote_event(&mut self, from: N, event: NetEvent<N, K, V>) {
if !self.remotes.contains_key(&from) {
let mut remote = RemoteStore::new(from.clone());
while let Some(event) = remote.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
self.remotes.insert(from.clone(), remote);
}
Expand All @@ -181,42 +192,43 @@ where
if let Some(remote) = self.remotes.get_mut(&from) {
remote.on_broadcast(event);
while let Some(event) = remote.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
}
}
NetEvent::Unicast(_from, event) => match event {
RpcEvent::RpcReq(rpc_req) => {
self.local.on_rpc_req(from, rpc_req);
while let Some(event) = self.local.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
}
RpcEvent::RpcRes(rpc_res) => {
if let Some(remote) = self.remotes.get_mut(&from) {
remote.on_rpc_res(rpc_res);
while let Some(event) = remote.pop_out() {
self.outs.push_back(Event::NetEvent(event));
self.outs.push_back(event);
}
}
}
},
}
}

pub fn pop(&mut self) -> Option<Event<N, V>> {
pub fn pop(&mut self) -> Option<Event<N, K, V>> {
self.outs.pop_front()
}
}

pub struct ReplicatedKvService<V> {
pub struct ReplicatedKvService<K, V> {
service: P2pService,
tick: Interval,
store: ReplicatedKvStore<PeerId, V>,
store: ReplicatedKvStore<PeerId, K, V>,
}

impl<V> ReplicatedKvService<V>
impl<K, V> ReplicatedKvService<K, V>
where
K: Debug + Hash + Ord + Eq + Clone + DeserializeOwned + Serialize,
V: Debug + Eq + Clone + DeserializeOwned + Serialize,
{
pub fn new(service: P2pService) -> Self {
Expand All @@ -227,17 +239,17 @@ where
}
}

pub async fn recv(&mut self) -> Option<KvEvent<PeerId, V>> {
pub async fn recv(&mut self) -> Option<KvEvent<PeerId, K, V>> {
loop {
if let Some(event) = self.store.pop() {
match event {
Event::NetEvent(net_event) => match net_event {
NetEvent::Broadcast(broadcast_event) => {
let _ = self.service.try_send_broadcast(bincode::serialize(&broadcast_event).unwrap()).await;
let _ = self.service.try_send_broadcast(bincode::serialize(&broadcast_event).expect("should serialize")).await;
continue;
}
NetEvent::Unicast(to_node, rpc_event) => {
let _ = self.service.try_send_unicast(to_node, bincode::serialize(&rpc_event).unwrap()).await;
let _ = self.service.try_send_unicast(to_node, bincode::serialize(&rpc_event).expect("should serialize")).await;
continue;
}
},
Expand All @@ -253,12 +265,16 @@ where
}
event = self.service.recv() => match event? {
super::P2pServiceEvent::Unicast(peer_id, vec) => {
let event = bincode::deserialize::<NetEvent<PeerId, V>>(&vec).unwrap();
self.store.on_remote_event(peer_id, event);
match bincode::deserialize::<RpcEvent<K, V>>(&vec) {
Ok(event) => self.store.on_remote_event(peer_id, NetEvent::Unicast(peer_id, event)),
Err(err) => log::error!("[ReplicatedKvService] deserialize error {err}"),
}
}
super::P2pServiceEvent::Broadcast(peer_id, vec) => {
let event = bincode::deserialize::<NetEvent<PeerId, V>>(&vec).unwrap();
self.store.on_remote_event(peer_id, event);
match bincode::deserialize::<BroadcastEvent<K, V>>(&vec) {
Ok(event) => self.store.on_remote_event(peer_id, NetEvent::Broadcast(event)),
Err(err) => log::error!("[ReplicatedKvService] deserialize error {err}"),
}
}
super::P2pServiceEvent::Stream(..) => {}
}
Expand Down
Loading

0 comments on commit af01719

Please sign in to comment.