Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-04-UpdateTxForRebroadcast #12231

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -143,7 +143,7 @@
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
// Any transaction states that are unknown will cause a panic.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 146 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -189,7 +189,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 192 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand Down Expand Up @@ -244,11 +244,11 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 247 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 251 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

Expand All @@ -256,12 +256,12 @@
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 259 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 264 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

Expand Down Expand Up @@ -381,24 +381,27 @@
defer as.Unlock()

if txAttempt.State != txmgrtypes.TxAttemptBroadcast {
return fmt.Errorf("move_confirmed_to_unconfirmed: attempt must be in broadcast state")
return fmt.Errorf("attempt must be in broadcast state")
}

tx, ok := as.confirmedTxs[txAttempt.TxID]
if !ok || tx == nil {
return fmt.Errorf("move_confirmed_to_unconfirmed: no confirmed transaction with ID %d", txAttempt.TxID)
return fmt.Errorf("no confirmed transaction with ID %d", txAttempt.TxID)
}
if len(tx.TxAttempts) == 0 {
return fmt.Errorf("move_confirmed_to_unconfirmed: no attempts for transaction with ID %d", txAttempt.TxID)
return fmt.Errorf("no attempts for transaction with ID %d", txAttempt.TxID)
}
tx.State = TxUnconfirmed

// Delete the receipt from the attempt
txAttempt.Receipts = nil
// Reset the broadcast information for the attempt
txAttempt.State = txmgrtypes.TxAttemptInProgress
txAttempt.BroadcastBeforeBlockNum = nil
tx.TxAttempts = append(tx.TxAttempts, txAttempt)
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
tx.TxAttempts[i].Receipts = nil
tx.TxAttempts[i].State = txmgrtypes.TxAttemptInProgress
tx.TxAttempts[i].BroadcastBeforeBlockNum = nil
break
}
}

as.unconfirmedTxs[tx.ID] = tx
delete(as.confirmedTxs, tx.ID)
Expand All @@ -406,7 +409,7 @@
return nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs(

Check failure on line 412 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._applyToTxs is unused (unused)
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -455,7 +458,7 @@
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 461 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
Expand Down
19 changes: 17 additions & 2 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,7 +48,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -331,7 +333,20 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveS
return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[etx.FromAddress]
if !ok {
return nil
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxForRebroadcast(ctx, etx, etxAttempt); err != nil {
return err
}

// Update in memory store
return as.moveConfirmedToUnconfirmed(etxAttempt)
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) {
return false, nil
Expand Down
117 changes: 116 additions & 1 deletion core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,129 @@
package txmgr_test

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) {
t.Parallel()

t.Run("delete all receipts for transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

txAttempt := inTx.TxAttempts[0]
err = inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxUnconfirmed, actTx.State)
assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State)
assert.Nil(t, actTx.TxAttempts[0].BroadcastBeforeBlockNum)
assert.Equal(t, 0, len(actTx.TxAttempts[0].Receipts))
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

txAttempt := inTx.TxAttempts[0]

t.Run("error when attempt is not in Broadcast state", func(t *testing.T) {
txAttempt.State = txmgrtypes.TxAttemptInProgress
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Error(t, expErr)
assert.Error(t, actErr)
txAttempt.State = txmgrtypes.TxAttemptBroadcast
})
t.Run("error when transaction is not in confirmed state", func(t *testing.T) {
inTx.State = commontxmgr.TxUnconfirmed
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Error(t, expErr)
assert.Error(t, actErr)
inTx.State = commontxmgr.TxConfirmed
})
t.Run("wrong fromAddress has no error", func(t *testing.T) {
inTx.FromAddress = common.Address{}
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Equal(t, expErr, actErr)
assert.Nil(t, actErr)
inTx.FromAddress = fromAddress
})

})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand All @@ -33,7 +148,7 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.SignalCallback, act.SignalCallback)
assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted)

require.Len(t, exp.TxAttempts, len(act.TxAttempts))
require.Equal(t, len(exp.TxAttempts), len(act.TxAttempts))
for i := 0; i < len(exp.TxAttempts); i++ {
assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i])
}
Expand Down
Loading