Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: add address state framework : STEP 2 #12122

Closed
Closed
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8e329ed
add address state
poopoothegorilla Feb 21, 2024
b69865e
change to better naming scheme and remove old TODOs
poopoothegorilla Feb 23, 2024
36574fa
change naming of transactions from Txn to Tx to match the struct
poopoothegorilla Feb 23, 2024
788d9b7
goimports
poopoothegorilla Feb 23, 2024
885cfb7
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Feb 29, 2024
25186b5
fix incorrect note
poopoothegorilla Feb 29, 2024
e0d9939
change naming from fetch to find
poopoothegorilla Feb 29, 2024
cd5f156
remove empty line
poopoothegorilla Feb 29, 2024
dcb01d9
update MoveTxToFatalError
poopoothegorilla Feb 29, 2024
777d53a
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 6, 2024
c364858
update comment for addressState
poopoothegorilla Mar 6, 2024
44854b0
address comments
poopoothegorilla Mar 6, 2024
e992b20
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 6, 2024
751788e
change txAttempt to pointers in addressState
poopoothegorilla Mar 6, 2024
0f91f8c
unexport all addressState methods
poopoothegorilla Mar 6, 2024
0f266ef
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 7, 2024
a3cb0de
remove implicit for loop stuff
poopoothegorilla Mar 7, 2024
4831b0a
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 8, 2024
6e1e9bc
add check for max queue size
poopoothegorilla Mar 8, 2024
87b196c
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 8, 2024
abe7d1b
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 13, 2024
99cfc64
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 21, 2024
f470445
Merge remote-tracking branch 'origin/jtw/step-1-in-memory-work' into …
amit-momin Jun 27, 2024
b10e97e
Suppressed unused function warnings while adding framework
amit-momin Jun 27, 2024
c1d40af
Fixed linting error
amit-momin Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 288 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
package txmgr

import (
"sync"
"time"

"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/internal/queues"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// addressState is the state of all transactions for a given address.
// It holds information about all transactions for a given address, including unstarted, in-progress, unconfirmed, confirmed, and fatal errored transactions.
// It is designed to help transition transactions between states and to provide information about the current state of transactions for a given address.
type addressState[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
lggr logger.SugaredLogger
chainID CHAIN_ID
fromAddress ADDR

sync.RWMutex
idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
attemptHashToTxAttempt map[TX_HASH]*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
unstartedTxs *queues.TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
// NOTE: below each map's key is the transaction ID that is assigned via the persistent datastore
unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmedMissingReceiptTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
allTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

// newAddressState returns a new addressState instance with initialized transaction state
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func newAddressState[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](
lggr logger.SugaredLogger,
chainID CHAIN_ID,
fromAddress ADDR,
maxUnstarted uint64,
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
if maxUnstarted == 0 {
panic("new_address_state: MaxUnstarted queue size must be greater than 0")
}

// Count the number of transactions in each state to reduce the number of map resizes
counts := map[txmgrtypes.TxState]int{
TxUnstarted: 0,
TxInProgress: 0,
TxUnconfirmed: 0,
TxConfirmedMissingReceipt: 0,
TxConfirmed: 0,
TxFatalError: 0,
}
var idempotencyKeysCount int
var txAttemptCount int
for _, tx := range txs {
counts[tx.State]++
if tx.IdempotencyKey != nil {
idempotencyKeysCount++
}
if len(tx.TxAttempts) > 0 {
txAttemptCount += len(tx.TxAttempts)
}
}

as := addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
lggr: lggr,
chainID: chainID,
fromAddress: fromAddress,

idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount),
unstartedTxs: queues.NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](int(maxUnstarted)),
inprogressTx: nil,
unconfirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]),
confirmedMissingReceiptTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]),
confirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]),
allTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)),
fatalErroredTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]),
attemptHashToTxAttempt: make(map[TX_HASH]*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount),
}

// Load all transactions supplied
for i := 0; i < len(txs); i++ {
tx := txs[i]
switch tx.State {
case TxUnstarted:
as.unstartedTxs.AddTx(&tx)
case TxInProgress:
as.inprogressTx = &tx
case TxUnconfirmed:
as.unconfirmedTxs[tx.ID] = &tx
case TxConfirmedMissingReceipt:
as.confirmedMissingReceiptTxs[tx.ID] = &tx
case TxConfirmed:
as.confirmedTxs[tx.ID] = &tx
case TxFatalError:
as.fatalErroredTxs[tx.ID] = &tx
default:
panic("unknown transaction state")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prashantkumar1982 @poopoothegorilla does panic make sense here? I doubt we will ever hit this, but if by some miracle we do, wouldn't it be better to just drop the TX, log it, and proceed? Or is this error only possible through some sort of dev error that we want to catch ASAP?

}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
as.allTxs[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
}
}

return &as
}

// countTransactionsByState returns the number of transactions that are in the given state
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// applyToTxsByState calls the given function for each transaction in the given states.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
) {
}

// findTxAttempts returns all attempts for the given transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// findTxs returns all transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxs(
txStates []txmgrtypes.TxState,
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {
}

// deleteTxs removes the transactions with the given IDs from the address state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}

// moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(
etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
return nil
}

// moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed(
txID int64,
) error {
return nil
}

// moveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
return nil
}

// moveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmed(
receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
) error {
return nil
}

// moveTxToFatalError moves a transaction to the fatal error state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError(
txID int64, txError null.String,
) error {
return nil
}

// moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
return nil
}

// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
return nil
}

// moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state.
//
//lint:ignore U1000 Ignore unused function temporarily while adding the framework
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}
Loading