Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters committed Jan 27, 2024
1 parent f0e972e commit 1cb6666
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 38 deletions.
8 changes: 8 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,14 @@ func (ps *PeerState) BlockPartsSent() int {
return ps.Stats.BlockParts
}

// HasBlock returns true if the peer has received a valid block for the current round
func (ps *PeerState) HasBlock() bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()

return ps.PRS.Block
}

// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock()
Expand Down
3 changes: 2 additions & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
ps := peer.Get(types.PeerStateKey).(*PeerState)

assert.Equal(t, true, ps.VotesSent() > 0, "number of votes sent should have increased")
assert.Equal(t, true, ps.BlockPartsSent() > 0, "number of votes sent should have increased")
assert.Equal(t, true, ps.HasBlock(), "should have block")
}

//-------------------------------------------------------------
Expand Down Expand Up @@ -1056,6 +1056,7 @@ func TestMarshalJSONPeerState(t *testing.T) {
"step": 0,
"start_time": "0001-01-01T00:00:00Z",
"proposal": false,
"block": false,
"proposal_block_part_set_header":
{"total":0, "hash":""},
"proposal_block_parts": null,
Expand Down
8 changes: 4 additions & 4 deletions mempool/cat/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (memR *Reactor) FetchTxsFromKeys(ctx context.Context, blockID []byte, compa
memR.mempool.jsonMetrics.Unlock()

// setup a request for this block and begin to track and retrieve all missing transactions
request := memR.blockFetcher.NewRequest(
request := memR.blockFetcher.newRequest(
blockID,
memR.mempool.Height(),
missingKeys,
Expand Down Expand Up @@ -124,8 +124,8 @@ type blockFetcher struct {
requests map[string]*blockRequest
}

// NewBlockFetcher returns a new blockFetcher for managing block requests
func NewBlockFetcher() *blockFetcher {
// newBlockFetcher returns a new blockFetcher for managing block requests
func newBlockFetcher() *blockFetcher {
return &blockFetcher{
requests: make(map[string]*blockRequest),
}
Expand All @@ -140,7 +140,7 @@ func (bf *blockFetcher) GetRequest(blockID []byte) (*blockRequest, bool) {

// NewRequest creates a new block request and returns it.
// If a request already exists it returns that instead
func (bf *blockFetcher) NewRequest(
func (bf *blockFetcher) newRequest(
blockID []byte,
height int64,
missingKeys map[int]types.TxKey,
Expand Down
12 changes: 6 additions & 6 deletions mempool/cat/block_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,23 @@ func TestBlockRequestConcurrently(t *testing.T) {
}

func TestBlockFetcherSimple(t *testing.T) {
bf := NewBlockFetcher()
bf := newBlockFetcher()
tx := types.Tx("hello world")
key := tx.Key()
missingKeys := map[int]types.TxKey{
0: key,
}
blockID := []byte("blockID")
req := bf.NewRequest(blockID, 1, missingKeys, make([][]byte, 1))
req := bf.newRequest(blockID, 1, missingKeys, make([][]byte, 1))
req2, ok := bf.GetRequest(blockID)
require.True(t, ok)
require.Equal(t, req, req2)
// a different request for the same blockID should
// return the same original request object.
req3 := bf.NewRequest(blockID, 2, missingKeys, make([][]byte, 2))
req3 := bf.newRequest(blockID, 2, missingKeys, make([][]byte, 2))
require.Equal(t, req, req3)

req4 := bf.NewRequest([]byte("differentBlockID"), 1, missingKeys, make([][]byte, 1))
req4 := bf.newRequest([]byte("differentBlockID"), 1, missingKeys, make([][]byte, 1))

bf.TryAddMissingTx(key, tx)
require.False(t, req4.TryAddMissingTx(key, tx))
Expand All @@ -117,7 +117,7 @@ func TestBlockFetcherSimple(t *testing.T) {

func TestBlockFetcherConcurrentRequests(t *testing.T) {
var (
bf = NewBlockFetcher()
bf = newBlockFetcher()
numBlocks = 5
numRequestsPerBlock = 5
numTxs = 5
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestBlockFetcherConcurrentRequests(t *testing.T) {
}
txsCopy := make([][]byte, len(txs))
copy(txsCopy, txs)
request := bf.NewRequest(blockID, 1, mk, txs)
request := bf.newRequest(blockID, 1, mk, txs)
if routine == 0 {
requestWG.Done()
}
Expand Down
38 changes: 23 additions & 15 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) {
return types.Tx{}, false
}

// GetCommitted retrieves a committed transaction based on the key.
// It returns the transaction and a bool indicating if the transaction exists or not.
func (txmp *TxPool) GetCommitted(txKey types.TxKey) (types.Tx, bool) {
wtx := txmp.store.getCommitted(txKey)
if wtx != nil {
return wtx.tx, true
}
return types.Tx{}, false
}

// IsRejectedTx returns true if the transaction was recently rejected and is
// currently within the cache
func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
Expand Down Expand Up @@ -374,7 +384,6 @@ func (txmp *TxPool) RemoveTxByKey(txKey types.TxKey) error {
func (txmp *TxPool) removeTxByKey(txKey types.TxKey) {
txmp.rejectedTxCache.Push(txKey)
_ = txmp.store.remove(txKey)
txmp.seenByPeersSet.RemoveKey(txKey)
}

// Flush purges the contents of the mempool and the cache, leaving both empty.
Expand Down Expand Up @@ -501,19 +510,25 @@ func (txmp *TxPool) Update(

txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs)))

for txKey := range txmp.committedCache {
// Remove the transaction from the mempool.
txmp.removeTxByKey(txKey)
}
txmp.store.clearCommitted()

// add the recently committed transactions to the cache
txmp.committedCache = make(map[types.TxKey]struct{})
for _, tx := range blockTxs {
txmp.committedCache[tx.Key()] = struct{}{}
keys := make([]types.TxKey, len(blockTxs))
for idx, tx := range blockTxs {
keys[idx] = tx.Key()
// this prevents the node from reprocessing recently committed transactions
txmp.rejectedTxCache.Push(keys[idx])
}
txmp.store.markAsCommitted(keys)

// purge transactions that are past the TTL
txmp.purgeExpiredTxs(blockHeight)

// prune record of peers seen transactions after an hour
// We assume by then that the transaction will no longer
// need to be requested
txmp.seenByPeersSet.Prune(time.Now().Add(time.Hour))

// If there any uncommitted transactions left in the mempool, we either
// initiate re-CheckTx per remaining transaction or notify that remaining
// transactions are left.
Expand Down Expand Up @@ -739,13 +754,6 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {

numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge)
txmp.metrics.EvictedTxs.Add(float64(numExpired))

// purge old evicted and seen transactions
if txmp.config.TTLDuration == 0 {
// ensure that seenByPeersSet are eventually pruned
expirationAge = now.Add(-time.Hour)
}
txmp.seenByPeersSet.Prune(expirationAge)
}

func (txmp *TxPool) notifyTxsAvailable() {
Expand Down
9 changes: 7 additions & 2 deletions mempool/cat/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func TestTxPool_Size(t *testing.T) {

txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
// txs are removed with a one block delay so we need to call this twice
require.NoError(t, txmp.Update(1, nil, nil, nil, nil))
txmp.Unlock()

require.Equal(t, len(rawTxs)/2, txmp.Size())
Expand Down Expand Up @@ -288,6 +290,8 @@ func TestTxPool_Eviction(t *testing.T) {
// Free up some space so we can add back previously evicted txs
err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil)
require.NoError(t, err)
// need to call Update twice to actually remove the transaction
_ = txmp.Update(2, nil, nil, nil, nil)
require.False(t, txExists("key10=0123456789abcdef=11"))
mustCheckTx(t, txmp, "key3=0002=10")
require.True(t, txExists("key3=0002=10"))
Expand Down Expand Up @@ -598,9 +602,8 @@ func TestTxPool_ExpiredTxs_NumBlocks(t *testing.T) {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}

txmp.Lock()
require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
txmp.Unlock()
require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil))

require.Equal(t, 95, txmp.Size())

Expand Down Expand Up @@ -695,6 +698,8 @@ func TestTxPool_RemoveBlobTx(t *testing.T) {

err = txmp.Update(1, []types.Tx{indexWrapper}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)
err = txmp.Update(1, nil, nil, nil, nil)
require.NoError(t, err)
require.EqualValues(t, 0, txmp.Size())
require.EqualValues(t, 0, txmp.SizeBytes())
}
Expand Down
15 changes: 13 additions & 2 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) {
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
blockFetcher: NewBlockFetcher(),
blockFetcher: newBlockFetcher(),
traceClient: &trace.Client{},
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
Expand Down Expand Up @@ -187,6 +187,14 @@ func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
return peer
}

// AddPeer broadcasts all the transactions that this node has seen
func (memR *Reactor) AddPeer(peer p2p.Peer) {
keys := memR.mempool.store.getAllKeys()
for _, key := range keys {
memR.broadcastSeenTx(key)
}
}

// RemovePeer implements Reactor. For all current outbound requests to this
// peer it will find a new peer to rerequest the same transactions.
func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
Expand All @@ -195,7 +203,6 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// we won't receive any responses from them.
outboundRequests := memR.requests.ClearAllRequestsFrom(peerID)
for key := range outboundRequests {
memR.mempool.metrics.RequestedTxs.Add(1)
memR.findNewPeerToRequestTx(key)
}
}
Expand Down Expand Up @@ -323,6 +330,10 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
return
}
tx, has := memR.mempool.Get(txKey)
if !has {
// see if the tx was recently committed
tx, has = memR.mempool.GetCommitted(txKey)
}
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
schema.WriteMempoolTx(
Expand Down
25 changes: 25 additions & 0 deletions mempool/cat/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type store struct {
mtx sync.RWMutex
bytes int64
txs map[types.TxKey]*wrappedTx
committedTxs map[types.TxKey]*wrappedTx
}

func newStore() *store {
Expand Down Expand Up @@ -41,13 +42,37 @@ func (s *store) get(txKey types.TxKey) *wrappedTx {
return s.txs[txKey]
}

func (s *store) getCommitted(txKey types.TxKey) *wrappedTx {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.committedTxs[txKey]
}

func (s *store) has(txKey types.TxKey) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
_, has := s.txs[txKey]
return has
}

func (s *store) markAsCommitted(txKeys []types.TxKey) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, key := range txKeys {
if tx, exists := s.txs[key]; exists {
s.bytes -= tx.size()
delete(s.txs, key)
s.committedTxs[key] = tx
}
}
}

func (s *store) clearCommitted() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.committedTxs = make(map[types.TxKey]*wrappedTx)
}

func (s *store) remove(txKey types.TxKey) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Update the state with the block and responses.
state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
return state, 0, fmt.Errorf("failed to update state: %v", err)
}

// Lock mempool, commit app state, update mempoool.
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# We need to build in a Linux environment to support C libraries, e.g. RocksDB.
# We use Debian instead of Alpine, so that we can use binary database packages
# instead of spending time compiling them.
FROM golang:1.21.6-bullseye
FROM golang:1.20-bullseye

RUN apt-get -qq update -y && apt-get -qq upgrade -y >/dev/null
RUN apt-get -qq install -y libleveldb-dev librocksdb-dev >/dev/null
Expand Down
2 changes: 0 additions & 2 deletions test/e2e/networks/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ initial_height = 1000
initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" }
# The most common case (e.g. Cosmos SDK-based chains).
abci_protocol = "builtin"
load_tx_connections = 10
load_tx_batch_size = 10

[validators]
validator01 = 100
Expand Down
3 changes: 0 additions & 3 deletions test/e2e/networks/simple.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
load_tx_connections = 10
load_tx_batch_size = 10

[node.validator01]
[node.validator02]
[node.validator03]
Expand Down
1 change: 0 additions & 1 deletion test/e2e/runner/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func MakeGenesis(testnet *e2e.Testnet) (types.GenesisDoc, error) {
// MakeConfig generates a CometBFT config for a node.
func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg := config.DefaultConfig()
cfg.Consensus.TimeoutPropose = 5 * time.Second
cfg.Moniker = node.Name
cfg.ProxyApp = AppAddressTCP
cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657"
Expand Down

0 comments on commit 1cb6666

Please sign in to comment.