Skip to content

Commit

Permalink
Use queue to replace the recursion call in sync progress (#4126)
Browse files Browse the repository at this point in the history
* add delete sync blocks

* fix some test case

* remove jacktest

* add norecursion when execute dag blocks

* add delete sync blocks

* add norecusion function to execute the children if the parents are executed

* fix fmt and clippy

* fix fmt and clippy

* add delete sync blocks

* fix some test case

* remove jacktest

* add norecursion when execute dag blocks

* add delete sync blocks

* add norecusion function to execute the children if the parents are executed

* fix fmt and clippy

* fix fmt and clippy

* add delete sync blocks

* fix some test case

* remove jacktest

* fix fmt and clippy

* no returning target error in check_enough_by_info

* fix the test_sync_block_in_async_connection: check the dag state not the block info

* fix fmt
  • Loading branch information
jackzhhuang authored Jun 20, 2024
1 parent 8e91a0b commit 57357e8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 66 deletions.
84 changes: 78 additions & 6 deletions sync/src/tasks/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,76 @@ where
Ok(result)
}

fn execute_if_parent_ready_norecursion(&mut self, parent_id: HashValue) -> Result<()> {
let mut parent_block_ids = vec![parent_id];

while !parent_block_ids.is_empty() {
let mut next_parent_blocks = vec![];
for parent_block_id in parent_block_ids {
let parent_block = self
.local_store
.get_dag_sync_block(parent_block_id)?
.ok_or_else(|| {
anyhow!(
"the dag block should exist in local store, parent child block id: {:?}",
parent_id,
)
})?;
let mut executed_children = vec![];
for child in &parent_block.children {
let child_block =
self.local_store
.get_dag_sync_block(*child)?
.ok_or_else(|| {
anyhow!(
"the dag block should exist in local store, child block id: {:?}",
child
)
})?;
if child_block
.block
.header()
.parents_hash()
.ok_or_else(|| anyhow!("the dag block's parents should exist"))?
.iter()
.all(|parent| match self.chain.has_dag_block(*parent) {
Ok(has) => has,
Err(e) => {
error!(
"failed to get the block from the chain, block id: {:?}, error: {:?}",
*parent, e
);
false
}
})
{
let executed_block = self
.chain
.apply_with_verifier::<DagBasicVerifier>(child_block.block.clone())?;
info!(
"succeed to apply a dag block: {:?}, number: {:?}",
executed_block.block.id(),
executed_block.block.header().number()
);
executed_children.push(*child);
self.notify_connected_block(
executed_block.block,
executed_block.block_info.clone(),
BlockConnectAction::ConnectNewBlock,
self.check_enough_by_info(executed_block.block_info)?,
)?;
next_parent_blocks.push(*child);
}
}
self.local_store.delete_dag_sync_block(parent_block_id)?;
}

parent_block_ids = next_parent_blocks;
}

Ok(())
}

