Skip to content

Commit

Permalink
Merge pull request #12233 from smartcontractkit/jtw/step-3-04-mark-ol…
Browse files Browse the repository at this point in the history
…d-txs-missing-receipt-as-errored

TXM In-memory: step 3-04-MarkOldTxesMissingReceiptAsErrored
  • Loading branch information
poopoothegorilla authored Mar 22, 2024
2 parents 9d59e02 + ae2d267 commit 991a5f1
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 4 deletions.
10 changes: 8 additions & 2 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,22 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTx
if tx == nil {
return fmt.Errorf("move_tx_to_fatal_error: no transaction with ID %d", txID)
}
originalState := tx.State

// Move the transaction to the fatal error state
as._moveTxToFatalError(tx, txError)

switch tx.State {
// Remove the transaction from its original state
switch originalState {
case TxUnstarted:
_ = as.unstartedTxs.RemoveTxByID(txID)
case TxInProgress:
as.inprogressTx = nil
case TxConfirmedMissingReceipt:
delete(as.confirmedMissingReceiptTxs, tx.ID)
case TxFatalError:
// Already in fatal error state
return nil
default:
panic(fmt.Sprintf("unknown transaction state: %q", tx.State))
}
Expand Down Expand Up @@ -482,7 +488,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveT
) {
tx.State = TxFatalError
tx.Sequence = nil
tx.TxAttempts = nil
tx.BroadcastAt = nil
tx.InitialBroadcastAt = nil
tx.Error = txError
as.fatalErroredTxs[tx.ID] = tx
Expand Down
83 changes: 82 additions & 1 deletion common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,88 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkA
return errs
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error {
return nil
if ms.chainID.String() != chainID.String() {
panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String()))
}

// Persist to persistent storage
if err := ms.persistentTxStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID); err != nil {
return err
}

// Update in memory store
type result struct {
ID int64
Sequence SEQ
FromAddress ADDR
MaxBroadcastBeforeBlockNum int64
TxHashes []TX_HASH
}
var results []result
cutoff := blockNum - int64(finalityDepth)
if cutoff <= 0 {
return nil
}
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if len(tx.TxAttempts) == 0 {
return false
}
var maxBroadcastBeforeBlockNum int64
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
if txAttempt.BroadcastBeforeBlockNum == nil {
continue
}
if *txAttempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum {
maxBroadcastBeforeBlockNum = *txAttempt.BroadcastBeforeBlockNum
}
}
return maxBroadcastBeforeBlockNum < cutoff
}
var errs error
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
for _, as := range ms.addressStates {
states := []txmgrtypes.TxState{TxConfirmedMissingReceipt}
txs := as.findTxs(states, filter)
for _, tx := range txs {
if err := as.moveTxToFatalError(tx.ID, null.StringFrom(ErrCouldNotGetReceipt.Error())); err != nil {
err = fmt.Errorf("mark_old_txes_missing_receipt_as_errored: address: %s: %w", as.fromAddress, err)
errs = errors.Join(errs, err)
continue
}
hashes := make([]TX_HASH, len(tx.TxAttempts))
maxBroadcastBeforeBlockNum := int64(0)
for i, attempt := range tx.TxAttempts {
hashes[i] = attempt.Hash
if attempt.BroadcastBeforeBlockNum != nil {
if *attempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum {
maxBroadcastBeforeBlockNum = *attempt.BroadcastBeforeBlockNum
}
}
}
rr := result{
ID: tx.ID,
Sequence: *tx.Sequence,
FromAddress: tx.FromAddress,
MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum,
TxHashes: hashes,
}
results = append(results, rr)
}
}

for _, r := range results {
ms.lggr.Criticalw(fmt.Sprintf("eth_tx with ID %v expired without ever getting a receipt for any of our attempts. "+
"Current block height is %v, transaction was broadcast before block height %v. This transaction may not have not been sent and will be marked as fatally errored. "+
"This can happen if there is another instance of chainlink running that is using the same private key, or if "+
"an external wallet has been used to send a transaction from account %s with nonce %v."+
" Please note that Chainlink requires exclusive ownership of it's private keys and sharing keys across multiple"+
" chainlink instances, or using the chainlink keys with an external wallet is NOT SUPPORTED and WILL lead to missed transactions",
r.ID, blockNum, r.MaxBroadcastBeforeBlockNum, r.FromAddress, r.Sequence), "ethTxID", r.ID, "sequence", r.Sequence, "fromAddress", r.FromAddress, "txHashes", r.TxHashes)
}

return errs
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx(
Expand Down
58 changes: 57 additions & 1 deletion core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txmgr_test
import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand All @@ -21,6 +22,54 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) {
t.Parallel()
blockNum := int64(10)
finalityDepth := uint32(2)

t.Run("successfully mark errored transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, persistentStore, 1, 7, time.Now(), fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

err = inMemoryStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State)
assert.Equal(t, commontxmgr.TxFatalError, actTx.State)
})
}

func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -56,11 +105,13 @@ func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) {

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)
require.Len(t, expTx.TxAttempts, 1)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxUnconfirmed, actTx.State)
assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State)
Expand Down Expand Up @@ -193,7 +244,12 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.Value, act.Value)
assert.Equal(t, exp.FeeLimit, act.FeeLimit)
assert.Equal(t, exp.Error, act.Error)
assert.Equal(t, exp.BroadcastAt, act.BroadcastAt)
if exp.BroadcastAt != nil {
require.NotNil(t, act.BroadcastAt)
assert.Equal(t, exp.BroadcastAt.Unix(), act.BroadcastAt.Unix())
} else {
assert.Equal(t, exp.BroadcastAt, act.BroadcastAt)
}
assert.Equal(t, exp.InitialBroadcastAt, act.InitialBroadcastAt)
assert.Equal(t, exp.CreatedAt, act.CreatedAt)
assert.Equal(t, exp.State, act.State)
Expand Down

0 comments on commit 991a5f1

Please sign in to comment.