Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: add in-memory store framework: STEP 3 #12128

Closed
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f7aa4f
add initial in-memory store
poopoothegorilla Feb 21, 2024
7371ca2
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Feb 26, 2024
7da0d6b
add deepCopyTx and deepCopyTxAttempt
poopoothegorilla Feb 26, 2024
268f3d5
rerun goimports
poopoothegorilla Feb 26, 2024
67e94b3
add test helpers for future tests
poopoothegorilla Feb 26, 2024
c2418f2
use config for maxQueued number
poopoothegorilla Feb 28, 2024
b09d419
add test helpers
poopoothegorilla Feb 29, 2024
6e5da58
add test helper for finding txs in memory
poopoothegorilla Feb 29, 2024
e62808b
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Feb 29, 2024
c0c8e6b
refactor fatal error methods
poopoothegorilla Feb 29, 2024
9c48a65
update MoveTxToFatalError
poopoothegorilla Feb 29, 2024
32189a2
add context for enabledAddressesForChain
poopoothegorilla Feb 29, 2024
a0cc5ef
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Feb 29, 2024
c5e83de
fix FindTxs
poopoothegorilla Feb 29, 2024
ac03ebe
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Mar 7, 2024
363e78d
change naming of AllTransactions to GetAllTransactions
poopoothegorilla Mar 7, 2024
17395f2
unexport the InMemoryStore
poopoothegorilla Mar 7, 2024
e1ec414
fix test helpers
poopoothegorilla Mar 7, 2024
7379e59
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Mar 8, 2024
b7906c2
ensure that there is a safe unstarted max queue size if none is provided
poopoothegorilla Mar 8, 2024
2a9a205
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Mar 13, 2024
fdccd51
Merge branch 'jtw/step-2-in-memory-work' into jtw/step-3-in-memory-work
poopoothegorilla Mar 21, 2024
35862a8
address comments
poopoothegorilla Mar 21, 2024
c0b23a3
Merge remote-tracking branch 'origin/jtw/step-2-in-memory-work' into …
amit-momin Jun 27, 2024
77f290e
Updated mocks
amit-momin Jun 27, 2024
ecce198
Merge remote-tracking branch 'origin/jtw/step-2-in-memory-work' into …
amit-momin Jun 27, 2024
fdc6c4e
Suppressed unused function warnings while adding framework
amit-momin Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
402 changes: 402 additions & 0 deletions common/txmgr/inmemory_store.go

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions common/txmgr/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txmgr

