Skip to content

Commit

Permalink
Merge pull request #1917 from blockstack/fix/remove-downloader-index-…
Browse files Browse the repository at this point in the history
…hash-cache

Fix/remove downloader index hash cache
  • Loading branch information
jcnelson authored Sep 28, 2020
2 parents 89e39d6 + 219e3ab commit 502d1d0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 43 deletions.
48 changes: 16 additions & 32 deletions src/net/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ pub struct BlockDownloader {

/// how often to download
download_interval: u64,

/// set of blocks and microblocks we have successfully downloaded (even if they haven't been
/// stored yet)
blocks_downloaded: HashSet<StacksBlockId>,
microblocks_downloaded: HashSet<StacksBlockId>
}

impl BlockDownloader {
Expand Down Expand Up @@ -234,9 +229,6 @@ impl BlockDownloader {
broken_neighbors: vec![],

download_interval: download_interval,

blocks_downloaded: HashSet::new(),
microblocks_downloaded: HashSet::new(),
}
}

Expand Down Expand Up @@ -725,7 +717,7 @@ impl PeerNetwork {

/// Create block request keys for a range of blocks that are available but that we don't have in a given range of
/// sortitions. The same keys can be used to fetch confirmed microblock streams.
fn make_requests(&mut self, sortdb: &SortitionDB, chainstate: &StacksChainState, downloader: &BlockDownloader, start_sortition_height: u64, microblocks: bool) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
fn make_requests(&mut self, sortdb: &SortitionDB, chainstate: &StacksChainState, start_sortition_height: u64, microblocks: bool) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
let scan_batch_size = self.burnchain.pox_constants.reward_cycle_length as u64;
let mut blocks_to_try : HashMap<u64, VecDeque<BlockRequestKey>> = HashMap::new();

Expand Down Expand Up @@ -844,14 +836,6 @@ impl PeerNetwork {
};

let target_index_block_hash = StacksBlockHeader::make_index_block_hash(&target_consensus_hash, &target_block_hash);
if !microblocks && downloader.blocks_downloaded.contains(&target_index_block_hash) {
// already downloaded this
continue;
}
if microblocks && downloader.microblocks_downloaded.contains(&target_index_block_hash) {
// already downloaded this stream
continue;
}

// don't request the same data from the same data url, in case multiple peers report the
// same data url (e.g. two peers sharing a Gaia hub).
Expand Down Expand Up @@ -889,13 +873,13 @@ impl PeerNetwork {
}

/// Make requests for missing anchored blocks
fn make_block_requests(&mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, downloader: &BlockDownloader, start_sortition_height: u64) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
self.make_requests(sortdb, chainstate, downloader, start_sortition_height, false)
fn make_block_requests(&mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, start_sortition_height: u64) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
self.make_requests(sortdb, chainstate, start_sortition_height, false)
}

/// Make requests for missing confirmed microblocks
fn make_confirmed_microblock_requests(&mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, downloader: &BlockDownloader, start_sortition_height: u64) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
self.make_requests(sortdb, chainstate, downloader, start_sortition_height, true)
fn make_confirmed_microblock_requests(&mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, start_sortition_height: u64) -> Result<HashMap<u64, VecDeque<BlockRequestKey>>, net_error> {
self.make_requests(sortdb, chainstate, start_sortition_height, true)
}

