Skip to content

Commit

Permalink
refactor: refactor propose id into <client_id>-<seq-num>
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 9, 2023
1 parent 6408669 commit c7526ff
Show file tree
Hide file tree
Showing 21 changed files with 172 additions and 164 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion curp-external-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ mockall = "0.11.3"
prost = "0.11"
serde = { version = "1.0.130", features = ["derive", "rc"] }
thiserror = "1.0.31"
uuid = { version = "1.1.2", features = ["v4"] }
17 changes: 13 additions & 4 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_trait::async_trait;
use engine::Snapshot;
use prost::DecodeError;
use serde::{de::DeserializeOwned, Serialize};
use uuid::Uuid;

use crate::LogIndex;

Expand Down Expand Up @@ -79,11 +78,21 @@ pub trait Command:
/// Command Id wrapper, abstracting underlying implementation
pub type ProposeId = String;

/// Generate propose id with the given prefix
/// Generate propose id with client id and seq num
#[inline]
#[must_use]
pub fn generate_propose_id(prefix: &str) -> ProposeId {
format!("{}-{}", prefix, Uuid::new_v4())
pub fn generate_propose_id(client_id: u64, seq_num: u64) -> ProposeId {
format!("{client_id}#{seq_num}")
}

/// Parse propose id to (`client_id`, `seq_num`)
#[inline]
#[must_use]
pub fn parse_propose_id(id: &ProposeId) -> Option<(u64, u64)> {
let mut iter = id.split('#');
let client_id = iter.next()?.parse().ok()?;
let seq_num: u64 = iter.next()?.parse().ok()?;
Some((client_id, seq_num))
}

/// Check conflict of two keys
Expand Down
14 changes: 9 additions & 5 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE};
pub(crate) const APPLIED_INDEX_KEY: &str = "applied_index";
pub(crate) const LAST_REVISION_KEY: &str = "last_revision";

/// Test client id
pub const TEST_CLIENT_ID: &str = "test_client_id";

static NEXT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(1));

