Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/curp client tests suits #582

Merged
merged 6 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 89 additions & 23 deletions curp/src/client_new/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@
/// Retry layer
mod retry;

/// Tests for client
#[cfg(test)]
mod tests;

use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::{stream::FuturesUnordered, StreamExt};
use utils::config::ClientConfig;

use self::{
retry::Retry,
unary::{UnaryBuilder, UnaryConfig},
};
use crate::{
client_new::retry::RetryConfig,
members::ServerId,
rpc::{
connect::ConnectApi, protocol_client::ProtocolClient, ConfChange, FetchClusterRequest,
FetchClusterResponse, Member, ReadState,
FetchClusterResponse, Member, Protocol, ReadState,
},
};

Expand Down Expand Up @@ -76,17 +85,17 @@
#[inline]
async fn fetch_leader_id(&self, linearizable: bool) -> Result<ServerId, Self::Error> {
if linearizable {
let resp = self.fetch_cluster(true).await?;
return Ok(resp.leader_id.unwrap_or_else(|| {
unreachable!("linearizable fetch cluster should return a leader id")
}));

Check warning on line 91 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L88-L91

Added lines #L88 - L91 were not covered by tests
}
let resp = self.fetch_cluster(false).await?;
if let Some(id) = resp.leader_id {
return Ok(id);
}
// fallback to linearizable fetch
self.fetch_leader_id(true).await

Check warning on line 98 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L96-L98

Added lines #L96 - L98 were not covered by tests
}
}

Expand All @@ -98,94 +107,151 @@
}

/// Client builder to build a client
#[derive(Debug, Clone)]

Check warning on line 110 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L110

