Skip to content

Commit

Permalink
improve test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 26, 2024
1 parent 61fdc21 commit ffc8eb7
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 66 deletions.
67 changes: 39 additions & 28 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,27 +961,29 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
ec.nConsecutiveBlocksChainTooShort = 0
}

missingTxs, err := ec.markFinalizedUsingLocalChain(ctx, head, ec.chainConfig.RPCDefaultBatchSize())
// fetch all confirmed transactions.
confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID)
if err != nil {
return fmt.Errorf("FindConfirmedTransactions failed: %w", err)
}

// find transactions that are not present in the local chain
missingTxs, err := ec.markFinalizedWithLocalChain(ctx, head, confirmedTxs, ec.chainConfig.RPCDefaultBatchSize())
if err != nil {
return fmt.Errorf("failed to find tx missing from the local chain: %w", err)
}

// Tx might be missing from the chain because it was included into block that is deeper than HistoryDepth.
// Mark finalized txes and returns IDs of transactions whose ID's we failed to find
missingTxIDs, err := ec.fetchAndSaveFinalizedReceipts(ctx, missingTxs)
// find transactions that were re-orged out of the chain and we no longer can find their receipts
missingTxs, err = ec.markFinalizedWithRPCReceipts(ctx, missingTxs)
if err != nil {
return err
}

// mark all missing tx for rebroadcast
for _, tx := range missingTxs {
if _, ok := missingTxIDs[tx.ID]; !ok {
continue
}

if err := ec.markForRebroadcast(*tx, head); err != nil {
return fmt.Errorf("markForRebroadcast failed for tx %v: %w", tx.ID, err)
}

}

