Skip to content

Commit

Permalink
Remove max_in_flight from linera benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 5, 2025
1 parent 3eaf1c2 commit 34e94bf
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 120 deletions.
86 changes: 39 additions & 47 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ use linera_rpc::node_provider::{NodeOptions, NodeProvider};
use linera_storage::Storage;
use thiserror_context::Context;
use tracing::{debug, info};
#[cfg(feature = "fs")]
use {
linera_base::{
crypto::CryptoHash,
data_types::{BlobContent, Bytecode},
identifiers::BytecodeId,
},
linera_core::client::create_bytecode_blobs,
std::{fs, path::PathBuf},
};
#[cfg(feature = "benchmark")]
use {
futures::{stream, StreamExt as _, TryStreamExt as _},
linera_base::{
crypto::PublicKey,
data_types::Amount,
Expand All @@ -49,18 +58,9 @@ use {
},
linera_sdk::abis::fungible,
std::{collections::HashMap, iter},
tokio::task,
tracing::{error, trace},
};
#[cfg(feature = "fs")]
use {
linera_base::{
crypto::CryptoHash,
data_types::{BlobContent, Bytecode},
identifiers::BytecodeId,
},
linera_core::client::create_bytecode_blobs,
std::{fs, path::PathBuf},
};

#[cfg(web)]
use crate::persistent::{LocalPersist as Persist, LocalPersistExt as _};
Expand Down Expand Up @@ -664,7 +664,6 @@ where
&mut self,
key_pairs: &HashMap<ChainId, KeyPair>,
application_id: ApplicationId,
max_in_flight: usize,
) -> Result<(), Error> {
let default_chain_id = self
.wallet
Expand Down Expand Up @@ -701,38 +700,35 @@ where
}
self.update_wallet_from_client(&chain_client).await?;
// Make sure all chains have registered the application now.
let futures = key_pairs
.keys()
.map(|&chain_id| {
let chain_client = self
.make_chain_client(chain_id)
.expect("chain should have been created");
async move {
for i in 0..5 {
linera_base::time::timer::sleep(Duration::from_secs(i)).await;
chain_client.process_inbox().await?;
let chain_state = chain_client.chain_state_view().await?;
if chain_state
.execution_state
.system
.registry
.known_applications
.contains_key(&application_id)
.await?
{
return Ok::<_, Error>(chain_client);
}
let mut join_set = task::JoinSet::new();
for &chain_id in key_pairs.keys() {
let chain_client = self
.make_chain_client(chain_id)
.expect("chain should have been created");
join_set.spawn(async move {
for i in 0..5 {
linera_base::time::timer::sleep(Duration::from_secs(i)).await;
chain_client.process_inbox().await?;
let chain_state = chain_client.chain_state_view().await?;
if chain_state
.execution_state
.system
.registry
.known_applications
.contains_key(&application_id)
.await?
{
return Ok::<_, Error>(chain_client);
}
panic!("Could not instantiate application on chain {chain_id:?}");
}
})
.collect::<Vec<_>>();
// We have to collect the futures to avoid a higher-ranked lifetime error:
// https://github.com/rust-lang/rust/issues/102211#issuecomment-1673201352
let clients = stream::iter(futures)
.buffer_unordered(max_in_flight)
.try_collect::<Vec<_>>()
.await?;
panic!("Could not instantiate application on chain {chain_id:?}");
});
}
let clients = join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
for client in clients {
self.update_wallet_from_client(&client).await?;
}
Expand Down Expand Up @@ -842,7 +838,6 @@ where
pub async fn mass_broadcast(
&self,
phase: &'static str,
max_in_flight: usize,
proposals: Vec<RpcMessage>,
) -> Vec<RpcMessage> {
let time_start = Instant::now();
Expand All @@ -853,10 +848,7 @@ where
let proposals = proposals.clone();
let handle = join_set.spawn_task(async move {
debug!("Sending {} requests", proposals.len());
let responses = client
.send(proposals, max_in_flight)
.await
.unwrap_or_default();
let responses = client.send(proposals).await.unwrap_or_default();
debug!("Done sending requests");
responses
});
Expand Down
4 changes: 0 additions & 4 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,6 @@ pub enum ClientCommand {
/// Send one transfer per chain in bulk mode
#[cfg(feature = "benchmark")]
Benchmark {
/// Maximum number of blocks in flight
#[arg(long, default_value = "200")]
max_in_flight: usize,

/// How many chains to use for the benchmark
#[arg(long, default_value = "10")]
num_chains: usize,
Expand Down
96 changes: 48 additions & 48 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,60 +463,60 @@ impl mass_client::MassClient for GrpcClient {
async fn send(
&self,
requests: Vec<RpcMessage>,
max_in_flight: usize,
) -> Result<Vec<RpcMessage>, mass_client::MassClientError> {
let client = self.client.clone();
let responses = stream::iter(requests)
.map(|request| {
let mut client = client.clone();
async move {
let response = match request {
RpcMessage::BlockProposal(proposal) => {
let request = Request::new((*proposal).try_into()?);
client.handle_block_proposal(request).await?
}
RpcMessage::TimeoutCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_timeout_certificate(request).await?
}
RpcMessage::ValidatedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_validated_certificate(request).await?
}
RpcMessage::ConfirmedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_confirmed_certificate(request).await?
}
msg => panic!("attempted to send msg: {:?}", msg),
};
match response
.into_inner()
.inner
.ok_or(GrpcProtoConversionError::MissingField)?
{
api::chain_info_result::Inner::ChainInfoResponse(chain_info_response) => {
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(
chain_info_response.try_into()?,
))))
}
api::chain_info_result::Inner::Error(error) => {
let error = bincode::deserialize::<NodeError>(&error)
.map_err(GrpcProtoConversionError::BincodeError)?;
tracing::error!(?error, "received error response");
Ok(None)
}
let mut join_set: tokio::task::JoinSet<
Result<Option<RpcMessage>, mass_client::MassClientError>,
> = tokio::task::JoinSet::new();

// Spawn tasks for each request
for request in requests {
let mut client = client.clone();
join_set.spawn(async move {
let response = match request {
RpcMessage::BlockProposal(proposal) => {
let request = Request::new((*proposal).try_into()?);
client.handle_block_proposal(request).await?
}
RpcMessage::TimeoutCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_timeout_certificate(request).await?
}
RpcMessage::ValidatedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_validated_certificate(request).await?
}
RpcMessage::ConfirmedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_confirmed_certificate(request).await?
}
msg => panic!("attempted to send msg: {:?}", msg),
};
match response
.into_inner()
.inner
.ok_or(GrpcProtoConversionError::MissingField)?
{
api::chain_info_result::Inner::ChainInfoResponse(chain_info_response) => {
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(
chain_info_response.try_into()?,
))))
}
api::chain_info_result::Inner::Error(error) => {
let error = bincode::deserialize::<NodeError>(&error)
.map_err(GrpcProtoConversionError::BincodeError)?;
tracing::error!(?error, "received error response");
Ok(None)
}
}
})
.buffer_unordered(max_in_flight)
.filter_map(
|result: Result<Option<_>, mass_client::MassClientError>| async move {
result.transpose()
},
)
.collect::<Vec<_>>()
});
}

