Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tests in v0.7 branch #954

Merged
merged 19 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ignored = ["prost", "workspace-hack"]

[patch.crates-io]
# This branch update the tonic version for madsim. We should switch to the original etcd-client crate when new version release.
madsim = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" }
madsim-tonic = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" }
madsim-tonic-build = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" }
madsim-tokio = { git = "https://github.com/Phoenix500526/madsim.git", branch = "update-tonic" }
madsim = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" }
madsim-tonic = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" }
madsim-tonic-build = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" }
madsim-tokio = { git = "https://github.com/bsbds/madsim.git", branch = "fix-client-stream" }
1 change: 1 addition & 0 deletions crates/benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = "1.0.83"
clap = { version = "4", features = ["derive"] }
clippy-utilities = "0.2.0"
etcd-client = { version = "0.13.0", features = ["tls"] }
futures = "0.3.30"
indicatif = "0.17.8"
rand = "0.8.5"
thiserror = "1.0.61"
Expand Down
16 changes: 10 additions & 6 deletions crates/benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use anyhow::Result;
use clippy_utilities::{NumericCast, OverflowArithmetic};
use futures::future::join_all;
use indicatif::ProgressBar;
use rand::RngCore;
use tokio::{
Expand Down Expand Up @@ -158,7 +159,6 @@ impl CommandRunner {

/// Create clients
async fn create_clients(&self) -> Result<Vec<BenchClient>> {
let mut clients = Vec::with_capacity(self.args.clients);
let client_options = ClientOptions::default().with_client_config(ClientConfig::new(
Duration::from_secs(10),
Duration::from_secs(5),
Expand All @@ -180,11 +180,15 @@ impl CommandRunner {
}
})
.collect::<Vec<_>>();
for _ in 0..self.args.clients {
let client =
BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone()).await?;
clients.push(client);
}
let clients_futs = std::iter::repeat_with(|| {
BenchClient::new(addrs.clone(), self.args.use_curp, client_options.clone())
})
.take(self.args.clients);
let clients = join_all(clients_futs)
.await
.into_iter()
.collect::<Result<_, _>>()?;

Ok(clients)
}

Expand Down
8 changes: 8 additions & 0 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ where
/// command.
fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the read-only command
///
/// # Errors
///
/// This function may return an error if there is a problem executing the
/// command.
fn execute_ro(&self, cmd: &C) -> Result<(C::ER, C::ASR), C::Error>;

/// Batch execute the after_sync callback
///
/// This `highest_index` means the last log index of the `cmds`
Expand Down
10 changes: 10 additions & 0 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ impl CommandExecutor<TestCommand> for TestCE {
Ok(result)
}

fn execute_ro(
&self,
cmd: &TestCommand,
) -> Result<
(<TestCommand as Command>::ER, <TestCommand as Command>::ASR),
<TestCommand as Command>::Error,
> {
self.execute(cmd).map(|er| (er, LogIndexResult(0)))
}

fn after_sync(
&self,
cmds: Vec<AfterSyncCmd<'_, TestCommand>>,
Expand Down
12 changes: 10 additions & 2 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,23 @@ impl ClientBuilder {
impl ClientApi<Error = tonic::Status, Cmd = C> + Send + Sync + 'static,
Arc<AtomicU64>,
),
tonic::transport::Error,
tonic::Status,
> {
let state = Arc::new(self.init_state_builder().build().await?);
let state = Arc::new(
self.init_state_builder()
.build()
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?,
);

let client = Retry::new(
Unary::new(Arc::clone(&state), self.init_unary_config()),
self.init_retry_config(),
Some(self.spawn_bg_tasks(Arc::clone(&state))),
);
let client_id = state.clone_client_id();
self.wait_for_client_id(state).await?;

Ok((client, client_id))
}
}
Expand Down
39 changes: 23 additions & 16 deletions crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ where
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path)
self.retry::<_, _>(|client| async move {
let propose_id = self.inner.gen_propose_id()?;
RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path).await
})
.await
}
Expand All @@ -236,19 +236,23 @@ where
&self,
changes: Vec<ConfChange>,
) -> Result<Vec<Member>, tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
let changes_c = changes.clone();
RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c)
async move {
let propose_id = self.inner.gen_propose_id()?;
RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c).await
}
})
.await
}

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id))
.await
self.retry::<_, _>(|client| async move {
let propose_id = self.inner.gen_propose_id()?;
RepeatableClientApi::propose_shutdown(client, *propose_id).await
})
.await
}

/// Send propose to publish a node id and name
Expand All @@ -258,17 +262,20 @@ where
node_name: String,
node_client_urls: Vec<String>,
) -> Result<(), Self::Error> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
let name_c = node_name.clone();
let node_client_urls_c = node_client_urls.clone();
RepeatableClientApi::propose_publish(
client,
*propose_id,
node_id,
name_c,
node_client_urls_c,
)
async move {
let propose_id = self.inner.gen_propose_id()?;
RepeatableClientApi::propose_publish(
client,
*propose_id,
node_id,
name_c,
node_client_urls_c,
)
.await
}
})
.await
}
Expand Down
Loading
Loading