From a1170b64c300e35cd16d9b8068e711f38b050fc9 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Fri, 13 Dec 2024 17:10:03 +0700 Subject: [PATCH] add sample --- Cargo.toml | 1 + examples/kv.rs | 171 ++++++++++++++++++ src/service/replicate_kv_service.rs | 19 +- .../replicate_kv_service/local_storage.rs | 6 +- .../replicate_kv_service/remote_storage.rs | 94 +++++++--- src/tests.rs | 1 + src/tests/replicate_kv.rs | 74 ++++++++ 7 files changed, 341 insertions(+), 25 deletions(-) create mode 100644 examples/kv.rs create mode 100644 src/tests/replicate_kv.rs diff --git a/Cargo.toml b/Cargo.toml index 09cda46..e666146 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,4 @@ tokio = { version = "1", features = ["full"] } test-log = { version = "0.2" } clap = { version = "4.4", features = ["derive", "env", "color"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } +poem = { version = "3" } \ No newline at end of file diff --git a/examples/kv.rs b/examples/kv.rs new file mode 100644 index 0000000..10ddd05 --- /dev/null +++ b/examples/kv.rs @@ -0,0 +1,171 @@ +use std::{collections::BTreeMap, net::SocketAddr, str::FromStr}; + +use atm0s_small_p2p::{ + replicate_kv_service::{KvEvent, ReplicatedKvService}, + P2pNetwork, P2pNetworkConfig, PeerAddress, PeerId, SharedKeyHandshake, +}; +use clap::Parser; +use poem::{ + get, handler, + listener::TcpListener, + middleware::Tracing, + put, + web::{Data, Json, Path}, + EndpointExt, Route, Server, +}; +use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; +use serde::{Deserialize, Serialize}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert"); +pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key"); + +/// A Replicated KV store +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// HTTP listen for api + #[arg(env, long, default_value = "0.0.0.0:3000")] + http_listen: SocketAddr, + + /// UDP/TCP port for serving QUIC/TCP connection for SDN network + #[arg(env, long, default_value_t = 0)] + sdn_peer_id: u64, + + /// UDP/TCP port for serving QUIC/TCP connection for SDN network + #[arg(env, long, default_value = "0.0.0.0:11111")] + sdn_listener: SocketAddr, + + /// Seeds + #[arg(env, long, value_delimiter = ',')] + sdn_seeds: Vec, + + /// Allow it broadcast address to other peers + /// This allows other peer can active connect to this node + /// This option is useful with high performance relay node + #[arg(env, long)] + sdn_advertise_address: Option, + + /// Sdn secure code + #[arg(env, long, default_value = "insecure")] + sdn_secure_code: String, +} + +enum Control { + Get(u64, oneshot::Sender>), + Put(u64, u64, oneshot::Sender<()>), + Del(u64, oneshot::Sender<()>), +} + +#[derive(Debug, Clone, Serialize)] +struct ValueInfo { + value: u64, + node: Option, +} + +#[handler] +async fn get_key(Path(key): Path, Data(control): Data<&mpsc::Sender>) -> Json> { + let (tx, rx) = oneshot::channel(); + control.send(Control::Get(key, tx)).await.expect("should send to main loop"); + Json(rx.await.expect("should get value")) +} + +#[derive(Debug, Deserialize)] +struct PutParams { + key: u64, + value: u64, +} + +#[handler] +async fn put_key(Path(params): Path, Data(control): Data<&mpsc::Sender>) -> String { + let (tx, rx) = oneshot::channel(); + control.send(Control::Put(params.key, params.value, tx)).await.expect("should send to main loop"); + rx.await.expect("should get value"); + "Ok".to_string() +} + +#[handler] +async fn del_key(Path(key): Path, Data(control): Data<&mpsc::Sender>) -> String { + let (tx, rx) = oneshot::channel(); + control.send(Control::Del(key, tx)).await.expect("should send to main loop"); + rx.await.expect("should get value"); + "Ok".to_string() +} + +#[tokio::main] +async fn main() { + rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); + + if std::env::var_os("RUST_LOG").is_none() { + std::env::set_var("RUST_LOG", "info"); + } + if std::env::var_os("RUST_BACKTRACE").is_none() { + std::env::set_var("RUST_BACKTRACE", "1"); + } + let args: Args = Args::parse(); + tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); + + let priv_key = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec()); + let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec()); + + let mut p2p = P2pNetwork::new(P2pNetworkConfig { + peer_id: args.sdn_peer_id.into(), + listen_addr: args.sdn_listener, + advertise: args.sdn_advertise_address.map(|a| a.into()), + priv_key, + cert, + tick_ms: 100, + seeds: args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::>(), + secure: SharedKeyHandshake::from(args.sdn_secure_code.as_str()), + }) + .await + .expect("should create network"); + + let mut kv_service: ReplicatedKvService = ReplicatedKvService::new(p2p.create_service(0.into())); + + let (control_tx, mut control_rx) = mpsc::channel::(10); + tokio::spawn(async move { + let app = Route::new() + .at("/kv/:key/:value", put(put_key)) + .at("/kv/:key", get(get_key).delete(del_key)) + .data(control_tx) + .with(Tracing); + Server::new(TcpListener::bind(args.http_listen)).run(app).await.expect("bind http error"); + }); + + let mut kv_local = BTreeMap::new(); + + loop { + select! { + ctr = control_rx.recv() => { + match ctr.expect("should recv control") { + Control::Get(key, tx) => { + let value = kv_local.get(&key); + let _ = tx.send(value.cloned()); + } + Control::Put(key, value, tx) => { + kv_service.set(key, value); + let _ = tx.send(()); + } + Control::Del(key, tx) => { + kv_service.del(key); + let _ = tx.send(()); + } + } + }, + event = kv_service.recv() => match event.expect("should recv event") { + KvEvent::Set(node, key, value) => { + kv_local.insert(key, ValueInfo { value, node } ); + }, + KvEvent::Del(_, key) => { + kv_local.remove(&key); + }, + }, + _ = p2p.recv() => {} + } + } +} diff --git a/src/service/replicate_kv_service.rs b/src/service/replicate_kv_service.rs index d88e8c1..bccdf46 100644 --- a/src/service/replicate_kv_service.rs +++ b/src/service/replicate_kv_service.rs @@ -161,7 +161,16 @@ where while let Some(event) = self.local.pop_out() { self.outs.push_back(event); } - self.remotes.retain(|_, remote| remote.last_active().elapsed().as_millis() < REMOTE_TIMEOUT_MS); + self.remotes.retain(|_, remote| { + let keep = remote.last_active().elapsed().as_millis() < REMOTE_TIMEOUT_MS; + if !keep { + remote.destroy(); + while let Some(event) = remote.pop_out() { + self.outs.push_back(event); + } + } + keep + }); } pub fn set(&mut self, key: K, value: V) { @@ -239,6 +248,14 @@ where } } + pub fn set(&mut self, key: K, value: V) { + self.store.set(key, value); + } + + pub fn del(&mut self, key: K) { + self.store.del(key); + } + pub async fn recv(&mut self) -> Option> { loop { if let Some(event) = self.store.pop() { diff --git a/src/service/replicate_kv_service/local_storage.rs b/src/service/replicate_kv_service/local_storage.rs index 6d13025..9c628ef 100644 --- a/src/service/replicate_kv_service/local_storage.rs +++ b/src/service/replicate_kv_service/local_storage.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, VecDeque}, hash::Hash, }; @@ -8,7 +8,7 @@ use super::{Action, BroadcastEvent, Changed, Event, FetchChangedError, KvEvent, const MAX_CHANGE_SINGLE_PKT: u64 = 1024; pub struct LocalStore { - slots: HashMap>, + slots: BTreeMap>, changeds: BTreeMap>, max_changeds: usize, version: Version, @@ -22,7 +22,7 @@ where { pub fn new(max_changeds: usize) -> Self { LocalStore { - slots: HashMap::new(), + slots: BTreeMap::new(), changeds: BTreeMap::new(), max_changeds, version: Version(0), diff --git a/src/service/replicate_kv_service/remote_storage.rs b/src/service/replicate_kv_service/remote_storage.rs index d50785c..3cad779 100644 --- a/src/service/replicate_kv_service/remote_storage.rs +++ b/src/service/replicate_kv_service/remote_storage.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, VecDeque}, fmt::Debug, hash::Hash, marker::PhantomData, @@ -12,6 +12,7 @@ use super::{Action, BroadcastEvent, Changed, Event, KvEvent, NetEvent, RpcEvent, enum RemoteStoreState { SyncFull(SyncFullState), Working(WorkingState), + Destroy(DestroyState), } impl State for RemoteStoreState @@ -24,6 +25,7 @@ where match self { RemoteStoreState::SyncFull(state) => state.init(ctx), RemoteStoreState::Working(state) => state.init(ctx), + RemoteStoreState::Destroy(state) => state.init(ctx), } } @@ -31,6 +33,7 @@ where match self { RemoteStoreState::SyncFull(state) => state.on_broadcast(ctx, event), RemoteStoreState::Working(state) => state.on_broadcast(ctx, event), + RemoteStoreState::Destroy(state) => state.on_broadcast(ctx, event), } } @@ -38,13 +41,14 @@ where match self { RemoteStoreState::SyncFull(state) => state.on_rpc_res(ctx, event), RemoteStoreState::Working(state) => state.on_rpc_res(ctx, event), + RemoteStoreState::Destroy(state) => state.on_rpc_res(ctx, event), } } } struct StateCtx { remote: N, - slots: HashMap>, + slots: BTreeMap>, outs: VecDeque>, next_state: Option>, } @@ -70,7 +74,7 @@ where pub fn new(remote: N) -> Self { let mut ctx = StateCtx { remote, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -85,6 +89,12 @@ where } } + pub fn destroy(&mut self) { + let mut state = DestroyState { _tmp: PhantomData }; + state.init(&mut self.ctx); + self.state = RemoteStoreState::Destroy(state); + } + pub fn last_active(&self) -> Instant { self.last_active } @@ -130,7 +140,9 @@ where N: Clone, { fn init(&mut self, ctx: &mut StateCtx) { - ctx.slots.clear(); + while let Some((k, _v)) = ctx.slots.pop_first() { + ctx.outs.push_back(Event::KvEvent(KvEvent::Del(Some(ctx.remote.clone()), k))); + } ctx.outs.push_back(Event::NetEvent(NetEvent::Unicast(ctx.remote.clone(), RpcEvent::RpcReq(RpcReq::FetchSnapshot)))); } @@ -218,11 +230,7 @@ where fn on_broadcast(&mut self, ctx: &mut StateCtx, event: BroadcastEvent) { match event { BroadcastEvent::Changed(changed) => { - if self.version > changed.version { - // wrong data => resyncFull - ctx.slots.clear(); - ctx.next_state = Some(RemoteStoreState::SyncFull(SyncFullState::default())); - } else if self.version < changed.version { + if self.version < changed.version { self.pendings.insert(changed.version, changed); self.apply_pendings(ctx); } @@ -263,6 +271,32 @@ where } } +#[derive(Debug, PartialEq, Eq)] +pub struct DestroyState { + _tmp: PhantomData<(N, K, V)>, +} + +impl State for DestroyState +where + K: Debug + Hash + Ord + Eq + Clone, + V: Clone, + N: Clone, +{ + fn init(&mut self, ctx: &mut StateCtx) { + while let Some((k, _v)) = ctx.slots.pop_first() { + ctx.outs.push_back(Event::KvEvent(KvEvent::Del(Some(ctx.remote.clone()), k))); + } + } + + fn on_broadcast(&mut self, _ctx: &mut StateCtx, _event: BroadcastEvent) { + // dont process here + } + + fn on_rpc_res(&mut self, _ctx: &mut StateCtx, _event: RpcRes) { + // dont process here + } +} + #[cfg(test)] mod tests { use super::*; @@ -272,7 +306,7 @@ mod tests { fn test_restore_full() { let mut ctx: StateCtx = StateCtx { remote: 1, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -287,7 +321,7 @@ mod tests { }, ); - assert_eq!(ctx.slots, HashMap::from([(1, Slot::new(1, Version(1)))])); + assert_eq!(ctx.slots, BTreeMap::from([(1, Slot::new(1, Version(1)))])); assert_eq!(ctx.next_state, Some(RemoteStoreState::Working(WorkingState::new(Version(1))))); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 1)))); assert_eq!(ctx.outs.pop_front(), None); @@ -298,7 +332,7 @@ mod tests { fn test_working_state_zero() { let mut ctx: StateCtx = StateCtx { remote: 1, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -314,7 +348,7 @@ mod tests { }), ); - assert_eq!(ctx.slots, HashMap::from([(1, Slot::new(1, Version(1)))])); + assert_eq!(ctx.slots, BTreeMap::from([(1, Slot::new(1, Version(1)))])); assert_eq!(ctx.next_state, None); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 1)))); assert_eq!(ctx.outs.pop_front(), None); @@ -325,7 +359,7 @@ mod tests { fn test_working_state_zero_out_of_sync() { let mut ctx: StateCtx = StateCtx { remote: 1, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -334,7 +368,7 @@ mod tests { state.on_broadcast(&mut ctx, BroadcastEvent::Version(Version(1))); - assert_eq!(ctx.slots, HashMap::new()); + assert_eq!(ctx.slots, BTreeMap::new()); assert_eq!(ctx.next_state, None); assert_eq!( ctx.outs.pop_front(), @@ -348,7 +382,7 @@ mod tests { fn test_working_state_missing_changed() { let mut ctx: StateCtx = StateCtx { remote: 1, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -365,7 +399,7 @@ mod tests { ); assert_eq!(state.pendings.len(), 1); - assert_eq!(ctx.slots, HashMap::new()); + assert_eq!(ctx.slots, BTreeMap::new()); assert_eq!(ctx.next_state, None); assert_eq!( ctx.outs.pop_front(), @@ -383,7 +417,7 @@ mod tests { ); assert_eq!(state.pendings.len(), 0); - assert_eq!(ctx.slots, HashMap::from([(1, Slot::new(1, Version(2)))])); + assert_eq!(ctx.slots, BTreeMap::from([(1, Slot::new(1, Version(2)))])); assert_eq!(ctx.next_state, None); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 2)))); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 1)))); @@ -395,7 +429,7 @@ mod tests { fn test_working_state_missing_changed2() { let mut ctx: StateCtx = StateCtx { remote: 1, - slots: HashMap::new(), + slots: BTreeMap::new(), outs: VecDeque::new(), next_state: None, }; @@ -412,7 +446,7 @@ mod tests { ); assert_eq!(state.pendings.len(), 1); - assert_eq!(ctx.slots, HashMap::new()); + assert_eq!(ctx.slots, BTreeMap::new()); assert_eq!(ctx.next_state, None); assert_eq!( ctx.outs.pop_front(), @@ -430,10 +464,28 @@ mod tests { ); assert_eq!(state.pendings.len(), 0); - assert_eq!(ctx.slots, HashMap::from([(1, Slot::new(1, Version(2)))])); + assert_eq!(ctx.slots, BTreeMap::from([(1, Slot::new(1, Version(2)))])); assert_eq!(ctx.next_state, None); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 2)))); assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Set(Some(1), 1, 1)))); assert_eq!(ctx.outs.pop_front(), None); } + + #[test] + fn destroy_remote_should_clear_slots() { + let mut ctx: StateCtx = StateCtx { + remote: 1, + slots: BTreeMap::from([(1, Slot::new(1, Version(1)))]), + outs: VecDeque::new(), + next_state: None, + }; + + let mut state = DestroyState { _tmp: PhantomData }; + state.init(&mut ctx); + + assert_eq!(ctx.slots, BTreeMap::new()); + assert_eq!(ctx.next_state, None); + assert_eq!(ctx.outs.pop_front(), Some(Event::KvEvent(KvEvent::Del(Some(1), 1)))); + assert_eq!(ctx.outs.pop_front(), None); + } } diff --git a/src/tests.rs b/src/tests.rs index f4b4dbd..049fd23 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -9,6 +9,7 @@ mod cross_nodes; mod discovery; mod metrics; mod pubsub; +mod replicate_kv; mod visualization; pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert"); diff --git a/src/tests/replicate_kv.rs b/src/tests/replicate_kv.rs new file mode 100644 index 0000000..68e09a2 --- /dev/null +++ b/src/tests/replicate_kv.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use test_log::test; +use tokio::time::timeout; + +use crate::replicate_kv_service::{KvEvent, ReplicatedKvService}; + +use super::create_node; + +const WAIT: Duration = Duration::from_secs(3); + +#[test(tokio::test)] +async fn single_node() { + let (mut node1, _addr1) = create_node(true, 1, vec![]).await; + let mut kv1: ReplicatedKvService = ReplicatedKvService::new(node1.create_service(0.into())); + tokio::spawn(async move { while node1.recv().await.is_ok() {} }); + + kv1.set(1, 1); + + assert_eq!(timeout(WAIT, kv1.recv()).await, Ok(Some(KvEvent::Set(None, 1, 1)))); +} + +#[test(tokio::test)] +async fn full_sync() { + let (mut node1, addr1) = create_node(true, 1, vec![]).await; + let (mut node2, addr2) = create_node(true, 2, vec![]).await; + + let node1_requester = node1.requester(); + + let mut kv1: ReplicatedKvService = ReplicatedKvService::new(node1.create_service(0.into())); + let mut kv2: ReplicatedKvService = ReplicatedKvService::new(node2.create_service(0.into())); + + tokio::spawn(async move { + kv1.set(1, 1); + kv1.set(2, 2); + kv1.set(3, 3); + while let Some(_event) = kv1.recv().await {} + }); + + tokio::spawn(async move { while node1.recv().await.is_ok() {} }); + tokio::spawn(async move { while node2.recv().await.is_ok() {} }); + + tokio::time::sleep(Duration::from_millis(1000)).await; + node1_requester.connect(addr2).await.expect("should connect success"); + + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 1, 1)))); + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 2, 2)))); + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 3, 3)))); +} + +#[test(tokio::test)] +async fn continuos_sync() { + let (mut node1, addr1) = create_node(true, 1, vec![]).await; + let (mut node2, _addr2) = create_node(true, 2, vec![addr1.clone()]).await; + + let mut kv1: ReplicatedKvService = ReplicatedKvService::new(node1.create_service(0.into())); + let mut kv2: ReplicatedKvService = ReplicatedKvService::new(node2.create_service(0.into())); + + tokio::spawn(async move { while node1.recv().await.is_ok() {} }); + tokio::spawn(async move { while node2.recv().await.is_ok() {} }); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + tokio::spawn(async move { + kv1.set(1, 1); + kv1.set(2, 2); + kv1.set(3, 3); + while let Some(_event) = kv1.recv().await {} + }); + + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 1, 1)))); + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 2, 2)))); + assert_eq!(timeout(WAIT, kv2.recv()).await, Ok(Some(KvEvent::Set(Some(addr1.peer_id()), 3, 3)))); +}