/// Prioritize block requests -- ask for the rarest blocks first
Expand Down Expand Up @@ -935,10 +919,10 @@ impl PeerNetwork {
while next_block_sortition_height <= network.chain_view.burn_block_height - sortdb.first_block_height || next_microblock_sortition_height <= network.chain_view.burn_block_height - sortdb.first_block_height {

debug!("{:?}: Make block requests from sortition height {}", &network.local_peer, next_block_sortition_height);
let mut next_blocks_to_try = network.make_block_requests(sortdb, chainstate, downloader, next_block_sortition_height)?;
let mut next_blocks_to_try = network.make_block_requests(sortdb, chainstate, next_block_sortition_height)?;

debug!("{:?}: Make microblock requests from sortition height {}", &network.local_peer, next_microblock_sortition_height);
let mut next_microblocks_to_try = network.make_confirmed_microblock_requests(sortdb, chainstate, downloader, next_microblock_sortition_height)?;
let mut next_microblocks_to_try = network.make_confirmed_microblock_requests(sortdb, chainstate, next_microblock_sortition_height)?;

let mut height = next_block_sortition_height;
let mut mblock_height = next_microblock_sortition_height;
Expand Down Expand Up @@ -973,6 +957,8 @@ impl PeerNetwork {
}
test_debug!("{:?}: End microblock requests", &network.local_peer);

debug!("{:?}: create block, microblock requests up to heights ({},{})", &network.local_peer, &max_height, &max_mblock_height);

// queue up block requests in order by sortition height
while height <= max_height && (downloader.blocks_to_try.len() as u64) < downloader.max_inflight_requests {
if !next_blocks_to_try.contains_key(&height) {
Expand All @@ -994,8 +980,8 @@ impl PeerNetwork {

assert_eq!(height, requests.front().as_ref().unwrap().sortition_height);

test_debug!("{:?}: request anchored block for sortition {}: {}/{} ({})",
&network.local_peer, height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &requests.front().as_ref().unwrap().index_block_hash);
debug!("{:?}: will request anchored block for sortition {}: {}/{} ({})",
&network.local_peer, height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &requests.front().as_ref().unwrap().index_block_hash);

downloader.blocks_to_try.insert(height, requests);

Expand Down Expand Up @@ -1026,8 +1012,8 @@ impl PeerNetwork {

assert_eq!(mblock_height, requests.front().as_ref().unwrap().sortition_height);

test_debug!("{:?}: request microblock stream produced by sortition {}: {}/{} ({})",
&network.local_peer, mblock_height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &requests.front().as_ref().unwrap().index_block_hash);
debug!("{:?}: will request microblock stream produced by sortition {}: {}/{} ({})",
&network.local_peer, mblock_height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &requests.front().as_ref().unwrap().index_block_hash);

downloader.microblocks_to_try.insert(mblock_height, requests);

Expand All @@ -1037,7 +1023,7 @@ impl PeerNetwork {
debug!("{:?}: block download scan now at ({},{}) (was ({},{}))", &network.local_peer, height, mblock_height, block_sortition_height, microblock_sortition_height);

if max_height <= next_block_sortition_height && max_mblock_height <= next_microblock_sortition_height {
test_debug!("{:?}: no more requests to make", &network.local_peer);
debug!("{:?}: no more download requests to make", &network.local_peer);
break;
}

Expand All @@ -1047,7 +1033,7 @@ impl PeerNetwork {

// at capacity?
if (downloader.blocks_to_try.len() as u64) >= downloader.max_inflight_requests || (downloader.microblocks_to_try.len() as u64) >= downloader.max_inflight_requests {
test_debug!("{:?}: queued up {} requests (blocks so far: {}, microblocks so far: {})", &network.local_peer, downloader.blocks_to_try.len(), downloader.blocks_to_try.len(), downloader.microblocks_to_try.len());
debug!("{:?}: queued up {} requests (blocks so far: {}, microblocks so far: {})", &network.local_peer, downloader.blocks_to_try.len(), downloader.blocks_to_try.len(), downloader.microblocks_to_try.len());
break;
}
}
Expand All @@ -1057,7 +1043,7 @@ impl PeerNetwork {
next_block_sortition_height = next_block_sortition_height + (network.burnchain.pox_constants.reward_cycle_length as u64);
next_microblock_sortition_height = next_microblock_sortition_height + (network.burnchain.pox_constants.reward_cycle_length as u64);

test_debug!("{:?}: Pessimistically increase block and microblock sortition heights to ({},{})", &network.local_peer, next_block_sortition_height, next_microblock_sortition_height);
debug!("{:?}: Pessimistically increase block and microblock sortition heights to ({},{})", &network.local_peer, next_block_sortition_height, next_microblock_sortition_height);
}

downloader.next_block_sortition_height = next_block_sortition_height;
Expand Down Expand Up @@ -1266,7 +1252,6 @@ impl PeerNetwork {

// don't try this again
downloader.blocks_to_try.remove(&request_key.sortition_height);
downloader.blocks_downloaded.insert(request_key.index_block_hash.clone());
}
for (request_key, microblock_stream) in downloader.microblocks.drain() {
let block_header = StacksChainState::load_block_header(&chainstate.blocks_path, &request_key.consensus_hash, &request_key.anchor_block_hash)?
Expand All @@ -1288,7 +1273,6 @@ impl PeerNetwork {

// don't try again
downloader.microblocks_to_try.remove(&request_key.sortition_height);
downloader.microblocks_downloaded.insert(request_key.index_block_hash.clone());
}

// clear empties
Expand Down
6 changes: 3 additions & 3 deletions src/net/inv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ impl InvState {
// No block is available here anyway, even though the peer agrees with us on the
// consensus hash.
// This is bad behavior on the peer's part.
test_debug!("No sortition for consensus hash {}", consensus_hash);
debug!("No sortition for consensus hash {}", consensus_hash);
return Err(net_error::InvalidMessage);
}

Expand Down Expand Up @@ -1049,7 +1049,7 @@ impl InvState {
}
},
None => {
test_debug!("No inv stats for neighbor {:?}", neighbor_key);
debug!("No inv stats for neighbor {:?}", neighbor_key);
Ok(None)
}
}
Expand Down Expand Up @@ -1620,7 +1620,7 @@ impl PeerNetwork {
self.tip_sort_id = new_tip_sort_id;
self.pox_id = new_pox_id;

test_debug!("{:?}: PoX bit vector is {:?}", &self.local_peer, &self.pox_id);
debug!("{:?}: PoX bit vector is {:?}", &self.local_peer, &self.pox_id);

Ok(())
}
Expand Down
12 changes: 7 additions & 5 deletions src/net/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ impl PeerNetwork {
let client_addr = match socket.peer_addr() {
Ok(addr) => addr,
Err(e) => {
debug!("Failed to get peer address of {:?}: {:?}", &socket, &e);
debug!("{:?}: Failed to get peer address of {:?}: {:?}", &self.local_peer, &socket, &e);
self.deregister_socket(event_id, socket);
return Err(net_error::SocketError);
}
Expand Down Expand Up @@ -1265,11 +1265,12 @@ impl PeerNetwork {
for event_id in poll_state.ready.iter() {
if self.connecting.contains_key(event_id) {
let (socket, outbound, _) = self.connecting.remove(event_id).unwrap();
debug!("{:?}: Connected event {}: {:?} (outbound={})", &self.local_peer, event_id, &socket, outbound);

let sock_str = format!("{:?}", &socket);
if let Err(_e) = self.register_peer(*event_id, socket, outbound) {
debug!("{:?}: Failed to register connected event {} ({}): {:?}", &self.local_peer, event_id, sock_str, &_e);
debug!("{:?}: Failed to register connecting socket on event {} ({}): {:?}", &self.local_peer, event_id, sock_str, &_e);
}
else {
debug!("{:?}: Registered peer on event {}: {:?} (outbound={})", &self.local_peer, event_id, sock_str, outbound);
}
}
}
Expand Down Expand Up @@ -2108,7 +2109,8 @@ impl PeerNetwork {
match res {
Ok(Some(block_height)) => block_height,
Ok(None) => {
debug!("Peer {:?} already known to have {} for {}", &outbound_neighbor_key, if microblocks { "streamed microblocks" } else { "blocks" }, consensus_hash);
debug!("Ignore {} from {} -- we either do not recognize consensus hash {}, or already know the inventory for it",
if microblocks { "streamed microblocks" } else { "blocks" }, outbound_neighbor_key, consensus_hash);
return None;
},
Err(net_error::InvalidMessage) => {
Expand Down
6 changes: 3 additions & 3 deletions testnet/stacks-node/src/neon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,9 +736,9 @@ impl InitializedNeonNode {
}
};

debug!("Mining tenure's last consensus hash: {}, stacks tip consensus hash: {}",
&burn_block.consensus_hash,
&stacks_tip.consensus_hash);
debug!("Mining tenure's last consensus hash: {} (height {}), stacks tip consensus hash: {} (height {})",
&burn_block.consensus_hash, burn_block.block_height,
&stacks_tip.consensus_hash, parent_snapshot.block_height);

let coinbase_nonce = {
let principal = keychain.origin_address().unwrap().into();
Expand Down

0 comments on commit 502d1d0

Please sign in to comment.