Skip to content

Commit

Permalink
chore(consensus): cleanup throughput_aware_consensus_submission (#3576)
Browse files Browse the repository at this point in the history
* Remove throughput_aware_consensus_submission protocol param

* Update openrpc.json

* Fix clippy

* Remove `consensus_throughput_calculator` because it's unused

* Fix cargo check reported errors
  • Loading branch information
piotrm50 authored Oct 23, 2024
1 parent 32016aa commit f101ef2
Show file tree
Hide file tree
Showing 12 changed files with 10 additions and 893 deletions.
1 change: 0 additions & 1 deletion crates/iota-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl AuthorityServer {
None,
None,
ConsensusAdapterMetrics::new_test(),
state.epoch_store_for_testing().protocol_config().clone(),
));
Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
}
Expand Down
54 changes: 1 addition & 53 deletions crates/iota-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
time::Instant,
};

use arc_swap::{ArcSwap, ArcSwapOption};
use arc_swap::ArcSwap;
use bytes::Bytes;
use dashmap::{DashMap, try_result::TryResult};
use futures::{
Expand All @@ -22,7 +22,6 @@ use futures::{
pin_mut,
};
use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
use iota_protocol_config::ProtocolConfig;
use iota_simulator::{anemo::PeerId, narwhal_network::connectivity::ConnectionStatus};
use iota_types::{
base_types::{AuthorityName, TransactionDigest},
Expand Down Expand Up @@ -51,7 +50,6 @@ use tracing::{debug, info, warn};
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
consensus_handler::{SequencedConsensusTransactionKey, classify},
consensus_throughput_calculator::{ConsensusThroughputProfiler, Level},
epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
metrics::LatencyObserver,
};
Expand Down Expand Up @@ -241,15 +239,11 @@ pub struct ConsensusAdapter {
connection_monitor_status: Arc<dyn CheckConnection>,
/// A structure to check the reputation scores populated by Consensus
low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
/// The throughput profiler to be used when making decisions to submit to
/// consensus
consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
/// A structure to register metrics
metrics: ConsensusAdapterMetrics,
/// Semaphore limiting parallel submissions to narwhal
submit_semaphore: Semaphore,
latency_observer: LatencyObserver,
protocol_config: ProtocolConfig,
}

pub trait CheckConnection: Send + Sync {
Expand Down Expand Up @@ -281,7 +275,6 @@ impl ConsensusAdapter {
max_submit_position: Option<usize>,
submit_delay_step_override: Option<Duration>,
metrics: ConsensusAdapterMetrics,
protocol_config: ProtocolConfig,
) -> Self {
let num_inflight_transactions = Default::default();
let low_scoring_authorities =
Expand All @@ -298,8 +291,6 @@ impl ConsensusAdapter {
metrics,
submit_semaphore: Semaphore::new(max_pending_local_submissions),
latency_observer: LatencyObserver::new(),
consensus_throughput_profiler: ArcSwapOption::empty(),
protocol_config,
}
}

Expand All @@ -310,10 +301,6 @@ impl ConsensusAdapter {
self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
}

pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
self.consensus_throughput_profiler.store(Some(profiler))
}

// todo - this probably need to hold some kind of lock to make sure epoch does
// not change while we are recovering
pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
Expand Down Expand Up @@ -404,7 +391,6 @@ impl ConsensusAdapter {
let latency = std::cmp::max(latency, MIN_LATENCY);
let latency = std::cmp::min(latency, MAX_LATENCY);
let latency = latency * 2;
let latency = self.override_by_throughput_profiler(position, latency);
let (delay_step, position) =
self.override_by_max_submit_position_settings(latency, position);

Expand All @@ -420,42 +406,6 @@ impl ConsensusAdapter {
)
}

// According to the throughput profile we want to either allow some transaction
// duplication or not) When throughput profile is Low and the validator is
// in position = 1, then it will submit to consensus with much lower latency.
// When throughput profile is High then we go back to default operation and
// no-one co-submits.
fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;

let p = self.consensus_throughput_profiler.load();

if let Some(profiler) = p.as_ref() {
let (level, _) = profiler.throughput_level();

// we only run this for the position = 1 validator to co-submit with the
// validator of position = 0. We also enable this only when the
// feature is enabled on the protocol config.
if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
return match level {
Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
Level::Medium => {
Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
}
Level::High => {
let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);

// back off according to recorded latency if it's significantly higher
if latency >= 2 * l { latency } else { l }
}
};
}
}
latency
}

