Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: address_state methods: step 3-03 #12179

Closed
wants to merge 50 commits into from
Closed
Changes from 10 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c740102
implement MoveUnstartedToInProgress
poopoothegorilla Feb 23, 2024
8f538a0
implement MoveConfirmedMissingReceiptToUnconfirmed
poopoothegorilla Feb 23, 2024
13f3959
implement MoveInProgressToUnconfirmed
poopoothegorilla Feb 23, 2024
feabfb7
update attemptHashToTxAttempt map when moving unstarted to inprogress
poopoothegorilla Feb 23, 2024
105ec2d
implement MoveUnconfirmedToConfirmed
poopoothegorilla Feb 23, 2024
54644db
implement MoveUnstartedToFatalError
poopoothegorilla Feb 23, 2024
99a0839
update comments
poopoothegorilla Feb 23, 2024
e5ed75e
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-03
poopoothegorilla Feb 27, 2024
fd8fb43
implement UpdateTxUnstartedToInProgress
poopoothegorilla Feb 27, 2024
8aebf02
implement UpdateTxAttemptInProgressToBroadcast
poopoothegorilla Feb 27, 2024
c038b3d
revert MoveUnstartedToFatalError and move to another branch
poopoothegorilla Feb 27, 2024
063bd25
implement UpdateTxsUnconfirmed
poopoothegorilla Feb 27, 2024
b932bd2
implement SaveFetchedReceipts
poopoothegorilla Feb 27, 2024
a77c3bf
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-03
poopoothegorilla Mar 7, 2024
d320f2c
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-03
poopoothegorilla Mar 12, 2024
42017d6
remove old comment
poopoothegorilla Mar 12, 2024
43f8f9d
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-unstarted-t…
poopoothegorilla Mar 12, 2024
b60751b
fix merge conflicts
poopoothegorilla Mar 12, 2024
dd82fd2
add basic test framework
poopoothegorilla Mar 12, 2024
4926aaa
add findTxs
poopoothegorilla Mar 12, 2024
2a8c010
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-unstarted-t…
poopoothegorilla Mar 12, 2024
5a76c07
implement tests for UpdateTxUnstartedToInProgress
poopoothegorilla Mar 12, 2024
8e4887a
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-txs-unconfirmed
poopoothegorilla Mar 12, 2024
8be0b28
implement tests for UpdateTxsUnconfirmed
poopoothegorilla Mar 12, 2024
ca8ed94
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-attempt-in-…
poopoothegorilla Mar 12, 2024
65de839
fix merge conflicts
poopoothegorilla Mar 12, 2024
3e18438
implement testing for UpdateTxAttemptInProgressToBroadcast
poopoothegorilla Mar 13, 2024
c6deb3f
Merge branch 'jtw/step-3-03' into jtw/step-3-03-save-fetched-receipts
poopoothegorilla Mar 13, 2024
fb54bf8
implement tests for SaveFetchReceipts
poopoothegorilla Mar 13, 2024
14b935c
Merge branch 'jtw/step-3-in-memory-work' into jtw/step-3-03
poopoothegorilla Mar 21, 2024
5dfe080
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-attempt-in-…
poopoothegorilla Mar 25, 2024
4ccf558
clean up merge conflicts
poopoothegorilla Mar 25, 2024
30d8146
implement tests for UpdateTxAttemptInProgressToBroadcast
poopoothegorilla Mar 25, 2024
9831e0f
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-unstarted-t…
poopoothegorilla Apr 1, 2024
0931887
cleanup and address comments
poopoothegorilla Apr 1, 2024
48ee5fb
fix context
poopoothegorilla Apr 1, 2024
eced7c7
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-txs-unconfirmed
poopoothegorilla Apr 3, 2024
e9aaffe
clean up context in tests
poopoothegorilla Apr 3, 2024
289e524
Merge pull request #12228 from smartcontractkit/jtw/step-3-03-update-…
poopoothegorilla Apr 3, 2024
bd22a27
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-attempt-in-…
poopoothegorilla Apr 3, 2024
2e6c50c
Merge pull request #12225 from smartcontractkit/jtw/step-3-03-update-…
poopoothegorilla Apr 3, 2024
999d23f
Merge branch 'jtw/step-3-03' into jtw/step-3-03-save-fetched-receipts
poopoothegorilla Apr 3, 2024
bdabb35
small cleanup
poopoothegorilla Apr 3, 2024
85290d1
fix issue where tx is not moved to confirmed from unconfirmed
poopoothegorilla Apr 3, 2024
37dc31b
clean up naming
poopoothegorilla Apr 3, 2024
24beab3
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-unstarted-t…
poopoothegorilla Apr 4, 2024
fa0ff89
Merge pull request #12229 from smartcontractkit/jtw/step-3-03-save-fe…
poopoothegorilla Apr 4, 2024
9b12c13
add check for if the transaction exists in memory
poopoothegorilla Apr 4, 2024
698bbf8
Merge branch 'jtw/step-3-03' into jtw/step-3-03-update-tx-unstarted-t…
poopoothegorilla Apr 4, 2024
a4b6104
Merge pull request #12217 from smartcontractkit/jtw/step-3-03-update-…
poopoothegorilla Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 95 additions & 4 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txmgr

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -124,12 +125,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 128 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 133 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -137,7 +138,7 @@
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 141 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand All @@ -149,7 +150,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 153 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand All @@ -171,55 +172,145 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 175 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 179 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 183 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 188 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {

Check failure on line 193 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).addTxToUnstarted is unused (unused)
return nil
}

// moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state.
// The supplied txAttempt is added to the transaction.
// It returns an error if there is already a transaction in progress.
// It returns an error if there is no unstarted transaction to move to in_progress.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(

Check failure on line 201 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).moveUnstartedToInProgress is unused (unused)
etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txID int64, seq SEQ, broadcastAt time.Time, initialBroadcastAt time.Time,
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
defer as.Unlock()

if as.inprogressTx != nil {
return fmt.Errorf("move_unstarted_to_in_progress: address %s already has a transaction in progress", as.fromAddress)
}

tx := as.unstartedTxs.RemoveTxByID(txID)
if tx == nil {
return fmt.Errorf("move_unstarted_to_in_progress: no unstarted transaction to move to in_progress")
}
tx.State = TxInProgress
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt}
tx.Sequence = &seq
tx.BroadcastAt = &broadcastAt
tx.InitialBroadcastAt = &initialBroadcastAt

as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
as.inprogressTx = tx

return nil
}

// moveConfirmedMissingReceiptToUnconfirmed moves the confirmed missing receipt transaction to the unconfirmed state.
// It returns an error if there is no confirmed missing receipt transaction with the given ID.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedMissingReceiptToUnconfirmed(
txID int64,
) error {
as.Lock()
defer as.Unlock()

tx, ok := as.confirmedMissingReceiptTxs[txID]
if !ok || tx == nil {
return fmt.Errorf("move_confirmed_missing_receipt_to_unconfirmed: no confirmed_missing_receipt transaction with ID %d", txID)
}

tx.State = TxUnconfirmed
as.unconfirmedTxs[tx.ID] = tx
delete(as.confirmedMissingReceiptTxs, tx.ID)

return nil
}

// moveInProgressToUnconfirmed moves the in-progress transaction to the unconfirmed state.
// It returns an error if there is no transaction in progress.
// It returns an error if the transaction in progress does not have an attempt with the given ID.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txError null.String, broadcastAt time.Time, initialBroadcastAt time.Time,
txAttemptID int64,
) error {
as.Lock()
defer as.Unlock()

tx := as.inprogressTx
if tx == nil {
return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress")
}

found := false
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
if txAttempt.ID == txAttemptID {
txAttempt.State = txmgrtypes.TxAttemptBroadcast
tx.TxAttempts[i] = txAttempt
found = true
break
}
}
if !found {
return fmt.Errorf("move_in_progress_to_unconfirmed: transaction in progress does not have an attempt with ID %d", txAttemptID)
}
amit-momin marked this conversation as resolved.
Show resolved Hide resolved

tx.State = TxUnconfirmed
tx.Error = txError
tx.BroadcastAt = &broadcastAt
tx.InitialBroadcastAt = &initialBroadcastAt

as.unconfirmedTxs[tx.ID] = tx
as.inprogressTx = nil

return nil
}

// moveUnconfirmedToConfirmed moves the unconfirmed transaction to the confirmed state.
// It returns an error if there is no unconfirmed transaction with the given ID.
// It returns an error if there is no attempt with the given receipt.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmed(
receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
) error {
as.Lock()
defer as.Unlock()

txAttempt, ok := as.attemptHashToTxAttempt[receipt.GetTxHash()]
if !ok {
return fmt.Errorf("move_unconfirmed_to_confirmed: no attempt found with receipt %v", receipt)
}
// TODO(jtw): not sure how to set blocknumber, transactionindex, and receipt on conflict
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
txAttempt.Receipts = []txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]{receipt}
tx, ok := as.unconfirmedTxs[txAttempt.TxID]
if !ok {
return fmt.Errorf("move_unconfirmed_to_confirmed: no unconfirmed transaction with ID %d", txAttempt.TxID)
}
txAttempt.State = txmgrtypes.TxAttemptBroadcast
if txAttempt.BroadcastBeforeBlockNum == nil {
blockNum := receipt.GetBlockNumber().Int64()
txAttempt.BroadcastBeforeBlockNum = &blockNum
}
tx.State = TxConfirmed

return nil
}

Expand Down
Loading