Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update miner mempool iterator query to consider both nonces and fee rates #5541

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions stackslib/src/chainstate/stacks/tests/block_construction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5087,3 +5087,188 @@ fn paramaterized_mempool_walk_test(
},
);
}

#[test]
/// Test that the mempool walk query ignores old nonces and prefers next possible nonces before higher global fees.
fn mempool_walk_test_nonce_filtered_and_ranked() {
let key_address_pairs: Vec<(Secp256k1PrivateKey, StacksAddress)> = (0..3)
.map(|_user_index| {
let privk = StacksPrivateKey::new();
let addr = StacksAddress::from_public_keys(
C32_ADDRESS_VERSION_TESTNET_SINGLESIG,
&AddressHashMode::SerializeP2PKH,
1,
&vec![StacksPublicKey::from_private(&privk)],
)
.unwrap();
(privk, addr)
})
.collect();
let origin_addresses: Vec<String> = key_address_pairs
.iter()
.map(|(_, b)| b.to_string())
.collect();
let address_0 = origin_addresses[0].to_string();
let address_1 = origin_addresses[1].to_string();
let address_2 = origin_addresses[2].to_string();

let test_name = "mempool_walk_test_nonce_filtered_and_ranked";
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
let mut peer_config = TestPeerConfig::new(test_name, 2002, 2003);
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved

peer_config.initial_balances = vec![];
for (privk, addr) in &key_address_pairs {
peer_config
.initial_balances
.push((addr.to_account_principal(), 1000000000));
}

let recipient =
StacksAddress::from_string("ST1RFD5Q2QPK3E0F08HG9XDX7SSC7CNRS0QR0SGEV").unwrap();

let mut chainstate =
instantiate_chainstate_with_balances(false, 0x80000000, &test_name, vec![]);
let chainstate_path = chainstate_path(&test_name);
let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap();
let b_1 = make_block(
&mut chainstate,
ConsensusHash([0x1; 20]),
&(
FIRST_BURNCHAIN_CONSENSUS_HASH.clone(),
FIRST_STACKS_BLOCK_HASH.clone(),
),
1,
1,
);
let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2);

let mut tx_events = Vec::new();

// Submit nonces 0 through 9 for each of the 3 senders.
for nonce in 0..10 {
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
for user_index in 0..3 {
let mut tx = make_user_stacks_transfer(
&key_address_pairs[user_index].0,
nonce as u64,
200,
&recipient.to_account_principal(),
1,
);

let mut mempool_tx = mempool.tx_begin().unwrap();

let origin_address = tx.origin_address();
let sponsor_address = tx.sponsor_address().unwrap_or(origin_address);

tx.set_tx_fee(100);
let txid = tx.txid();
let tx_bytes = tx.serialize_to_vec();
let tx_fee = tx.get_tx_fee();
let height = 100;

MemPoolDB::try_add_tx(
&mut mempool_tx,
&mut chainstate,
&b_1.0,
&b_1.1,
true,
txid,
tx_bytes,
tx_fee,
height,
&origin_address,
nonce.try_into().unwrap(),
&sponsor_address,
nonce.try_into().unwrap(),
None,
)
.unwrap();

// Increase the `fee_rate` as nonce goes up, so we can test that next nonces get confirmed before higher fee txs.
// Also slightly increase the fee for some addresses so we can check those txs get selected first.
mempool_tx
.execute(
"UPDATE mempool SET fee_rate = ? WHERE txid = ?",
params![Some(100.0 * (nonce + 1 + user_index) as f64), &txid],
)
.unwrap();
mempool_tx.commit().unwrap();
}
}

// Simulate next possible nonces for the 3 addresses:
// Address 0 => 2
// Address 1 => 7
// Address 2 => 9
let mempool_tx = mempool.tx_begin().unwrap();
mempool_tx
.execute(
"INSERT INTO nonces (address, nonce) VALUES (?, ?), (?, ?), (?, ?)",
params![address_0, 2, address_1, 7, address_2, 9],
)
.unwrap();
mempool_tx.commit().unwrap();

