Skip to content

Commit

Permalink
ResetWithHeaders optimization (#198)
Browse files Browse the repository at this point in the history
* fix

* reorg

* small optimization

* comments fix

* ut fix
  • Loading branch information
goran-ethernal authored Apr 18, 2024
1 parent 7de79dc commit bd84444
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 72 deletions.
2 changes: 1 addition & 1 deletion consensus/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (d *Dev) writeNewBlock(parent *types.Header) error {

// after the block has been written we reset the txpool so that
// the old transactions are removed
d.txpool.ResetWithHeaders(block.Header)
d.txpool.ResetWithBlock(block)

return nil
}
Expand Down
16 changes: 11 additions & 5 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type txPoolInterface interface {
Drop(*types.Transaction)
Demote(*types.Transaction)
SetSealing(bool)
ResetWithHeaders(...*types.Header)
ResetWithBlock(*types.Block)
}

// epochMetadata is the static info for epoch currently being processed
Expand Down Expand Up @@ -157,9 +157,15 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti
eventProvider: NewEventProvider(config.blockchain),
}

bridgeManager, err := newBridgeManager(runtime, config, runtime.eventProvider, log)
if err != nil {
return nil, err
var bridgeManager BridgeManager

if runtime.IsBridgeEnabled() {
bridgeManager, err = newBridgeManager(runtime, config, runtime.eventProvider, log)
if err != nil {
return nil, err
}
} else {
bridgeManager = &dummyBridgeManager{}
}

runtime.bridgeManager = bridgeManager
Expand Down Expand Up @@ -274,7 +280,7 @@ func (c *consensusRuntime) OnBlockInserted(fullBlock *types.FullBlock) {
}

// after the block has been written we reset the txpool so that the old transactions are removed
c.config.txPool.ResetWithHeaders(fullBlock.Block.Header)
c.config.txPool.ResetWithBlock(fullBlock.Block)

var (
epoch = c.epoch
Expand Down
2 changes: 1 addition & 1 deletion consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestConsensusRuntime_OnBlockInserted_EndOfEpoch(t *testing.T) {
polybftBackendMock.On("GetValidatorsWithTx", mock.Anything, mock.Anything, mock.Anything).Return(validatorSet).Times(3)

txPool := new(txPoolMock)
txPool.On("ResetWithHeaders", mock.Anything).Once()
txPool.On("ResetWithBlock", mock.Anything).Once()

snapshot := NewProposerSnapshot(epochSize-1, validatorSet)
polybftCfg := &PolyBFTConfig{EpochSize: epochSize}
Expand Down
10 changes: 8 additions & 2 deletions consensus/polybft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ func (f *fsm) BuildProposal(currentRound uint64) ([]byte, error) {
"state root", stateBlock.Block.Header.StateRoot,
"proposal hash", checkpointHash.String(),
"txs count", len(stateBlock.Block.Transactions),
"txs", buf.String())
"txs", buf.String(),
"finsihedIn", time.Since(start),
)
}

f.target = stateBlock
Expand Down Expand Up @@ -321,6 +323,8 @@ func (f *fsm) ValidateCommit(signerAddr []byte, seal []byte, proposalHash []byte

// Validate validates a raw proposal (used if non-proposer)
func (f *fsm) Validate(proposal []byte) error {
start := time.Now().UTC()

var block types.Block
if err := block.UnmarshalRLP(proposal); err != nil {
return fmt.Errorf("failed to validate, cannot decode block data. Error: %w", err)
Expand Down Expand Up @@ -414,7 +418,9 @@ func (f *fsm) Validate(proposal []byte) error {
"block num", block.Number(),
"state root", block.Header.StateRoot,
"proposer", types.BytesToHash(block.Header.Miner),
"proposal hash", checkpointHash)
"proposal hash", checkpointHash,
"finishedIn", time.Since(start),
)
}

f.target = stateBlock
Expand Down
4 changes: 2 additions & 2 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (tp *txPoolMock) SetSealing(v bool) {
tp.Called(v)
}

func (tp *txPoolMock) ResetWithHeaders(values ...*types.Header) {
tp.Called(values)
func (tp *txPoolMock) ResetWithBlock(fullBlock *types.Block) {
tp.Called(fullBlock)
}

var _ syncer.Syncer = (*syncerMock)(nil)
Expand Down
79 changes: 28 additions & 51 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"google.golang.org/grpc"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/chain"
"github.com/0xPolygon/polygon-edge/network"
"github.com/0xPolygon/polygon-edge/state"
Expand Down Expand Up @@ -473,75 +472,54 @@ func (p *TxPool) Demote(tx *types.Transaction) {
p.eventManager.signalEvent(proto.EventType_DEMOTED, tx.Hash())
}

// ResetWithHeaders processes the transactions from the new
// headers to sync the pool with the new state.
func (p *TxPool) ResetWithHeaders(headers ...*types.Header) {
// ResetWithBlock processes the transactions from the newly
// finalized block to sync the pool with the new state
func (p *TxPool) ResetWithBlock(block *types.Block) {
// process the txs in the event
// to make sure the pool is up-to-date
p.processEvent(&blockchain.Event{
NewChain: headers,
})
}

// processEvent collects the latest nonces for each account contained
// in the received event. Resets all known accounts with the new nonce.
func (p *TxPool) processEvent(event *blockchain.Event) {
// Grab the latest state root now that the block has been inserted
stateRoot := p.store.Header().StateRoot
stateNonces := make(map[types.Address]uint64)

// discover latest (next) nonces for all accounts
for _, header := range event.NewChain {
block, ok := p.store.GetBlockByHash(header.Hash, true)
if !ok {
p.logger.Error("could not find block in store", "hash", header.Hash.String())

continue
}

// remove mined txs from the lookup map
p.index.remove(block.Transactions...)
// remove mined txs from the lookup map
p.index.remove(block.Transactions...)

// Extract latest nonces
for _, tx := range block.Transactions {
var err error
// Extract latest nonces
for _, tx := range block.Transactions {
var err error

addr := tx.From()
if addr == types.ZeroAddress {
// From field is not set, extract the signer
if addr, err = p.signer.Sender(tx); err != nil {
p.logger.Error(
fmt.Sprintf("unable to extract signer for transaction, %v", err),
)

continue
}
}
addr := tx.From()
if addr == types.ZeroAddress {
// From field is not set, extract the signer
if addr, err = p.signer.Sender(tx); err != nil {
p.logger.Error(
fmt.Sprintf("unable to extract signer for transaction, %v", err),
)

// skip already processed accounts
if _, processed := stateNonces[addr]; processed {
continue
}
}

// fetch latest nonce from the state
latestNonce := p.store.GetNonce(stateRoot, addr)

// update the result map
stateNonces[addr] = latestNonce
// skip already processed accounts
if _, processed := stateNonces[addr]; processed {
continue
}
}

// update base fee
if ln := len(event.NewChain); ln > 0 {
p.SetBaseFee(event.NewChain[ln-1])
// fetch latest nonce from the state
latestNonce := p.store.GetNonce(stateRoot, addr)

// update the result map
stateNonces[addr] = latestNonce
}

p.SetBaseFee(block.Header)

// reset accounts with the new state
p.resetAccounts(stateNonces)

if !p.sealing.Load() {
// only non-validator cleanup inactive accounts
p.updateAccountSkipsCounts(stateNonces)
p.updateAccountSkipsCounts(stateNonces, stateRoot)
}
}

Expand Down Expand Up @@ -1049,8 +1027,7 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {

// updateAccountSkipsCounts update the accounts' skips,
// the number of the consecutive blocks that doesn't have the account's transactions
func (p *TxPool) updateAccountSkipsCounts(latestActiveAccounts map[types.Address]uint64) {
stateRoot := p.store.Header().StateRoot
func (p *TxPool) updateAccountSkipsCounts(latestActiveAccounts map[types.Address]uint64, stateRoot types.Hash) {
p.accounts.Range(
func(key, value interface{}) bool {
address, _ := key.(types.Address)
Expand Down
21 changes: 11 additions & 10 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@ func Test_updateAccountSkipsCounts(t *testing.T) {

pool.updateAccountSkipsCounts(map[types.Address]uint64{
// empty
})
}, types.Hash{1})

// make sure the account queue is empty and skips is reset
assert.Zero(t, accountMap.enqueued.length())
Expand Down Expand Up @@ -2070,7 +2070,7 @@ func Test_updateAccountSkipsCounts(t *testing.T) {

pool.updateAccountSkipsCounts(map[types.Address]uint64{
// empty
})
}, types.Hash{1})

// make sure the account queue is empty and skips is reset
assert.Zero(t, accountMap.enqueued.length())
Expand Down Expand Up @@ -2105,7 +2105,7 @@ func Test_updateAccountSkipsCounts(t *testing.T) {

pool.updateAccountSkipsCounts(map[types.Address]uint64{
addr1: 1,
})
}, types.Hash{1})

// make sure the account queue is empty and skips is reset
assert.Zero(t, accountMap.enqueued.length())
Expand Down Expand Up @@ -2167,7 +2167,7 @@ func Test_updateAccountSkipsCounts(t *testing.T) {
accountMap.setNonce(storeNonce + 3)
accountMap.skips = maxAccountSkips - 1

pool.updateAccountSkipsCounts(map[types.Address]uint64{})
pool.updateAccountSkipsCounts(map[types.Address]uint64{}, types.Hash{1})

// make sure the account queue is empty and skips is reset
assert.Zero(t, accountMap.enqueued.length())
Expand Down Expand Up @@ -3646,7 +3646,7 @@ func TestAddTxsInOrder(t *testing.T) {
}
}

func TestResetWithHeadersSetsBaseFee(t *testing.T) {
func TestResetWithBlockSetsBaseFee(t *testing.T) {
t.Parallel()

blocks := []*types.Block{
Expand All @@ -3665,7 +3665,7 @@ func TestResetWithHeadersSetsBaseFee(t *testing.T) {
{
Header: &types.Header{
BaseFee: 2000,
Hash: types.Hash{2},
Hash: types.Hash{1},
},
},
}
Expand All @@ -3685,12 +3685,13 @@ func TestResetWithHeadersSetsBaseFee(t *testing.T) {
require.NoError(t, err)

pool.SetBaseFee(blocks[0].Header)
require.Equal(t, blocks[0].Header.BaseFee, pool.GetBaseFee())

pool.ResetWithHeaders()
assert.Equal(t, blocks[0].Header.BaseFee, pool.GetBaseFee())
pool.ResetWithBlock(blocks[len(blocks)-1])
require.Equal(t, blocks[len(blocks)-1].Header.BaseFee, pool.GetBaseFee())

pool.ResetWithHeaders(blocks[len(blocks)-2].Header, blocks[len(blocks)-1].Header)
assert.Equal(t, blocks[len(blocks)-1].Header.BaseFee, pool.GetBaseFee())
pool.ResetWithBlock(blocks[len(blocks)-2])
require.Equal(t, blocks[len(blocks)-2].Header.BaseFee, pool.GetBaseFee())
}

func TestAddTx_TxReplacement(t *testing.T) {
Expand Down

0 comments on commit bd84444

Please sign in to comment.