diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 169faecebf..c124a3bf9b 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1743,6 +1743,7 @@ impl ChainReader for BlockChain { ) } + //todo: return status as reference fn status(&self) -> ChainStatus { self.status.status.clone() } diff --git a/config/src/genesis_config.rs b/config/src/genesis_config.rs index 9aada1eae1..cd10d910e9 100644 --- a/config/src/genesis_config.rs +++ b/config/src/genesis_config.rs @@ -891,7 +891,7 @@ pub static G_DEV_CONFIG: Lazy = Lazy::new(|| { min_action_delay: 60 * 60 * 1000, // 1h }, transaction_timeout: ONE_DAY, - dag_effective_height: u64::MAX, + dag_effective_height: 0, } }); diff --git a/miner/Cargo.toml b/miner/Cargo.toml index c5731ab84a..55f6e7152f 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -27,8 +27,9 @@ starcoin-txpool = { workspace = true } starcoin-txpool-api = { workspace = true } starcoin-vm-types = { workspace = true } tokio = { features = ["full"], workspace = true } -starcoin-dag = { workspace =true } +starcoin-dag = { workspace = true } starcoin-types = { workspace = true } +starcoin-sync = { workspace = true } [dev-dependencies] starcoin-network-rpc = { workspace = true } diff --git a/miner/src/create_block_template/mod.rs b/miner/src/create_block_template/mod.rs index 3734f75778..62e9c1fb86 100644 --- a/miner/src/create_block_template/mod.rs +++ b/miner/src/create_block_template/mod.rs @@ -2,38 +2,33 @@ // SPDX-License-Identifier: Apache-2.0 use crate::create_block_template::metrics::BlockBuilderMetrics; -use anyhow::{anyhow, bail, format_err, Result}; +use anyhow::{format_err, Result}; use futures::executor::block_on; use starcoin_account_api::{AccountAsyncService, AccountInfo, DefaultAccountChangeEvent}; use starcoin_account_service::AccountService; -use starcoin_chain::BlockChain; -use starcoin_chain::{ChainReader, ChainWriter}; -use starcoin_chain_api::ChainType; -use starcoin_config::ChainNetwork; + use starcoin_config::NodeConfig; -use starcoin_consensus::Consensus; use starcoin_crypto::hash::HashValue; -use starcoin_dag::blockdag::BlockDAG; + use starcoin_executor::VMMetrics; use starcoin_logger::prelude::*; use starcoin_open_block::OpenedBlock; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, + ServiceRequest, }; -use starcoin_storage::{BlockStore, Storage, Store}; +use starcoin_storage::{Storage, Store}; +use starcoin_sync::block_connector::{BlockConnectorService, MinerRequest, MinerResponse}; use starcoin_txpool::TxPoolService; use starcoin_txpool_api::TxPoolSyncService; -use starcoin_types::{ - block::{BlockHeader, BlockTemplate, ExecutedBlock}, - system_events::{NewBranch, NewHeadBlock}, -}; +use starcoin_types::block::{BlockHeader, BlockTemplate}; use starcoin_vm_types::transaction::SignedUserTransaction; use std::cmp::min; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; mod metrics; -#[cfg(test)] -mod test_create_block_template; +//#[cfg(test)] +//mod test_create_block_template; #[derive(Debug)] pub struct GetHeadRequest; @@ -56,7 +51,7 @@ pub struct BlockTemplateResponse { } pub struct BlockBuilderService { - inner: Inner, + inner: Inner, } impl BlockBuilderService {} @@ -65,9 +60,9 @@ impl ServiceFactory for BlockBuilderService { fn create(ctx: &mut ServiceContext) -> Result { let config = ctx.get_shared::>()?; let storage = ctx.get_shared::>()?; - let startup_info = storage - .get_startup_info()? - .expect("Startup info should exist when service start."); + let connector_service = ctx + .service_ref::>()? + .clone(); //TODO support get service ref by AsyncAPI; let account_service = ctx.service_ref::()?; let miner_account = block_on(async { account_service.get_default_account().await })? @@ -81,18 +76,15 @@ impl ServiceFactory for BlockBuilderService { .and_then(|registry| BlockBuilderMetrics::register(registry).ok()); let vm_metrics = ctx.get_shared_opt::()?; - let dag = ctx.get_shared::()?; let inner = Inner::new( - config.net(), + connector_service, storage, - startup_info.main, txpool, config.miner.block_gas_limit, miner_account, metrics, vm_metrics, - dag, )?; Ok(Self { inner }) } @@ -100,34 +92,16 @@ impl ServiceFactory for BlockBuilderService { impl ActorService for BlockBuilderService { fn started(&mut self, ctx: &mut ServiceContext) -> Result<()> { - ctx.subscribe::(); - ctx.subscribe::(); ctx.subscribe::(); Ok(()) } fn stopped(&mut self, ctx: &mut ServiceContext) -> Result<()> { - ctx.unsubscribe::(); - ctx.unsubscribe::(); ctx.unsubscribe::(); Ok(()) } } -impl EventHandler for BlockBuilderService { - fn handle_event(&mut self, msg: NewHeadBlock, _ctx: &mut ServiceContext) { - if let Err(e) = self.inner.update_chain(msg.executed_block.as_ref().clone()) { - error!("err : {:?}", e) - } - } -} - -impl EventHandler for BlockBuilderService { - fn handle_event(&mut self, msg: NewBranch, _ctx: &mut ServiceContext) { - self.inner.insert_uncle(msg.0.block.header().clone()); - } -} - impl EventHandler for BlockBuilderService { fn handle_event( &mut self, @@ -145,19 +119,7 @@ impl ServiceHandler for BlockBuilderService { _msg: BlockTemplateRequest, _ctx: &mut ServiceContext, ) -> Result { - let template = self.inner.create_block_template(); - self.inner.uncles_prune(); - template - } -} - -impl ServiceHandler for BlockBuilderService { - fn handle( - &mut self, - _msg: GetHeadRequest, - _ctx: &mut ServiceContext, - ) -> HashValue { - self.inner.chain.current_header().id() + self.inner.create_block_template() } } @@ -186,154 +148,53 @@ impl TemplateTxProvider for TxPoolService { } } -pub struct Inner