/// Overrides the latency and the position if there are defined settings for
/// `max_submit_position` and `submit_delay_step_override`. If the
/// `max_submit_position` has defined, then that will always be used
Expand Down Expand Up @@ -1179,7 +1129,6 @@ mod adapter_tests {
Some(1),
Some(Duration::from_secs(2)),
ConsensusAdapterMetrics::new_test(),
iota_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
);

// transaction to submit
Expand Down Expand Up @@ -1209,7 +1158,6 @@ mod adapter_tests {
None,
None,
ConsensusAdapterMetrics::new_test(),
iota_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
);

let (delay_step, position, positions_moved, _) =
Expand Down
35 changes: 0 additions & 35 deletions crates/iota-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
epoch_start_configuration::EpochStartConfigTrait,
},
checkpoints::{CheckpointService, CheckpointServiceNotify},
consensus_throughput_calculator::ConsensusThroughputCalculator,
consensus_types::{AuthorityIndex, consensus_output_api::ConsensusOutputAPI},
execution_cache::ObjectCacheRead,
scoring_decision::update_low_scoring_authorities,
Expand All @@ -49,7 +48,6 @@ pub struct ConsensusHandlerInitializer {
checkpoint_service: Arc<CheckpointService>,
epoch_store: Arc<AuthorityPerEpochStore>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
}

impl ConsensusHandlerInitializer {
Expand All @@ -58,14 +56,12 @@ impl ConsensusHandlerInitializer {
checkpoint_service: Arc<CheckpointService>,
epoch_store: Arc<AuthorityPerEpochStore>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
) -> Self {
Self {
state,
checkpoint_service,
epoch_store,
low_scoring_authorities,
throughput_calculator,
}
}

Expand All @@ -78,10 +74,6 @@ impl ConsensusHandlerInitializer {
checkpoint_service,
epoch_store: state.epoch_store_for_testing().clone(),
low_scoring_authorities: Arc::new(Default::default()),
throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
None,
state.metrics.clone(),
)),
}
}

Expand All @@ -97,7 +89,6 @@ impl ConsensusHandlerInitializer {
self.low_scoring_authorities.clone(),
consensus_committee,
self.state.metrics.clone(),
self.throughput_calculator.clone(),
)
}
}
Expand Down Expand Up @@ -128,9 +119,6 @@ pub struct ConsensusHandler<C> {
/// Lru cache to quickly discard transactions processed by consensus
processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
transaction_scheduler: AsyncTransactionScheduler,
/// Using the throughput calculator to record the current consensus
/// throughput
throughput_calculator: Arc<ConsensusThroughputCalculator>,
}

const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
Expand All @@ -144,7 +132,6 @@ impl<C> ConsensusHandler<C> {
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
committee: ConsensusCommittee,
metrics: Arc<AuthorityMetrics>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
) -> Self {
// Recover last_consensus_stats so it is consistent across validators.
let mut last_consensus_stats = epoch_store
Expand All @@ -166,7 +153,6 @@ impl<C> ConsensusHandler<C> {
metrics,
processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
transaction_scheduler,
throughput_calculator,
}
}

Expand Down Expand Up @@ -233,23 +219,9 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {

// (serialized, transaction, output_cert)
let mut transactions = vec![];
let timestamp = consensus_output.commit_timestamp_ms();
let leader_author = consensus_output.leader_author_index();
let commit_sub_dag_index = consensus_output.commit_sub_dag_index();

let epoch_start = self
.epoch_store
.epoch_start_config()
.epoch_start_timestamp_ms();
let timestamp = if timestamp < epoch_start {
error!(
"Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}, round {round}",
);
epoch_start
} else {
timestamp
};

debug!(
%consensus_output,
epoch = ?self.epoch_store.epoch(),
Expand Down Expand Up @@ -438,10 +410,6 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
.await
.expect("Unrecoverable error in consensus handler");

// update the calculated throughput
self.throughput_calculator
.add_transactions(timestamp, transactions_to_schedule.len() as u64);

fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
let key = [commit_sub_dag_index, self.epoch_store.epoch()];
if iota_simulator::random::deterministic_probability(&key, 0.01) {
Expand Down Expand Up @@ -888,8 +856,6 @@ mod tests {

let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));

let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());

let mut consensus_handler = ConsensusHandler::new(
epoch_store,
Arc::new(CheckpointServiceNoop {}),
Expand All @@ -898,7 +864,6 @@ mod tests {
Arc::new(ArcSwap::default()),
consensus_committee.clone(),
metrics,
Arc::new(throughput_calculator),
);

// AND
Expand Down
Loading

0 comments on commit f101ef2

Please sign in to comment.