// It is safe to process separate keys concurrently
Expand All @@ -1007,25 +1009,28 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
return multierr.Combine(errs...)
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedUsingLocalChain(ctx context.Context,
head types.Head[BLOCK_HASH], batchSize uint32) (
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithLocalChain(ctx context.Context,
head types.Head[BLOCK_HASH], confirmedTxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], batchSize uint32) (
[]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
// fetch all confirmed transactions.
confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID)
if err != nil {
return nil, fmt.Errorf("FindConfirmedTransactions failed: %w", err)
if len(confirmedTxs) == 0 {
return nil, nil
}

finalizedBHead, err := ec.findFinalizedHeadInChain(ctx, head)
finalizedHead, err := ec.findFinalizedHeadInChain(ctx, head)
if err != nil {
return nil, fmt.Errorf("failed to find finalized block in chain: %w", err)
}

if finalizedHead != nil {
ec.lggr.With("block_num", finalizedHead.BlockNumber(), "hash", finalizedHead.BlockHash()).
Debug("found finalized block in headTracker's chain - using it to find finalized txs")
}

txsMissingInChain := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(confirmedTxs))
finalizedAttempts := make([]int64, 0, batchSize)
for _, tx := range confirmedTxs {
// transaction is finalized
if finalizedAttempt := findAttemptWithReceiptInChain(*tx, finalizedBHead); finalizedAttempt != nil {
if finalizedAttempt := findAttemptWithReceiptInChain(*tx, finalizedHead); finalizedAttempt != nil {
finalizedAttempts = append(finalizedAttempts, finalizedAttempt.ID)

if uint32(len(finalizedAttempts)) >= batchSize {
Expand Down Expand Up @@ -1061,22 +1066,22 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
return txsMissingInChain, nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceipts(
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithRPCReceipts(
ctx context.Context,
txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) (map[int64]struct{}, error) {
) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

missingTxs := make(map[int64]struct{}, len(txs))
missingTxIDs := make(map[int64]struct{}, len(txs))

attemptsBatch := make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
0, ec.chainConfig.RPCDefaultBatchSize())
for _, tx := range txs {
missingTxs[tx.ID] = struct{}{}
missingTxIDs[tx.ID] = struct{}{}
for _, attempt := range tx.TxAttempts {
attemptsBatch = append(attemptsBatch, attempt)
// save one for block fetching
if uint32(len(attemptsBatch)+1) >= ec.chainConfig.RPCDefaultBatchSize() {
err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs)
err := ec.markFinalizedWithRPCReceiptsBatch(ctx, attemptsBatch, missingTxIDs)
if err != nil {
return nil, fmt.Errorf("failed to mark txs finalized: %w", err)
}
Expand All @@ -1086,22 +1091,29 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
}

if len(attemptsBatch) > 0 {
err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs)
err := ec.markFinalizedWithRPCReceiptsBatch(ctx, attemptsBatch, missingTxIDs)
if err != nil {
return nil, fmt.Errorf("failed to mark txs finalized: %w", err)
}
}

missingTxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(missingTxIDs))
for _, tx := range txs {
if _, ok := missingTxIDs[tx.ID]; ok {
missingTxs = append(missingTxs, tx)
}
}

return missingTxs, nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceiptsBatch(
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithRPCReceiptsBatch(
ctx context.Context,
attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
missingTxs map[int64]struct{},
) error {

rpcFinalizedBlock, receipts, txErrs, err := ec.client.BatchGetReceiptsWithFinalizedBlock(ctx, attempts,
rpcFinalizedHeight, receipts, txErrs, err := ec.client.BatchGetReceiptsWithFinalizedHeight(ctx, attempts,
ec.chainConfig.FinalityTagEnabled(), ec.chainConfig.FinalityDepth())
if err != nil {
return err
Expand All @@ -1127,13 +1139,12 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
}

delete(missingTxs, attempt.TxID)
if receipt.GetBlockNumber().Cmp(rpcFinalizedBlock) >= 0 {
l.With("rpc_finalized_block", rpcFinalizedBlock).Debug("attempt is not finalized")
if receipt.GetBlockNumber().Cmp(rpcFinalizedHeight) > 0 {
l.With("rpc_finalized_height", rpcFinalizedHeight).Debug("attempt is not finalized")
continue
}

finalizedReceipts = append(finalizedReceipts, receipts[i])
missingTxs[attempts[i].TxID] = struct{}{}
}

err = ec.txStore.SaveFinalizedReceipts(ctx, finalizedReceipts, ec.chainID)
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type TxmClient[
ctx context.Context,
attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) (txReceipt []R, txErr []error, err error)
BatchGetReceiptsWithFinalizedBlock(ctx context.Context,
BatchGetReceiptsWithFinalizedHeight(ctx context.Context,
attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
useFinalityTag bool, finalityDepth uint32) (
finalizedBlock *big.Int, txReceipt []R, txErr []error, funcErr error)
Expand Down
6 changes: 3 additions & 3 deletions common/txmgr/types/mocks/txm_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions core/chains/evm/txmgr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func newGetBatchReceiptsReq(attempts []TxAttempt) (reqs []rpc.BatchElem, txRecei
return reqs, txReceipts, txErrs
}

// FinalizedBlockHash - returns hash and block number of latest finalized block.
// Must not be called on chains that do not support finality tag.
func (c *evmTxmClient) FinalizedBlockHash(ctx context.Context) (common.Hash, *big.Int, error) {
head, err := c.client.FinalizedBlock(ctx)
if err != nil || head == nil {
Expand All @@ -159,12 +161,14 @@ func (c *evmTxmClient) FinalizedBlockHash(ctx context.Context) (common.Hash, *bi
return head.Hash, big.NewInt(head.BlockNumber()), nil
}

// TODO: test me
func (c *evmTxmClient) BatchGetReceiptsWithFinalizedBlock(ctx context.Context, attempts []TxAttempt, useFinalityTag bool, finalityDepth uint32) (
finalizedBlock *big.Int, txReceipt []*evmtypes.Receipt, txErr []error, funcErr error) {
// BatchGetReceiptsWithFinalizedHeight - returns most recently finalized block and receipts for the attempts.
// If finality tag is enabled - uses corresponding RPC request to get finalizaed block.
// Otherwise calculates it based on finalityDepth and latest block number.
func (c *evmTxmClient) BatchGetReceiptsWithFinalizedHeight(ctx context.Context, attempts []TxAttempt, useFinalityTag bool, finalityDepth uint32) (
finalizedBlock *big.Int, receipts []*evmtypes.Receipt, receiptErrs []error, funcErr error) {

var reqs []rpc.BatchElem
reqs, txReceipt, txErr = newGetBatchReceiptsReq(attempts)
reqs, receipts, receiptErrs = newGetBatchReceiptsReq(attempts)

blockNumber := rpc.LatestBlockNumber
if useFinalityTag {
Expand All @@ -180,22 +184,22 @@ func (c *evmTxmClient) BatchGetReceiptsWithFinalizedBlock(ctx context.Context, a
reqs = append(reqs, blockRequest)

if err := c.client.BatchCallContext(ctx, reqs); err != nil {
return nil, nil, nil, fmt.Errorf("BatchGetReceiptsWithFinalizedBlock error fetching receipts with BatchCallContext: %w", err)
return nil, nil, nil, fmt.Errorf("BatchGetReceiptsWithFinalizedHeight error fetching receipts with BatchCallContext: %w", err)
}

if blockRequest.Error != nil {
return nil, nil, nil, fmt.Errorf("failed to fetch finalized block with BatchCallContext: %w", blockRequest.Error)
}

for i := range txErr {
txErr[i] = reqs[i].Error
for i := range receiptErrs {
receiptErrs[i] = reqs[i].Error
}

finalizedBlock = big.NewInt(head.BlockNumber() - int64(finalityDepth))
if useFinalityTag {
finalizedBlock = big.NewInt(head.BlockNumber())
}
return finalizedBlock, txReceipt, txErr, nil
return finalizedBlock, receipts, receiptErrs, nil
}

// sendEmptyTransaction sends a transaction with 0 Eth and an empty payload to the burn address
Expand Down
89 changes: 89 additions & 0 deletions core/chains/evm/txmgr/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package txmgr_test

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
)

func TestClient_BatchGetReceiptsWithFinalizedHeight(t *testing.T) {
client := evmtest.NewEthClientMockWithDefaultChain(t)
txmClient := txmgr.NewEvmTxmClient(client)
testCases := []struct {
Name string
// inputs
UseFinalityTag bool
FinalityDepth uint32

// RPC response
RPCErr error
BlockError error
RPCHead evmtypes.Head

// Call Results
ExpectedBlock *big.Int
ExpectedErr error
}{
{
Name: "returns error if call fails",
UseFinalityTag: true,
FinalityDepth: 10,

RPCErr: errors.New("failed to call RPC"),

ExpectedErr: errors.New("failed to call RPC"),
},
{
Name: "returns error if fail to fetch block",

BlockError: errors.New("failed to get bock"),

ExpectedErr: errors.New("failed to get block"),
},
{
Name: "Returns block as is, if we are using finality tag",
UseFinalityTag: true,
FinalityDepth: 10,

ExpectedBlock: big.NewInt(100),

RPCHead: evmtypes.Head{Number: 100},
},
{
Name: "Subtracts finality depth if finality tag is disabled",
UseFinalityTag: false,
FinalityDepth: 10,

ExpectedBlock: big.NewInt(90),

RPCHead: evmtypes.Head{Number: 100},
},
}

for _, testCase := range testCases {
client.On("BatchCallContext", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
reqs := args.Get(1).([]rpc.BatchElem)
blockReq := reqs[len(reqs)-1]
blockReq.Error = testCase.BlockError
reqHead := blockReq.Result.(*evmtypes.Head)
*reqHead = testCase.RPCHead
}).Return(testCase.ExpectedErr).Once()
block, _, _, err := txmClient.BatchGetReceiptsWithFinalizedHeight(testutils.Context(t), nil, testCase.UseFinalityTag, testCase.FinalityDepth)
if testCase.ExpectedErr != nil {
assert.Error(t, testCase.ExpectedErr, err)
} else {
assert.NoError(t, err)
}

assert.Equal(t, testCase.ExpectedBlock, block)
}
}
Loading

0 comments on commit ffc8eb7

Please sign in to comment.