import (
"context"
"fmt"
"time"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -49,3 +50,49 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestRes
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAbandon(addr ADDR) (err error) {
return b.abandon(addr)
}

func (b *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestInsertTx(fromAddr ADDR, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as, ok := b.addressStates[fromAddr]
if !ok {
return fmt.Errorf("address not found: %s", fromAddr)
}

as.allTxs[tx.ID] = tx
if tx.IdempotencyKey != nil && *tx.IdempotencyKey != "" {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
}

switch tx.State {
case TxUnstarted:
as.unstartedTxs.AddTx(tx)
case TxInProgress:
as.inprogressTx = tx
case TxUnconfirmed:
as.unconfirmedTxs[tx.ID] = tx
case TxConfirmed:
as.confirmedTxs[tx.ID] = tx
case TxConfirmedMissingReceipt:
as.confirmedMissingReceiptTxs[tx.ID] = tx
case TxFatalError:
as.fatalErroredTxs[tx.ID] = tx
}

return nil
}

func (b *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestFindTxs(
txStates []txmgrtypes.TxState,
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
for _, as := range b.addressStates {
txs = append(txs, as.findTxs(txStates, filter, txIDs...)...)
}

return txs
}
4 changes: 4 additions & 0 deletions common/txmgr/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type TransactionManagerTransactionsConfig interface {
MaxQueued() uint64
}

type InMemoryStoreConfig interface {
MaxQueued() uint64
}

type BroadcasterChainConfig interface {
IsL2() bool
}
Expand Down
14 changes: 14 additions & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TxStore[
UnstartedTxQueuePruner
TxHistoryReaper[CHAIN_ID]
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
InMemoryInitializer[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
Expand All @@ -55,6 +56,19 @@ type TxStore[
FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []TxState, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
}

// InMemoryInitializer encapsulates the methods that are used by the txmgr to initialize the in memory tx store.
type InMemoryInitializer[
ADDR types.Hashable,
CHAIN_ID types.ID,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
R ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] interface {
GetAllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
}

// TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts
type TransactionStore[
ADDR types.Hashable,
Expand Down
70 changes: 70 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package txmgr_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
)

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
assert.Equal(t, exp.IdempotencyKey, act.IdempotencyKey)
assert.Equal(t, exp.Sequence, act.Sequence)
assert.Equal(t, exp.FromAddress, act.FromAddress)
assert.Equal(t, exp.ToAddress, act.ToAddress)
assert.Equal(t, exp.EncodedPayload, act.EncodedPayload)
assert.Equal(t, exp.Value, act.Value)
assert.Equal(t, exp.FeeLimit, act.FeeLimit)
assert.Equal(t, exp.Error, act.Error)
assert.Equal(t, exp.BroadcastAt, act.BroadcastAt)
assert.Equal(t, exp.InitialBroadcastAt, act.InitialBroadcastAt)
assert.Equal(t, exp.CreatedAt, act.CreatedAt)
assert.Equal(t, exp.State, act.State)
assert.Equal(t, exp.Meta, act.Meta)
assert.Equal(t, exp.Subject, act.Subject)
assert.Equal(t, exp.ChainID, act.ChainID)
assert.Equal(t, exp.PipelineTaskRunID, act.PipelineTaskRunID)
assert.Equal(t, exp.MinConfirmations, act.MinConfirmations)
assert.Equal(t, exp.TransmitChecker, act.TransmitChecker)
assert.Equal(t, exp.SignalCallback, act.SignalCallback)
assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted)

require.Len(t, exp.TxAttempts, len(act.TxAttempts))
for i := 0; i < len(exp.TxAttempts); i++ {
assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i])
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
}
}

func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) {
assert.Equal(t, exp.ID, act.ID)
assert.Equal(t, exp.TxID, act.TxID)
assert.Equal(t, exp.Tx, act.Tx)
assert.Equal(t, exp.TxFee, act.TxFee)
assert.Equal(t, exp.ChainSpecificFeeLimit, act.ChainSpecificFeeLimit)
assert.Equal(t, exp.SignedRawTx, act.SignedRawTx)
assert.Equal(t, exp.Hash, act.Hash)
assert.Equal(t, exp.CreatedAt, act.CreatedAt)
assert.Equal(t, exp.BroadcastBeforeBlockNum, act.BroadcastBeforeBlockNum)
assert.Equal(t, exp.State, act.State)
assert.Equal(t, exp.TxType, act.TxType)

require.Equal(t, len(exp.Receipts), len(act.Receipts))
for i := 0; i < len(exp.Receipts); i++ {
assertChainReceiptEqual(t, exp.Receipts[i], act.Receipts[i])
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
}
}

func assertChainReceiptEqual(t *testing.T, exp, act evmtxmgr.ChainReceipt) {
assert.Equal(t, exp.GetStatus(), act.GetStatus())
assert.Equal(t, exp.GetTxHash(), act.GetTxHash())
assert.Equal(t, exp.GetBlockNumber(), act.GetBlockNumber())
assert.Equal(t, exp.IsZero(), act.IsZero())
assert.Equal(t, exp.IsUnmined(), act.IsUnmined())
assert.Equal(t, exp.GetFeeUsed(), act.GetFeeUsed())
assert.Equal(t, exp.GetTransactionIndex(), act.GetTransactionIndex())
assert.Equal(t, exp.GetBlockHash(), act.GetBlockHash())
}
16 changes: 16 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,22 @@ func (o *evmTxStore) TransactionsWithAttempts(offset, limit int) (txs []Tx, coun
return
}

// GetAllTransactions returns all eth transactions
func (o *evmTxStore) GetAllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbEtxs []DbEthTx
sql := `SELECT * FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 ORDER BY id desc`
if err = qq.Select(&dbEtxs, sql, fromAddress, chainID.String()); err != nil {
return
}
txs = dbEthTxsToEvmEthTxs(dbEtxs)
err = o.preloadTxAttempts(txs)
return
}

// TxAttempts returns the last tx attempts sorted by created_at descending.
func (o *evmTxStore) TxAttempts(offset, limit int) (txs []TxAttempt, count int, err error) {
sql := `SELECT count(*) FROM evm.tx_attempts`
Expand Down
Loading