pub fn next_id() -> u64 {
NEXT_ID.fetch_add(1, Ordering::SeqCst)
pub fn next_id() -> String {
let seq_num = NEXT_ID.fetch_add(1, Ordering::Relaxed);
format!("{TEST_CLIENT_ID}#{seq_num}")
}

#[derive(Error, Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -66,7 +70,7 @@ pub struct TestCommand {
impl Default for TestCommand {
fn default() -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys: vec![1],
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand Down Expand Up @@ -114,7 +118,7 @@ impl PbCodec for TestCommandResult {
impl TestCommand {
pub fn new_get(keys: Vec<u32>) -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys,
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand All @@ -126,7 +130,7 @@ impl TestCommand {

pub fn new_put(keys: Vec<u32>, value: u32) -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys,
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand Down
1 change: 1 addition & 0 deletions curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ madsim = { version = "0.2.22", features = ["rpc", "macros"] }
opentelemetry = "0.18.0"
parking_lot = "0.12.1"
prost = "0.11"
rand = "0.8.5"
serde = { version = "1.0.130", features = ["derive", "rc"] }
thiserror = "1.0.31"
tokio = { version = "0.2.23", package = "madsim-tokio", features = [
Expand Down
29 changes: 28 additions & 1 deletion curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::Duration,
};

use curp_external_api::cmd::PbSerializeError;
use curp_external_api::cmd::{generate_propose_id, PbSerializeError};
use dashmap::DashMap;
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -772,6 +772,33 @@ where
fn all_connects(&self) -> Vec<Arc<dyn ConnectApi>> {
self.connects.iter().map(|c| Arc::clone(&c)).collect()
}

/// Get the client id
///
/// # Errors
///
/// `ProposeError::Timeout` if timeout
#[allow(clippy::unused_async)] // TODO: grant a client id from server
async fn get_client_id(&self) -> Result<u64, ProposeError> {
Ok(rand::random())
}

/// New a seq num and record it
#[allow(clippy::unused_self)] // TODO: implement request tracker
fn new_seq_num(&self) -> u64 {
0
}

/// Generate a propose id
///
/// # Errors
/// `ProposeError::Timeout` if timeout
#[inline]
pub async fn gen_propose_id(&self) -> Result<ProposeId, CommandProposeError<C>> {
let client_id = self.get_client_id().await?;
let seq_num = self.new_seq_num();
Ok(generate_propose_id(client_id, seq_num))
}
}

/// Get the superquorum for curp protocol
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct CurpNode {
pub as_rx: mpsc::UnboundedReceiver<(TestCommand, LogIndex)>,
pub role_change_arc: Arc<TestRoleChangeInner>,
pub handle: JoinHandle<Result<(), ServerError>>,
pub trigger: shutdown::Trigger,
pub trigger: Trigger,
}

pub struct CurpGroup {
Expand Down
8 changes: 2 additions & 6 deletions xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use pbkdf2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Pbkdf2,
Expand Down Expand Up @@ -30,8 +30,6 @@ use crate::{
/// Client for Auth operations.
#[derive(Clone, Debug)]
pub struct AuthClient {
/// Name of the AuthClient, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The auth RPC client, only communicate with one server at a time
Expand All @@ -44,13 +42,11 @@ impl AuthClient {
/// Creates a new `AuthClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
) -> Self {
Self {
name,
curp_client,
auth_client: xlineapi::AuthClient::new(AuthService::new(
channel,
Expand Down Expand Up @@ -693,7 +689,7 @@ impl AuthClient {
request: Req,
use_fast_path: bool,
) -> Result<Res> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = Command::new(vec![], request, propose_id);

Expand Down
26 changes: 8 additions & 18 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use xline::server::{Command, KeyRange};
use xlineapi::{
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
Expand All @@ -15,8 +15,6 @@ use crate::{
/// Client for KV operations.
#[derive(Clone, Debug)]
pub struct KvClient {
/// Name of the kv client, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The auth token
Expand All @@ -26,16 +24,8 @@ pub struct KvClient {
impl KvClient {
/// New `KvClient`
#[inline]
pub(crate) fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
token: Option<String>,
) -> Self {
Self {
name,
curp_client,
token,
}
pub(crate) fn new(curp_client: Arc<CurpClient<Command>>, token: Option<String>) -> Self {
Self { curp_client, token }
}

/// Put a key-value into the store
Expand Down Expand Up @@ -65,7 +55,7 @@ impl KvClient {
#[inline]
pub async fn put(&self, request: PutRequest) -> Result<PutResponse> {
let key_ranges = vec![KeyRange::new_one_key(request.key())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::PutRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -110,7 +100,7 @@ impl KvClient {
#[inline]
pub async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::RangeRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -148,7 +138,7 @@ impl KvClient {
#[inline]
pub async fn delete(&self, request: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::DeleteRangeRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -203,7 +193,7 @@ impl KvClient {
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect();
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::TxnRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -256,7 +246,7 @@ impl KvClient {
#[inline]
pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
let use_fast_path = request.physical();
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
Expand Down
10 changes: 3 additions & 7 deletions xline-client/src/clients/lease.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use futures::channel::mpsc::channel;
use tonic::{transport::Channel, Streaming};
use xline::server::Command;
Expand All @@ -22,8 +22,6 @@ use crate::{
/// Client for Lease operations.
#[derive(Clone, Debug)]
pub struct LeaseClient {
/// Name of the LeaseClient, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The lease RPC client, only communicate with one server at a time
Expand All @@ -38,14 +36,12 @@ impl LeaseClient {
/// Creates a new `LeaseClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
Self {
name,
curp_client,
lease_client: xlineapi::LeaseClient::new(AuthService::new(
channel,
Expand Down Expand Up @@ -85,7 +81,7 @@ impl LeaseClient {
/// ```
#[inline]
pub async fn grant(&self, mut request: LeaseGrantRequest) -> Result<LeaseGrantResponse> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
if request.inner.id == 0 {
request.inner.id = self.id_gen.next();
}
Expand Down Expand Up @@ -260,7 +256,7 @@ impl LeaseClient {
/// ```
#[inline]
pub async fn leases(&self) -> Result<LeaseLeasesResponse> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::LeaseLeasesRequest {}.into(),
self.token.clone(),
Expand Down
19 changes: 3 additions & 16 deletions xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
};

use clippy_utilities::OverflowArithmetic;
use curp::{
client::Client as CurpClient,
cmd::{generate_propose_id, ProposeId},
};
use curp::{client::Client as CurpClient, cmd::ProposeId};
use futures::{Future, FutureExt};
use tonic::transport::Channel;
use xline::server::{Command, CommandResponse, KeyRange, SyncResponse};
Expand All @@ -35,8 +32,6 @@ use crate::{
/// Client for Lock operations.
#[derive(Clone, Debug)]
pub struct LockClient {
/// Name of the LockClient
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The lease client
Expand All @@ -53,22 +48,14 @@ impl LockClient {
/// Creates a new `LockClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
Self {
name: name.clone(),
curp_client: Arc::clone(&curp_client),
lease_client: LeaseClient::new(
name,
curp_client,
channel.clone(),
token.clone(),
id_gen,
),
lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen),
watch_client: WatchClient::new(channel, token.clone()),
token,
}
Expand Down Expand Up @@ -277,7 +264,7 @@ impl LockClient {
{
let request_with_token =
RequestWithToken::new_with_token(request.into(), self.token.clone());
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;

let cmd = Self::command_from_request_wrapper(propose_id, request_with_token);
self.curp_client
Expand Down
Loading

0 comments on commit c7526ff

Please sign in to comment.