Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-03-UpdateTxUnstartedToInProgress #12217

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -142,7 +142,7 @@
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 145 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand All @@ -154,7 +154,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 157 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand Down Expand Up @@ -212,25 +212,25 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 215 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 219 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 223 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 228 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {

Check failure on line 233 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).addTxToUnstarted is unused (unused)
return nil
}

Expand All @@ -239,7 +239,7 @@
// It returns an error if there is already a transaction in progress.
// It returns an error if there is no unstarted transaction to move to in_progress.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(
txID int64, seq SEQ, broadcastAt time.Time, initialBroadcastAt time.Time,
txID int64, seq *SEQ, broadcastAt *time.Time, initialBroadcastAt *time.Time,
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
Expand All @@ -255,9 +255,9 @@
}
tx.State = TxInProgress
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt}
tx.Sequence = &seq
tx.BroadcastAt = &broadcastAt
tx.InitialBroadcastAt = &initialBroadcastAt
tx.Sequence = seq
tx.BroadcastAt = broadcastAt
tx.InitialBroadcastAt = initialBroadcastAt

as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
as.inprogressTx = tx
Expand All @@ -267,7 +267,7 @@

// moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state.
// It returns an error if there is no confirmed missing receipt transaction with the given ID.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed(

Check failure on line 270 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveConfirmedMissingReceiptToUnconfirmed is unused (unused)
txID int64,
) error {
as.Lock()
Expand Down
33 changes: 32 additions & 1 deletion common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -46,7 +47,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -141,6 +143,35 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
if tx.Sequence == nil {
return fmt.Errorf("in_progress transaction must have nonce")
}
if tx.State != TxUnstarted {
return fmt.Errorf("update_tx_unstarted_to_in_progress: can only transition to in_progress from unstarted, transaction is currently %s", tx.State)
}
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("attempt state must be in_progress")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
fmt.Println("ms.addressStates", tx.FromAddress)
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
as, ok := ms.addressStates[tx.FromAddress]
if !ok {
return nil
}
fmt.Println("2ms.addressStates", tx.FromAddress)

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxUnstartedToInProgress(ctx, tx, attempt); err != nil {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

// Update in address state in memory
if err := as.moveUnstartedToInProgress(tx.ID, tx.Sequence, tx.BroadcastAt, tx.InitialBroadcastAt, *attempt); err != nil {
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

return nil
}

Expand Down
140 changes: 140 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,154 @@
package txmgr_test

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxUnstartedToInProgress(t *testing.T) {
t.Parallel()

t.Run("successfully updates unstarted tx to inprogress", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce := evmtypes.Nonce(123)
// Insert a transaction into persistent store
inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx.Sequence = &nonce
inTxAttempt := cltest.NewLegacyEthTxAttempt(t, inTx.ID)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

// Update the transaction to in-progress
require.NoError(t, inMemoryStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx, &inTxAttempt))

expTx, err := persistentStore.FindTxWithAttempts(inTx.ID)
require.NoError(t, err)
assert.Equal(t, commontxmgr.TxInProgress, expTx.State)
assert.Equal(t, 1, len(expTx.TxAttempts))

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxInProgress, actTx.State)
})

t.Run("wrong input error scenarios", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce1 := evmtypes.Nonce(1)
nonce2 := evmtypes.Nonce(2)
// Insert a transaction into persistent store
inTx1 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx2 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx1.Sequence = &nonce1
inTx2.Sequence = &nonce2
inTxAttempt1 := cltest.NewLegacyEthTxAttempt(t, inTx1.ID)
inTxAttempt2 := cltest.NewLegacyEthTxAttempt(t, inTx2.ID)
// Insert the transaction into the in-memory store
//inTx2 := cltest.NewEthTx(fromAddress)
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx2))

// sequence nil
inTx1.Sequence = nil
inTx2.Sequence = nil
expErr := persistentStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx1, &inTxAttempt1)
actErr := inMemoryStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.Sequence = &nonce1 // reset
inTx2.Sequence = &nonce2 // reset

// tx not in unstarted state
inTx1.State = commontxmgr.TxInProgress
inTx2.State = commontxmgr.TxInProgress
expErr = persistentStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx2, &inTxAttempt2)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.State = commontxmgr.TxUnstarted // reset
inTx2.State = commontxmgr.TxUnstarted // reset

// tx attempt not in in-progress state
inTxAttempt1.State = txmgrtypes.TxAttemptBroadcast
inTxAttempt2.State = txmgrtypes.TxAttemptBroadcast
expErr = persistentStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTxAttempt1.State = txmgrtypes.TxAttemptInProgress // reset
inTxAttempt2.State = txmgrtypes.TxAttemptInProgress // reset

// wrong from address
inTx1.FromAddress = cltest.NewEIP55Address().Address()
inTx2.FromAddress = cltest.NewEIP55Address().Address()
expErr = persistentStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(testutils.Context(t), &inTx2, &inTxAttempt2)
assert.NoError(t, actErr)
assert.NoError(t, expErr)
inTx1.FromAddress = fromAddress // reset
inTx2.FromAddress = fromAddress // reset
})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand Down
Loading