Skip to content

Commit

Permalink
Merge pull request #2590 from blockstack/fix/atlas-replication-retry
Browse files Browse the repository at this point in the history
Fix/atlas replication retry
  • Loading branch information
jcnelson authored Apr 20, 2021
2 parents da3c0dd + 9bd50a3 commit 7ba919a
Show file tree
Hide file tree
Showing 16 changed files with 926 additions and 191 deletions.
2 changes: 2 additions & 0 deletions .github/actions/bitcoin-int-tests/Dockerfile.bitcoin-tests
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::pox_integ
RUN cargo test -- --test-threads 1 --ignored tests::bitcoin_regtest::bitcoind_integration_test
RUN cargo test -- --test-threads 1 --ignored tests::should_succeed_handling_malformed_and_valid_txs
RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::size_overflow_unconfirmed_microblocks_integration_test
RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::size_overflow_unconfirmed_stream_microblocks_integration_test
RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::size_overflow_unconfirmed_invalid_stream_microblocks_integration_test
RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::runtime_overflow_unconfirmed_microblocks_integration_test
RUN cargo test -- --test-threads 1 --ignored tests::neon_integrations::antientropy_integration_test
4 changes: 3 additions & 1 deletion src/chainstate/stacks/db/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4540,7 +4540,9 @@ impl StacksChainState {
return Err(Error::InvalidStacksBlock(msg));
}

debug!("Reached state root {}", root_hash);
debug!("Reached state root {}", root_hash;
"microblock cost" => %microblock_cost,
"block cost" => %block_cost);

// good to go!
clarity_tx.commit_to_block(chain_tip_consensus_hash, &block.block_hash());
Expand Down
30 changes: 29 additions & 1 deletion src/chainstate/stacks/db/unconfirmed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub struct UnconfirmedState {
readonly: bool,
dirty: bool,
num_mblocks_added: u64,

// fault injection for testing
pub disable_cost_check: bool,
pub disable_bytes_check: bool,
}

impl UnconfirmedState {
Expand Down Expand Up @@ -85,6 +89,9 @@ impl UnconfirmedState {
readonly: false,
dirty: false,
num_mblocks_added: 0,

disable_cost_check: check_fault_injection(FAULT_DISABLE_MICROBLOCKS_COST_CHECK),
disable_bytes_check: check_fault_injection(FAULT_DISABLE_MICROBLOCKS_BYTES_CHECK),
})
}

Expand Down Expand Up @@ -115,6 +122,9 @@ impl UnconfirmedState {
readonly: true,
dirty: false,
num_mblocks_added: 0,

disable_cost_check: check_fault_injection(FAULT_DISABLE_MICROBLOCKS_COST_CHECK),
disable_bytes_check: check_fault_injection(FAULT_DISABLE_MICROBLOCKS_BYTES_CHECK),
})
}

