Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add networking to subsystem-benchmarks #7125

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion polkadot/node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ pub(crate) const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty vi
/// Messages from and to the network.
///
/// As transmitted to and received from subsystems.
/// TODO: shouldn't we expose this only for benchmarks?
#[derive(Debug, Encode, Decode, Clone)]
pub(crate) enum WireMessage<M> {
pub enum WireMessage<M> {
/// A message from a peer on a specific protocol.
#[codec(index = 1)]
ProtocolMessage(M),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
//!
//! Statement distribution benchmark based on Kusama parameters and scale.

use polkadot_primitives::Block;
use polkadot_subsystem_bench::{
configuration::TestConfiguration,
statement::{benchmark_statement_distribution, prepare_test, TestState},
usage::BenchmarkUsage,
utils::save_to_file,
};
use sc_network::NetworkWorker;
use std::io::Write;

const BENCH_COUNT: usize = 50;
Expand All @@ -43,7 +45,7 @@ fn main() -> Result<(), String> {
.map(|n| {
print!("\r[{}{}]", "#".repeat(n), "_".repeat(BENCH_COUNT - n));
std::io::stdout().flush().unwrap();
let (mut env, _cfgs) = prepare_test(&state, false);
let (mut env, _cfgs) = prepare_test::<Block, NetworkWorker<_, _>>(&state, false);
env.runtime().block_on(benchmark_statement_distribution(&mut env, &state))
})
.collect();
Expand Down
5 changes: 5 additions & 0 deletions polkadot/node/subsystem-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ rand_chacha = { workspace = true, default-features = true }
rand_core = { workspace = true }
strum = { features = ["derive"], workspace = true, default-features = true }

parking_lot = { workspace = true, default-features = true }
polkadot-network-bridge = { workspace = true, default-features = true }
sc-network-common = { workspace = true, default-features = true }
array-bytes = { workspace = true, default-features = true }

[features]
default = []
memprofile = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ TestConfiguration:
num_blocks: 10
n_cores: 100
n_validators: 500
connectivity: 100
5 changes: 4 additions & 1 deletion polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
use clap::Parser;
use color_eyre::eyre;
use colored::Colorize;
use polkadot_primitives::Block;
use polkadot_subsystem_bench::{approval, availability, configuration, statement};
use pyroscope::PyroscopeAgent;
use pyroscope_pprofrs::{pprof_backend, PprofConfig};
use sc_network::NetworkWorker;
use serde::{Deserialize, Serialize};
use std::path::Path;

Expand Down Expand Up @@ -165,7 +167,8 @@ impl BenchCli {
},
TestObjective::StatementDistribution => {
let state = statement::TestState::new(&test_config);
let (mut env, _protocol_config) = statement::prepare_test(&state, true);
let (mut env, _protocol_config) =
statement::prepare_test::<Block, NetworkWorker<_, _>>(&state, true);
env.runtime()
.block_on(statement::benchmark_statement_distribution(&mut env, &state))
},
Expand Down
13 changes: 12 additions & 1 deletion polkadot/node/subsystem-bench/src/lib/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId, ValidatorPair};
use rand::thread_rng;
use rand_distr::{Distribution, Normal, Uniform};
use sc_network::config::{ed25519::SecretKey, NodeKeyConfig, Secret};
use sc_network_types::PeerId;
use serde::{Deserialize, Serialize};
use sp_consensus_babe::AuthorityId;
Expand Down Expand Up @@ -199,6 +200,15 @@ impl TestConfiguration {

/// Generates the authority keys we need for the network emulation.
pub fn generate_authorities(&self) -> TestAuthorities {
let node_key_configs = (0..self.n_validators)
.map(|_| NodeKeyConfig::Ed25519(Secret::Input(SecretKey::generate())))
.collect_vec();
let peer_ids = node_key_configs
.iter()
.cloned()
.map(|node_key_config| node_key_config.into_keypair().unwrap().public().to_peer_id())
.collect_vec();

let keyring = Keyring::default();

let key_seeds = (0..self.n_validators)
Expand All @@ -222,7 +232,6 @@ impl TestConfiguration {

let validator_assignment_id: Vec<AssignmentId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let peer_ids: Vec<PeerId> = keys.iter().map(|_| PeerId::random()).collect::<Vec<_>>();

let peer_id_to_authority = peer_ids
.iter()
Expand All @@ -239,6 +248,7 @@ impl TestConfiguration {
keyring,
validator_public,
validator_authority_id,
node_key_configs,
peer_ids,
validator_babe_id,
validator_assignment_id,
Expand Down Expand Up @@ -275,6 +285,7 @@ pub struct TestAuthorities {
pub peer_ids: Vec<PeerId>,
pub peer_id_to_authority: HashMap<PeerId, AuthorityDiscoveryId>,
pub validator_pairs: Vec<ValidatorPair>,
pub node_key_configs: Vec<NodeKeyConfig>,
}

/// Sample latency (in milliseconds) from a normal distribution with parameters
Expand Down
50 changes: 41 additions & 9 deletions polkadot/node/subsystem-bench/src/lib/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn new_runtime() -> tokio::runtime::Runtime {
.thread_name("subsystem-bench")
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.worker_threads(4)
.worker_threads(8)
.build()
.unwrap()
}
Expand Down Expand Up @@ -363,22 +363,54 @@ impl TestEnvironment {
}

fn network_usage(&self) -> Vec<ResourceUsage> {
let stats = self.network().peer_stats(0);
let total_node_received = (stats.received() / 1024) as f64;
let total_node_sent = (stats.sent() / 1024) as f64;
let test_metrics = super::display::parse_metrics(self.registry());
let direction_in = test_metrics.subset_with_label_value("direction", "in");
let direction_out = test_metrics.subset_with_label_value("direction", "out");
let total_node_received =
direction_in.sum_by("substrate_sub_libp2p_network_bytes_total") / 1024.0;
let total_node_sent =
direction_out.sum_by("substrate_sub_libp2p_network_bytes_total") / 1024.0;
let notifications_received =
direction_in.sum_by("substrate_sub_libp2p_notifications_sizes_count");
let notifications_sent =
direction_out.sum_by("substrate_sub_libp2p_notifications_sizes_count");
let requests_received =
test_metrics.sum_by("substrate_sub_libp2p_requests_in_success_total_count");
let requests_sent =
test_metrics.sum_by("substrate_sub_libp2p_requests_out_success_total_count");
let num_blocks = self.config().num_blocks as f64;

vec![
ResourceUsage {
resource_name: "Received from peers".to_string(),
resource_name: "Traffic IN, KiB".to_string(),
total: total_node_received,
per_block: total_node_received / num_blocks,
},
ResourceUsage {
resource_name: "Sent to peers".to_string(),
resource_name: "Traffic OUT, KiB".to_string(),
total: total_node_sent,
per_block: total_node_sent / num_blocks,
},
ResourceUsage {
resource_name: "Notifications IN".to_string(),
total: notifications_received,
per_block: notifications_received / num_blocks,
},
ResourceUsage {
resource_name: "Notifications OUT".to_string(),
total: notifications_sent,
per_block: notifications_sent / num_blocks,
},
ResourceUsage {
resource_name: "Requests IN".to_string(),
total: requests_received,
per_block: requests_received / num_blocks,
},
ResourceUsage {
resource_name: "Requests OUT".to_string(),
total: requests_sent,
per_block: requests_sent / num_blocks,
},
]
}

Expand All @@ -396,7 +428,7 @@ impl TestEnvironment {
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
usage.push(ResourceUsage {
resource_name: subsystem.to_string(),
resource_name: format!("{}, s", subsystem),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
Expand All @@ -409,7 +441,7 @@ impl TestEnvironment {

if let Some(task_name) = metric.label_value("task_name") {
usage.push(ResourceUsage {
resource_name: format!("{}/{}", subsystem, task_name),
resource_name: format!("{}/{}, s", subsystem, task_name),
total: metric.value(),
per_block: metric.value() / num_blocks,
});
Expand All @@ -423,7 +455,7 @@ impl TestEnvironment {
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");

usage.push(ResourceUsage {
resource_name: "test-environment".to_string(),
resource_name: "test-environment, s".to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
Expand Down
12 changes: 9 additions & 3 deletions polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl MockCandidateBacking {
})
.or_insert(1);

let statements_received_count = *statements_tracker.get(&candidate_hash).unwrap();
if statements_received_count == (self.config.minimum_backing_votes - 1) &&
if *statements_tracker.get(&candidate_hash).unwrap() ==
(self.config.minimum_backing_votes - 1) &&
is_own_backing_group
{
let statement = Statement::Valid(candidate_hash);
Expand All @@ -92,9 +92,15 @@ impl MockCandidateBacking {
.unwrap(),
);
messages.push(message);
// Add own statement to tracker
statements_tracker.entry(candidate_hash).and_modify(|v| {
*v += 1;
});
}

if statements_received_count == self.config.minimum_backing_votes {
if *statements_tracker.get(&candidate_hash).unwrap() ==
self.config.minimum_backing_votes
{
let message =
polkadot_node_subsystem::messages::StatementDistributionMessage::Backed(
candidate_hash,
Expand Down
Loading
Loading