// Visit transactions. Keep a record of the order of visited txs so we can compare at the end.
let mut considered_txs = vec![];
let deadline = get_epoch_time_ms() + 30000;
chainstate.with_read_only_clarity_tx(
&TEST_BURN_STATE_DB,
&StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1),
|clarity_conn| {
// When the candidate cache fills, one pass cannot process all transactions
loop {
if mempool
.iterate_candidates::<_, ChainstateError, _>(
clarity_conn,
&mut tx_events,
MemPoolWalkSettings::default(),
|_, available_tx, _| {
considered_txs.push((
available_tx.tx.metadata.origin_address.to_string(),
available_tx.tx.metadata.origin_nonce,
));
Ok(Some(
// Generate any success result
TransactionResult::success(
&available_tx.tx.tx,
available_tx.tx.metadata.tx_fee,
StacksTransactionReceipt::from_stx_transfer(
available_tx.tx.tx.clone(),
vec![],
Value::okay(Value::Bool(true)).unwrap(),
ExecutionCost::zero(),
),
)
.convert_to_event(),
))
},
)
.unwrap()
.0
== 0
{
break;
}
assert!(get_epoch_time_ms() < deadline, "test timed out");
}
assert_eq!(
considered_txs,
vec![
(address_2.clone(), 9), // Highest fee for address 2, and 9 is the next nonce
(address_1.clone(), 7),
(address_0.clone(), 2),
(address_1.clone(), 8),
(address_0.clone(), 3),
(address_1.clone(), 9), // Highest fee for address 1, but have to confirm nonces 7 and 8 first
(address_0.clone(), 4),
(address_0.clone(), 5),
(address_0.clone(), 6),
(address_0.clone(), 7),
(address_0.clone(), 8),
(address_0.clone(), 9), // Highest fee for address 0, but have to confirm all other nonces first
],
"Mempool should visit transactions in the correct order while ignoring past nonces",
);
},
);
}
130 changes: 72 additions & 58 deletions stackslib/src/core/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,7 @@ pub struct MemPoolWalkSettings {
/// milliseconds. This is a soft deadline.
pub max_walk_time_ms: u64,
/// Probability percentage to consider a transaction which has not received a cost estimate.
/// That is, with x%, when picking the next transaction to include a block, select one that
/// either failed to get a cost estimate or has not been estimated yet.
/// This property is no longer used and will be ignored.
pub consider_no_estimate_tx_prob: u8,
/// Size of the nonce cache. This avoids MARF look-ups.
pub nonce_cache_size: u64,
Expand Down Expand Up @@ -820,6 +819,18 @@ const MEMPOOL_SCHEMA_7_TIME_ESTIMATES: &'static [&'static str] = &[
"#,
];

const MEMPOOL_SCHEMA_8_NONCE_SORTING: &'static [&'static str] = &[
r#"
-- Drop redundant mempool indexes, covered by unique constraints
DROP INDEX IF EXISTS "by_txid";
DROP INDEX IF EXISTS "by_sponsor";
DROP INDEX IF EXISTS "by_origin";
"#,
r#"
INSERT INTO schema_version (version) VALUES (8)
"#,
];

const MEMPOOL_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS by_txid ON mempool(txid);",
"CREATE INDEX IF NOT EXISTS by_height ON mempool(height);",
Expand Down Expand Up @@ -1393,6 +1404,16 @@ impl MemPoolDB {
Ok(())
}

/// Optimize indexes for mempool visits
#[cfg_attr(test, mutants::skip)]
fn instantiate_schema_8(tx: &DBTx) -> Result<(), db_error> {
for sql_exec in MEMPOOL_SCHEMA_8_NONCE_SORTING {
tx.execute_batch(sql_exec)?;
}

Ok(())
}

