diff --git a/Cargo.lock b/Cargo.lock index c5c7eb150a..a4ce5987a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4857,18 +4857,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "test_sync_state_notifications" -version = "0.1.6" -dependencies = [ - "kaspa-core", - "kaspa-grpc-client", - "kaspa-notify", - "kaspa-rpc-core", - "log", - "tokio", -] - [[package]] name = "textwrap" version = "0.11.0" diff --git a/consensus/src/consensus/services.rs b/consensus/src/consensus/services.rs index 54e596be97..bce22255d6 100644 --- a/consensus/src/consensus/services.rs +++ b/consensus/src/consensus/services.rs @@ -184,6 +184,7 @@ impl ConsensusServices { params.anticone_finalization_depth(), params.ghostdag_k, notification_root, + params.daa_window_params, )); let sync_manager = SyncManager::new( diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 8bb70c9c89..f6d8ec9895 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -11,5 +11,3 @@ pub mod params; pub mod pipeline; pub mod processes; pub mod test_helpers; - -pub mod sync_state; diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index cc0efb4b13..fc4cacb2cd 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -76,7 +76,6 @@ use kaspa_muhash::MuHash; use kaspa_notify::notifier::Notify; use crate::model::stores::headers::CompactHeaderData; -use crate::sync_state::SYNC_STATE; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use itertools::Itertools; use kaspa_consensus_core::config::params::DAAWindowParams; @@ -319,8 +318,9 @@ impl VirtualStateProcessor { ))) .expect("expecting an open unbounded channel"); - SYNC_STATE.is_synced_or(|| { - let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(new_sink).unwrap(); + { + let sink = self.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent; + let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(sink).unwrap(); let diff = -(unix_now() as i64) + timestamp as i64 + self.daa_window_params.expected_daa_window_duration_in_milliseconds(daa_score) as i64; @@ -331,8 +331,7 @@ impl VirtualStateProcessor { )) .expect("expecting an open unbounded channel"); } - diff - }); + } } pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index 59895d2520..d1a7ab132c 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -10,6 +10,7 @@ use itertools::Itertools; use parking_lot::RwLock; use rocksdb::WriteBatch; +use kaspa_consensus_core::config::params::DAAWindowParams; use kaspa_consensus_core::{ blockhash::{BlockHashExtensions, BlockHashes, ORIGIN}, errors::{ @@ -25,6 +26,7 @@ use kaspa_consensus_notify::{ notification::{Notification, SyncStateChangedNotification}, root::ConsensusNotificationRoot, }; +use kaspa_core::time::unix_now; use kaspa_core::{debug, info, trace}; use kaspa_database::prelude::{ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions}; use kaspa_hashes::Hash; @@ -32,7 +34,7 @@ use kaspa_notify::notifier::Notify; use kaspa_pow::calc_block_level; use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions}; -use crate::sync_state::SYNC_STATE; +use crate::model::stores::headers::CompactHeaderData; use crate::{ consensus::{ services::{DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbWindowManager}, @@ -108,6 +110,7 @@ pub struct PruningProofManager { anticone_finalization_depth: u64, ghostdag_k: KType, notification_root: Arc, + daa_window_params: DAAWindowParams, } impl PruningProofManager { @@ -126,6 +129,7 @@ impl PruningProofManager { anticone_finalization_depth: u64, ghostdag_k: KType, notification_root: Arc, + daa_window_params: DAAWindowParams, ) -> Self { Self { db, @@ -157,6 +161,7 @@ impl PruningProofManager { anticone_finalization_depth, ghostdag_k, notification_root, + daa_window_params, } } @@ -414,11 +419,19 @@ impl PruningProofManager { let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1]; for level in (0..=self.max_block_level).rev() { - if !SYNC_STATE.is_synced() { - self.notification_root - .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level))) - .expect("expecting an open unbounded channel"); + { + let sink = self.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent; + let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(sink).unwrap(); + let diff = -(unix_now() as i64) + + timestamp as i64 + + self.daa_window_params.expected_daa_window_duration_in_milliseconds(daa_score) as i64; + if diff > 0 { + self.notification_root + .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level))) + .expect("expecting an open unbounded channel"); + } } + info!("Validating level {level} from the pruning point proof"); let level_idx = level as usize; let mut selected_tip = None; diff --git a/consensus/src/sync_state.rs b/consensus/src/sync_state.rs deleted file mode 100644 index bad17d2532..0000000000 --- a/consensus/src/sync_state.rs +++ /dev/null @@ -1,59 +0,0 @@ -use kaspa_consensus_core::config::params::DAAWindowParams; -use kaspa_consensusmanager::ConsensusManager; -use kaspa_core::time::unix_now; -use once_cell::sync::{Lazy, OnceCell}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -pub static SYNC_STATE: Lazy = Lazy::new(SyncState::default); - -#[derive(Default)] -pub struct SyncState { - pub consensus_manager: OnceCell>, - pub daa_window_params: OnceCell, - pub has_peers: OnceCell bool + 'static + Sync + Send>>, - - is_nearly_synced: Arc, -} - -impl SyncState { - pub fn is_synced(&self) -> bool { - self.is_nearly_synced.load(Ordering::Acquire) && self.has_peers.get().is_some_and(|has_peers| has_peers()) - } - - // diff is sink timestamp + expected_daa_window_duration_in_milliseconds(daa_score) - unix_now() - pub(super) fn is_synced_or(&self, check_diff: impl FnOnce() -> i64) -> bool { - let (is_nearly_synced, has_peers) = - (self.is_nearly_synced.load(Ordering::Acquire), self.has_peers.get().is_some_and(|has_peers| has_peers())); - if !is_nearly_synced && has_peers { - let diff = check_diff(); - if diff > 0 && self.is_nearly_synced.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed).is_ok() { - self.watch(diff); - } - } - is_nearly_synced && has_peers - } - - fn watch(&self, mut diff: i64) { - let is_nearly_synced = Arc::clone(&self.is_nearly_synced); - let daa_window_params = *self.daa_window_params.get().unwrap(); - let consensus_manager = Arc::clone(self.consensus_manager.get().unwrap()); - - tokio::spawn(async move { - while diff > 0 { - tokio::time::sleep(Duration::from_millis(diff as u64)).await; - - let session = consensus_manager.consensus().session().await; - let sink = session.async_get_sink().await; - let h = session.async_get_header(sink).await.unwrap(); - drop(session); - - diff = -(unix_now() as i64) - + daa_window_params.expected_daa_window_duration_in_milliseconds(h.daa_score) as i64 - + h.timestamp as i64; - } - is_nearly_synced.store(false, Ordering::Release); - }); - } -} diff --git a/indexes/utxoindex/Cargo.toml b/indexes/utxoindex/Cargo.toml index 5d06fa6aee..8be9701c18 100644 --- a/indexes/utxoindex/Cargo.toml +++ b/indexes/utxoindex/Cargo.toml @@ -9,7 +9,6 @@ license.workspace = true [dependencies] kaspa-hashes.workspace = true -kaspa-consensus.workspace = true kaspa-consensus-core.workspace = true kaspa-consensusmanager.workspace = true kaspa-consensus-notify.workspace = true diff --git a/indexes/utxoindex/src/index.rs b/indexes/utxoindex/src/index.rs index 4eb4335aa4..945f01b1b4 100644 --- a/indexes/utxoindex/src/index.rs +++ b/indexes/utxoindex/src/index.rs @@ -148,11 +148,14 @@ impl UtxoIndexApi for UtxoIndex { info!("Resyncing the utxoindex..."); #[cfg(not(test))] - if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() { - self.notification_root - .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync())) - .expect("expecting an open unbounded channel"); + { + if !futures::executor::block_on(self.consensus_manager.consensus().session_blocking()).is_nearly_synced() { + self.notification_root + .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync())) + .expect("expecting an open unbounded channel"); + } } + self.store.delete_all()?; let consensus = self.consensus_manager.consensus(); let session = futures::executor::block_on(consensus.session_blocking()); diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 6846f5ebbc..fdcde64f14 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -1,7 +1,6 @@ use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration}; use async_channel::unbounded; -use kaspa_consensus::sync_state::SYNC_STATE; use kaspa_consensus_core::{ config::{Config, ConfigBuilder}, errors::config::{ConfigError, ConfigResult}, @@ -234,8 +233,6 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm processing_counters.clone(), )); let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory)); - _ = SYNC_STATE.consensus_manager.set(consensus_manager.clone()); - SYNC_STATE.daa_window_params.set(config.params.daa_window_params).unwrap(); let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone())); diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index 92211d2412..98f15b09f3 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -11,7 +11,6 @@ license.workspace = true [dependencies] kaspa-core.workspace = true -kaspa-consensus.workspace = true kaspa-consensus-core.workspace = true kaspa-consensus-notify.workspace = true kaspa-p2p-lib.workspace = true diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 13551e3921..bdf434aeae 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -3,7 +3,6 @@ use crate::v5; use async_trait::async_trait; use kaspa_addressmanager::AddressManager; use kaspa_connectionmanager::ConnectionManager; -use kaspa_consensus::sync_state::SYNC_STATE; use kaspa_consensus_core::block::Block; use kaspa_consensus_core::config::Config; use kaspa_consensus_core::errors::block::RuleError; @@ -183,8 +182,6 @@ impl FlowContext { notification_root: Arc, ) -> Self { let hub = Hub::new(); - let moved_hub = hub.clone(); - _ = SYNC_STATE.has_peers.set(Box::new(move || moved_hub.has_peers() as _)); let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32; diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 7fcca6fefe..7c9fa52fce 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -14,7 +14,6 @@ use kaspa_consensus_core::{ BlockHashSet, }; use kaspa_consensus_notify::notification::{Notification, SyncStateChangedNotification}; -use kaspa_consensus_notify::root::ConsensusNotificationRoot; use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy, StagingConsensus}; use kaspa_core::{debug, info, warn}; use kaspa_hashes::Hash; @@ -46,8 +45,6 @@ pub struct IbdFlow { // Receives relay blocks from relay flow which are out of orphan resolution range and hence trigger IBD relay_receiver: Receiver, - - notification_root: Option>, } #[async_trait::async_trait] @@ -70,14 +67,8 @@ pub enum IbdType { // TODO: define a peer banning strategy impl IbdFlow { - pub fn new( - ctx: FlowContext, - router: Arc, - incoming_route: IncomingRoute, - relay_receiver: Receiver, - notification_root: Arc, - ) -> Self { - Self { ctx, router, incoming_route, relay_receiver, notification_root: Some(notification_root) } + pub fn new(ctx: FlowContext, router: Arc, incoming_route: IncomingRoute, relay_receiver: Receiver) -> Self { + Self { ctx, router, incoming_route, relay_receiver } } async fn start_impl(&mut self) -> Result<(), ProtocolError> { @@ -291,10 +282,9 @@ impl IbdFlow { if passed > Duration::from_secs(1) { info!("Processed {} trusted blocks in the last {:.2}s (total {})", i - last_index, passed.as_secs_f64(), i); - if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() { - self.notification_root - .as_ref() - .unwrap() + if !self.ctx.consensus().session().await.async_is_nearly_synced().await { + self.ctx + .notification_root .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_trust_sync( (i - last_index) as u64, i as u64, @@ -328,10 +318,9 @@ impl IbdFlow { relay_block.header.daa_score, "block headers", Some(|headers: usize, progress: i32| { - if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() { - self.notification_root - .as_ref() - .unwrap() + if !futures::executor::block_on(self.ctx.consensus().session_blocking()).is_nearly_synced() { + self.ctx + .notification_root .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_headers( headers as u64, progress as i64, @@ -446,8 +435,7 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", RequestPruningPointUtxoSetMessage { pruning_point_hash: Some(pruning_point.into()) } )) .await?; - let mut chunk_stream = - PruningPointUtxosetChunkStream::new(&self.router, &mut self.incoming_route, self.notification_root.as_ref().unwrap()); + let mut chunk_stream = PruningPointUtxosetChunkStream::new(&self.router, &mut self.incoming_route, &self.ctx); let mut multiset = MuHash::new(); while let Some(chunk) = chunk_stream.next().await? { multiset = consensus @@ -471,13 +459,14 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", let low_header = consensus.async_get_header(*hashes.first().expect("hashes was non empty")).await?; let high_header = consensus.async_get_header(*hashes.last().expect("hashes was non empty")).await?; - let notification_root = self.notification_root.take().unwrap(); + let notification_root = self.ctx.notification_root.clone(); + let consensus_move = self.ctx.consensus().clone(); let mut progress_reporter = ProgressReporter::new( low_header.daa_score, high_header.daa_score, "blocks", - Some(|blocks, progress| { - if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() { + Some(move |blocks, progress| { + if !futures::executor::block_on(consensus_move.session_blocking()).is_nearly_synced() { notification_root .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_blocks( blocks as u64, @@ -506,7 +495,6 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", let prev_chunk_len = prev_jobs.len(); try_join_all(prev_jobs).await?; progress_reporter.report_completion(prev_chunk_len); - self.notification_root = Some(notification_root); self.ctx.on_new_block_template().await?; Ok(()) diff --git a/protocol/flows/src/v5/ibd/streams.rs b/protocol/flows/src/v5/ibd/streams.rs index e257425fcc..d024efd127 100644 --- a/protocol/flows/src/v5/ibd/streams.rs +++ b/protocol/flows/src/v5/ibd/streams.rs @@ -2,13 +2,13 @@ //! Logical stream abstractions used throughout the IBD negotiation protocols //! +use crate::flow_context::FlowContext; use kaspa_consensus_core::{ errors::consensus::ConsensusError, header::Header, tx::{TransactionOutpoint, UtxoEntry}, }; use kaspa_consensus_notify::notification::{Notification, SyncStateChangedNotification}; -use kaspa_consensus_notify::root::ConsensusNotificationRoot; use kaspa_core::{debug, info}; use kaspa_notify::notifier::Notify; use kaspa_p2p_lib::{ @@ -138,12 +138,12 @@ pub struct PruningPointUtxosetChunkStream<'a, 'b> { incoming_route: &'b mut IncomingRoute, i: usize, // Chunk index utxo_count: usize, - notification_root: &'a ConsensusNotificationRoot, + ctx: &'a FlowContext, } impl<'a, 'b> PruningPointUtxosetChunkStream<'a, 'b> { - pub fn new(router: &'a Router, incoming_route: &'b mut IncomingRoute, notification_root: &'a ConsensusNotificationRoot) -> Self { - Self { router, incoming_route, i: 0, utxo_count: 0, notification_root } + pub fn new(router: &'a Router, incoming_route: &'b mut IncomingRoute, ctx: &'a FlowContext) -> Self { + Self { router, incoming_route, i: 0, utxo_count: 0, ctx } } pub async fn next(&mut self) -> Result, ProtocolError> { @@ -183,8 +183,9 @@ impl<'a, 'b> PruningPointUtxosetChunkStream<'a, 'b> { self.utxo_count += chunk.len(); if self.i % IBD_BATCH_SIZE == 0 { info!("Received {} UTXO set chunks so far, totaling in {} UTXOs", self.i, self.utxo_count); - if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() { - self.notification_root + if !self.ctx.consensus_manager.consensus().session().await.async_is_nearly_synced().await { + self.ctx + .notification_root .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_sync( self.i as u64, self.utxo_count as u64, diff --git a/protocol/flows/src/v5/mod.rs b/protocol/flows/src/v5/mod.rs index be22c3d3bf..91afb6fe3f 100644 --- a/protocol/flows/src/v5/mod.rs +++ b/protocol/flows/src/v5/mod.rs @@ -56,7 +56,6 @@ pub fn register(ctx: FlowContext, router: Arc) -> Vec> { KaspadMessagePayloadType::DonePruningPointUtxoSetChunks, ]), relay_receiver, - ctx.notification_root.clone(), )), Box::new(HandleRelayInvsFlow::new( ctx.clone(),