let responses = join_set
.join_all()
.await
.into_iter()
.filter_map(|result| result.transpose())
.collect::<Result<Vec<_>, _>>()?;
Ok(responses)
}
Expand Down
6 changes: 1 addition & 5 deletions linera-rpc/src/mass_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,5 @@ pub enum MassClientError {

#[async_trait]
pub trait MassClient {
async fn send(
&self,
requests: Vec<RpcMessage>,
max_in_flight: usize,
) -> Result<Vec<RpcMessage>, MassClientError>;
async fn send(&self, requests: Vec<RpcMessage>) -> Result<Vec<RpcMessage>, MassClientError>;
}
3 changes: 1 addition & 2 deletions linera-rpc/src/simple/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ impl mass_client::MassClient for SimpleMassClient {
async fn send(
&self,
requests: Vec<RpcMessage>,
max_in_flight: usize,
) -> Result<Vec<RpcMessage>, mass_client::MassClientError> {
let address = format!("{}:{}", self.network.host, self.network.port);
let mut stream = self.network.protocol.connect(address).await?;
Expand All @@ -268,7 +267,7 @@ impl mass_client::MassClient for SimpleMassClient {
let mut responses = Vec::new();

loop {
while in_flight < max_in_flight {
loop {
let request = match requests.next() {
None => {
if in_flight == 0 {
Expand Down
2 changes: 0 additions & 2 deletions linera-service/src/cli_wrappers/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ impl ClientWrapper {
#[cfg(feature = "benchmark")]
pub async fn benchmark(
&self,
max_in_flight: usize,
num_chains: usize,
transactions_per_block: usize,
fungible_application_id: Option<
Expand All @@ -671,7 +670,6 @@ impl ClientWrapper {
let mut command = self.command().await?;
command
.arg("benchmark")
.args(["--max-in-flight", &max_in_flight.to_string()])
.args(["--num-chains", &num_chains.to_string()])
.args([
"--transactions-per-block",
Expand Down
13 changes: 3 additions & 10 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,6 @@ impl Runnable for Job {

#[cfg(feature = "benchmark")]
Benchmark {
max_in_flight,
num_chains,
tokens_per_chain,
transactions_per_block,
Expand All @@ -758,9 +757,7 @@ impl Runnable for Job {
.await?;

if let Some(id) = fungible_application_id {
context
.supply_fungible_tokens(&key_pairs, id, max_in_flight)
.await?;
context.supply_fungible_tokens(&key_pairs, id).await?;
}

// For this command, we create proposals and gather certificates without using
Expand All @@ -784,9 +781,7 @@ impl Runnable for Job {
}
}

let responses = context
.mass_broadcast("block proposals", max_in_flight, proposals)
.await;
let responses = context.mass_broadcast("block proposals", proposals).await;
let votes = responses
.into_iter()
.filter_map(|message| {
Expand Down Expand Up @@ -816,9 +811,7 @@ impl Runnable for Job {
))
})
.collect();
let responses = context
.mass_broadcast("certificates", max_in_flight, messages)
.await;
let responses = context.mass_broadcast("certificates", messages).await;
let mut confirmed = HashSet::new();
let num_valid = responses.into_iter().fold(0, |acc, message| {
match deserialize_response(message) {
Expand Down
4 changes: 2 additions & 2 deletions linera-service/tests/local_net_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ async fn test_end_to_end_benchmark(mut config: LocalNetConfig) -> Result<()> {

assert_eq!(client.load_wallet()?.num_chains(), 3);
// Launch local benchmark using some additional chains.
client.benchmark(2, 4, 10, None).await?;
client.benchmark(4, 10, None).await?;
assert_eq!(client.load_wallet()?.num_chains(), 7);

// Now we run the benchmark again, with the fungible token application instead of the
Expand All @@ -838,7 +838,7 @@ async fn test_end_to_end_benchmark(mut config: LocalNetConfig) -> Result<()> {
None,
)
.await?;
client.benchmark(2, 5, 10, Some(application_id)).await?;
client.benchmark(5, 10, Some(application_id)).await?;

net.ensure_is_running().await?;
net.terminate().await?;
Expand Down

0 comments on commit 34e94bf

Please sign in to comment.