#[cfg_attr(test, mutants::skip)]
pub fn db_path(chainstate_root_path: &str) -> Result<String, db_error> {
let mut path = PathBuf::from(chainstate_root_path);
Expand Down Expand Up @@ -1645,39 +1666,57 @@ impl MemPoolDB {

debug!("Mempool walk for {}ms", settings.max_walk_time_ms,);

let tx_consideration_sampler = Uniform::new(0, 100);
let mut rng = rand::thread_rng();
let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size);
let mut nonce_cache = NonceCache::new(settings.nonce_cache_size);

// set of (address, nonce) to store after the inner loop completes. This will be done in a
// single transaction. This cannot grow to more than `settings.nonce_cache_size` entries.
let mut retry_store = HashMap::new();

// Iterate pending mempool transactions using a heuristic that maximizes miner fee profitability and minimizes CPU time
// wasted on already-mined or not-yet-mineable transactions. This heuristic takes the following steps:
//
// 1. Filters out transactions that have nonces smaller than the origin address' next expected nonce as stated in the
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
// `nonces` table, when possible
// 2. Adds a "simulated" fee rate to transactions that don't have it by multiplying the mempool's maximum current fee rate
// by a random number. This helps us mix these transactions with others to guarantee they get processed in a reasonable
// order
// 3. Ranks transactions by prioritizing those with next nonces and higher fees (per origin address)
// 4. Sorts all ranked transactions by fee and returns them for evaluation
//
// This logic prevents miners from repeatedly visiting (and then skipping) high fee transactions that would get evaluated
// first based on their `fee_rate` but are otherwise non-mineable because they have very high or invalid nonces. A large
// volume of these transactions would cause considerable slowness when selecting valid transactions to mine. This query
// also makes sure transactions that have NULL `fee_rate`s are visited, because they will also get ranked according to
// their origin address nonce.
let sql = "
SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
FROM mempool
WHERE fee_rate IS NULL
";
let mut query_stmt_null = self
.db
.prepare(&sql)
.map_err(|err| Error::SqliteError(err))?;
let mut null_iterator = query_stmt_null
.query(NO_PARAMS)
.map_err(|err| Error::SqliteError(err))?;

let sql = "
WITH nonce_filtered AS (
SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate,
CASE
WHEN fee_rate IS NULL THEN (ABS(RANDOM()) % 10000 / 10000.0) * (SELECT MAX(fee_rate) FROM mempool)
ELSE fee_rate
END AS sort_fee_rate
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
FROM mempool
LEFT JOIN nonces ON mempool.origin_address = nonces.address
WHERE nonces.address IS NULL OR mempool.origin_nonce >= nonces.nonce
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
),
address_nonce_ranked AS (
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY origin_address
ORDER BY origin_nonce ASC, sort_fee_rate DESC
) AS rank
FROM nonce_filtered
)
SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
FROM mempool
WHERE fee_rate IS NOT NULL
ORDER BY fee_rate DESC
FROM address_nonce_ranked
ORDER BY rank ASC, sort_fee_rate DESC
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
";
let mut query_stmt_fee = self
let mut query_stmt = self
.db
.prepare(&sql)
.map_err(|err| Error::SqliteError(err))?;
let mut fee_iterator = query_stmt_fee
let mut tx_iterator = query_stmt
.query(NO_PARAMS)
.map_err(|err| Error::SqliteError(err))?;

Expand All @@ -1688,46 +1727,23 @@ impl MemPoolDB {
break MempoolIterationStopReason::DeadlineReached;
}

let start_with_no_estimate =
tx_consideration_sampler.sample(&mut rng) < settings.consider_no_estimate_tx_prob;

// First, try to read from the retry list
let (candidate, update_estimate) = match candidate_cache.next() {
Some(tx) => {
let update_estimate = tx.fee_rate.is_none();
(tx, update_estimate)
}
None => {
// When the retry list is empty, read from the mempool db,
// randomly selecting from either the null fee-rate transactions
// or those with fee-rate estimates.
let opt_tx = if start_with_no_estimate {
null_iterator
.next()
.map_err(|err| Error::SqliteError(err))?
} else {
fee_iterator.next().map_err(|err| Error::SqliteError(err))?
};
match opt_tx {
Some(row) => (MemPoolTxInfoPartial::from_row(row)?, start_with_no_estimate),
// When the retry list is empty, read from the mempool db
match tx_iterator.next().map_err(|err| Error::SqliteError(err))? {
Some(row) => {
let tx = MemPoolTxInfoPartial::from_row(row)?;
let update_estimate = tx.fee_rate.is_none();
(tx, update_estimate)
}
None => {
// If the selected iterator is empty, check the other
match if start_with_no_estimate {
fee_iterator.next().map_err(|err| Error::SqliteError(err))?
} else {
null_iterator
.next()
.map_err(|err| Error::SqliteError(err))?
} {
Some(row) => (
MemPoolTxInfoPartial::from_row(row)?,
!start_with_no_estimate,
),
None => {
debug!("No more transactions to consider in mempool");
break MempoolIterationStopReason::NoMoreCandidates;
}
}
debug!("No more transactions to consider in mempool");
break MempoolIterationStopReason::NoMoreCandidates;
}
}
}
Expand Down Expand Up @@ -1928,10 +1944,8 @@ impl MemPoolDB {
// drop these rusqlite statements and queries, since their existence as immutable borrows on the
// connection prevents us from beginning a transaction below (which requires a mutable
// borrow).
drop(null_iterator);
drop(fee_iterator);
drop(query_stmt_null);
drop(query_stmt_fee);
drop(tx_iterator);
drop(query_stmt);

if retry_store.len() > 0 {
let tx = self.tx_begin()?;
Expand Down