Skip to content

Commit

Permalink
feat(mempool): get txs return requested txs after replenshing queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeletstarkware committed Jul 24, 2024
1 parent a70b025 commit 1991f1e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
22 changes: 16 additions & 6 deletions crates/mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,22 @@ impl Mempool {
// back. TODO: Consider renaming to `pop_txs` to be more consistent with the standard
// library.
pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult<Vec<ThinTransaction>> {
let mut eligible_txs: Vec<ThinTransaction> = Vec::with_capacity(n_txs);
for tx_hash in self.tx_queue.pop_chunk(n_txs) {
let tx = self.tx_pool.remove(tx_hash)?;
eligible_txs.push(tx);
let mut eligible_tx_references: Vec<TransactionReference> = Vec::with_capacity(n_txs);
let mut n_remaining_txs = n_txs;

while n_remaining_txs > 0 && !self.tx_queue.is_empty() {
let chunk = self.tx_queue.pop_chunk(n_remaining_txs);

self.enqueue_next_eligible_txs(&chunk)?;

n_remaining_txs -= chunk.len();
eligible_tx_references.extend(chunk);
}
self.enqueue_next_eligible_txs(&eligible_txs)?;

let eligible_txs = eligible_tx_references
.into_iter()
.map(|tx_ref| self.tx_pool.remove(tx_ref.tx_hash))
.collect::<MempoolResult<Vec<ThinTransaction>>>()?;

Ok(eligible_txs)
}
Expand Down Expand Up @@ -112,7 +122,7 @@ impl Mempool {
Ok(())
}

fn enqueue_next_eligible_txs(&mut self, txs: &[ThinTransaction]) -> MempoolResult<()> {
fn enqueue_next_eligible_txs(&mut self, txs: &Vec<TransactionReference>) -> MempoolResult<()> {
for tx in txs {
let current_account_state = Account {
sender_address: tx.sender_address,
Expand Down
25 changes: 12 additions & 13 deletions crates/mempool/src/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,24 +177,24 @@ fn test_get_txs(#[case] requested_txs: usize) {
#[rstest]
fn test_get_txs_multi_nonce() {
// Setup.
let tx_address_0_nonce_0 =
let tx_nonce_0 =
add_tx_input!(tx_hash: 1, sender_address: "0x0", tx_nonce: 0_u8, account_nonce: 0_u8).tx;
let tx_address_0_nonce_1 =
let tx_nonce_1 =
add_tx_input!(tx_hash: 2, sender_address: "0x0", tx_nonce: 1_u8, account_nonce: 0_u8).tx;
let tx_nonce_2 =
add_tx_input!(tx_hash: 3, sender_address: "0x0", tx_nonce: 2_u8, account_nonce: 0_u8).tx;

let queue_txs = [TransactionReference::new(&tx_address_0_nonce_0)];
let pool_txs = [tx_address_0_nonce_0.clone(), tx_address_0_nonce_1.clone()];
let queue_txs = [TransactionReference::new(&tx_nonce_0)];
let pool_txs =
[tx_nonce_0.clone(), tx_nonce_1.clone(), tx_nonce_2.clone()];
let mut mempool: Mempool = MempoolState::new(pool_txs, queue_txs).into();

// Test.
let txs = mempool.get_txs(2).unwrap();
let txs = mempool.get_txs(3).unwrap();

// Assert that the account's next tx was added the queue.
// TODO(Ayelet): all transactions should be returned after replenishing.
assert_eq!(txs, &[tx_address_0_nonce_0]);
let expected_queue_txs = [TransactionReference::new(&tx_address_0_nonce_1)];
let expected_pool_txs = [tx_address_0_nonce_1];
let expected_mempool_state = MempoolState::new(expected_pool_txs, expected_queue_txs);
// Assert: all transactions are returned.
assert_eq!(txs, &[tx_nonce_0, tx_nonce_1, tx_nonce_2]);
let expected_mempool_state = MempoolState::new([], []);
expected_mempool_state.assert_eq_mempool_state(&mempool);
}

Expand Down Expand Up @@ -418,9 +418,8 @@ fn test_flow_filling_holes(mut mempool: Mempool) {
add_tx(&mut mempool, &input_address_0_nonce_0);
let txs = mempool.get_txs(2).unwrap();

// TODO(Ayelet): all transactions should be returned after replenishing.
// Assert: all remaining transactions are returned.
assert_eq!(txs, &[input_address_0_nonce_0.tx]);
assert_eq!(txs, &[input_address_0_nonce_0.tx, input_address_0_nonce_1.tx]);
}

#[rstest]
Expand Down
9 changes: 6 additions & 3 deletions crates/mempool/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap};

use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::transaction::TransactionHash;

use crate::mempool::TransactionReference;

Expand Down Expand Up @@ -35,14 +34,14 @@ impl TransactionQueue {
}

// TODO(gilad): remove collect
pub fn pop_chunk(&mut self, n_txs: usize) -> Vec<TransactionHash> {
pub fn pop_chunk(&mut self, n_txs: usize) -> Vec<TransactionReference> {
let txs: Vec<TransactionReference> =
(0..n_txs).filter_map(|_| self.queue.pop_last().map(|tx| tx.0)).collect();
for tx in &txs {
self.address_to_tx.remove(&tx.sender_address);
}

txs.into_iter().map(|tx| tx.tx_hash).collect()
txs
}

/// Returns an iterator of the current eligible transactions for sequencing, ordered by their
Expand All @@ -63,6 +62,10 @@ impl TransactionQueue {
}
false
}

pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}

/// Encapsulates a transaction reference to assess its order (i.e., priority).
Expand Down

0 comments on commit 1991f1e

Please sign in to comment.