Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-04-MarkOldTxesMissingReceiptAsErrored #12233

12 changes: 9 additions & 3 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -144,7 +144,7 @@
// If no txStates are provided, all transactions are considered.
// This method does not handle transactions in the UnstartedTx state.
// Any transaction states that are unknown will cause a panic including UnstartedTx.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 147 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -183,7 +183,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 186 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand Down Expand Up @@ -233,11 +233,11 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 236 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 240 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

Expand All @@ -245,12 +245,12 @@
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 248 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 253 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

Expand Down Expand Up @@ -303,18 +303,24 @@
if tx == nil {
return fmt.Errorf("move_tx_to_fatal_error: no transaction with ID %d", txID)
}
originalState := tx.State

// Move the transaction to the fatal error state
as._moveTxToFatalError(tx, txError)

switch tx.State {
// Remove the transaction from its original state
switch originalState {
case TxUnstarted:
_ = as.unstartedTxs.RemoveTxByID(txID)
case TxInProgress:
as.inprogressTx = nil
case TxConfirmedMissingReceipt:
delete(as.confirmedMissingReceiptTxs, tx.ID)
case TxFatalError:
// Already in fatal error state
return nil
default:
panic("move_tx_to_fatal_error: unknown transaction state")
panic(fmt.Sprintf("move_tx_to_fatal_error: unknown transaction state: %q", tx.State))
}

return nil
Expand Down Expand Up @@ -403,7 +409,7 @@
return nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs(

Check failure on line 412 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._applyToTxs is unused (unused)
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -452,7 +458,7 @@
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 461 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
Expand All @@ -476,7 +482,7 @@
) {
tx.State = TxFatalError
tx.Sequence = nil
tx.TxAttempts = nil
tx.BroadcastAt = nil
tx.InitialBroadcastAt = nil
tx.Error = txError
as.fatalErroredTxs[tx.ID] = tx
Expand Down
87 changes: 85 additions & 2 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -46,7 +47,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -337,7 +339,88 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkA
return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error {
return nil
if ms.chainID.String() != chainID.String() {
return nil
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}

// Persist to persistent storage
if err := ms.persistentTxStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID); err != nil {
return err
}

// Update in memory store
type result struct {
ID int64
Sequence SEQ
FromAddress ADDR
MaxBroadcastBeforeBlockNum int64
TxHashes []TX_HASH
}
var results []result
cutoff := blockNum - int64(finalityDepth)
if cutoff <= 0 {
return nil
}
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if len(tx.TxAttempts) == 0 {
return false
}
var maxBroadcastBeforeBlockNum int64
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
if txAttempt.BroadcastBeforeBlockNum == nil {
continue
}
if *txAttempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum {
maxBroadcastBeforeBlockNum = *txAttempt.BroadcastBeforeBlockNum
}
}
return maxBroadcastBeforeBlockNum < cutoff
}
var errs error
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
for _, as := range ms.addressStates {
states := []txmgrtypes.TxState{TxConfirmedMissingReceipt}
txs := as.findTxs(states, filter)
for _, tx := range txs {
if err := as.moveTxToFatalError(tx.ID, null.StringFrom(ErrCouldNotGetReceipt.Error())); err != nil {
err = fmt.Errorf("mark_old_txes_missing_receipt_as_errored: address: %s: %w", as.fromAddress, err)
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
errs = errors.Join(errs, err)
continue
}
hashes := make([]TX_HASH, len(tx.TxAttempts))
maxBroadcastBeforeBlockNum := int64(0)
for i, attempt := range tx.TxAttempts {
hashes[i] = attempt.Hash
if attempt.BroadcastBeforeBlockNum != nil {
if *attempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum {
maxBroadcastBeforeBlockNum = *attempt.BroadcastBeforeBlockNum
}
}
}
rr := result{
ID: tx.ID,
Sequence: *tx.Sequence,
FromAddress: tx.FromAddress,
MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum,
TxHashes: hashes,
}
results = append(results, rr)
}
}

for _, r := range results {
ms.lggr.Criticalw(fmt.Sprintf("eth_tx with ID %v expired without ever getting a receipt for any of our attempts. "+
"Current block height is %v, transaction was broadcast before block height %v. This transaction may not have not been sent and will be marked as fatally errored. "+
"This can happen if there is another instance of chainlink running that is using the same private key, or if "+
"an external wallet has been used to send a transaction from account %s with nonce %v."+
" Please note that Chainlink requires exclusive ownership of it's private keys and sharing keys across multiple"+
" chainlink instances, or using the chainlink keys with an external wallet is NOT SUPPORTED and WILL lead to missed transactions",
r.ID, blockNum, r.MaxBroadcastBeforeBlockNum, r.FromAddress, r.Sequence), "ethTxID", r.ID, "sequence", r.Sequence, "fromAddress", r.FromAddress, "txHashes", r.TxHashes)
}

return errs
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx(
Expand Down
106 changes: 104 additions & 2 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,111 @@
package txmgr_test

import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) {
t.Parallel()
blockNum := int64(10)
finalityDepth := uint32(2)

t.Run("successfully mark errored transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, persistentStore, 1, 7, time.Now(), fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

err = inMemoryStore.MarkOldTxesMissingReceiptAsErrored(testutils.Context(t), blockNum, finalityDepth, chainID)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(inTx.ID)
require.NoError(t, err)
require.Len(t, expTx.TxAttempts, 1)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State)
assert.Equal(t, commontxmgr.TxFatalError, actTx.State)
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, persistentStore, 1, 7, time.Now(), fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

t.Run("error when old attempt is not in progress", func(t *testing.T) {
wrongChainID := big.NewInt(123)
expErr := persistentStore.MarkOldTxesMissingReceiptAsErrored(testutils.Context(t), blockNum, finalityDepth, wrongChainID)
actErr := inMemoryStore.MarkOldTxesMissingReceiptAsErrored(testutils.Context(t), blockNum, finalityDepth, wrongChainID)
assert.Equal(t, expErr, actErr)
})
})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand All @@ -20,7 +117,12 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.Value, act.Value)
assert.Equal(t, exp.FeeLimit, act.FeeLimit)
assert.Equal(t, exp.Error, act.Error)
assert.Equal(t, exp.BroadcastAt, act.BroadcastAt)
if exp.BroadcastAt != nil {
require.NotNil(t, act.BroadcastAt)
assert.Equal(t, exp.BroadcastAt.Unix(), act.BroadcastAt.Unix())
} else {
assert.Equal(t, exp.BroadcastAt, act.BroadcastAt)
}
assert.Equal(t, exp.InitialBroadcastAt, act.InitialBroadcastAt)
assert.Equal(t, exp.CreatedAt, act.CreatedAt)
assert.Equal(t, exp.State, act.State)
Expand All @@ -33,7 +135,7 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.SignalCallback, act.SignalCallback)
assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted)

require.Len(t, exp.TxAttempts, len(act.TxAttempts))
require.Equal(t, len(exp.TxAttempts), len(act.TxAttempts))
for i := 0; i < len(exp.TxAttempts); i++ {
assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i])
}
Expand Down
Loading