Added line #L110 was not covered by tests
pub struct ClientBuilder {
/// local server id
local_server_id: Option<ServerId>,
pub struct ClientBuilder<P: Protocol> {
/// local server
local_server: Option<(ServerId, P)>,
/// initial cluster version
cluster_version: Option<u64>,
/// initial cluster members
all_members: Option<HashMap<ServerId, Vec<String>>>,
/// initial leader state
leader_state: Option<(ServerId, u64)>,
/// client configuration
config: ClientConfig,
/// initial all members
all_members: Option<HashMap<ServerId, Vec<String>>>,
}

impl ClientBuilder {
impl<P: Protocol> ClientBuilder<P> {
/// Create a client builder
#[inline]
#[must_use]
pub fn new(config: ClientConfig) -> Self {
Self {
local_server_id: None,
config,
all_members: None,
local_server: None,
cluster_version: None,
all_members: None,
leader_state: None,
config,
}
}

Check warning on line 136 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L128-L136

Added lines #L128 - L136 were not covered by tests

/// Set the local server id
/// Set the local server to bypass `gRPC` request
#[inline]
#[must_use]
pub fn local_server_id(&mut self, id: ServerId) -> &mut Self {
self.local_server_id = Some(id);
pub fn bypass(&mut self, id: ServerId, server: P) -> &mut Self {
self.local_server = Some((id, server));
self
}

Check warning on line 144 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L141-L144

Added lines #L141 - L144 were not covered by tests

/// Set the initial cluster version
#[inline]
#[must_use]
pub fn cluster_version(&mut self, cluster_version: u64) -> &mut Self {
self.cluster_version = Some(cluster_version);
self
}

Check warning on line 152 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L149-L152

Added lines #L149 - L152 were not covered by tests

/// Fetch initial all members from some endpoints if you do not know the whole members
/// Set the initial all members
#[inline]
#[must_use]
pub fn all_members(&mut self, all_members: HashMap<ServerId, Vec<String>>) -> &mut Self {
self.all_members = Some(all_members);
self
}

Check warning on line 160 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L157-L160

Added lines #L157 - L160 were not covered by tests

/// Set the initial leader state
#[inline]
#[must_use]
pub fn leader_state(&mut self, leader_id: ServerId, term: u64) -> &mut Self {
self.leader_state = Some((leader_id, term));
self
}

Check warning on line 168 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L165-L168

Added lines #L165 - L168 were not covered by tests

/// Discover the initial states from some endpoints
///
/// # Errors
///
/// Return `tonic::Status` for connection failure or some server errors.
#[inline]
pub async fn fetch_all_members(
&mut self,
addrs: Vec<String>,
) -> Result<&mut Self, tonic::Status> {
pub async fn discover_from(&mut self, addrs: Vec<String>) -> Result<&mut Self, tonic::Status> {
let propose_timeout = *self.config.propose_timeout();
let mut futs: FuturesUnordered<_> = addrs
.into_iter()
.map(|mut addr| {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
async move {
let mut protocol_client = ProtocolClient::connect(addr).await.map_err(|e| {
tonic::Status::cancelled(format!("cannot connect to addr, error: {e}"))
})?;
let mut req = tonic::Request::new(FetchClusterRequest::default());
req.set_timeout(propose_timeout);
let fetch_cluster_res = protocol_client.fetch_cluster(req).await?.into_inner();
Ok::<FetchClusterResponse, tonic::Status>(fetch_cluster_res)
}
})
.collect();
let mut err = tonic::Status::invalid_argument("addrs is empty");

Check warning on line 195 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L176-L195

Added lines #L176 - L195 were not covered by tests
// find the first one return `FetchClusterResponse`
while let Some(r) = futs.next().await {
match r {
Ok(r) => {
self.cluster_version = Some(r.cluster_version);
if let Some(id) = r.leader_id {
self.leader_state = Some((id, r.term));
}
self.all_members = Some(r.into_members_addrs());
return Ok(self);

Check warning on line 205 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L197-L205

Added lines #L197 - L205 were not covered by tests
}
Err(e) => err = e,

Check warning on line 207 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L207

Added line #L207 was not covered by tests
}
}
Err(err)
}

Check warning on line 211 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L210-L211

Added lines #L210 - L211 were not covered by tests

/// Set the initial all members
/// Build the client
///
/// # Errors
///
/// Return `tonic::transport::Error` for connection failure.
#[inline]
#[must_use]
pub fn set_all_members(&mut self, all_members: HashMap<ServerId, Vec<String>>) -> &mut Self {
self.all_members = Some(all_members);
self
pub async fn build<C: Command>(
self,
) -> Result<impl ClientApi<Error = tonic::Status, Cmd = C>, tonic::transport::Error> {
let mut builder = UnaryBuilder::<P>::new(
self.all_members.unwrap_or_else(|| {
unreachable!("must set the initial members or discover from some endpoints")
}),
UnaryConfig::new_full(
*self.config.propose_timeout(),
*self.config.wait_synced_timeout(),
),
);
if let Some(version) = self.cluster_version {
builder.set_cluster_version(version);
}
if let Some((id, server)) = self.local_server {
builder.set_local_server(id, server);
}
if let Some((id, term)) = self.leader_state {
builder.set_leader_state(id, term);
}
bsbds marked this conversation as resolved.
Show resolved Hide resolved
let unary = builder.build::<C>().await?;

Check warning on line 240 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L219-L240

Added lines #L219 - L240 were not covered by tests

let retry_config = if *self.config.use_backoff() {
RetryConfig::new_exponential(
*self.config.initial_retry_timeout(),
*self.config.max_retry_timeout(),
*self.config.retry_count(),
)

Check warning on line 247 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L242-L247

Added lines #L242 - L247 were not covered by tests
} else {
RetryConfig::new_fixed(
*self.config.initial_retry_timeout(),
*self.config.retry_count(),
)

Check warning on line 252 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L249-L252

Added lines #L249 - L252 were not covered by tests
};
let client = Retry::new(unary, retry_config);
Ok(client)
}

Check warning on line 256 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L254-L256

Added lines #L254 - L256 were not covered by tests
}
38 changes: 34 additions & 4 deletions curp/src/client_new/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/// Backoff config
#[derive(Debug, Clone)]
pub(super) enum BackoffConfig {
enum BackoffConfig {
/// A fixed delay backoff
Fixed,
/// A exponential delay backoff
Expand All @@ -41,7 +41,7 @@
}

/// Backoff tool
#[derive(Debug)]

Check warning on line 44 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L44

Added line #L44 was not covered by tests
struct Backoff {
/// The retry config
config: RetryConfig,
Expand All @@ -53,7 +53,7 @@

impl RetryConfig {
/// Create a fixed retry config
fn new_fixed(delay: Duration, count: usize) -> Self {
pub(super) fn new_fixed(delay: Duration, count: usize) -> Self {
assert!(count > 0, "retry count should be larger than 0");
Self {
backoff: BackoffConfig::Fixed,
Expand All @@ -63,7 +63,7 @@
}

/// Create a exponential retry config
fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self {
pub(super) fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self {
assert!(count > 0, "retry count should be larger than 0");
Self {
backoff: BackoffConfig::Exponential { max_delay },
Expand Down Expand Up @@ -103,7 +103,7 @@

/// The retry client automatically retry the requests of the inner client api
/// which raises the [`tonic::Status`] error
#[derive(Debug)]

Check warning on line 106 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L106

Added line #L106 was not covered by tests
pub(super) struct Retry<Api> {
/// Inner client
inner: Api,
Expand All @@ -116,7 +116,7 @@
Api: ClientApi<Error = CurpError> + LeaderStateUpdate + Send + Sync + 'static,
{
/// Create a retry client
fn new(inner: Api, config: RetryConfig) -> Self {
pub(super) fn new(inner: Api, config: RetryConfig) -> Self {
Self { inner, config }
}

Expand All @@ -128,7 +128,7 @@
let mut backoff = self.config.init_backoff();
while let Some(delay) = backoff.next_delay() {
let err = match f(&self.inner).await {
Ok(res) => return Ok(res),

Check warning on line 131 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L131

Added line #L131 was not covered by tests
Err(err) => err,
};

Expand All @@ -152,15 +152,15 @@
// update the cluster state if got WrongClusterVersion
CurpError::WrongClusterVersion(_) => {
// the inner client should automatically update cluster state when fetch_cluster
if let Err(e) = self.inner.fetch_cluster(false).await {
warn!("fetch cluster failed, error {e:?}");
}

Check warning on line 157 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L155-L157

Added lines #L155 - L157 were not covered by tests
}

// update the leader state if got Redirect
CurpError::Redirect(Redirect { leader_id, term }) => {
self.inner.update_leader(leader_id, term);
}

Check warning on line 163 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L161-L163

Added lines #L161 - L163 were not covered by tests
}

warn!("retry on {} seconds later", delay.as_secs_f32());
Expand All @@ -183,9 +183,9 @@
type Cmd = Api::Cmd;

/// Get the local connection when the client is on the server node.
async fn local_connect(&self) -> Option<Arc<dyn ConnectApi>> {
self.inner.local_connect().await
}

Check warning on line 188 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L186-L188

Added lines #L186 - L188 were not covered by tests

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
Expand All @@ -199,49 +199,79 @@
}

/// Send propose configuration changes to the cluster
async fn propose_conf_change(
&self,
changes: Vec<ConfChange>,
) -> Result<Vec<Member>, tonic::Status> {
self.retry::<_, _>(|client| {
let changes_c = changes.clone();
async move { client.propose_conf_change(changes_c).await }
})
.await
}

Check warning on line 211 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L202-L211

Added lines #L202 - L211 were not covered by tests

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), tonic::Status> {
self.retry::<_, _>(ClientApi::propose_shutdown).await
}

Check warning on line 216 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L214-L216

Added lines #L214 - L216 were not covered by tests

/// Send propose to publish a node id and name
async fn propose_publish(
&self,
node_id: ServerId,
node_name: String,
) -> Result<(), Self::Error> {
self.retry::<_, _>(|client| {
let name_c = node_name.clone();
async move { client.propose_publish(node_id, name_c).await }
})
.await
}

Check warning on line 229 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L219-L229

Added lines #L219 - L229 were not covered by tests

/// Send fetch read state from leader
async fn fetch_read_state(&self, cmd: &Self::Cmd) -> Result<ReadState, tonic::Status> {
self.retry::<_, _>(|client| client.fetch_read_state(cmd))
.await
}

Check warning on line 235 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L232-L235

Added lines #L232 - L235 were not covered by tests

/// Send fetch cluster requests to all servers (That's because initially, we didn't
/// know who the leader is.)
/// Note: The fetched cluster may still be outdated if `linearizable` is false
async fn fetch_cluster(
&self,
linearizable: bool,
) -> Result<FetchClusterResponse, tonic::Status> {
self.retry::<_, _>(|client| client.fetch_cluster(linearizable))
.await
}

Check warning on line 246 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L240-L246

Added lines #L240 - L246 were not covered by tests
}

/// Tests for backoff
#[cfg(test)]
mod tests {
use std::time::Duration;

use super::RetryConfig;

#[test]
fn test_fixed_backoff_works() {
let config = RetryConfig::new_fixed(Duration::from_secs(1), 3);
let mut backoff = config.init_backoff();
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1)));
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1)));
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1)));
assert_eq!(backoff.next_delay(), None);
}

#[test]
fn test_exponential_backoff_works() {
let config =
RetryConfig::new_exponential(Duration::from_secs(1), Duration::from_secs(5), 4);
let mut backoff = config.init_backoff();
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(1)));
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(4)));
assert_eq!(backoff.next_delay(), Some(Duration::from_secs(5))); // 8 > 5
assert_eq!(backoff.next_delay(), None);
}
}
Loading
Loading