+ ProtocolMetrics, T: BenchmarkType> Orchestrator {
- config.set_max_accumulated_txn_cost_per_object_in_narwhal_commit_for_testing(
- txn_count_limit,
- );
config.set_max_accumulated_txn_cost_per_object_in_mysticeti_commit_for_testing(
txn_count_limit,
);
@@ -839,9 +829,6 @@ mod test {
"SIM_STRESS_TEST_NUM_VALIDATORS",
default_num_validators,
));
- if std::env::var("CHECKPOINTS_PER_EPOCH").is_ok() {
- eprintln!("CHECKPOINTS_PER_EPOCH env var is deprecated, use EPOCH_DURATION_MS");
- }
let epoch_duration_ms = get_var("EPOCH_DURATION_MS", default_epoch_duration_ms);
if epoch_duration_ms > 0 {
builder = builder.with_epoch_duration_ms(epoch_duration_ms);
diff --git a/crates/iota-core/Cargo.toml b/crates/iota-core/Cargo.toml
index 92103862792..8138aea0c46 100644
--- a/crates/iota-core/Cargo.toml
+++ b/crates/iota-core/Cargo.toml
@@ -41,6 +41,7 @@ object_store.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
+quinn-proto.workspace = true
rand.workspace = true
rayon.workspace = true
reqwest.workspace = true
@@ -87,7 +88,6 @@ move-bytecode-utils.workspace = true
move-core-types.workspace = true
move-package.workspace = true
move-symbol-pool.workspace = true
-narwhal-types.workspace = true
shared-crypto.workspace = true
telemetry-subscribers.workspace = true
typed-store.workspace = true
@@ -105,6 +105,7 @@ pretty_assertions.workspace = true
rstest.workspace = true
serde-reflection.workspace = true
serde_yaml.workspace = true
+tower.workspace = true
# internal dependencies
iota-move.workspace = true
diff --git a/crates/iota-core/src/authority/authority_per_epoch_store.rs b/crates/iota-core/src/authority/authority_per_epoch_store.rs
index c0d016458f8..5fb03ed47e1 100644
--- a/crates/iota-core/src/authority/authority_per_epoch_store.rs
+++ b/crates/iota-core/src/authority/authority_per_epoch_store.rs
@@ -32,7 +32,7 @@ use iota_types::{
accumulator::Accumulator,
authenticator_state::{ActiveJwk, get_authenticator_state},
base_types::{
- AuthorityName, ConciseableName, EpochId, ObjectID, ObjectRef, SequenceNumber,
+ AuthorityName, CommitRound, ConciseableName, EpochId, ObjectID, ObjectRef, SequenceNumber,
TransactionDigest,
},
committee::{Committee, CommitteeTrait},
@@ -62,7 +62,6 @@ use iota_types::{
};
use itertools::{Itertools, izip};
use move_bytecode_utils::module_cache::SyncModuleCache;
-use narwhal_types::{Round, TimestampMs};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use prometheus::IntCounter;
use serde::{Deserialize, Serialize};
@@ -104,7 +103,7 @@ use crate::{
epoch::{
epoch_metrics::EpochMetrics,
randomness::{
- DkgStatus, RandomnessManager, RandomnessReporter, SINGLETON_KEY,
+ CommitTimestampMs, DkgStatus, RandomnessManager, RandomnessReporter, SINGLETON_KEY,
VersionedProcessedMessage, VersionedUsedProcessedMessages,
},
reconfiguration::ReconfigState,
@@ -599,7 +598,7 @@ pub struct AuthorityEpochTables {
pub(crate) randomness_highest_completed_round: DBMap,
/// Holds the timestamp of the most recently generated round of randomness.
- pub(crate) randomness_last_round_timestamp: DBMap,
+ pub(crate) randomness_last_round_timestamp: DBMap,
}
fn signed_transactions_table_default_config() -> DBOptions {
@@ -1696,7 +1695,7 @@ impl AuthorityPerEpochStore {
fn should_defer(
&self,
cert: &VerifiedExecutableTransaction,
- commit_round: Round,
+ commit_round: CommitRound,
dkg_failed: bool,
generating_randomness: bool,
previously_deferred_tx_digests: &HashMap,
@@ -2383,10 +2382,6 @@ impl AuthorityPerEpochStore {
return None;
}
}
- SequencedConsensusTransactionKind::External(ConsensusTransaction {
- kind: ConsensusTransactionKind::RandomnessStateUpdate(_round, _bytes),
- ..
- }) => {}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgMessage(authority, _bytes),
..
@@ -3224,7 +3219,7 @@ impl AuthorityPerEpochStore {
output: &mut ConsensusCommitOutput,
transaction: &VerifiedSequencedConsensusTransaction,
checkpoint_service: &Arc,
- commit_round: Round,
+ commit_round: CommitRound,
previously_deferred_tx_digests: &HashMap,
mut randomness_manager: Option<&mut RandomnessManager>,
dkg_failed: bool,
@@ -3427,13 +3422,6 @@ impl AuthorityPerEpochStore {
}
Ok(ConsensusCertificateResult::ConsensusMessage)
}
- SequencedConsensusTransactionKind::External(ConsensusTransaction {
- kind: ConsensusTransactionKind::RandomnessStateUpdate(_, _),
- ..
- }) => {
- // These are always generated as System transactions (handled below).
- panic!("process_consensus_transaction called with external RandomnessStateUpdate");
- }
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgMessage(authority, bytes),
..
@@ -3840,7 +3828,7 @@ pub(crate) struct ConsensusCommitOutput {
pending_checkpoints: Vec,
// random beacon state
- next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
+ next_randomness_round: Option<(RandomnessRound, CommitTimestampMs)>,
dkg_confirmations: BTreeMap,
dkg_processed_messages: BTreeMap,
@@ -3918,7 +3906,7 @@ impl ConsensusCommitOutput {
pub fn reserve_next_randomness_round(
&mut self,
next_randomness_round: RandomnessRound,
- commit_timestamp: TimestampMs,
+ commit_timestamp: CommitTimestampMs,
) {
assert!(self.next_randomness_round.is_none());
self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
diff --git a/crates/iota-core/src/authority/shared_object_congestion_tracker.rs b/crates/iota-core/src/authority/shared_object_congestion_tracker.rs
index 3cff245233e..75b9a4c7053 100644
--- a/crates/iota-core/src/authority/shared_object_congestion_tracker.rs
+++ b/crates/iota-core/src/authority/shared_object_congestion_tracker.rs
@@ -6,11 +6,10 @@ use std::collections::HashMap;
use iota_protocol_config::PerObjectCongestionControlMode;
use iota_types::{
- base_types::{ObjectID, TransactionDigest},
+ base_types::{CommitRound, ObjectID, TransactionDigest},
executable_transaction::VerifiedExecutableTransaction,
transaction::SharedInputObject,
};
-use narwhal_types::Round;
use crate::authority::transaction_deferral::DeferralKey;
@@ -83,7 +82,7 @@ impl SharedObjectCongestionTracker {
cert: &VerifiedExecutableTransaction,
max_accumulated_txn_cost_per_object_in_commit: u64,
previously_deferred_tx_digests: &HashMap,
- commit_round: Round,
+ commit_round: CommitRound,
) -> Option<(DeferralKey, Vec)> {
let tx_cost = self.get_tx_cost(cert)?;
diff --git a/crates/iota-core/src/authority/transaction_deferral.rs b/crates/iota-core/src/authority/transaction_deferral.rs
index 8843592dd62..5af556cb3a9 100644
--- a/crates/iota-core/src/authority/transaction_deferral.rs
+++ b/crates/iota-core/src/authority/transaction_deferral.rs
@@ -2,8 +2,7 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
-use iota_types::base_types::ObjectID;
-use narwhal_types::Round;
+use iota_types::base_types::{CommitRound, ObjectID};
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
@@ -11,26 +10,29 @@ pub enum DeferralKey {
// For transactions deferred until new randomness is available (whether delayd due to
// DKG, or skipped commits).
Randomness {
- deferred_from_round: Round, // commit round, not randomness round
+ deferred_from_round: CommitRound,
},
// ConsensusRound deferral key requires both the round to which the tx should be deferred (so
// that we can efficiently load all txns that are now ready), and the round from which it
// has been deferred (so that multiple rounds can efficiently defer to the same future
// round).
ConsensusRound {
- future_round: Round,
- deferred_from_round: Round,
+ future_round: CommitRound,
+ deferred_from_round: CommitRound,
},
}
impl DeferralKey {
- pub fn new_for_randomness(deferred_from_round: Round) -> Self {
+ pub fn new_for_randomness(deferred_from_round: CommitRound) -> Self {
Self::Randomness {
deferred_from_round,
}
}
- pub fn new_for_consensus_round(future_round: Round, deferred_from_round: Round) -> Self {
+ pub fn new_for_consensus_round(
+ future_round: CommitRound,
+ deferred_from_round: CommitRound,
+ ) -> Self {
Self::ConsensusRound {
future_round,
deferred_from_round,
@@ -50,7 +52,7 @@ impl DeferralKey {
// Returns a range of deferral keys that are deferred up to the given consensus
// round.
- pub fn range_for_up_to_consensus_round(consensus_round: Round) -> (Self, Self) {
+ pub fn range_for_up_to_consensus_round(consensus_round: CommitRound) -> (Self, Self) {
(
Self::ConsensusRound {
future_round: 0,
@@ -63,7 +65,7 @@ impl DeferralKey {
)
}
- pub fn deferred_from_round(&self) -> Round {
+ pub fn deferred_from_round(&self) -> CommitRound {
match self {
Self::Randomness {
deferred_from_round,
diff --git a/crates/iota-core/src/checkpoints/mod.rs b/crates/iota-core/src/checkpoints/mod.rs
index 5fcec61ad33..b466e405866 100644
--- a/crates/iota-core/src/checkpoints/mod.rs
+++ b/crates/iota-core/src/checkpoints/mod.rs
@@ -1962,9 +1962,9 @@ impl CheckpointSignatureAggregator {
Err(())
}
InsertResult::QuorumReached(cert) => {
- // It is not guaranteed that signature.authority == narwhal_cert.author, but we
- // do verify the signature so we know that the author signed the
- // message at some point.
+ // It is not guaranteed that signature.authority == consensus_cert.author, but
+ // we do verify the signature so we know that the author signed
+ // the message at some point.
if their_digest != self.digest {
self.metrics.remote_checkpoint_forks.inc();
warn!(
diff --git a/narwhal/network/src/connectivity.rs b/crates/iota-core/src/connection_monitor.rs
similarity index 73%
rename from narwhal/network/src/connectivity.rs
rename to crates/iota-core/src/connection_monitor.rs
index 2d32cc11a7c..a15afd715c4 100644
--- a/narwhal/network/src/connectivity.rs
+++ b/crates/iota-core/src/connection_monitor.rs
@@ -7,15 +7,30 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use anemo::{PeerId, types::PeerEvent};
use dashmap::DashMap;
use futures::future;
-use iota_metrics::spawn_logged_monitored_task;
+use iota_metrics::{metrics_network::NetworkConnectionMetrics, spawn_logged_monitored_task};
use quinn_proto::ConnectionStats;
-use tokio::{task::JoinHandle, time};
-use types::ConditionalBroadcastReceiver;
-
-use crate::metrics::NetworkConnectionMetrics;
+use tokio::{sync::broadcast, task::JoinHandle, time};
const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);
+#[derive(Debug)]
+pub struct ConditionalBroadcastReceiver {
+ pub receiver: broadcast::Receiver<()>,
+}
+
+/// ConditionalBroadcastReceiver has an additional method for convenience to be
+/// able to use to conditionally check for shutdown in all branches of a select
+/// statement. Using this method will allow for the shutdown signal to propagate
+/// faster, since we will no longer be waiting until the branch that checks the
+/// receiver is randomly selected by the select macro.
+impl ConditionalBroadcastReceiver {
+ pub async fn received_signal(&mut self) -> bool {
+ futures::future::poll_immediate(&mut Box::pin(self.receiver.recv()))
+ .await
+ .is_some()
+ }
+}
+
#[derive(Eq, PartialEq, Clone, Debug)]
pub enum ConnectionStatus {
Connected,
@@ -256,15 +271,128 @@ mod tests {
use anemo::{Network, Request, Response};
use bytes::Bytes;
+ use iota_metrics::metrics_network::NetworkConnectionMetrics;
use prometheus::Registry;
- use tokio::time::{sleep, timeout};
+ use tokio::{
+ sync::{broadcast, broadcast::error::SendError},
+ time::{sleep, timeout},
+ };
use tower::util::BoxCloneService;
- use crate::{
- connectivity::{ConnectionMonitor, ConnectionStatus},
- metrics::NetworkConnectionMetrics,
+ use crate::connection_monitor::{
+ ConditionalBroadcastReceiver, ConnectionMonitor, ConnectionStatus,
};
+ /// PreSubscribedBroadcastSender is a wrapped Broadcast channel that limits
+ /// subscription to initialization time. This is designed to be used for
+ /// cancellation signal to all the components, and the limitation is
+ /// intended to prevent a component missing the shutdown signal due to a
+ /// subscription that happens after the shutdown signal was sent. The
+ /// receivers have a special peek method which can be used to
+ /// conditionally check for shutdown signal on the channel.
+ struct PreSubscribedBroadcastSender {
+ sender: broadcast::Sender<()>,
+ receivers: Vec,
+ }
+
+ impl PreSubscribedBroadcastSender {
+ fn new(num_subscribers: u64) -> Self {
+ let (tx_init, _) = broadcast::channel(1);
+ let mut receivers = Vec::new();
+ for _i in 0..num_subscribers {
+ receivers.push(ConditionalBroadcastReceiver {
+ receiver: tx_init.subscribe(),
+ });
+ }
+
+ PreSubscribedBroadcastSender {
+ sender: tx_init,
+ receivers,
+ }
+ }
+
+ fn try_subscribe(&mut self) -> Option {
+ self.receivers.pop()
+ }
+
+ fn send(&self) -> Result> {
+ self.sender.send(())
+ }
+ }
+
+ #[tokio::test]
+ async fn test_pre_subscribed_broadcast() {
+ let mut tx_shutdown = PreSubscribedBroadcastSender::new(2);
+ let mut rx_shutdown_a = tx_shutdown.try_subscribe().unwrap();
+
+ let a = tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = rx_shutdown_a.receiver.recv() => {
+ return 1
+ }
+
+ _ = async{}, if true => {
+ if rx_shutdown_a.received_signal().await {
+ return 1
+ }
+ }
+ }
+ }
+ });
+
+ let mut rx_shutdown_b = tx_shutdown.try_subscribe().unwrap();
+ let rx_shutdown_c = tx_shutdown.try_subscribe();
+
+ assert!(rx_shutdown_c.is_none());
+
+ // send the shutdown signal before we start component b and started listening
+ // for shutdown there
+ assert!(tx_shutdown.send().is_ok());
+
+ let b = tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = rx_shutdown_b.receiver.recv() => {
+ return 2
+ }
+
+ _ = async{}, if true => {
+ if rx_shutdown_b.received_signal().await {
+ return 2
+ }
+ }
+ }
+ }
+ });
+
+ // assert that both component a and b loops have exited, effectively shutting
+ // down
+ assert_eq!(a.await.unwrap() + b.await.unwrap(), 3);
+ }
+
+ #[tokio::test]
+ async fn test_conditional_broadcast_receiver() {
+ let mut tx_shutdown: PreSubscribedBroadcastSender = PreSubscribedBroadcastSender::new(2);
+ let mut rx_shutdown = tx_shutdown.try_subscribe().unwrap();
+
+ let a = tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = async{}, if true => {
+ if rx_shutdown.received_signal().await {
+ return 1
+ }
+ }
+ }
+ }
+ });
+
+ assert!(tx_shutdown.send().is_ok());
+
+ assert_eq!(a.await.unwrap(), 1);
+ }
+
#[tokio::test]
async fn test_connectivity() {
// GIVEN
diff --git a/crates/iota-core/src/consensus_adapter.rs b/crates/iota-core/src/consensus_adapter.rs
index c9076f677a4..37aaa26ccd9 100644
--- a/crates/iota-core/src/consensus_adapter.rs
+++ b/crates/iota-core/src/consensus_adapter.rs
@@ -14,7 +14,6 @@ use std::{
};
use arc_swap::ArcSwap;
-use bytes::Bytes;
use dashmap::{DashMap, try_result::TryResult};
use futures::{
FutureExt,
@@ -22,7 +21,7 @@ use futures::{
pin_mut,
};
use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
-use iota_simulator::{anemo::PeerId, narwhal_network::connectivity::ConnectionStatus};
+use iota_simulator::anemo::PeerId;
use iota_types::{
base_types::{AuthorityName, TransactionDigest},
committee::Committee,
@@ -31,7 +30,6 @@ use iota_types::{
messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
};
use itertools::Itertools;
-use narwhal_types::{TransactionProto, TransactionsClient};
use parking_lot::RwLockReadGuard;
use prometheus::{
Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
@@ -39,7 +37,6 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry,
};
-use tap::prelude::*;
use tokio::{
sync::{Semaphore, SemaphorePermit},
task::JoinHandle,
@@ -49,6 +46,7 @@ use tracing::{debug, info, warn};
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
+ connection_monitor::ConnectionStatus,
consensus_handler::{SequencedConsensusTransactionKey, classify},
epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
metrics::LatencyObserver,
@@ -188,35 +186,6 @@ pub trait SubmitToConsensus: Sync + Send + 'static {
) -> IotaResult;
}
-#[async_trait::async_trait]
-impl SubmitToConsensus for TransactionsClient {
- async fn submit_to_consensus(
- &self,
- transactions: &[ConsensusTransaction],
- _epoch_store: &Arc,
- ) -> IotaResult {
- let transactions_bytes = transactions
- .iter()
- .map(|t| {
- let serialized =
- bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail");
- Bytes::from(serialized)
- })
- .collect::>();
- self.clone()
- .submit_transaction(TransactionProto {
- transactions: transactions_bytes,
- })
- .await
- .map_err(|e| IotaError::ConsensusConnectionBroken(format!("{:?}", e)))
- .tap_err(|r| {
- // Will be logged by caller as well.
- warn!("Submit transaction failed with: {:?}", r);
- })?;
- Ok(())
- }
-}
-
/// Submit Iota certificates to the consensus.
pub struct ConsensusAdapter {
/// The network client connecting to the consensus node of this authority.
@@ -241,7 +210,7 @@ pub struct ConsensusAdapter {
low_scoring_authorities: ArcSwap>>>,
/// A structure to register metrics
metrics: ConsensusAdapterMetrics,
- /// Semaphore limiting parallel submissions to narwhal
+ /// Semaphore limiting parallel submissions to consensus
submit_semaphore: Semaphore,
latency_observer: LatencyObserver,
}
@@ -301,11 +270,12 @@ impl ConsensusAdapter {
self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
}
- // todo - this probably need to hold some kind of lock to make sure epoch does
+ // TODO - this probably need to hold some kind of lock to make sure epoch does
// not change while we are recovering
pub fn submit_recovered(self: &Arc, epoch_store: &Arc) {
- // Currently narwhal worker might lose transactions on restart, so we need to
- // resend them todo - get_all_pending_consensus_transactions is called
+ // Currently consensus worker might lose transactions on restart, so we need to
+ // resend them.
+ // TODO: get_all_pending_consensus_transactions is called
// twice when initializing AuthorityPerEpochStore and here, should not
// be a big deal but can be optimized
let mut recovered = epoch_store.get_all_pending_consensus_transactions();
@@ -612,9 +582,9 @@ impl ConsensusAdapter {
// In addition to that, within_alive_epoch ensures that all pending consensus
// adapter tasks are stopped before reconfiguration can proceed.
//
- // This is essential because narwhal workers reuse same ports when narwhal
+ // This is essential because consensus workers reuse same ports when consensus
// restarts, this means we might be sending transactions from previous
- // epochs to narwhal of new epoch if we have not had this barrier.
+ // epochs to consensus of new epoch if we have not had this barrier.
epoch_store
.within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
.await
diff --git a/crates/iota-core/src/consensus_handler.rs b/crates/iota-core/src/consensus_handler.rs
index cc2119cb189..eb785977989 100644
--- a/crates/iota-core/src/consensus_handler.rs
+++ b/crates/iota-core/src/consensus_handler.rs
@@ -25,7 +25,7 @@ use iota_types::{
};
use lru::LruCache;
use serde::{Deserialize, Serialize};
-use tracing::{debug, error, info, instrument, trace_span, warn};
+use tracing::{debug, info, instrument, trace_span, warn};
use crate::{
authority::{
@@ -201,7 +201,7 @@ impl ConsensusHandler {
let round = consensus_output.leader_round();
- // TODO: Remove this once narwhal is deprecated. For now mysticeti will not
+ // TODO: Is this check necessary? For now mysticeti will not
// return more than one leader per round so we are not in danger of
// ignoring any commits.
assert!(round >= last_committed_round);
@@ -314,18 +314,9 @@ impl ConsensusHandler {
.stats
.inc_num_user_transactions(authority_index as usize);
}
- if let ConsensusTransactionKind::RandomnessStateUpdate(randomness_round, _) =
- &transaction.kind
- {
- // These are deprecated and we should never see them. Log an error and eat
- // the tx if one appears.
- error!(
- "BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}"
- )
- } else {
- let transaction = SequencedConsensusTransactionKind::External(transaction);
- transactions.push((serialized_transaction, transaction, authority_index));
- }
+
+ let transaction = SequencedConsensusTransactionKind::External(transaction);
+ transactions.push((serialized_transaction, transaction, authority_index));
}
}
}
@@ -541,7 +532,6 @@ pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
ConsensusTransactionKind::CapabilityNotificationV1(_) => "capability_notification_v1",
ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
- ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
}
diff --git a/crates/iota-core/src/consensus_types/consensus_output_api.rs b/crates/iota-core/src/consensus_types/consensus_output_api.rs
index d2ae2ae5d73..f1d547ba95a 100644
--- a/crates/iota-core/src/consensus_types/consensus_output_api.rs
+++ b/crates/iota-core/src/consensus_types/consensus_output_api.rs
@@ -5,9 +5,7 @@
use std::fmt::Display;
use consensus_core::{BlockAPI, CommitDigest};
-use fastcrypto::hash::Hash;
use iota_types::{digests::ConsensusCommitDigest, messages_consensus::ConsensusTransaction};
-use narwhal_types::{BatchAPI, CertificateAPI, ConsensusOutputDigest, HeaderAPI};
use crate::consensus_types::AuthorityIndex;
@@ -35,78 +33,6 @@ pub(crate) trait ConsensusOutputAPI: Display {
fn consensus_digest(&self) -> ConsensusCommitDigest;
}
-impl ConsensusOutputAPI for narwhal_types::ConsensusOutput {
- fn reputation_score_sorted_desc(&self) -> Option> {
- if !self.sub_dag.reputation_score.final_of_schedule {
- return None;
- }
- Some(
- self.sub_dag
- .reputation_score
- .authorities_by_score_desc()
- .into_iter()
- .map(|(id, score)| (id.0 as AuthorityIndex, score))
- .collect(),
- )
- }
-
- fn leader_round(&self) -> u64 {
- self.sub_dag.leader_round()
- }
-
- fn leader_author_index(&self) -> AuthorityIndex {
- self.sub_dag.leader.origin().0 as AuthorityIndex
- }
-
- fn commit_timestamp_ms(&self) -> u64 {
- self.sub_dag.commit_timestamp()
- }
-
- fn commit_sub_dag_index(&self) -> u64 {
- self.sub_dag.sub_dag_index
- }
-
- fn transactions(&self) -> ConsensusOutputTransactions {
- assert!(self.sub_dag.certificates.len() == self.batches.len());
- self.sub_dag
- .certificates
- .iter()
- .zip(&self.batches)
- .map(|(cert, batches)| {
- assert_eq!(cert.header().payload().len(), batches.len());
- let transactions: Vec<(&[u8], ConsensusTransaction)> =
- batches.iter().flat_map(|batch| {
- let digest = batch.digest();
- assert!(cert.header().payload().contains_key(&digest));
- batch.transactions().iter().map(move |serialized_transaction| {
- let transaction = match bcs::from_bytes::(
- serialized_transaction,
- ) {
- Ok(transaction) => transaction,
- Err(err) => {
- // This should have been prevented by transaction verifications in consensus.
- panic!(
- "Unexpected malformed transaction (failed to deserialize): {}\nCertificate={:?} BatchDigest={:?} Transaction={:?}",
- err, cert, digest, serialized_transaction
- );
- }
- };
- (serialized_transaction.as_ref(), transaction)
- })
- }).collect();
- (cert.origin().0 as AuthorityIndex, transactions)
- }).collect()
- }
-
- fn consensus_digest(&self) -> ConsensusCommitDigest {
- // We port ConsensusOutputDigest, a narwhal space object, into
- // ConsensusCommitDigest, a iota-core space object. We assume they
- // always have the same format.
- static_assertions::assert_eq_size!(ConsensusCommitDigest, ConsensusOutputDigest);
- ConsensusCommitDigest::new(self.digest().into_inner())
- }
-}
-
impl ConsensusOutputAPI for consensus_core::CommittedSubDag {
fn reputation_score_sorted_desc(&self) -> Option> {
if !self.reputation_scores_desc.is_empty() {
diff --git a/crates/iota-core/src/consensus_validator.rs b/crates/iota-core/src/consensus_validator.rs
index 4962bb0b2f5..907b69ad85a 100644
--- a/crates/iota-core/src/consensus_validator.rs
+++ b/crates/iota-core/src/consensus_validator.rs
@@ -88,8 +88,7 @@ impl IotaTxValidator {
ConsensusTransactionKind::EndOfPublish(_)
| ConsensusTransactionKind::NewJWKFetched(_, _, _)
- | ConsensusTransactionKind::CapabilityNotificationV1(_)
- | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {}
+ | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
}
}
diff --git a/crates/iota-core/src/epoch/randomness.rs b/crates/iota-core/src/epoch/randomness.rs
index acbb37baad7..e63f4f2a4e8 100644
--- a/crates/iota-core/src/epoch/randomness.rs
+++ b/crates/iota-core/src/epoch/randomness.rs
@@ -21,14 +21,13 @@ use futures::{StreamExt, stream::FuturesUnordered};
use iota_macros::fail_point_if;
use iota_network::randomness;
use iota_types::{
- base_types::AuthorityName,
+ base_types::{AuthorityName, CommitRound},
committee::{Committee, EpochId, StakeUnit},
crypto::{AuthorityKeyPair, RandomnessRound},
error::{IotaError, IotaResult},
iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
};
-use narwhal_types::{Round, TimestampMs};
use parking_lot::Mutex;
use rand::{
SeedableRng,
@@ -47,6 +46,9 @@ use crate::{
consensus_adapter::SubmitToConsensus,
};
+/// The epoch UNIX timestamp in milliseconds
+pub type CommitTimestampMs = u64;
+
type PkG = bls12381::G2Element;
type EncG = bls12381::G2Element;
@@ -456,7 +458,7 @@ impl RandomnessManager {
pub(crate) async fn advance_dkg(
&mut self,
consensus_output: &mut ConsensusCommitOutput,
- round: Round,
+ round: CommitRound,
) -> IotaResult {
let epoch_store = self.epoch_store()?;
@@ -690,7 +692,7 @@ impl RandomnessManager {
/// be resumed.
pub(crate) fn reserve_next_randomness(
&mut self,
- commit_timestamp: TimestampMs,
+ commit_timestamp: CommitTimestampMs,
output: &mut ConsensusCommitOutput,
) -> IotaResult