fn execute_if_parent_ready(&mut self, parent_id: HashValue) -> Result<()> {
let mut parent_block =
self.local_store
Expand Down Expand Up @@ -612,7 +682,7 @@ where
BlockConnectAction::ConnectNewBlock,
self.check_enough_by_info(executed_block.block_info)?,
)?;
self.execute_if_parent_ready(*child)?;
self.execute_if_parent_ready_norecursion(*child)?;
self.local_store.delete_dag_sync_block(*child)?;
}
}
Expand Down Expand Up @@ -653,15 +723,17 @@ where
if block_info.block_accumulator_info.num_leaves
== self.target.block_info.block_accumulator_info.num_leaves
{
if block_info != self.target.block_info {
if self.chain.check_chain_type()? == ChainType::Dag {
Ok(CollectorState::Enough)
} else if block_info != self.target.block_info {
Err(TaskError::BreakError(
RpcVerifyError::new_with_peers(
self.target.peers.clone(),
format!(
"Verify target error, expect target: {:?}, collect target block_info:{:?}",
self.target.block_info,
block_info
),
"Verify target error, expect target: {:?}, collect target block_info:{:?}",
self.target.block_info,
block_info
),
)
.into(),
)
Expand Down
68 changes: 8 additions & 60 deletions sync/src/tasks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::tasks::{
full_sync_task, AccumulatorCollector, AncestorCollector, BlockAccumulatorSyncTask,
BlockCollector, BlockFetcher, BlockLocalStore, BlockSyncTask, FindAncestorTask, SyncFetcher,
};
use crate::verified_rpc_client::RpcVerifyError;
use anyhow::{format_err, Result};
use anyhow::{Context, Ok};
use futures::channel::mpsc::unbounded;
Expand Down Expand Up @@ -37,9 +36,7 @@ use starcoin_types::{
};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use stream_task::{
DefaultCustomErrorHandle, Generator, TaskError, TaskEventCounterHandle, TaskGenerator,
};
use stream_task::{DefaultCustomErrorHandle, Generator, TaskEventCounterHandle, TaskGenerator};
use test_helper::DummyNetworkService;

use super::test_tools::{full_sync_new_node, SyncTestSystem};
Expand All @@ -50,61 +47,6 @@ pub async fn test_full_sync_new_node() -> Result<()> {
full_sync_new_node().await
}

#[stest::test]
pub async fn test_sync_invalid_target() -> Result<()> {
let net1 = ChainNetwork::new_builtin(BuiltinNetworkID::DagTest);
let mut node1 = SyncNodeMocker::new(net1, 300, 0)?;
node1.produce_block(10)?;

let arc_node1 = Arc::new(node1);

let net2 = ChainNetwork::new_builtin(BuiltinNetworkID::DagTest);

let node2 = SyncNodeMocker::new(net2.clone(), 300, 0)?;
let dag = node2.chain().dag();
let mut target = arc_node1.sync_target();

target.block_info.total_difficulty = U256::max_value();

let current_block_header = node2.chain().current_header();
let dag_fork_height = node2.dag_fork_number()?;

let storage = node2.chain().get_storage();
let (sender_1, receiver_1) = unbounded();
let (sender_2, _receiver_2) = unbounded();
let (sync_task, _task_handle, _task_event_counter) = full_sync_task(
current_block_header.id(),
target.clone(),
false,
net2.time_service(),
storage.clone(),
sender_1,
arc_node1.clone(),
sender_2,
DummyNetworkService::default(),
15,
None,
None,
Some(dag_fork_height),
dag,
)?;
let _join_handle = node2.process_block_connect_event(receiver_1).await;
let sync_result = sync_task.await;
assert!(sync_result.is_err());
let err = sync_result.err().unwrap();
debug!("task_error: {:?}", err);
assert!(err.is_break_error());
if let TaskError::BreakError(err) = err {
let verify_err = err.downcast::<RpcVerifyError>().unwrap();
assert_eq!(verify_err.peers[0].clone(), arc_node1.peer_id);
debug!("{:?}", verify_err)
} else {
panic!("Expect BreakError, but got: {:?}", err)
}

Ok(())
}

#[ignore = "This test is for the scenario that a block failed to connect to the main will be stored in the \
failure storage and the sync will return failure instantly the next time the block shows up again, \
which is no longer suitable for the dag"]
Expand Down Expand Up @@ -1076,7 +1018,13 @@ fn sync_block_in_async_connection(
dag,
)?;
let branch = async_std::task::block_on(sync_task)?;
assert_eq!(branch.current_header().id(), target.target_id.id());
assert_eq!(branch.current_header().number(), target.target_id.number());
let (target_dag_genesis, target_dag_state) = target_node.chain().get_dag_state_by_block()?;
let (local_dag_genesis, local_dag_state) = target_node.chain().get_dag_state_by_block()?;
assert_eq!(target_dag_genesis, local_dag_genesis);
local_dag_state.tips.iter().for_each(|tip| {
assert!(target_dag_state.tips.contains(tip));
});

handle.join().unwrap();

Expand Down

0 comments on commit 57357e8

Please sign in to comment.