Expand Down Expand Up @@ -207,7 +217,7 @@ impl UnconfirmedState {

last_mblock = Some(mblock_header);
last_mblock_seq = seq;
new_bytes = {
new_bytes += {
let mut total = 0;
for tx in mblock.txs.iter() {
let mut bytes = vec![];
Expand Down Expand Up @@ -237,6 +247,16 @@ impl UnconfirmedState {
self.bytes_so_far += new_bytes;
self.num_mblocks_added += num_new_mblocks;

// apply injected faults
if self.disable_cost_check {
warn!("Fault injection: disabling microblock miner's cost tracking");
self.cost_so_far = ExecutionCost::zero();
}
if self.disable_bytes_check {
warn!("Fault injection: disabling microblock miner's size tracking");
self.bytes_so_far = 0;
}

Ok((total_fees, total_burns, all_receipts))
}

Expand Down Expand Up @@ -311,6 +331,14 @@ impl UnconfirmedState {
) -> Option<(StacksTransaction, BlockHeaderHash, u16)> {
self.mined_txs.get(txid).map(|x| x.clone())
}

pub fn num_microblocks(&self) -> u64 {
if self.last_mblock.is_some() {
(self.last_mblock_seq as u64) + 1
} else {
0
}
}
}

impl StacksChainState {
Expand Down
37 changes: 33 additions & 4 deletions src/chainstate/stacks/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ struct MicroblockMinerRuntime {
considered: Option<HashSet<Txid>>,
num_mined: u64,
tip: StacksBlockId,

// fault injection, inherited from unconfirmed
disable_bytes_check: bool,
disable_cost_check: bool,
}

#[derive(PartialEq)]
Expand All @@ -87,6 +91,9 @@ impl From<&UnconfirmedState> for MicroblockMinerRuntime {
considered: Some(considered),
num_mined: 0,
tip: unconfirmed.confirmed_chain_tip.clone(),

disable_bytes_check: unconfirmed.disable_bytes_check,
disable_cost_check: unconfirmed.disable_cost_check,
}
}
}
Expand Down Expand Up @@ -341,6 +348,7 @@ impl<'a> StacksMicroblockBuilder<'a> {
return Ok(false);
}

/// NOTE: this is only used in integration tests.
pub fn mine_next_microblock_from_txs(
&mut self,
txs_and_lens: Vec<(StacksTransaction, u64)>,
Expand Down Expand Up @@ -386,6 +394,16 @@ impl<'a> StacksMicroblockBuilder<'a> {
}
}

// do fault injection
if self.runtime.disable_bytes_check {
warn!("Fault injection: disabling miner limit on microblock stream size");
bytes_so_far = 0;
}
if self.runtime.disable_cost_check {
warn!("Fault injection: disabling miner limit on microblock runtime cost");
clarity_tx.reset_cost(ExecutionCost::zero());
}

self.runtime.bytes_so_far = bytes_so_far;
self.clarity_tx.replace(clarity_tx);
self.runtime.considered.replace(considered);
Expand Down Expand Up @@ -465,6 +483,16 @@ impl<'a> StacksMicroblockBuilder<'a> {
},
);

// do fault injection
if self.runtime.disable_bytes_check {
warn!("Fault injection: disabling miner limit on microblock stream size");
bytes_so_far = 0;
}
if self.runtime.disable_cost_check {
warn!("Fault injection: disabling miner limit on microblock runtime cost");
clarity_tx.reset_cost(ExecutionCost::zero());
}

self.runtime.bytes_so_far = bytes_so_far;
self.clarity_tx.replace(clarity_tx);
self.runtime.considered.replace(considered);
Expand Down Expand Up @@ -498,8 +526,9 @@ impl<'a> Drop for StacksMicroblockBuilder<'a> {
debug!(
"Drop StacksMicroblockBuilder";
"chain tip" => %&self.runtime.tip,
"txs considered" => &self.runtime.considered.as_ref().map(|x| x.len()).unwrap_or(0),
"txs mined" => self.runtime.num_mined,
"txs mined off tip" => &self.runtime.considered.as_ref().map(|x| x.len()).unwrap_or(0),
"txs added" => self.runtime.num_mined,
"bytes so far" => self.runtime.bytes_so_far,
"cost so far" => &format!("{:?}", &self.get_cost_so_far())
);
self.clarity_tx
Expand Down Expand Up @@ -1363,8 +1392,8 @@ impl StacksBlockBuilder {
);

debug!(
"Build anchored block off of {}/{} height {}",
&tip_consensus_hash, &tip_block_hash, tip_height
"Build anchored block off of {}/{} height {} budget {:?}",
&tip_consensus_hash, &tip_block_hash, tip_height, execution_budget
);

let (mut header_reader_chainstate, _) = chainstate_handle.reopen()?; // used for reading block headers during an epoch
Expand Down
14 changes: 14 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ pub const BLOCK_LIMIT_MAINNET: ExecutionCost = ExecutionCost {
runtime: 5_000_000_000,
};

pub const FAULT_DISABLE_MICROBLOCKS_COST_CHECK: &str = "MICROBLOCKS_DISABLE_COST_CHECK";
pub const FAULT_DISABLE_MICROBLOCKS_BYTES_CHECK: &str = "MICROBLOCKS_DISABLE_BYTES_CHECK";

pub fn check_fault_injection(fault_name: &str) -> bool {
use std::env;

// only activates if we're testing
if env::var("BITCOIND_TEST") != Ok("1".to_string()) {
return false;
}

env::var(fault_name) == Ok("1".to_string())
}

/// Synchronize burn transactions from the Bitcoin blockchain
pub fn sync_burnchain_bitcoin(
working_dir: &String,
Expand Down
16 changes: 16 additions & 0 deletions src/net/atlas/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2021 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use rusqlite::types::ToSql;
use rusqlite::Row;
use rusqlite::Transaction;
Expand Down
85 changes: 76 additions & 9 deletions src/net/atlas/download.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
use super::{AtlasDB, Attachment, AttachmentInstance, MAX_ATTACHMENT_INV_PAGES_PER_REQUEST};
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2021 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use super::{
AtlasDB, Attachment, AttachmentInstance, MAX_ATTACHMENT_INV_PAGES_PER_REQUEST, MAX_RETRY_DELAY,
};
use chainstate::burn::{BlockHeaderHash, ConsensusHash};
use chainstate::stacks::db::StacksChainState;
use chainstate::stacks::{StacksBlockHeader, StacksBlockId};
Expand All @@ -23,6 +41,10 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};

use rand::thread_rng;
use rand::Rng;
use std::cmp;

#[derive(Debug)]
pub struct AttachmentsDownloader {
priority_queue: BinaryHeap<AttachmentsBatch>,
Expand All @@ -43,6 +65,36 @@ impl AttachmentsDownloader {
}
}

/// Identify whether or not any AttachmentBatches in the priority queue are ready for
/// (re-)consideration by the downloader, based on whether or not its re-try deadline
/// has passed.
pub fn has_ready_batches(&self) -> bool {
for batch in self.priority_queue.iter() {
if batch.retry_deadline < get_epoch_time_secs() {
return true;
}
}
return false;
}

/// Returns the next attachments batch that is ready for processing -- i.e. after its deadline
/// has passed.
/// Because AttachmentBatches are ordered first by their retry deadlines, it follows that if
/// there are any ready AttachmentBatches, they'll be at the head of the queue.
pub fn pop_next_ready_batch(&mut self) -> Option<AttachmentsBatch> {
let next_is_ready = if let Some(ref next) = self.priority_queue.peek() {
next.retry_deadline < get_epoch_time_secs()
} else {
false
};

if next_is_ready {
self.priority_queue.pop()
} else {
None
}
}

pub fn run(
&mut self,
dns_client: &mut DNSClient,
Expand All @@ -66,7 +118,7 @@ impl AttachmentsDownloader {
let ongoing_fsm = match self.ongoing_batch.take() {
Some(batch) => batch,
None => {
if self.priority_queue.is_empty() {
if self.priority_queue.is_empty() || !self.has_ready_batches() {
// Nothing to do!
return Ok((vec![], vec![]));
}
Expand All @@ -87,10 +139,14 @@ impl AttachmentsDownloader {
return Ok((vec![], vec![]));
}

let attachments_batch = self
.priority_queue
.pop()
.expect("Unable to pop attachments bactch from queue");
let attachments_batch = match self.pop_next_ready_batch() {
Some(ready_batch) => ready_batch,
None => {
// unreachable
return Ok((vec![], vec![]));
}
};

let ctx = AttachmentsBatchStateContext::new(
attachments_batch,
peers,
Expand Down Expand Up @@ -1003,6 +1059,7 @@ pub struct AttachmentsBatch {
pub index_block_hash: StacksBlockId,
pub attachments_instances: HashMap<QualifiedContractIdentifier, HashMap<u32, Hash160>>,
pub retry_count: u64,
pub retry_deadline: u64,
}

impl AttachmentsBatch {
Expand All @@ -1012,6 +1069,7 @@ impl AttachmentsBatch {
index_block_hash: StacksBlockId([0u8; 32]),
attachments_instances: HashMap::new(),
retry_count: 0,
retry_deadline: 0,
}
}

Expand All @@ -1023,7 +1081,7 @@ impl AttachmentsBatch {
if self.block_height != attachment.block_height
|| self.index_block_hash != attachment.index_block_hash
{
warn!("Atlas: attempt to add unrelated AttachmentInstance ({}, {}) to AttachmentBatch", attachment.attachment_index, attachment.index_block_hash);
warn!("Atlas: attempt to add unrelated AttachmentInstance ({}, {}) to AttachmentsBatch", attachment.attachment_index, attachment.index_block_hash);
return;
}
}
Expand All @@ -1048,6 +1106,15 @@ impl AttachmentsBatch {

pub fn bump_retry_count(&mut self) {
self.retry_count += 1;
let delay = cmp::min(
MAX_RETRY_DELAY,
2u64.saturating_pow(self.retry_count as u32).saturating_add(
thread_rng().gen::<u64>() % 2u64.saturating_pow((self.retry_count - 1) as u32),
),
);

debug!("Atlas: Re-attempt download in {} seconds", delay);
self.retry_deadline = get_epoch_time_secs() + delay;
}

pub fn has_fully_succeed(&self) -> bool {
Expand Down Expand Up @@ -1105,8 +1172,8 @@ impl AttachmentsBatch {
impl Ord for AttachmentsBatch {
fn cmp(&self, other: &AttachmentsBatch) -> Ordering {
other
.retry_count
.cmp(&self.retry_count)
.retry_deadline
.cmp(&self.retry_deadline)
.then_with(|| {
self.attachments_instances_count()
.cmp(&other.attachments_instances_count())
Expand Down
17 changes: 17 additions & 0 deletions src/net/atlas/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2021 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

pub mod db;
pub mod download;

Expand All @@ -20,6 +36,7 @@ use std::convert::TryFrom;
use std::hash::{Hash, Hasher};

pub const MAX_ATTACHMENT_INV_PAGES_PER_REQUEST: usize = 8;
pub const MAX_RETRY_DELAY: u64 = 600; // seconds

lazy_static! {
pub static ref BNS_CHARS_REGEX: Regex = Regex::new("^([a-z0-9]|[-_])*$").unwrap();
Expand Down
Loading

0 comments on commit 7ba919a

Please sign in to comment.