Skip to content

Commit

Permalink
Merge branch 'jtw/step-3-04' into jtw/step-3-04-update-tx-fatal-error
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Apr 1, 2024
2 parents f9bd7e6 + d4684a2 commit b03e6fb
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 17 deletions.
84 changes: 67 additions & 17 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,50 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 247 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold and have receipts older than the given block number threshold.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapConfirmedTxs(minBlockNumberToKeep int64, timeThreshold time.Time) {
as.Lock()
defer as.Unlock()

for _, tx := range as.confirmedTxs {
if len(tx.TxAttempts) == 0 {
continue
}
if tx.CreatedAt.After(timeThreshold) {
continue
}

for i := 0; i < len(tx.TxAttempts); i++ {
if len(tx.TxAttempts[i].Receipts) == 0 {
continue
}
if tx.TxAttempts[i].Receipts[0].GetBlockNumber() == nil || tx.TxAttempts[i].Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep {
continue
}
as._deleteTx(tx.ID)
}
}
}

// reapFatalErroredTxs removes fatal errored transactions that are older than the given time threshold.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapFatalErroredTxs(timeThreshold time.Time) {
as.Lock()
defer as.Unlock()

for _, tx := range as.fatalErroredTxs {
if tx.CreatedAt.After(timeThreshold) {
continue
}
as._deleteTx(tx.ID)
}
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txIDs ...int64) {

Check failure on line 289 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

as._deleteTxs(txs...)
as._deleteTxs(txIDs...)
}

// deleteTxAttempts removes the attempts for the given transaction from the address state.
Expand Down Expand Up @@ -487,22 +525,34 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _delet
tx.TxAttempts = nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
txID := tx.ID
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
delete(as.allTxs, txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
as.unstartedTxs.RemoveTxByID(txID)
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) {

Check failure on line 528 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, txID := range txIDs {
as._deleteTx(txID)
}
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTx(txID int64) {
tx, ok := as.allTxs[txID]
if !ok {
return
}

for i := 0; i < len(tx.TxAttempts); i++ {
txAttemptHash := tx.TxAttempts[i].Hash
delete(as.attemptHashToTxAttempt, txAttemptHash)
}
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
as.unstartedTxs.RemoveTxByID(txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
delete(as.allTxs, txID)
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveTxToFatalError(
Expand Down
17 changes: 17 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,23 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error {
if ms.chainID.String() != chainID.String() {
panic("invalid chain ID")
}

// Persist to persistent storage
if err := ms.persistentTxStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID); err != nil {
return err
}

// Update in memory store
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
for _, as := range ms.addressStates {
as.reapConfirmedTxs(minBlockNumberToKeep, timeThreshold)
as.reapFatalErroredTxs(timeThreshold)
}

return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) {
Expand Down
142 changes: 142 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
Expand Down Expand Up @@ -69,6 +70,147 @@ func TestInMemoryStore_UpdateTxFatalError(t *testing.T) {
})
}

func TestInMemoryStore_ReapTxHistory(t *testing.T) {
t.Parallel()

t.Run("reap all confirmed txs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx_0 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 7, 1, fromAddress)
r_0 := mustInsertEthReceipt(t, persistentStore, 1, utils.NewHash(), inTx_0.TxAttempts[0].Hash)
inTx_0.TxAttempts[0].Receipts = append(inTx_0.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_0))
inTx_1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 8, 2, fromAddress)
r_1 := mustInsertEthReceipt(t, persistentStore, 2, utils.NewHash(), inTx_1.TxAttempts[0].Hash)
inTx_1.TxAttempts[0].Receipts = append(inTx_1.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_1))
inTx_2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 9, 3, fromAddress)
r_2 := mustInsertEthReceipt(t, persistentStore, 3, utils.NewHash(), inTx_2.TxAttempts[0].Hash)
inTx_2.TxAttempts[0].Receipts = append(inTx_2.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_2))
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2))

minBlockNumberToKeep := int64(3)
timeThreshold := inTx_2.CreatedAt
expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
require.NoError(t, expErr)
require.NoError(t, actErr)

fn := func(tx *evmtxmgr.Tx) bool { return true }
// Check that the transactions were reaped in persistent store
expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_0.ID)
expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_1.ID)
// Check that the transactions were reaped in in-memory store
actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID)
require.Equal(t, 0, len(actTxs_0))
actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID)
require.Equal(t, 0, len(actTxs_1))

// Check that the transaction was not reaped
expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID)
require.NoError(t, err)
require.Equal(t, inTx_2.ID, expTx_2.ID)
actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID)
require.Equal(t, 1, len(actTxs_2))
assertTxEqual(t, expTx_2, actTxs_2[0])
})
t.Run("reap all fatal error txs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx_0 := cltest.NewEthTx(fromAddress)
inTx_0.Error = null.StringFrom("something exploded")
inTx_0.State = commontxmgr.TxFatalError
inTx_0.CreatedAt = time.Unix(1000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_0))
inTx_1 := cltest.NewEthTx(fromAddress)
inTx_1.Error = null.StringFrom("something exploded")
inTx_1.State = commontxmgr.TxFatalError
inTx_1.CreatedAt = time.Unix(2000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_1))
inTx_2 := cltest.NewEthTx(fromAddress)
inTx_2.Error = null.StringFrom("something exploded")
inTx_2.State = commontxmgr.TxFatalError
inTx_2.CreatedAt = time.Unix(3000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_2))
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2))

minBlockNumberToKeep := int64(3)
timeThreshold := time.Unix(2500, 0) // Only reap txs created before this time
expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
require.NoError(t, expErr)
require.NoError(t, actErr)

fn := func(tx *evmtxmgr.Tx) bool { return true }
// Check that the transactions were reaped in persistent store
expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_0.ID)
expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_1.ID)
// Check that the transactions were reaped in in-memory store
actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID)
require.Equal(t, 0, len(actTxs_0))
actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID)
require.Equal(t, 0, len(actTxs_1))

// Check that the transaction was not reaped
expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID)
require.NoError(t, err)
require.Equal(t, inTx_2.ID, expTx_2.ID)
actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID)
require.Equal(t, 1, len(actTxs_2))
assertTxEqual(t, expTx_2, actTxs_2[0])
})
}

func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) {
t.Parallel()
blockNum := int64(10)
Expand Down

0 comments on commit b03e6fb

Please sign in to comment.