Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: add address state framework : STEP 2 #12122

Closed
Closed
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8e329ed
add address state
poopoothegorilla Feb 21, 2024
b69865e
change to better naming scheme and remove old TODOs
poopoothegorilla Feb 23, 2024
36574fa
change naming of transactions from Txn to Tx to match the struct
poopoothegorilla Feb 23, 2024
788d9b7
goimports
poopoothegorilla Feb 23, 2024
885cfb7
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Feb 29, 2024
25186b5
fix incorrect note
poopoothegorilla Feb 29, 2024
e0d9939
change naming from fetch to find
poopoothegorilla Feb 29, 2024
cd5f156
remove empty line
poopoothegorilla Feb 29, 2024
dcb01d9
update MoveTxToFatalError
poopoothegorilla Feb 29, 2024
777d53a
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 6, 2024
c364858
update comment for addressState
poopoothegorilla Mar 6, 2024
44854b0
address comments
poopoothegorilla Mar 6, 2024
e992b20
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 6, 2024
751788e
change txAttempt to pointers in addressState
poopoothegorilla Mar 6, 2024
0f91f8c
unexport all addressState methods
poopoothegorilla Mar 6, 2024
0f266ef
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 7, 2024
a3cb0de
remove implicit for loop stuff
poopoothegorilla Mar 7, 2024
4831b0a
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 8, 2024
6e1e9bc
add check for max queue size
poopoothegorilla Mar 8, 2024
87b196c
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 8, 2024
abe7d1b
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 13, 2024
99cfc64
Merge branch 'jtw/step-1-in-memory-work' into jtw/step-2-in-memory-work
poopoothegorilla Mar 21, 2024
f470445
Merge remote-tracking branch 'origin/jtw/step-1-in-memory-work' into …
amit-momin Jun 27, 2024
b10e97e
Suppressed unused function warnings while adding framework
amit-momin Jun 27, 2024
c1d40af
Fixed linting error
amit-momin Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package txmgr

import (
"sync"
"time"

"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// AddressState is the state of all transactions for a given address
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
type AddressState[
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
lggr logger.SugaredLogger
chainID CHAIN_ID
fromAddress ADDR

sync.RWMutex
idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
unstartedTxs *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
inprogressTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
// NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
unconfirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmedMissingReceiptTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmedTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
allTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
fatalErroredTxs map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

// NewAddressState returns a new AddressState instance with initialized transaction state
func NewAddressState[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](
lggr logger.SugaredLogger,
chainID CHAIN_ID,
fromAddress ADDR,
maxUnstarted int,
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) {
// Count the number of transactions in each state to reduce the number of map resizes
counts := map[txmgrtypes.TxState]int{
TxUnstarted: 0,
TxInProgress: 0,
TxUnconfirmed: 0,
TxConfirmedMissingReceipt: 0,
TxConfirmed: 0,
TxFatalError: 0,
}
var idempotencyKeysCount int
var txAttemptCount int
for _, tx := range txs {
counts[tx.State]++
if tx.IdempotencyKey != nil {
idempotencyKeysCount++
}
if tx.State == TxUnconfirmed {
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
txAttemptCount += len(tx.TxAttempts)
}
}

as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
lggr: lggr,
chainID: chainID,
fromAddress: fromAddress,

idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount),
unstartedTxs: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted),
inprogressTx: nil,
unconfirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]),
confirmedMissingReceiptTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]),
confirmedTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]),
allTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)),
fatalErroredTxs: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]),
attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount),
}

// Load all transactions supplied
for i := 0; i < len(txs); i++ {
tx := txs[i]
switch tx.State {
case TxUnstarted:
as.unstartedTxs.AddTx(&tx)
case TxInProgress:
as.inprogressTx = &tx
case TxUnconfirmed:
as.unconfirmedTxs[tx.ID] = &tx
case TxConfirmedMissingReceipt:
as.confirmedMissingReceiptTxs[tx.ID] = &tx
case TxConfirmed:
as.confirmedTxs[tx.ID] = &tx
case TxFatalError:
as.fatalErroredTxs[tx.ID] = &tx
}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
as.allTxs[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
for _, txAttempt := range tx.TxAttempts {
as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt
}
}

return &as, nil
}

// CountTransactionsByState returns the number of transactions that are in the given state
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int {
return 0
}

// FindTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// ApplyToTxsByState calls the given function for each transaction in the given states.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState(
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
) {
}

// FetchTxAttempts returns all attempts for the given transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxAttempts(
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// FetchTxs returns all transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxs(
txStates []txmgrtypes.TxState,
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txIDs ...int64,
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
return nil
}

// PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) {
}

// DeleteTxs removes the transactions with the given IDs from the address state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
}

// PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
}

// PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
}

// AddTxToUnstarted adds the given transaction to the unstarted queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}

// MoveUnstartedToInProgress moves the next unstarted transaction to the in-progress state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress(
etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
return nil
}

// MoveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed(
txID int64,
) error {
return nil
}

// MoveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
return nil
}

// MoveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed(
receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
) error {
return nil
}

// MoveUnstartedToFatalError moves the unstarted transaction to the fatal error state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txError null.String,
) error {
return nil
}

// MoveInProgressToFatalError moves the in-progress transaction to the fatal error state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToFatalError(txError null.String) error {
return nil
}

// MoveConfirmedMissingReceiptToFatalError moves the confirmed missing receipt transaction to the fatal error state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txError null.String,
) error {
return nil
}
amit-momin marked this conversation as resolved.
Show resolved Hide resolved

// MoveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
return nil
}

// MoveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
return nil
}
amit-momin marked this conversation as resolved.
Show resolved Hide resolved

// MoveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}
Loading