From 5f7aa4fa71aef0d4df3792b74a49fa99bc562fda Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 21 Feb 2024 15:06:26 -0500 Subject: [PATCH 01/18] add initial in-memory store --- common/txmgr/inmemory_store.go | 342 ++++++++++++++++++++++++++ common/txmgr/types/tx_store.go | 14 ++ core/chains/evm/txmgr/evm_tx_store.go | 16 ++ 3 files changed, 372 insertions(+) create mode 100644 common/txmgr/inmemory_store.go diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go new file mode 100644 index 00000000000..c57c295cb82 --- /dev/null +++ b/common/txmgr/inmemory_store.go @@ -0,0 +1,342 @@ +package txmgr + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "time" + + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "gopkg.in/guregu/null.v4" +) + +var ( + // ErrInvalidChainID is returned when the chain ID is invalid + ErrInvalidChainID = errors.New("invalid chain ID") + // ErrTxnNotFound is returned when a transaction is not found + ErrTxnNotFound = errors.New("transaction not found") + // ErrExistingIdempotencyKey is returned when a transaction with the same idempotency key already exists + ErrExistingIdempotencyKey = errors.New("transaction with idempotency key already exists") + // ErrAddressNotFound is returned when an address is not found + ErrAddressNotFound = errors.New("address not found") + // ErrSequenceNotFound is returned when a sequence is not found + ErrSequenceNotFound = errors.New("sequence not found") + // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt + // This most likely happened because an external wallet used the account for this nonce + ErrCouldNotGetReceipt = errors.New("could not get receipt") +) + +type InMemoryStore[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + lggr logger.SugaredLogger + chainID CHAIN_ID + + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + + addressStatesLock sync.RWMutex + addressStates map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] +} + +// NewInMemoryStore returns a new InMemoryStore +func NewInMemoryStore[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + ctx context.Context, + lggr logger.SugaredLogger, + chainID CHAIN_ID, + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], +) (*InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { + ms := InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, + chainID: chainID, + keyStore: keyStore, + txStore: txStore, + + addressStates: map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, + } + + maxUnstarted := 50 + addresses, err := keyStore.EnabledAddressesForChain(chainID) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + for _, fromAddr := range addresses { + txs, err := txStore.AllTransactions(ctx, fromAddr, chainID) + if err != nil { + return nil, fmt.Errorf("address_state: initialization: %w", err) + } + as, err := NewAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + + ms.addressStates[fromAddr] = as + } + + return &ms, nil +} + +// CreateTransaction creates a new transaction for a given txRequest. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction( + ctx context.Context, + txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], + chainID CHAIN_ID, +) ( + txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + return txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, nil +} + +// FindTxWithIdempotencyKey returns a transaction with the given idempotency key +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +// CheckTxQueueCapacity checks if the queue capacity has been reached for a given address +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) error { + return nil +} + +// FindLatestSequence returns the latest sequence number for a given address +// It is used to initialize the in-memory sequence map in the broadcaster +// TODO(jtw): this is until we have a abstracted Sequencer Component which can be used instead +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (seq SEQ, err error) { + return seq, nil +} + +// CountUnconfirmedTransactions returns the number of unconfirmed transactions for a given address. +// Unconfirmed transactions are transactions that have been broadcast but not confirmed on-chain. +// NOTE(jtw): used to calculate total inflight transactions +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { + return 0, nil +} + +// CountUnstartedTransactions returns the number of unstarted transactions for a given address. +// Unstarted transactions are transactions that have not been broadcast yet. +// NOTE(jtw): used to calculate total inflight transactions +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { + return 0, nil +} + +// UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxUnstartedToInProgress( + ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + return nil +} + +// GetTxInProgress returns the in_progress transaction for a given address. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +// UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast. +// It also updates the transaction state to unconfirmed. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAttemptInProgressToBroadcast( + ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + newAttemptState txmgrtypes.TxAttemptState, +) error { + return nil +} + +// FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { + return nil +} + +// SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveReplacementInProgressAttempt( + ctx context.Context, + oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + return nil +} + +// UpdateTxFatalError updates a transaction to fatal_error. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} + +// Close closes the InMemoryStore +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { +} + +// Abandon removes all transactions for a given address +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { + return nil +} + +// SetBroadcastBeforeBlockNum sets the broadcast_before_block_num for a given chain ID +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error { + return nil +} + +// FindTxAttemptsConfirmedMissingReceipt returns all transactions that are confirmed but missing a receipt +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) ( + []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + return nil, nil +} + +// UpdateBroadcastAts updates the broadcast_at time for a given set of attempts +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateBroadcastAts(ctx context.Context, now time.Time, txIDs []int64) error { + return nil +} + +// UpdateTxsUnconfirmed updates the unconfirmed transactions for a given set of ids +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxsUnconfirmed(ctx context.Context, txIDs []int64) error { + return nil +} + +// FindTxAttemptsRequiringReceiptFetch returns all transactions that are missing a receipt +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID CHAIN_ID) ( + attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + err error, +) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ( + []txmgrtypes.ReceiptPlus[R], + error, +) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { + return nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { + return nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ( + []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + return nil, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByStates(ctx context.Context, metaField string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (tx []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + return 0, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(_ context.Context, address ADDR, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(_ context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(_ context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + return null.Time{}, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { + return null.Int{}, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(_ context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(_ context.Context, account ADDR, chainID CHAIN_ID) (bool, error) { + return false, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes(_ context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { + return false, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + return nil, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { + return nil +} diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 742a1740033..06560ba42c6 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -34,6 +34,7 @@ type TxStore[ UnstartedTxQueuePruner TxHistoryReaper[CHAIN_ID] TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] + InMemoryInitializer[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) @@ -55,6 +56,19 @@ type TxStore[ FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []TxState, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) } +// InMemoryInitializer encapsulates the methods that are used by the txmgr to initialize the in memory tx store. +type InMemoryInitializer[ + ADDR types.Hashable, + CHAIN_ID types.ID, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] interface { + AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) +} + // TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts type TransactionStore[ ADDR types.Hashable, diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index ae986acee27..b310f831c8c 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -465,6 +465,22 @@ func (o *evmTxStore) TransactionsWithAttempts(offset, limit int) (txs []Tx, coun return } +// AllTransactions returns all eth transactions +func (o *evmTxStore) AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + var dbEtxs []DbEthTx + sql := `SELECT * FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 ORDER BY id desc` + if err = qq.Select(&dbEtxs, sql, fromAddress, chainID.String()); err != nil { + return + } + txs = dbEthTxsToEvmEthTxs(dbEtxs) + err = o.preloadTxAttempts(txs) + return +} + // TxAttempts returns the last tx attempts sorted by created_at descending. func (o *evmTxStore) TxAttempts(offset, limit int) (txs []TxAttempt, count int, err error) { sql := `SELECT count(*) FROM evm.tx_attempts` From 7da0d6b7a2323896181ea832577552ee61d429f6 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 15:49:44 -0500 Subject: [PATCH 02/18] add deepCopyTx and deepCopyTxAttempt --- common/txmgr/inmemory_store.go | 64 ++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index c57c295cb82..ef5ac67de20 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -340,3 +340,67 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkA func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { return nil } + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + copyTx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + ID: tx.ID, + IdempotencyKey: tx.IdempotencyKey, + Sequence: tx.Sequence, + FromAddress: tx.FromAddress, + ToAddress: tx.ToAddress, + EncodedPayload: make([]byte, len(tx.EncodedPayload)), + Value: *new(big.Int).Set(&tx.Value), + FeeLimit: tx.FeeLimit, + Error: tx.Error, + BroadcastAt: tx.BroadcastAt, + InitialBroadcastAt: tx.InitialBroadcastAt, + CreatedAt: tx.CreatedAt, + State: tx.State, + TxAttempts: make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(tx.TxAttempts)), + Meta: tx.Meta, + Subject: tx.Subject, + ChainID: tx.ChainID, + PipelineTaskRunID: tx.PipelineTaskRunID, + MinConfirmations: tx.MinConfirmations, + TransmitChecker: tx.TransmitChecker, + SignalCallback: tx.SignalCallback, + CallbackCompleted: tx.CallbackCompleted, + } + + // Copy the EncodedPayload + copy(copyTx.EncodedPayload, tx.EncodedPayload) + + // Copy the TxAttempts + for i, attempt := range tx.TxAttempts { + copyTx.TxAttempts[i] = ms.deepCopyTxAttempt(copyTx, attempt) + } + + return ©Tx +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTxAttempt( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + copyAttempt := txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + ID: attempt.ID, + TxID: attempt.TxID, + Tx: tx, + TxFee: attempt.TxFee, + ChainSpecificFeeLimit: attempt.ChainSpecificFeeLimit, + SignedRawTx: make([]byte, len(attempt.SignedRawTx)), + Hash: attempt.Hash, + CreatedAt: attempt.CreatedAt, + BroadcastBeforeBlockNum: attempt.BroadcastBeforeBlockNum, + State: attempt.State, + Receipts: make([]txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], len(attempt.Receipts)), + TxType: attempt.TxType, + } + + copy(copyAttempt.SignedRawTx, attempt.SignedRawTx) + copy(copyAttempt.Receipts, attempt.Receipts) + + return copyAttempt +} From 268f3d573edd2d6e726ca1f0716b13452c87baf0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 16:15:50 -0500 Subject: [PATCH 03/18] rerun goimports --- common/txmgr/inmemory_store.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index ef5ac67de20..cd783a25210 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -9,11 +9,12 @@ import ( "time" "github.com/google/uuid" + "gopkg.in/guregu/null.v4" + "github.com/smartcontractkit/chainlink-common/pkg/logger" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "gopkg.in/guregu/null.v4" ) var ( From 67e94b3e9ae4f9100e55e43c0ad5bc54cee1ad2c Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 17:26:27 -0500 Subject: [PATCH 04/18] add test helpers for future tests --- .../evm/txmgr/evm_inmemory_store_test.go | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 core/chains/evm/txmgr/evm_inmemory_store_test.go diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go new file mode 100644 index 00000000000..a102ee1c996 --- /dev/null +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -0,0 +1,70 @@ +package txmgr_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" +) + +// assertTxEqual asserts that two transactions are equal +func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { + assert.Equal(t, exp.ID, act.ID) + assert.Equal(t, exp.IdempotencyKey, act.IdempotencyKey) + assert.Equal(t, exp.Sequence, act.Sequence) + assert.Equal(t, exp.FromAddress, act.FromAddress) + assert.Equal(t, exp.ToAddress, act.ToAddress) + assert.Equal(t, exp.EncodedPayload, act.EncodedPayload) + assert.Equal(t, exp.Value, act.Value) + assert.Equal(t, exp.FeeLimit, act.FeeLimit) + assert.Equal(t, exp.Error, act.Error) + assert.Equal(t, exp.BroadcastAt, act.BroadcastAt) + assert.Equal(t, exp.InitialBroadcastAt, act.InitialBroadcastAt) + assert.Equal(t, exp.CreatedAt, act.CreatedAt) + assert.Equal(t, exp.State, act.State) + assert.Equal(t, exp.Meta, act.Meta) + assert.Equal(t, exp.Subject, act.Subject) + assert.Equal(t, exp.ChainID, act.ChainID) + assert.Equal(t, exp.PipelineTaskRunID, act.PipelineTaskRunID) + assert.Equal(t, exp.MinConfirmations, act.MinConfirmations) + assert.Equal(t, exp.TransmitChecker, act.TransmitChecker) + assert.Equal(t, exp.SignalCallback, act.SignalCallback) + assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted) + + require.Len(t, exp.TxAttempts, len(act.TxAttempts)) + for i := 0; i < len(exp.TxAttempts); i++ { + assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i]) + } +} + +func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) { + assert.Equal(t, exp.ID, act.ID) + assert.Equal(t, exp.TxID, act.TxID) + assert.Equal(t, exp.Tx, act.Tx) + assert.Equal(t, exp.TxFee, act.TxFee) + assert.Equal(t, exp.ChainSpecificFeeLimit, act.ChainSpecificFeeLimit) + assert.Equal(t, exp.SignedRawTx, act.SignedRawTx) + assert.Equal(t, exp.Hash, act.Hash) + assert.Equal(t, exp.CreatedAt, act.CreatedAt) + assert.Equal(t, exp.BroadcastBeforeBlockNum, act.BroadcastBeforeBlockNum) + assert.Equal(t, exp.State, act.State) + assert.Equal(t, exp.TxType, act.TxType) + + require.Equal(t, len(exp.Receipts), len(act.Receipts)) + for i := 0; i < len(exp.Receipts); i++ { + assertChainReceiptEqual(t, exp.Receipts[i], act.Receipts[i]) + } +} + +func assertChainReceiptEqual(t *testing.T, exp, act evmtxmgr.ChainReceipt) { + assert.Equal(t, exp.GetStatus(), act.GetStatus()) + assert.Equal(t, exp.GetTxHash(), act.GetTxHash()) + assert.Equal(t, exp.GetBlockNumber(), act.GetBlockNumber()) + assert.Equal(t, exp.IsZero(), act.IsZero()) + assert.Equal(t, exp.IsUnmined(), act.IsUnmined()) + assert.Equal(t, exp.GetFeeUsed(), act.GetFeeUsed()) + assert.Equal(t, exp.GetTransactionIndex(), act.GetTransactionIndex()) + assert.Equal(t, exp.GetBlockHash(), act.GetBlockHash()) +} From c2418f2cbbb84313ee2d4464407c8c496cef0741 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 28 Feb 2024 16:50:12 -0500 Subject: [PATCH 05/18] use config for maxQueued number --- common/txmgr/inmemory_store.go | 3 ++- common/txmgr/types/config.go | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..acbeec961e9 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -63,6 +63,7 @@ func NewInMemoryStore[ chainID CHAIN_ID, keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], + config txmgrtypes.InMemoryStoreConfig, ) (*InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { ms := InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ lggr: lggr, @@ -73,7 +74,7 @@ func NewInMemoryStore[ addressStates: map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, } - maxUnstarted := 50 + maxUnstarted := int(config.MaxQueued()) addresses, err := keyStore.EnabledAddressesForChain(chainID) if err != nil { return nil, fmt.Errorf("new_in_memory_store: %w", err) diff --git a/common/txmgr/types/config.go b/common/txmgr/types/config.go index 502a7f42d5c..4ee8100b499 100644 --- a/common/txmgr/types/config.go +++ b/common/txmgr/types/config.go @@ -23,6 +23,10 @@ type TransactionManagerTransactionsConfig interface { MaxQueued() uint64 } +type InMemoryStoreConfig interface { + MaxQueued() uint64 +} + type BroadcasterChainConfig interface { IsL2() bool } From b09d419e29edff3b1d2d6424dc945e54b99b7220 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 28 Feb 2024 21:32:45 -0500 Subject: [PATCH 06/18] add test helpers --- common/txmgr/test_helpers.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 6c0c5680ea7..888e963e365 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -2,6 +2,7 @@ package txmgr import ( "context" + "fmt" "time" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -49,3 +50,36 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestRes func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAbandon(addr ADDR) (err error) { return b.abandon(addr) } + +func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestInsertTx(fromAddr ADDR, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as, ok := b.addressStates[fromAddr] + if !ok { + return fmt.Errorf("address not found: %s", fromAddr) + } + + as.allTxs[tx.ID] = tx + if tx.IdempotencyKey != nil && *tx.IdempotencyKey != "" { + as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx + } + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt + } + + switch tx.State { + case TxUnstarted: + as.unstartedTxs.AddTx(tx) + case TxInProgress: + as.inprogressTx = tx + case TxUnconfirmed: + as.unconfirmedTxs[tx.ID] = tx + case TxConfirmed: + as.confirmedTxs[tx.ID] = tx + case TxConfirmedMissingReceipt: + as.confirmedMissingReceiptTxs[tx.ID] = tx + case TxFatalError: + as.fatalErroredTxs[tx.ID] = tx + } + + return nil +} From 6e5da588804aefac0dddaa5847ab26470959b945 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 28 Feb 2024 21:41:26 -0500 Subject: [PATCH 07/18] add test helper for finding txs in memory --- common/txmgr/test_helpers.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 888e963e365..3aacf715c75 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -83,3 +83,16 @@ func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTes return nil } + +func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestFindTxs( + txStates []txmgrtypes.TxState, + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, as := range b.addressStates { + txs = append(txs, as.FetchTxs(txStates, filter, txIDs...)...) + } + + return txs +} From c0c8e6bb5fcb53691aa1c11890ef956d64af4257 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:51:19 -0500 Subject: [PATCH 08/18] refactor fatal error methods --- common/txmgr/address_state.go | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 2bbb777c634..7954c85965e 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -217,21 +217,8 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn return nil } -// MoveUnstartedToFatalError moves the unstarted transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError( - etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - txError null.String, -) error { - return nil -} - -// MoveInProgressToFatalError moves the in-progress transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToFatalError(txError null.String) error { - return nil -} - -// MoveConfirmedMissingReceiptToFatalError moves the confirmed missing receipt transaction to the fatal error state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError( +// MoveTxToFatalError moves a transaction to the fatal error state. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txError null.String, ) error { From 9c48a65591286ace6c5ccabe07e59659eafd28f0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:54:11 -0500 Subject: [PATCH 09/18] update MoveTxToFatalError --- common/txmgr/address_state.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 7954c85965e..7dd04eff485 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -219,8 +219,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn // MoveTxToFatalError moves a transaction to the fatal error state. func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveTxToFatalError( - etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - txError null.String, + txID int64, txError null.String, ) error { return nil } From 32189a2106eee1f3a2d590a0c1fef6b5ffb327f1 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 15:55:20 -0500 Subject: [PATCH 10/18] add context for enabledAddressesForChain --- common/txmgr/inmemory_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index acbeec961e9..85df6ee99c3 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -75,7 +75,7 @@ func NewInMemoryStore[ } maxUnstarted := int(config.MaxQueued()) - addresses, err := keyStore.EnabledAddressesForChain(chainID) + addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) if err != nil { return nil, fmt.Errorf("new_in_memory_store: %w", err) } From c5e83de05a16a8b536851c3953925e10a07766dd Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 29 Feb 2024 16:12:13 -0500 Subject: [PATCH 11/18] fix FindTxs --- common/txmgr/test_helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 72b28411092..69a6cb84747 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -91,7 +91,7 @@ func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTes ) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} for _, as := range b.addressStates { - txs = append(txs, as.FetchTxs(txStates, filter, txIDs...)...) + txs = append(txs, as.FindTxs(txStates, filter, txIDs...)...) } return txs From 363e78dcadcc6a8df769ecf20872752a4a3dd459 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 7 Mar 2024 15:45:57 -0500 Subject: [PATCH 12/18] change naming of AllTransactions to GetAllTransactions --- common/txmgr/types/tx_store.go | 2 +- core/chains/evm/txmgr/evm_tx_store.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index a48735ff08b..8fc7e9a4620 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -66,7 +66,7 @@ type InMemoryInitializer[ SEQ types.Sequence, FEE feetypes.Fee, ] interface { - AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) + GetAllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) } // TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 644f4828fd1..ba166a3f4e8 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -465,8 +465,8 @@ func (o *evmTxStore) TransactionsWithAttempts(offset, limit int) (txs []Tx, coun return } -// AllTransactions returns all eth transactions -func (o *evmTxStore) AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) { +// GetAllTransactions returns all eth transactions +func (o *evmTxStore) GetAllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() From 17395f2fdd1b8476c853a872ef0c9a099a04f3f3 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 7 Mar 2024 16:00:06 -0500 Subject: [PATCH 13/18] unexport the InMemoryStore --- common/txmgr/inmemory_store.go | 150 ++++++++++++++++----------------- 1 file changed, 72 insertions(+), 78 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 85df6ee99c3..3c9b3829479 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "sync" "time" "github.com/google/uuid" @@ -33,7 +32,8 @@ var ( ErrCouldNotGetReceipt = errors.New("could not get receipt") ) -type InMemoryStore[ +// inMemoryStore is a simple wrapper around a persistent TxStore and KeyStore. It handles all the transaction state in memory. +type inMemoryStore[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], @@ -43,14 +43,13 @@ type InMemoryStore[ lggr logger.SugaredLogger chainID CHAIN_ID - keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] - txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - addressStatesLock sync.RWMutex - addressStates map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] } -// NewInMemoryStore returns a new InMemoryStore +// NewInMemoryStore returns a new inMemoryStore func NewInMemoryStore[ CHAIN_ID types.ID, ADDR, TX_HASH, BLOCK_HASH types.Hashable, @@ -62,41 +61,36 @@ func NewInMemoryStore[ lggr logger.SugaredLogger, chainID CHAIN_ID, keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], - txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], + persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], config txmgrtypes.InMemoryStoreConfig, -) (*InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { - ms := InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ - lggr: lggr, - chainID: chainID, - keyStore: keyStore, - txStore: txStore, - - addressStates: map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, +) (*inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { + ms := inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, + chainID: chainID, + keyStore: keyStore, + persistentTxStore: persistentTxStore, + + addressStates: map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, } - maxUnstarted := int(config.MaxQueued()) + maxUnstarted := config.MaxQueued() addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) if err != nil { return nil, fmt.Errorf("new_in_memory_store: %w", err) } for _, fromAddr := range addresses { - txs, err := txStore.AllTransactions(ctx, fromAddr, chainID) + txs, err := persistentTxStore.GetAllTransactions(ctx, fromAddr, chainID) if err != nil { return nil, fmt.Errorf("address_state: initialization: %w", err) } - as, err := NewAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) - if err != nil { - return nil, fmt.Errorf("new_in_memory_store: %w", err) - } - - ms.addressStates[fromAddr] = as + ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) } return &ms, nil } // CreateTransaction creates a new transaction for a given txRequest. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction( ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], chainID CHAIN_ID, @@ -108,38 +102,38 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat } // FindTxWithIdempotencyKey returns a transaction with the given idempotency key -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // CheckTxQueueCapacity checks if the queue capacity has been reached for a given address -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) error { return nil } // FindLatestSequence returns the latest sequence number for a given address // It is used to initialize the in-memory sequence map in the broadcaster // TODO(jtw): this is until we have a abstracted Sequencer Component which can be used instead -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (seq SEQ, err error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (seq SEQ, err error) { return seq, nil } // CountUnconfirmedTransactions returns the number of unconfirmed transactions for a given address. // Unconfirmed transactions are transactions that have been broadcast but not confirmed on-chain. // NOTE(jtw): used to calculate total inflight transactions -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { return 0, nil } // CountUnstartedTransactions returns the number of unstarted transactions for a given address. // Unstarted transactions are transactions that have not been broadcast yet. // NOTE(jtw): used to calculate total inflight transactions -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { return 0, nil } // UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxUnstartedToInProgress( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxUnstartedToInProgress( ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -148,13 +142,13 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // GetTxInProgress returns the in_progress transaction for a given address. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast. // It also updates the transaction state to unconfirmed. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAttemptInProgressToBroadcast( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAttemptInProgressToBroadcast( ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -164,12 +158,12 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { return nil } // SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveReplacementInProgressAttempt( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveReplacementInProgressAttempt( ctx context.Context, oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], @@ -178,26 +172,26 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR } // UpdateTxFatalError updates a transaction to fatal_error. -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -// Close closes the InMemoryStore -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { +// Close closes the inMemoryStore +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { } // Abandon removes all transactions for a given address -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { return nil } // SetBroadcastBeforeBlockNum sets the broadcast_before_block_num for a given chain ID -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error { return nil } // FindTxAttemptsConfirmedMissingReceipt returns all transactions that are confirmed but missing a receipt -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) ( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) ( []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error, ) { @@ -205,145 +199,145 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } // UpdateBroadcastAts updates the broadcast_at time for a given set of attempts -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateBroadcastAts(ctx context.Context, now time.Time, txIDs []int64) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateBroadcastAts(ctx context.Context, now time.Time, txIDs []int64) error { return nil } // UpdateTxsUnconfirmed updates the unconfirmed transactions for a given set of ids -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxsUnconfirmed(ctx context.Context, txIDs []int64) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxsUnconfirmed(ctx context.Context, txIDs []int64) error { return nil } // FindTxAttemptsRequiringReceiptFetch returns all transactions that are missing a receipt -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID CHAIN_ID) ( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID CHAIN_ID) ( attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, ) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ( []txmgrtypes.ReceiptPlus[R], error, ) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ( []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error, ) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByStates(ctx context.Context, metaField string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByStates(ctx context.Context, metaField string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (tx []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (tx []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { return 0, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(_ context.Context, address ADDR, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(_ context.Context, address ADDR, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(_ context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(_ context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(_ context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(_ context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { return null.Time{}, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { return null.Int{}, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(_ context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(_ context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(_ context.Context, account ADDR, chainID CHAIN_ID) (bool, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(_ context.Context, account ADDR, chainID CHAIN_ID) (bool, error) { return false, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes(_ context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes(_ context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { return false, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { copyTx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ @@ -382,7 +376,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepC return ©Tx } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTxAttempt( +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTxAttempt( tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { From e1ec4146700dbcf1b7b93644407b98369ebe95ab Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 7 Mar 2024 16:01:54 -0500 Subject: [PATCH 14/18] fix test helpers --- common/txmgr/test_helpers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 69a6cb84747..fa9af9a506a 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -51,7 +51,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAba return b.abandon(addr) } -func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestInsertTx(fromAddr ADDR, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (b *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestInsertTx(fromAddr ADDR, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { as, ok := b.addressStates[fromAddr] if !ok { return fmt.Errorf("address not found: %s", fromAddr) @@ -63,7 +63,7 @@ func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTes } for i := 0; i < len(tx.TxAttempts); i++ { txAttempt := tx.TxAttempts[i] - as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt } switch tx.State { @@ -84,14 +84,14 @@ func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTes return nil } -func (b *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestFindTxs( +func (b *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestFindTxs( txStates []txmgrtypes.TxState, filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, ) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} for _, as := range b.addressStates { - txs = append(txs, as.FindTxs(txStates, filter, txIDs...)...) + txs = append(txs, as.findTxs(txStates, filter, txIDs...)...) } return txs From b7906c29fc34c5a8df20cc2378b76e71ae92ec14 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 8 Mar 2024 15:01:13 -0500 Subject: [PATCH 15/18] ensure that there is a safe unstarted max queue size if none is provided --- common/txmgr/inmemory_store.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 3c9b3829479..f1f4d1a9f30 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -74,6 +74,9 @@ func NewInMemoryStore[ } maxUnstarted := config.MaxQueued() + if maxUnstarted <= 0 { + maxUnstarted = 10000 + } addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) if err != nil { return nil, fmt.Errorf("new_in_memory_store: %w", err) From 35862a8a62625e73278413af38ab653719b69754 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 20 Mar 2024 23:48:37 -0400 Subject: [PATCH 16/18] address comments --- common/txmgr/inmemory_store.go | 27 +++++++++++++++++---------- common/txmgr/types/tx_store.go | 2 +- core/chains/evm/txmgr/evm_tx_store.go | 12 ++++-------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index f1f4d1a9f30..bd4e9a2f3a6 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -43,6 +43,7 @@ type inMemoryStore[ lggr logger.SugaredLogger chainID CHAIN_ID + maxUnstarted uint64 keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] @@ -73,20 +74,26 @@ func NewInMemoryStore[ addressStates: map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, } - maxUnstarted := config.MaxQueued() - if maxUnstarted <= 0 { - maxUnstarted = 10000 + ms.maxUnstarted = config.MaxQueued() + if ms.maxUnstarted <= 0 { + ms.maxUnstarted = 10000 } - addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) + + txs, err := persistentTxStore.GetAllTransactions(ctx, chainID) if err != nil { - return nil, fmt.Errorf("new_in_memory_store: %w", err) + return nil, fmt.Errorf("address_state: initialization: %w", err) } - for _, fromAddr := range addresses { - txs, err := persistentTxStore.GetAllTransactions(ctx, fromAddr, chainID) - if err != nil { - return nil, fmt.Errorf("address_state: initialization: %w", err) + addressesToTxs := map[ADDR][]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, tx := range txs { + at, exists := addressesToTxs[tx.FromAddress] + if !exists { + at = []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} } - ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) + at = append(at, tx) + addressesToTxs[tx.FromAddress] = at + } + for fromAddr, txs := range addressesToTxs { + ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, ms.maxUnstarted, txs) } return &ms, nil diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 8fc7e9a4620..4521d49ad6a 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -66,7 +66,7 @@ type InMemoryInitializer[ SEQ types.Sequence, FEE feetypes.Fee, ] interface { - GetAllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) + GetAllTransactions(ctx context.Context, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) } // TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 2132eb67c30..e57161c0f8f 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -466,18 +466,14 @@ func (o *evmTxStore) TransactionsWithAttempts(ctx context.Context, offset, limit } // GetAllTransactions returns all eth transactions -func (o *evmTxStore) GetAllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) { - var cancel context.CancelFunc - ctx, cancel = o.mergeContexts(ctx) - defer cancel() - qq := o.q.WithOpts(pg.WithParentCtx(ctx)) +func (o *evmTxStore) GetAllTransactions(ctx context.Context, chainID *big.Int) (txs []Tx, err error) { var dbEtxs []DbEthTx - sql := `SELECT * FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 ORDER BY id desc` - if err = qq.Select(&dbEtxs, sql, fromAddress, chainID.String()); err != nil { + sql := `SELECT * FROM evm.txes WHERE evm_chain_id = $1 ORDER BY id desc` + if err = o.q.SelectContext(ctx, &dbEtxs, sql, chainID.String()); err != nil { return } txs = dbEthTxsToEvmEthTxs(dbEtxs) - err = o.preloadTxAttempts(txs) + err = o.preloadTxAttempts(ctx, txs) return } From 77f290e7843a56b756bffff74d7f7c88dd28bb8b Mon Sep 17 00:00:00 2001 From: amit-momin Date: Thu, 27 Jun 2024 14:37:20 -0500 Subject: [PATCH 17/18] Updated mocks --- common/txmgr/types/mocks/tx_store.go | 30 +++++++++++++++++++++ core/chains/evm/txmgr/mocks/evm_tx_store.go | 30 +++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 9c2edfa963f..228984fc612 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -730,6 +730,36 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAbandone return r0, r1 } +// GetAllTransactions provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAllTransactions(ctx context.Context, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for GetAllTransactions") + } + + var r0 []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, address, chainID) diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 5a699f71bf9..5d54b6ac334 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -881,6 +881,36 @@ func (_m *EvmTxStore) GetAbandonedTransactionsByBatch(ctx context.Context, chain return r0, r1 } +// GetAllTransactions provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) GetAllTransactions(ctx context.Context, chainID *big.Int) ([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for GetAllTransactions") + } + + var r0 []types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) ([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) []types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address common.Address, chainID *big.Int) ([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, address, chainID) From fdc6c4eefd033a848bc877fd993de9751192a37f Mon Sep 17 00:00:00 2001 From: amit-momin Date: Thu, 27 Jun 2024 18:25:58 -0500 Subject: [PATCH 18/18] Suppressed unused function warnings while adding framework --- common/txmgr/inmemory_store.go | 2 ++ core/chains/evm/txmgr/evm_inmemory_store_test.go | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index bd4e9a2f3a6..70cf972b5b5 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -347,6 +347,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkO return nil } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { @@ -386,6 +387,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepC return ©Tx } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTxAttempt( tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..82512b1a447 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -10,6 +10,8 @@ import ( ) // assertTxEqual asserts that two transactions are equal +// +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) assert.Equal(t, exp.IdempotencyKey, act.IdempotencyKey) @@ -39,6 +41,7 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { } } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) { assert.Equal(t, exp.ID, act.ID) assert.Equal(t, exp.TxID, act.TxID) @@ -58,6 +61,7 @@ func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) { } } +//lint:ignore U1000 Ignore unused function temporarily while adding the framework func assertChainReceiptEqual(t *testing.T, exp, act evmtxmgr.ChainReceipt) { assert.Equal(t, exp.GetStatus(), act.GetStatus()) assert.Equal(t, exp.GetTxHash(), act.GetTxHash())