Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jan 8, 2025
1 parent 3e51fa6 commit ae930f1
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let results = Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, dag_state).await;
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.dec();
if results.is_empty() {
warn!("No results returned while requesting missing blocks");
return;
}

Expand Down Expand Up @@ -991,19 +990,19 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
.expect("Possible misconfiguration as a peer should be found");
let peer_hostname = &context.committee.authority(peer).hostname;
let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
info!(
"Fetching {} missing blocks from peer {}: {}",
block_refs.len(),
peer_hostname,
block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);

// lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
info!(
"Fetching {} missing blocks from peer {}: {}",
block_refs.len(),
peer_hostname,
block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
peer,
Expand All @@ -1022,9 +1021,11 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C

loop {
tokio::select! {
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() =>
Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
results.push((blocks_guard, fetched_blocks, peer_index));

// no more pending requests are left, just break the loop
Expand All @@ -1037,6 +1038,16 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
if let Some(next_peer) = peers.next() {
// do best effort to lock guards. If we can't lock then don't bother at this run.
if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
info!(
"Retrying fetching {} missing blocks from peer {}: {}",
blocks_guard.block_refs.len(),
peer_hostname,
blocks_guard.block_refs
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
);
request_futures.push(Self::fetch_blocks_request(
network_client.clone(),
next_peer,
Expand All @@ -1052,9 +1063,10 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
debug!("No more peers left to fetch blocks");
}
}
},
}
},
_ = &mut fetcher_timeout => {
debug!("Timed out while fetching all the blocks");
debug!("Timed out while fetching missing blocks");
break;
}
}
Expand Down

0 comments on commit ae930f1

Please sign in to comment.