From c7526ff24ce9e4467163fc475af05f99627d89a4 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 28 Sep 2023 13:40:20 +0800 Subject: [PATCH] refactor: refactor propose id into `-` Signed-off-by: iGxnon --- Cargo.lock | 2 +- curp-external-api/Cargo.toml | 1 - curp-external-api/src/cmd.rs | 17 ++++++-- curp-test-utils/src/test_cmd.rs | 14 ++++--- curp/Cargo.toml | 1 + curp/src/client.rs | 29 ++++++++++++- curp/tests/common/curp_group.rs | 2 +- xline-client/src/clients/auth.rs | 8 +--- xline-client/src/clients/kv.rs | 26 ++++-------- xline-client/src/clients/lease.rs | 10 ++--- xline-client/src/clients/lock.rs | 19 ++------- xline-client/src/lib.rs | 14 ++----- xline/src/client/mod.rs | 8 ++-- xline/src/server/auth_server.rs | 14 ++++--- xline/src/server/kv_server.rs | 28 ++++++++----- xline/src/server/lease_server.rs | 14 ++++--- xline/src/server/lock_server.rs | 13 +++--- xline/src/server/xline_server.rs | 4 +- xline/src/storage/compact/mod.rs | 29 ++++--------- .../src/storage/compact/periodic_compactor.rs | 42 ++++++++++--------- .../src/storage/compact/revision_compactor.rs | 41 ++++++++++-------- 21 files changed, 172 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77edf34fe..f28ea2043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,6 +644,7 @@ dependencies = [ "parking_lot", "prost", "prost-build", + "rand", "serde", "tempfile", "test-macros", @@ -667,7 +668,6 @@ dependencies = [ "prost", "serde", "thiserror", - "uuid", ] [[package]] diff --git a/curp-external-api/Cargo.toml b/curp-external-api/Cargo.toml index ac22e04e5..5735cfdf7 100644 --- a/curp-external-api/Cargo.toml +++ b/curp-external-api/Cargo.toml @@ -17,4 +17,3 @@ mockall = "0.11.3" prost = "0.11" serde = { version = "1.0.130", features = ["derive", "rc"] } thiserror = "1.0.31" -uuid = { version = "1.1.2", features = ["v4"] } diff --git a/curp-external-api/src/cmd.rs b/curp-external-api/src/cmd.rs index 6002d7cd7..8812e2a29 100644 --- a/curp-external-api/src/cmd.rs +++ b/curp-external-api/src/cmd.rs @@ -4,7 +4,6 @@ use async_trait::async_trait; use engine::Snapshot; use prost::DecodeError; use serde::{de::DeserializeOwned, Serialize}; -use uuid::Uuid; use crate::LogIndex; @@ -79,11 +78,21 @@ pub trait Command: /// Command Id wrapper, abstracting underlying implementation pub type ProposeId = String; -/// Generate propose id with the given prefix +/// Generate propose id with client id and seq num #[inline] #[must_use] -pub fn generate_propose_id(prefix: &str) -> ProposeId { - format!("{}-{}", prefix, Uuid::new_v4()) +pub fn generate_propose_id(client_id: u64, seq_num: u64) -> ProposeId { + format!("{client_id}#{seq_num}") +} + +/// Parse propose id to (`client_id`, `seq_num`) +#[inline] +#[must_use] +pub fn parse_propose_id(id: &ProposeId) -> Option<(u64, u64)> { + let mut iter = id.split('#'); + let client_id = iter.next()?.parse().ok()?; + let seq_num: u64 = iter.next()?.parse().ok()?; + Some((client_id, seq_num)) } /// Check conflict of two keys diff --git a/curp-test-utils/src/test_cmd.rs b/curp-test-utils/src/test_cmd.rs index a66d8688d..221870a4a 100644 --- a/curp-test-utils/src/test_cmd.rs +++ b/curp-test-utils/src/test_cmd.rs @@ -26,10 +26,14 @@ use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE}; pub(crate) const APPLIED_INDEX_KEY: &str = "applied_index"; pub(crate) const LAST_REVISION_KEY: &str = "last_revision"; +/// Test client id +pub const TEST_CLIENT_ID: &str = "test_client_id"; + static NEXT_ID: Lazy = Lazy::new(|| AtomicU64::new(1)); -pub fn next_id() -> u64 { - NEXT_ID.fetch_add(1, Ordering::SeqCst) +pub fn next_id() -> String { + let seq_num = NEXT_ID.fetch_add(1, Ordering::Relaxed); + format!("{TEST_CLIENT_ID}#{seq_num}") } #[derive(Error, Debug, Clone, Serialize, Deserialize)] @@ -66,7 +70,7 @@ pub struct TestCommand { impl Default for TestCommand { fn default() -> Self { Self { - id: next_id().to_string(), + id: next_id(), keys: vec![1], exe_dur: Duration::ZERO, as_dur: Duration::ZERO, @@ -114,7 +118,7 @@ impl PbCodec for TestCommandResult { impl TestCommand { pub fn new_get(keys: Vec) -> Self { Self { - id: next_id().to_string(), + id: next_id(), keys, exe_dur: Duration::ZERO, as_dur: Duration::ZERO, @@ -126,7 +130,7 @@ impl TestCommand { pub fn new_put(keys: Vec, value: u32) -> Self { Self { - id: next_id().to_string(), + id: next_id(), keys, exe_dur: Duration::ZERO, as_dur: Duration::ZERO, diff --git a/curp/Cargo.toml b/curp/Cargo.toml index a955a429d..a9b092976 100644 --- a/curp/Cargo.toml +++ b/curp/Cargo.toml @@ -34,6 +34,7 @@ madsim = { version = "0.2.22", features = ["rpc", "macros"] } opentelemetry = "0.18.0" parking_lot = "0.12.1" prost = "0.11" +rand = "0.8.5" serde = { version = "1.0.130", features = ["derive", "rc"] } thiserror = "1.0.31" tokio = { version = "0.2.23", package = "madsim-tokio", features = [ diff --git a/curp/src/client.rs b/curp/src/client.rs index 36d04681f..a2ac9849b 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -3,7 +3,7 @@ use std::{ time::Duration, }; -use curp_external_api::cmd::PbSerializeError; +use curp_external_api::cmd::{generate_propose_id, PbSerializeError}; use dashmap::DashMap; use event_listener::Event; use futures::{pin_mut, stream::FuturesUnordered, StreamExt}; @@ -772,6 +772,33 @@ where fn all_connects(&self) -> Vec> { self.connects.iter().map(|c| Arc::clone(&c)).collect() } + + /// Get the client id + /// + /// # Errors + /// + /// `ProposeError::Timeout` if timeout + #[allow(clippy::unused_async)] // TODO: grant a client id from server + async fn get_client_id(&self) -> Result { + Ok(rand::random()) + } + + /// New a seq num and record it + #[allow(clippy::unused_self)] // TODO: implement request tracker + fn new_seq_num(&self) -> u64 { + 0 + } + + /// Generate a propose id + /// + /// # Errors + /// `ProposeError::Timeout` if timeout + #[inline] + pub async fn gen_propose_id(&self) -> Result> { + let client_id = self.get_client_id().await?; + let seq_num = self.new_seq_num(); + Ok(generate_propose_id(client_id, seq_num)) + } } /// Get the superquorum for curp protocol diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index 1bcea037d..a817e84f8 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -60,7 +60,7 @@ pub struct CurpNode { pub as_rx: mpsc::UnboundedReceiver<(TestCommand, LogIndex)>, pub role_change_arc: Arc, pub handle: JoinHandle>, - pub trigger: shutdown::Trigger, + pub trigger: Trigger, } pub struct CurpGroup { diff --git a/xline-client/src/clients/auth.rs b/xline-client/src/clients/auth.rs index 2a579c34b..067803e4c 100644 --- a/xline-client/src/clients/auth.rs +++ b/xline-client/src/clients/auth.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use curp::{client::Client as CurpClient, cmd::generate_propose_id}; +use curp::client::Client as CurpClient; use pbkdf2::{ password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, Pbkdf2, @@ -30,8 +30,6 @@ use crate::{ /// Client for Auth operations. #[derive(Clone, Debug)] pub struct AuthClient { - /// Name of the AuthClient, which will be used in CURP propose id generation - name: String, /// The client running the CURP protocol, communicate with all servers. curp_client: Arc>, /// The auth RPC client, only communicate with one server at a time @@ -44,13 +42,11 @@ impl AuthClient { /// Creates a new `AuthClient` #[inline] pub fn new( - name: String, curp_client: Arc>, channel: Channel, token: Option, ) -> Self { Self { - name, curp_client, auth_client: xlineapi::AuthClient::new(AuthService::new( channel, @@ -693,7 +689,7 @@ impl AuthClient { request: Req, use_fast_path: bool, ) -> Result { - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token(request.into(), self.token.clone()); let cmd = Command::new(vec![], request, propose_id); diff --git a/xline-client/src/clients/kv.rs b/xline-client/src/clients/kv.rs index 2ca73a70c..4bc161e7e 100644 --- a/xline-client/src/clients/kv.rs +++ b/xline-client/src/clients/kv.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use curp::{client::Client as CurpClient, cmd::generate_propose_id}; +use curp::client::Client as CurpClient; use xline::server::{Command, KeyRange}; use xlineapi::{ CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, @@ -15,8 +15,6 @@ use crate::{ /// Client for KV operations. #[derive(Clone, Debug)] pub struct KvClient { - /// Name of the kv client, which will be used in CURP propose id generation - name: String, /// The client running the CURP protocol, communicate with all servers. curp_client: Arc>, /// The auth token @@ -26,16 +24,8 @@ pub struct KvClient { impl KvClient { /// New `KvClient` #[inline] - pub(crate) fn new( - name: String, - curp_client: Arc>, - token: Option, - ) -> Self { - Self { - name, - curp_client, - token, - } + pub(crate) fn new(curp_client: Arc>, token: Option) -> Self { + Self { curp_client, token } } /// Put a key-value into the store @@ -65,7 +55,7 @@ impl KvClient { #[inline] pub async fn put(&self, request: PutRequest) -> Result { let key_ranges = vec![KeyRange::new_one_key(request.key())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::PutRequest::from(request).into(), self.token.clone(), @@ -110,7 +100,7 @@ impl KvClient { #[inline] pub async fn range(&self, request: RangeRequest) -> Result { let key_ranges = vec![KeyRange::new(request.key(), request.range_end())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::RangeRequest::from(request).into(), self.token.clone(), @@ -148,7 +138,7 @@ impl KvClient { #[inline] pub async fn delete(&self, request: DeleteRangeRequest) -> Result { let key_ranges = vec![KeyRange::new(request.key(), request.range_end())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::DeleteRangeRequest::from(request).into(), self.token.clone(), @@ -203,7 +193,7 @@ impl KvClient { .iter() .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) .collect(); - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::TxnRequest::from(request).into(), self.token.clone(), @@ -256,7 +246,7 @@ impl KvClient { #[inline] pub async fn compact(&self, request: CompactionRequest) -> Result { let use_fast_path = request.physical(); - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::CompactionRequest::from(request).into(), self.token.clone(), diff --git a/xline-client/src/clients/lease.rs b/xline-client/src/clients/lease.rs index 57fdf036e..56c4526a5 100644 --- a/xline-client/src/clients/lease.rs +++ b/xline-client/src/clients/lease.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use curp::{client::Client as CurpClient, cmd::generate_propose_id}; +use curp::client::Client as CurpClient; use futures::channel::mpsc::channel; use tonic::{transport::Channel, Streaming}; use xline::server::Command; @@ -22,8 +22,6 @@ use crate::{ /// Client for Lease operations. #[derive(Clone, Debug)] pub struct LeaseClient { - /// Name of the LeaseClient, which will be used in CURP propose id generation - name: String, /// The client running the CURP protocol, communicate with all servers. curp_client: Arc>, /// The lease RPC client, only communicate with one server at a time @@ -38,14 +36,12 @@ impl LeaseClient { /// Creates a new `LeaseClient` #[inline] pub fn new( - name: String, curp_client: Arc>, channel: Channel, token: Option, id_gen: Arc, ) -> Self { Self { - name, curp_client, lease_client: xlineapi::LeaseClient::new(AuthService::new( channel, @@ -85,7 +81,7 @@ impl LeaseClient { /// ``` #[inline] pub async fn grant(&self, mut request: LeaseGrantRequest) -> Result { - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; if request.inner.id == 0 { request.inner.id = self.id_gen.next(); } @@ -260,7 +256,7 @@ impl LeaseClient { /// ``` #[inline] pub async fn leases(&self) -> Result { - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new_with_token( xlineapi::LeaseLeasesRequest {}.into(), self.token.clone(), diff --git a/xline-client/src/clients/lock.rs b/xline-client/src/clients/lock.rs index 8e8407501..e78084942 100644 --- a/xline-client/src/clients/lock.rs +++ b/xline-client/src/clients/lock.rs @@ -7,10 +7,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; -use curp::{ - client::Client as CurpClient, - cmd::{generate_propose_id, ProposeId}, -}; +use curp::{client::Client as CurpClient, cmd::ProposeId}; use futures::{Future, FutureExt}; use tonic::transport::Channel; use xline::server::{Command, CommandResponse, KeyRange, SyncResponse}; @@ -35,8 +32,6 @@ use crate::{ /// Client for Lock operations. #[derive(Clone, Debug)] pub struct LockClient { - /// Name of the LockClient - name: String, /// The client running the CURP protocol, communicate with all servers. curp_client: Arc>, /// The lease client @@ -53,22 +48,14 @@ impl LockClient { /// Creates a new `LockClient` #[inline] pub fn new( - name: String, curp_client: Arc>, channel: Channel, token: Option, id_gen: Arc, ) -> Self { Self { - name: name.clone(), curp_client: Arc::clone(&curp_client), - lease_client: LeaseClient::new( - name, - curp_client, - channel.clone(), - token.clone(), - id_gen, - ), + lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen), watch_client: WatchClient::new(channel, token.clone()), token, } @@ -277,7 +264,7 @@ impl LockClient { { let request_with_token = RequestWithToken::new_with_token(request.into(), self.token.clone()); - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let cmd = Self::command_from_request_wrapper(propose_id, request_with_token); self.curp_client diff --git a/xline-client/src/lib.rs b/xline-client/src/lib.rs index 6971516c1..2602acf69 100644 --- a/xline-client/src/lib.rs +++ b/xline-client/src/lib.rs @@ -213,7 +213,6 @@ impl Client { .into_iter() .map(|addr| addr.as_ref().to_owned()) .collect(); - let name = String::from("client"); let channel = Self::build_channel(addrs.clone()).await?; let curp_client = Arc::new( CurpClient::builder() @@ -225,12 +224,7 @@ impl Client { let token = match options.user { Some((username, password)) => { - let mut tmp_auth = AuthClient::new( - name.clone(), - Arc::clone(&curp_client), - channel.clone(), - None, - ); + let mut tmp_auth = AuthClient::new(Arc::clone(&curp_client), channel.clone(), None); let resp = tmp_auth .authenticate(types::auth::AuthenticateRequest::new(username, password)) .await?; @@ -240,22 +234,20 @@ impl Client { None => None, }; - let kv = KvClient::new(name.clone(), Arc::clone(&curp_client), token.clone()); + let kv = KvClient::new(Arc::clone(&curp_client), token.clone()); let lease = LeaseClient::new( - name.clone(), Arc::clone(&curp_client), channel.clone(), token.clone(), Arc::clone(&id_gen), ); let lock = LockClient::new( - name.clone(), Arc::clone(&curp_client), channel.clone(), token.clone(), id_gen, ); - let auth = AuthClient::new(name.clone(), curp_client, channel.clone(), token.clone()); + let auth = AuthClient::new(curp_client, channel.clone(), token.clone()); let maintenance = MaintenanceClient::new(channel.clone(), token.clone()); let watch = WatchClient::new(channel, token); let cluster = ClusterClient::new(); diff --git a/xline/src/client/mod.rs b/xline/src/client/mod.rs index 9e4aa8081..437986498 100644 --- a/xline/src/client/mod.rs +++ b/xline/src/client/mod.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use curp::{client::Client as CurpClient, cmd::generate_propose_id}; +use curp::client::Client as CurpClient; use etcd_client::{ AuthClient, Client as EtcdClient, KvClient, LeaseClient, LeaseKeepAliveStream, LeaseKeeper, LockClient, MaintenanceClient, WatchClient, @@ -95,7 +95,7 @@ impl Client { pub async fn put(&mut self, request: PutRequest) -> Result { if self.use_curp_client { let key_ranges = vec![KeyRange::new_one_key(request.key())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new(rpc::PutRequest::from(request).into()); let cmd = Command::new(key_ranges, request, propose_id); let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?; @@ -119,7 +119,7 @@ impl Client { pub async fn range(&mut self, request: RangeRequest) -> Result { if self.use_curp_client { let key_ranges = vec![KeyRange::new(request.key(), request.range_end())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new(rpc::RangeRequest::from(request).into()); let cmd = Command::new(key_ranges, request, propose_id); let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?; @@ -143,7 +143,7 @@ impl Client { ) -> Result { if self.use_curp_client { let key_ranges = vec![KeyRange::new(request.key(), request.range_end())]; - let propose_id = generate_propose_id(&self.name); + let propose_id = self.curp_client.gen_propose_id().await?; let request = RequestWithToken::new(rpc::DeleteRangeRequest::from(request).into()); let cmd = Command::new(key_ranges, request, propose_id); let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?; diff --git a/xline/src/server/auth_server.rs b/xline/src/server/auth_server.rs index 5bbca1f58..d1ee27b6e 100644 --- a/xline/src/server/auth_server.rs +++ b/xline/src/server/auth_server.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, sync::Arc}; -use curp::{client::Client, cmd::generate_propose_id}; +use curp::client::Client; use pbkdf2::{ password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, Pbkdf2, @@ -38,8 +38,6 @@ where { /// Consensus client client: Arc>, - /// Server name - name: String, /// Phantom phantom: PhantomData, } @@ -57,10 +55,9 @@ where S: StorageApi, { /// New `AuthServer` - pub(crate) fn new(client: Arc>, name: String) -> Self { + pub(crate) fn new(client: Arc>) -> Self { Self { client, - name, phantom: PhantomData, } } @@ -76,7 +73,12 @@ where { let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper::(generate_propose_id(&self.name), wrapper, None); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; + let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 2fb20ed77..a4e3699aa 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -1,9 +1,6 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; -use curp::{ - client::{Client, ReadState}, - cmd::generate_propose_id, -}; +use curp::client::{Client, ReadState}; use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; @@ -44,8 +41,6 @@ where range_retry_timeout: Duration, /// Consensus client client: Arc>, - /// Server name - name: String, } impl KvServer @@ -60,7 +55,6 @@ where id_barrier: Arc, range_retry_timeout: Duration, client: Arc>, - name: String, ) -> Self { Self { kv_storage, @@ -69,7 +63,6 @@ where id_barrier, range_retry_timeout, client, - name, } } @@ -101,7 +94,12 @@ where { let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper::(generate_propose_id(&self.name), wrapper, None); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; + let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) @@ -205,7 +203,11 @@ where let is_serializable = range_req.serializable; let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let propose_id = generate_propose_id(&self.name); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); if !is_serializable { self.wait_read_state(&cmd).await?; @@ -302,7 +304,11 @@ where let is_serializable = txn_req.is_serializable(); let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let propose_id = generate_propose_id(&self.name); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; let cmd = command_from_request_wrapper::(propose_id, wrapper, None); if !is_serializable { self.wait_read_state(&cmd).await?; diff --git a/xline/src/server/lease_server.rs b/xline/src/server/lease_server.rs index 89248f529..8ca3b44b0 100644 --- a/xline/src/server/lease_server.rs +++ b/xline/src/server/lease_server.rs @@ -2,7 +2,7 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use async_stream::{stream, try_stream}; use clippy_utilities::Cast; -use curp::{client::Client, cmd::generate_propose_id, members::ClusterInfo}; +use curp::{client::Client, members::ClusterInfo}; use futures::stream::Stream; use tokio::time; use tonic::transport::Endpoint; @@ -123,11 +123,13 @@ where { let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper( - generate_propose_id(self.cluster_info.self_name().as_ref()), - wrapper, - Some(self.lease_storage.as_ref()), - ); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; + let cmd = + command_from_request_wrapper(propose_id, wrapper, Some(self.lease_storage.as_ref())); self.client .propose(cmd, use_fast_path) diff --git a/xline/src/server/lock_server.rs b/xline/src/server/lock_server.rs index 16a6e99af..b82eb743f 100644 --- a/xline/src/server/lock_server.rs +++ b/xline/src/server/lock_server.rs @@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::Arc}; use async_stream::stream; use clippy_utilities::OverflowArithmetic; -use curp::{client::Client, cmd::generate_propose_id}; +use curp::client::Client; use etcd_client::EventType; use tonic::transport::{Channel, Endpoint}; use tracing::debug; @@ -37,8 +37,6 @@ pub(super) struct LockServer { client: Arc>, /// Id Generator id_gen: Arc, - /// Cluster information - name: String, /// Server addresses addrs: Vec, /// Phantom @@ -53,7 +51,6 @@ where pub(super) fn new( client: Arc>, id_gen: Arc, - name: String, addrs: Vec, ) -> Self { let addrs = addrs @@ -69,7 +66,6 @@ where Self { client, id_gen, - name, addrs, phantom: PhantomData, } @@ -86,7 +82,12 @@ where T: Into, { let wrapper = RequestWithToken::new_with_token(request.into(), token); - let cmd = command_from_request_wrapper::(generate_propose_id(&self.name), wrapper, None); + let propose_id = self + .client + .gen_propose_id() + .await + .map_err(propose_err_to_status)?; + let cmd = command_from_request_wrapper::(propose_id, wrapper, None); self.client .propose(cmd, use_fast_path) diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 4a2f6b329..13197dd89 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -379,12 +379,10 @@ impl XlineServer { id_barrier, *self.server_timeout.range_retry_timeout(), Arc::clone(&client), - self.cluster_info.self_name(), ), LockServer::new( Arc::clone(&client), Arc::clone(&id_gen), - self.cluster_info.self_name(), self.cluster_info.self_addrs(), ), LeaseServer::new( @@ -395,7 +393,7 @@ impl XlineServer { Arc::clone(&self.cluster_info), self.shutdown_trigger.subscribe(), ), - AuthServer::new(client, self.cluster_info.self_name()), + AuthServer::new(client), WatchServer::new( watcher, Arc::clone(&header_gen), diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 361ba8148..a6e24cff9 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,21 +1,19 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use curp::{ - client::Client, - cmd::generate_propose_id, - error::CommandProposeError::{AfterSync, Execute}, -}; +use curp::client::Client; +use curp::error::CommandProposeError; use event_listener::Event; use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; + use utils::{config::AutoCompactConfig, shutdown}; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, - ExecuteError, KvStore, + KvStore, }; use crate::{ revision_number::RevisionNumberGenerator, @@ -48,30 +46,21 @@ pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { #[async_trait] pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { /// do compact - async fn compact(&self, revision: i64) -> Result<(), ExecuteError>; + async fn compact(&self, revision: i64) -> Result<(), CommandProposeError>; } #[async_trait] impl Compactable for Client { - async fn compact(&self, revision: i64) -> Result<(), ExecuteError> { + async fn compact(&self, revision: i64) -> Result<(), CommandProposeError> { let request = CompactionRequest { revision, physical: false, }; let request_wrapper = RequestWithToken::new_with_token(request.into(), None); - let propose_id = generate_propose_id("auto-compactor"); + let propose_id = self.gen_propose_id().await?; let cmd = Command::new(vec![], request_wrapper, propose_id); - if let Err(e) = self.propose(cmd, true).await { - #[allow(clippy::wildcard_enum_match_arm)] - match e { - Execute(e) | AfterSync(e) => Err(e), - _ => { - unreachable!("Compaction should not receive any errors other than ExecuteError, but it receives {e:?}"); - } - } - } else { - Ok(()) - } + let _ig = self.propose(cmd, true).await?; + Ok(()) } } diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index a2915b408..396ca477f 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -8,6 +8,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; +use curp::error::CommandProposeError; use tracing::{info, warn}; use utils::shutdown; @@ -107,32 +108,35 @@ impl PeriodicCompactor { revision, self.period ); - if let Err(e) = self.client.compact(revision).await { - if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { - info!( - "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", - revision, - compacted_rev, - self.period, - now.elapsed().as_secs() - ); - Some(compacted_rev) - } else { - warn!( - "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", - revision, self.period, e - ); - None - } - } else { + let res = self.client.compact(revision).await; + if res.is_ok() { info!( "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", revision, self.period, now.elapsed().as_secs() ); - target_revision + return target_revision; } + if let Err( + CommandProposeError::Execute(ExecuteError::RevisionCompacted(_, compacted_rev)) + | CommandProposeError::AfterSync(ExecuteError::RevisionCompacted(_, compacted_rev)), + ) = res + { + info!( + "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", + revision, + compacted_rev, + self.period, + now.elapsed().as_secs() + ); + return Some(compacted_rev); + } + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, result: {:?}", + revision, self.period, res + ); + None } } diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index c72e5f459..5d2ae7a12 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -7,6 +7,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; +use curp::error::CommandProposeError; use tracing::{info, warn}; use utils::shutdown; @@ -65,31 +66,35 @@ impl RevisionCompactor { "starting auto revision compaction, revision = {}, retention = {}", target_revision, self.retention ); - if let Err(e) = self.client.compact(target_revision).await { - if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { - info!( - "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", - target_revision, - compacted_rev, - self.retention, - ); - Some(compacted_rev) - } else { - warn!( - "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", - target_revision, self.retention, e - ); - None - } - } else { + + let res = self.client.compact(target_revision).await; + if res.is_ok() { info!( "completed auto revision compaction, revision = {}, retention = {}, took {:?}", target_revision, self.retention, now.elapsed().as_secs() ); - Some(target_revision) + return Some(target_revision); + } + if let Err( + CommandProposeError::Execute(ExecuteError::RevisionCompacted(_, compacted_rev)) + | CommandProposeError::AfterSync(ExecuteError::RevisionCompacted(_, compacted_rev)), + ) = res + { + info!( + "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", + target_revision, + compacted_rev, + self.retention, + ); + return Some(compacted_rev); } + warn!( + "failed auto revision compaction, revision = {}, retention = {}, result: {:?}", + target_revision, self.retention, res + ); + None } }