Skip to content

Commit

Permalink
added resend after request timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Dec 16, 2024
1 parent 827d9d5 commit 2939826
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 51 deletions.
22 changes: 12 additions & 10 deletions src/service/replicate_kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,37 @@ impl Deref for Version {
}
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum BroadcastEvent<K, V> {
Changed(Changed<K, V>),
Version(Version),
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum RpcReq<K> {
FetchChanged { from: Version, count: u64 },
FetchSnapshot { from: Option<K>, to: Option<K>, max_version: Option<Version> },
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum FetchChangedError {
MissingData,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct SnapsnotData<K, V> {
slots: Vec<(K, Slot<V>)>,
next_key: Option<K>,
bigest_key: K,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum RpcRes<K, V> {
FetchChanged(Result<Vec<Changed<K, V>>, FetchChangedError>),
FetchSnapshot(Option<SnapsnotData<K, V>>, Version),
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum RpcEvent<K, V> {
RpcReq(RpcReq<K>),
RpcRes(RpcRes<K, V>),
Expand All @@ -131,7 +131,7 @@ pub enum KvEvent<N, K, V> {
Del(Option<N>, K),
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum NetEvent<N, K, V> {
Broadcast(BroadcastEvent<K, V>),
Unicast(N, RpcEvent<K, V>),
Expand Down Expand Up @@ -173,9 +173,11 @@ where
if !keep {
log::info!("[ReplicatedKvService] remove remote {node:?} after timeout");
remote.destroy();
while let Some(event) = remote.pop_out() {
self.outs.push_back(event);
}
} else {
remote.on_tick();
}
while let Some(event) = remote.pop_out() {
self.outs.push_back(event);
}
keep
});
Expand Down
Loading

0 comments on commit 2939826

Please sign in to comment.