Skip to content

Commit

Permalink
Introduce long running mode to linera benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 13, 2025
1 parent db789e5 commit 3599e8c
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 381 deletions.
237 changes: 131 additions & 106 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,23 @@ use {
linera_base::{
crypto::PublicKey,
data_types::Amount,
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, Owner},
},
linera_chain::data_types::{
BlockProposal, ExecutedBlock, ProposedBlock, SignatureAggregator, Vote,
linera_chain::{
data_types::{BlockProposal, ExecutedBlock, ProposedBlock, SignatureAggregator, Vote},
types::{CertificateValue, ConfirmedBlock, GenericCertificate},
},
linera_chain::types::{CertificateValue, GenericCertificate},
linera_core::data_types::ChainInfoQuery,
linera_core::{data_types::ChainInfoQuery, node::ValidatorNode},
linera_execution::{
committee::Epoch,
system::{OpenChainConfig, Recipient, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX},
Operation,
},
linera_rpc::{
config::NetworkProtocol, grpc::GrpcClient, mass_client::MassClient,
simple::SimpleMassClient, RpcMessage,
},
linera_sdk::abis::fungible,
std::{collections::HashMap, iter},
tokio::task,
tracing::{error, trace},
tracing::{error, trace, warn},
};

#[cfg(web)]
Expand Down Expand Up @@ -566,6 +563,118 @@ where
S: Storage + Clone + Send + Sync + 'static,
W: Persist<Target = Wallet>,
{
pub async fn run_benchmark(
&mut self,
bps: Option<usize>,
blocks_infos_iter: impl Iterator<Item = &(ChainId, Vec<Operation>, KeyPair)>,
clients: Vec<linera_rpc::Client>,
transactions_per_block: usize,
) -> Result<(), Error> {
let mut num_sent_proposals = 0;
let mut start = Instant::now();
for (chain_id, operations, key_pair) in blocks_infos_iter {
let chain = self.wallet.get(*chain_id).expect("should have chain");
let block = ProposedBlock {
epoch: Epoch::ZERO,
chain_id: *chain_id,
incoming_bundles: Vec::new(),
operations: operations.clone(),
previous_block_hash: chain.block_hash,
height: chain.next_block_height,
authenticated_signer: Some(Owner::from(key_pair.public())),
timestamp: chain.timestamp.max(Timestamp::now()),
};
let executed_block = self.stage_block_execution(block.clone(), None).await?;
let value = Hashed::new(ConfirmedBlock::new(executed_block));
let proposal =
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, key_pair);

let mut join_set = task::JoinSet::new();
for client in &clients {
let client = client.clone();
let proposal = proposal.clone();
join_set.spawn(async move { client.handle_block_proposal(proposal).await });
}
let votes = join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.filter_map(|response| {
let vote = response.info.manager.pending?;
vote.clone().with_value(value.clone())
})
.collect::<Vec<_>>();

let certificates = self.make_benchmark_certificates_from_votes(votes);
assert_eq!(
certificates.len(),
1,
"Unable to build all the expected certificates from received votes"
);

let certificate = &certificates[0];
let mut join_set = task::JoinSet::new();
for client in &clients {
let client = client.clone();
let certificate = certificate.clone();
join_set.spawn(async move {
client
.handle_confirmed_certificate(
certificate,
CrossChainMessageDelivery::NonBlocking,
)
.await
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
self.update_wallet_from_certificate(certificate.clone())
.await;
num_sent_proposals += 1;
if let Some(bps) = bps {
if num_sent_proposals == bps {
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
"Failed to achieve {} BPS/{} TPS, took {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis()
);
} else {
tokio::time::sleep(Duration::from_secs(1) - elapsed).await;
info!(
"Achieved {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis()
);
}
start = Instant::now();
num_sent_proposals = 0;
}
}
}

if bps.is_none() {
let elapsed = start.elapsed();
let bps = num_sent_proposals as f64 / elapsed.as_secs_f64();
info!(
"Achieved {} BPS/{} TPS",
bps,
bps * transactions_per_block as f64
);
}

Ok(())
}

pub async fn process_inboxes_and_force_validator_updates(&mut self) {
let chain_clients = self
.wallet
Expand Down Expand Up @@ -837,21 +946,21 @@ where
Ok(())
}

/// Makes one block proposal per chain, up to `num_chains` blocks.
pub fn make_benchmark_block_proposals(
/// Generates information related to one block per chain, up to `num_chains` blocks.
pub fn make_benchmark_block_info(
&mut self,
key_pairs: &HashMap<ChainId, KeyPair>,
key_pairs: HashMap<ChainId, KeyPair>,
transactions_per_block: usize,
fungible_application_id: Option<ApplicationId>,
) -> Vec<BlockProposal> {
let mut proposals = Vec::new();
) -> Vec<(ChainId, Vec<Operation>, KeyPair)> {
let mut blocks_infos = Vec::new();
let mut previous_chain_id = *key_pairs
.iter()
.last()
.expect("There should be a last element")
.0;
let amount = Amount::from(1);
for (&chain_id, key_pair) in key_pairs {
for (chain_id, key_pair) in key_pairs {
let public_key = key_pair.public();
let operation = match fungible_application_id {
Some(application_id) => Self::fungible_transfer(
Expand All @@ -870,27 +979,10 @@ where
let operations = iter::repeat(operation)
.take(transactions_per_block)
.collect();
let chain = self.wallet.get(chain_id).expect("should have chain");
let block = ProposedBlock {
epoch: Epoch::ZERO,
chain_id,
incoming_bundles: Vec::new(),
operations,
previous_block_hash: chain.block_hash,
height: chain.next_block_height,
authenticated_signer: Some(Owner::from(public_key)),
timestamp: chain.timestamp.max(Timestamp::now()),
};
trace!("Preparing block proposal: {:?}", block);
let proposal = BlockProposal::new_initial(
linera_base::data_types::Round::Fast,
block.clone(),
key_pair,
);
proposals.push(proposal);
previous_chain_id = chain.chain_id;
blocks_infos.push((chain_id, operations, key_pair));
previous_chain_id = chain_id;
}
proposals
blocks_infos
}

/// Tries to aggregate votes into certificates.
Expand Down Expand Up @@ -940,78 +1032,11 @@ where
certificates
}

/// Broadcasts a bulk of blocks to each validator.
pub async fn mass_broadcast(
&self,
phase: &'static str,
proposals: Vec<RpcMessage>,
) -> Vec<RpcMessage> {
let time_start = Instant::now();
info!("Broadcasting {} {}", proposals.len(), phase);
let mut join_set = task::JoinSet::new();
for client in self.make_validator_mass_clients() {
let proposals = proposals.clone();
join_set.spawn(async move {
debug!("Sending {} requests", proposals.len());
let responses = client.send(proposals).await.unwrap_or_default();
debug!("Done sending requests");
responses
});
}
let responses = join_set
.join_all()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
let time_elapsed = time_start.elapsed();
info!(
"Received {} responses in {} ms.",
responses.len(),
time_elapsed.as_millis()
);
info!(
"Estimated server throughput: {} {} per sec",
(proposals.len() as u128) * 1_000_000 / time_elapsed.as_micros(),
phase
);
responses
}

fn make_validator_mass_clients(&self) -> Vec<Box<dyn MassClient + Send>> {
let mut validator_clients = Vec::new();
for config in &self.wallet.genesis_config().committee.validators {
let client: Box<dyn MassClient + Send> = match config.network.protocol {
NetworkProtocol::Simple(protocol) => {
let network = config.network.clone_with_protocol(protocol);
Box::new(SimpleMassClient::new(
network,
self.send_timeout,
self.recv_timeout,
))
}
NetworkProtocol::Grpc { .. } => {
let node_options = self.make_node_options();
let address = config.network.http_address();
Box::new(GrpcClient::create(address, node_options))
}
};

validator_clients.push(client);
}
validator_clients
}

pub async fn update_wallet_from_certificates(
&mut self,
certificates: Vec<ConfirmedBlockCertificate>,
) {
pub async fn update_wallet_from_certificate(&mut self, certificate: ConfirmedBlockCertificate) {
let node = self.client.local_node().clone();
// Replay the certificates locally.
for certificate in certificates {
// No required certificates from other chains: This is only used with benchmark.
node.handle_certificate(certificate, &()).await.unwrap();
}
// Replay the certificate locally.
// No required certificates from other chains: This is only used with benchmark.
node.handle_certificate(certificate, &()).await.unwrap();
// Last update the wallet.
for chain in self.wallet.as_mut().chains_mut() {
let query = ChainInfoQuery::new(chain.chain_id);
Expand Down
5 changes: 5 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ pub enum ClientCommand {
/// If none is specified, the benchmark uses the native token.
#[arg(long)]
fungible_application_id: Option<linera_base::identifiers::ApplicationId>,

/// If provided, will be long running, and block proposals will be sent at the
/// provided fixed BPS rate.
#[arg(long)]
bps: Option<usize>,
},

/// Create genesis configuration for a Linera deployment.
Expand Down
2 changes: 2 additions & 0 deletions linera-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub(crate) enum Inner {
NonexistentKeypair(linera_base::identifiers::ChainId),
#[error("error on the local node: {0}")]
LocalNode(#[from] linera_core::local_node::LocalNodeError),
#[error("remote node operation failed: {0}")]
RemoteNode(#[from] linera_core::node::NodeError),
}

thiserror_context::impl_context!(Error(Inner));
Expand Down
71 changes: 0 additions & 71 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ use linera_core::{
use linera_version::VersionInfo;
use tonic::{Code, IntoRequest, Request, Status};
use tracing::{debug, error, info, instrument, warn};
#[cfg(not(web))]
use {
super::GrpcProtoConversionError,
crate::{mass_client, RpcMessage},
};

use super::{
api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
Expand Down Expand Up @@ -458,69 +453,3 @@ impl ValidatorNode for GrpcClient {
Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
}
}

#[cfg(not(web))]
#[async_trait::async_trait]
impl mass_client::MassClient for GrpcClient {
#[instrument(skip_all, err)]
async fn send(
&self,
requests: Vec<RpcMessage>,
) -> Result<Vec<RpcMessage>, mass_client::MassClientError> {
let client = self.client.clone();
let mut join_set: tokio::task::JoinSet<
Result<Option<RpcMessage>, mass_client::MassClientError>,
> = tokio::task::JoinSet::new();

// Spawn tasks for each request
for request in requests {
let mut client = client.clone();
join_set.spawn(async move {
let response = match request {
RpcMessage::BlockProposal(proposal) => {
let request = Request::new((*proposal).try_into()?);
client.handle_block_proposal(request).await?
}
RpcMessage::TimeoutCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_timeout_certificate(request).await?
}
RpcMessage::ValidatedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_validated_certificate(request).await?
}
RpcMessage::ConfirmedCertificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_confirmed_certificate(request).await?
}
msg => panic!("attempted to send msg: {:?}", msg),
};
match response
.into_inner()
.inner
.ok_or(GrpcProtoConversionError::MissingField)?
{
api::chain_info_result::Inner::ChainInfoResponse(chain_info_response) => {
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(
chain_info_response.try_into()?,
))))
}
api::chain_info_result::Inner::Error(error) => {
let error = bincode::deserialize::<NodeError>(&error)
.map_err(GrpcProtoConversionError::BincodeError)?;
tracing::error!(?error, "received error response");
Ok(None)
}
}
});
}

let responses = join_set
.join_all()
.await
.into_iter()
.filter_map(|result| result.transpose())
.collect::<Result<Vec<_>, _>>()?;
Ok(responses)
}
}
Loading

0 comments on commit 3599e8c

Please sign in to comment.