diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index c99c397ea6..83d7f06534 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -558,6 +558,76 @@ where Ok(result) } + fn execute_if_parent_ready_norecursion(&mut self, parent_id: HashValue) -> Result<()> { + let mut parent_block_ids = vec![parent_id]; + + while !parent_block_ids.is_empty() { + let mut next_parent_blocks = vec![]; + for parent_block_id in parent_block_ids { + let parent_block = self + .local_store + .get_dag_sync_block(parent_block_id)? + .ok_or_else(|| { + anyhow!( + "the dag block should exist in local store, parent child block id: {:?}", + parent_id, + ) + })?; + let mut executed_children = vec![]; + for child in &parent_block.children { + let child_block = + self.local_store + .get_dag_sync_block(*child)? + .ok_or_else(|| { + anyhow!( + "the dag block should exist in local store, child block id: {:?}", + child + ) + })?; + if child_block + .block + .header() + .parents_hash() + .ok_or_else(|| anyhow!("the dag block's parents should exist"))? + .iter() + .all(|parent| match self.chain.has_dag_block(*parent) { + Ok(has) => has, + Err(e) => { + error!( + "failed to get the block from the chain, block id: {:?}, error: {:?}", + *parent, e + ); + false + } + }) + { + let executed_block = self + .chain + .apply_with_verifier::(child_block.block.clone())?; + info!( + "succeed to apply a dag block: {:?}, number: {:?}", + executed_block.block.id(), + executed_block.block.header().number() + ); + executed_children.push(*child); + self.notify_connected_block( + executed_block.block, + executed_block.block_info.clone(), + BlockConnectAction::ConnectNewBlock, + self.check_enough_by_info(executed_block.block_info)?, + )?; + next_parent_blocks.push(*child); + } + } + self.local_store.delete_dag_sync_block(parent_block_id)?; + } + + parent_block_ids = next_parent_blocks; + } + + Ok(()) + } + fn execute_if_parent_ready(&mut self, parent_id: HashValue) -> Result<()> { let mut parent_block = self.local_store @@ -612,7 +682,7 @@ where BlockConnectAction::ConnectNewBlock, self.check_enough_by_info(executed_block.block_info)?, )?; - self.execute_if_parent_ready(*child)?; + self.execute_if_parent_ready_norecursion(*child)?; self.local_store.delete_dag_sync_block(*child)?; } } @@ -653,15 +723,17 @@ where if block_info.block_accumulator_info.num_leaves == self.target.block_info.block_accumulator_info.num_leaves { - if block_info != self.target.block_info { + if self.chain.check_chain_type()? == ChainType::Dag { + Ok(CollectorState::Enough) + } else if block_info != self.target.block_info { Err(TaskError::BreakError( RpcVerifyError::new_with_peers( self.target.peers.clone(), format!( - "Verify target error, expect target: {:?}, collect target block_info:{:?}", - self.target.block_info, - block_info - ), + "Verify target error, expect target: {:?}, collect target block_info:{:?}", + self.target.block_info, + block_info + ), ) .into(), ) diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index a865ef08a5..e8440cfa6a 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -8,7 +8,6 @@ use crate::tasks::{ full_sync_task, AccumulatorCollector, AncestorCollector, BlockAccumulatorSyncTask, BlockCollector, BlockFetcher, BlockLocalStore, BlockSyncTask, FindAncestorTask, SyncFetcher, }; -use crate::verified_rpc_client::RpcVerifyError; use anyhow::{format_err, Result}; use anyhow::{Context, Ok}; use futures::channel::mpsc::unbounded; @@ -37,9 +36,7 @@ use starcoin_types::{ }; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use stream_task::{ - DefaultCustomErrorHandle, Generator, TaskError, TaskEventCounterHandle, TaskGenerator, -}; +use stream_task::{DefaultCustomErrorHandle, Generator, TaskEventCounterHandle, TaskGenerator}; use test_helper::DummyNetworkService; use super::test_tools::{full_sync_new_node, SyncTestSystem}; @@ -50,61 +47,6 @@ pub async fn test_full_sync_new_node() -> Result<()> { full_sync_new_node().await } -#[stest::test] -pub async fn test_sync_invalid_target() -> Result<()> { - let net1 = ChainNetwork::new_builtin(BuiltinNetworkID::DagTest); - let mut node1 = SyncNodeMocker::new(net1, 300, 0)?; - node1.produce_block(10)?; - - let arc_node1 = Arc::new(node1); - - let net2 = ChainNetwork::new_builtin(BuiltinNetworkID::DagTest); - - let node2 = SyncNodeMocker::new(net2.clone(), 300, 0)?; - let dag = node2.chain().dag(); - let mut target = arc_node1.sync_target(); - - target.block_info.total_difficulty = U256::max_value(); - - let current_block_header = node2.chain().current_header(); - let dag_fork_height = node2.dag_fork_number()?; - - let storage = node2.chain().get_storage(); - let (sender_1, receiver_1) = unbounded(); - let (sender_2, _receiver_2) = unbounded(); - let (sync_task, _task_handle, _task_event_counter) = full_sync_task( - current_block_header.id(), - target.clone(), - false, - net2.time_service(), - storage.clone(), - sender_1, - arc_node1.clone(), - sender_2, - DummyNetworkService::default(), - 15, - None, - None, - Some(dag_fork_height), - dag, - )?; - let _join_handle = node2.process_block_connect_event(receiver_1).await; - let sync_result = sync_task.await; - assert!(sync_result.is_err()); - let err = sync_result.err().unwrap(); - debug!("task_error: {:?}", err); - assert!(err.is_break_error()); - if let TaskError::BreakError(err) = err { - let verify_err = err.downcast::().unwrap(); - assert_eq!(verify_err.peers[0].clone(), arc_node1.peer_id); - debug!("{:?}", verify_err) - } else { - panic!("Expect BreakError, but got: {:?}", err) - } - - Ok(()) -} - #[ignore = "This test is for the scenario that a block failed to connect to the main will be stored in the \ failure storage and the sync will return failure instantly the next time the block shows up again, \ which is no longer suitable for the dag"] @@ -1076,7 +1018,13 @@ fn sync_block_in_async_connection( dag, )?; let branch = async_std::task::block_on(sync_task)?; - assert_eq!(branch.current_header().id(), target.target_id.id()); + assert_eq!(branch.current_header().number(), target.target_id.number()); + let (target_dag_genesis, target_dag_state) = target_node.chain().get_dag_state_by_block()?; + let (local_dag_genesis, local_dag_state) = target_node.chain().get_dag_state_by_block()?; + assert_eq!(target_dag_genesis, local_dag_genesis); + local_dag_state.tips.iter().for_each(|tip| { + assert!(target_dag_state.tips.contains(tip)); + }); handle.join().unwrap();