{ +pub struct Inner { storage: Arc, - chain: BlockChain, + block_connector_service: ServiceRef>, tx_provider: P, - parent_uncle: HashMap>, - uncles: HashMap, local_block_gas_limit: Option, miner_account: AccountInfo, + #[allow(unused)] metrics: Option, vm_metrics: Option, - dag: BlockDAG, } -impl

Inner

+impl Inner where P: TemplateTxProvider, + T: TxPoolSyncService, { pub fn new( - net: &ChainNetwork, + block_connector_service: ServiceRef>, storage: Arc, - block_id: HashValue, tx_provider: P, local_block_gas_limit: Option, miner_account: AccountInfo, metrics: Option, vm_metrics: Option, - dag: BlockDAG, ) -> Result { - let chain = BlockChain::new( - net.time_service(), - block_id, - storage.clone(), - vm_metrics.clone(), - dag.clone(), - )?; - Ok(Inner { storage, - chain, + block_connector_service, tx_provider, - parent_uncle: HashMap::new(), - uncles: HashMap::new(), local_block_gas_limit, miner_account, metrics, vm_metrics, - dag, }) } - pub fn insert_uncle(&mut self, uncle: BlockHeader) { - self.parent_uncle - .entry(uncle.parent_hash()) - .or_default() - .push(uncle.id()); - self.uncles.insert(uncle.id(), uncle); - if let Some(metrics) = self.metrics.as_ref() { - metrics - .current_epoch_maybe_uncles - .set(self.uncles.len() as u64); - } - } - - pub fn update_chain(&mut self, block: ExecutedBlock) -> Result<()> { - let current_header = self.chain.current_header(); - let current_id = current_header.id(); - if self.chain.can_connect(&block) { - self.chain.connect(block)?; - } else { - self.chain = BlockChain::new( - self.chain.time_service(), - block.header().id(), - self.storage.clone(), - self.vm_metrics.clone(), - self.dag.clone(), - )?; - //current block possible be uncle. - self.uncles.insert(current_id, current_header); - - if let Some(metrics) = self.metrics.as_ref() { - metrics - .current_epoch_maybe_uncles - .set(self.uncles.len() as u64); - } - } - Ok(()) - } - - pub fn find_uncles(&self) -> Vec { - let mut new_uncle = Vec::new(); - let epoch = self.chain.epoch(); - if epoch.end_block_number() != (self.chain.current_header().number() + 1) { - for maybe_uncle in self.uncles.values() { - if new_uncle.len() as u64 >= epoch.max_uncles_per_block() { - break; - } - if self.chain.can_be_uncle(maybe_uncle).unwrap_or_default() { - new_uncle.push(maybe_uncle.clone()) - } - } - } - - new_uncle - } - - fn uncles_prune(&mut self) { - if !self.uncles.is_empty() { - let epoch = self.chain.epoch(); - // epoch的end_number是开区间,当前块已经生成但还没有apply,所以应该在epoch(最终状态) - // 的倒数第二块处理时清理uncles - if epoch.end_block_number() == (self.chain.current_header().number() + 2) { - self.uncles.clear(); - } - } - if let Some(metrics) = self.metrics.as_ref() { - metrics - .current_epoch_maybe_uncles - .set(self.uncles.len() as u64); - } - } - - fn get_dag_previous_header( - &self, - previous_header: BlockHeader, - selected_parent: HashValue, - ) -> Result { - if previous_header.id() == selected_parent { - return Ok(previous_header); - } - - if self - .chain - .dag() - .check_ancestor_of(previous_header.id(), vec![selected_parent])? - { - return self - .storage - .get_block_header_by_hash(selected_parent)? - .ok_or_else(|| { - format_err!("BlockHeader should exist by hash: {}", selected_parent) - }); - } - - Ok(previous_header) - } - pub fn create_block_template(&self) -> Result { - let on_chain_block_gas_limit = self.chain.epoch().block_gas_limit(); + let MinerResponse { + previous_header, + tips_hash, + blues_hash: blues, + strategy, + on_chain_block_gas_limit, + next_difficulty: difficulty, + now_milliseconds: mut now_millis, + } = *block_on(self.block_connector_service.send(MinerRequest {}))??; + let block_gas_limit = self .local_block_gas_limit .map(|block_gas_limit| min(block_gas_limit, on_chain_block_gas_limit)) @@ -345,11 +206,8 @@ where let txns = self.tx_provider.get_txns(max_txns); let author = *self.miner_account.address(); - let mut previous_header = self.chain.current_header(); - let epoch = self.chain.epoch(); - let strategy = epoch.strategy(); + let current_number = previous_header.number().saturating_add(1); - let mut now_millis = self.chain.time_service().now_millis(); if now_millis <= previous_header.timestamp() { info!( "Adjust new block timestamp by parent timestamp, parent.timestamp: {}, now: {}, gap: {}", @@ -357,41 +215,18 @@ where ); now_millis = previous_header.timestamp() + 1; } - let difficulty = strategy.calculate_next_difficulty(&self.chain)?; - let tips_hash = match self.chain.check_chain_type()? { - ChainType::Single => None, - ChainType::Dag => { - let (_dag_genesis, tips_hash) = self.chain.current_tips_hash()?.ok_or_else(|| { - anyhow!( - "the number of the block is larger than the dag fork number but no dag state!" - ) - })?; - Some(tips_hash) - } - }; - info!( - "block:{} tips(dag state):{:?}", - self.chain.current_header().number(), - &tips_hash, - ); + info!("block:{} tips(dag state):{:?}", current_number, &tips_hash,); let (uncles, blue_blocks) = { match &tips_hash { - None => (self.find_uncles(), None), + // fixme: remove this branch when single chain logic is removed + None => (vec![], None), Some(tips) => { - let mut blues = self.dag.ghostdata(tips)?.mergeset_blues.to_vec(); - if blues.is_empty() { - bail!("The count of ghostdata returns mergeset blues is empty"); - } info!( "create block template with tips:{:?},ghostdata blues:{:?}", - &tips_hash, blues + tips, blues ); let mut blue_blocks = vec![]; - - let selected_parent = blues.remove(0); - previous_header = - self.get_dag_previous_header(previous_header, selected_parent)?; - for blue in &blues { + for blue in blues.iter().skip(1) { // todo: make sure blue block has been executed successfully let block = self .storage @@ -410,6 +245,7 @@ where } } }; + info!( "[CreateBlockTemplate] previous_header: {:?}, block_gas_limit: {}, max_txns: {}, txn len: {}, uncles len: {}, timestamp: {}", previous_header, diff --git a/miner/tests/miner_test.rs b/miner/tests/miner_test.rs index 8edd7a7fec..f8996def5f 100644 --- a/miner/tests/miner_test.rs +++ b/miner/tests/miner_test.rs @@ -10,6 +10,7 @@ use starcoin_miner::{ }; use starcoin_service_registry::{RegistryAsyncService, RegistryService}; use starcoin_storage::BlockStore; +use starcoin_sync::block_connector::BlockConnectorService; use starcoin_txpool::TxPoolService; use starcoin_types::{system_events::GenerateBlockEvent, U256}; use std::sync::Arc; @@ -18,7 +19,7 @@ use tokio::time::sleep; #[stest::test] async fn test_miner_service() { - let mut config = NodeConfig::random_for_test(); + let mut config = NodeConfig::random_for_dag_test(); config.miner.disable_mint_empty_block = Some(false); let registry = RegistryService::launch(); let node_config = Arc::new(config.clone()); @@ -41,6 +42,11 @@ async fn test_miner_service() { .await .unwrap(); + registry + .register::>() + .await + .unwrap(); + let template = registry.register::().await.unwrap(); let response = template .send(BlockTemplateRequest) diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index 3c86b75aa7..952a6c7a03 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -7,22 +7,23 @@ use super::CheckBlockConnectorHashValue; use super::CreateBlockRequest; #[cfg(test)] use super::CreateBlockResponse; -use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; +use crate::block_connector::{ + ExecuteRequest, MinerRequest, MinerResponse, ResetRequest, WriteBlockChainService, +}; use crate::sync::{CheckSyncEvent, SyncService}; use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent}; -#[cfg(test)] -use anyhow::bail; -use anyhow::{format_err, Ok, Result}; +use anyhow::{anyhow, bail, format_err, Ok, Result}; use network_api::PeerProvider; -use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; +use starcoin_chain_api::{ChainReader, ChainType, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; +use starcoin_consensus::Consensus; use starcoin_crypto::HashValue; use starcoin_dag::blockdag::BlockDAG; use starcoin_executor::VMMetrics; use starcoin_logger::prelude::*; use starcoin_network::NetworkServiceRef; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest, }; use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::PeerNewBlock; @@ -30,7 +31,7 @@ use starcoin_txpool::TxPoolService; use starcoin_txpool_api::TxPoolSyncService; #[cfg(test)] use starcoin_txpool_mock_service::MockTxPoolService; -use starcoin_types::block::ExecutedBlock; +use starcoin_types::block::{BlockHeader, ExecutedBlock}; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; use std::sync::Arc; @@ -118,6 +119,33 @@ where None } + + fn get_dag_previous_header( + &self, + previous_header: BlockHeader, + selected_parent: HashValue, + ) -> Result { + if previous_header.id() == selected_parent { + return std::prelude::rust_2015::Ok(previous_header); + } + + if self + .chain_service + .get_dag() + .check_ancestor_of(previous_header.id(), vec![selected_parent])? + { + return self + .chain_service + .get_main() + .get_storage() + .get_block_header_by_hash(selected_parent)? + .ok_or_else(|| { + format_err!("BlockHeader should exist by hash: {}", selected_parent) + }); + } + + Ok(previous_header) + } } impl ServiceFactory @@ -385,6 +413,56 @@ where } } +impl ServiceHandler for BlockConnectorService +where + T: TxPoolSyncService + 'static, +{ + fn handle( + &mut self, + _msg: MinerRequest, + _ctx: &mut ServiceContext, + ) -> ::Response { + let main = self.chain_service.get_main(); + let dag = self.chain_service.get_dag(); + let epoch = main.epoch().clone(); + let strategy = epoch.strategy(); + let on_chain_block_gas_limit = epoch.block_gas_limit(); + let mut previous_header = main.status().head().clone(); + let (tips_hash, blues_hash) = match main.check_chain_type()? { + ChainType::Single => (None, vec![]), + ChainType::Dag => { + let (_dag_genesis, tips_hash) = main.current_tips_hash()?.ok_or_else(|| { + anyhow!( + "the number of the block is larger than the dag fork number but no dag state!" + ) + })?; + let blues_hash = dag.ghostdata(tips_hash.as_ref())?.mergeset_blues.to_vec(); + if blues_hash.is_empty() { + bail!("The count of ghostdata returns mergeset blues is empty"); + } + let selected_parent = blues_hash.first().unwrap(); + previous_header = + self.get_dag_previous_header(previous_header, *selected_parent)?; + + (Some(tips_hash), blues_hash) + } + }; + + let next_difficulty = epoch.strategy().calculate_next_difficulty(main)?; + let now_milliseconds = main.time_service().now_millis(); + + Ok(Box::new(MinerResponse { + previous_header, + on_chain_block_gas_limit, + tips_hash, + blues_hash, + strategy, + next_difficulty, + now_milliseconds, + })) + } +} + #[cfg(test)] impl ServiceHandler for BlockConnectorService diff --git a/sync/src/block_connector/mod.rs b/sync/src/block_connector/mod.rs index 6f726c3e85..c533bba9bd 100644 --- a/sync/src/block_connector/mod.rs +++ b/sync/src/block_connector/mod.rs @@ -16,20 +16,18 @@ mod test_write_dag_block_chain; mod write_block_chain; pub use block_connector_service::BlockConnectorService; -#[cfg(test)] use starcoin_types::block::BlockHeader; -#[cfg(test)] -use starcoin_types::transaction::SignedUserTransaction; -#[cfg(test)] -use starcoin_vm_types::account_address::AccountAddress; +use starcoin_types::U256; +use starcoin_vm_types::genesis_config::ConsensusStrategy; pub use write_block_chain::WriteBlockChainService; - #[cfg(test)] -pub use test_write_block_chain::create_writeable_block_chain; -#[cfg(test)] -pub use test_write_block_chain::gen_blocks; +use { + starcoin_types::transaction::SignedUserTransaction, + starcoin_vm_types::account_address::AccountAddress, +}; + #[cfg(test)] -pub use test_write_block_chain::new_block; +pub use test_write_block_chain::{create_writeable_block_chain, gen_blocks, new_block}; #[derive(Debug, Clone)] pub struct ResetRequest { @@ -49,6 +47,24 @@ impl ServiceRequest for ExecuteRequest { type Response = anyhow::Result; } +#[derive(Clone, Debug)] +pub struct MinerRequest {} + +#[derive(Clone, Debug)] +pub struct MinerResponse { + pub previous_header: BlockHeader, + pub tips_hash: Option>, + pub blues_hash: Vec, + pub strategy: ConsensusStrategy, + pub on_chain_block_gas_limit: u64, + pub next_difficulty: U256, + pub now_milliseconds: u64, +} + +impl ServiceRequest for MinerRequest { + type Response = anyhow::Result>; +} + #[cfg(test)] #[derive(Clone, Debug)] pub struct CreateBlockRequest { diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 351e43dc08..bd7b27396b 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -207,6 +207,7 @@ where &self.main } + //todo: return a reference pub fn get_dag(&self) -> BlockDAG { self.dag.clone() } diff --git a/test-helper/src/txpool.rs b/test-helper/src/txpool.rs index b0a38c3dfe..528f3798b5 100644 --- a/test-helper/src/txpool.rs +++ b/test-helper/src/txpool.rs @@ -10,9 +10,11 @@ use starcoin_miner::{BlockBuilderService, MinerService}; use starcoin_service_registry::bus::BusService; use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRef}; use starcoin_storage::Storage; +use starcoin_sync::block_connector::BlockConnectorService; use starcoin_txpool::{TxPoolActorService, TxPoolService}; use std::sync::Arc; use std::time::Duration; + pub async fn start_txpool_with_size( pool_size: u64, ) -> ( @@ -62,6 +64,10 @@ pub async fn start_txpool_with_miner( registry.register::().await.unwrap(); if enable_miner { + registry + .register::>() + .await + .unwrap(); registry.register::().await.unwrap(); registry.register::().await.unwrap(); }