-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TXM In-memory: address_state methods: step 3-04 #12178
Changes from 14 commits
2af9ffa
1aa98d3
3c301dd
659b2fb
debfdc9
c3611e3
e8ae7f4
66be6d3
5153d24
f91d5ab
42c82ac
ef987f4
a65204a
b971ee6
b43bdd3
07a2760
f139fe6
fbd76d6
8d3960f
18b8eb3
5d473d8
433177a
afe6280
0833ddd
0583a9f
5af2106
09e2eac
82521c3
8f4d145
1cae6ff
f74f45a
051ebe2
cbacd55
4e12e66
da6d235
0cc19ea
39566e9
b8d09d8
53e0657
0693806
337f2e1
2c485aa
90f1726
9d59e02
d22d62c
6a0b5ad
ae2d267
991a5f1
52faeec
3381426
0f1c005
9ea64b8
26d432a
f9bd7e6
056eaf5
b634183
f2c2e0b
6deab0d
d4684a2
8b7af52
b03e6fb
661f877
516c11a
8ae9d72
3a93992
d2e3152
8e8aea5
9978e9b
a186281
7de69a4
6d7923d
493ffed
4a48dc4
33b085b
09e6d61
17fe719
d0f6376
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package txmgr | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -124,12 +125,12 @@ | |
} | ||
|
||
// countTransactionsByState returns the number of transactions that are in the given state | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int { | ||
return 0 | ||
} | ||
|
||
// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { | ||
return nil | ||
} | ||
|
||
|
@@ -137,11 +138,40 @@ | |
// If txIDs are provided, only the transactions with those IDs are considered. | ||
// If no txIDs are provided, all transactions in the given states are considered. | ||
// If no txStates are provided, all transactions are considered. | ||
// This method does not handle transactions in the UnstartedTx state. | ||
// Any transaction states that are unknown will cause a panic including UnstartedTx. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState( | ||
txStates []txmgrtypes.TxState, | ||
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), | ||
txIDs ...int64, | ||
) { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
// if txStates is empty then apply the filter to only the as.allTransactions map | ||
if len(txStates) == 0 { | ||
as._applyToTxs(as.allTxs, fn, txIDs...) | ||
return | ||
} | ||
|
||
for _, txState := range txStates { | ||
switch txState { | ||
case TxInProgress: | ||
if as.inprogressTx != nil { | ||
fn(as.inprogressTx) | ||
} | ||
case TxUnconfirmed: | ||
as._applyToTxs(as.unconfirmedTxs, fn, txIDs...) | ||
case TxConfirmedMissingReceipt: | ||
as._applyToTxs(as.confirmedMissingReceiptTxs, fn, txIDs...) | ||
case TxConfirmed: | ||
as._applyToTxs(as.confirmedTxs, fn, txIDs...) | ||
case TxFatalError: | ||
as._applyToTxs(as.fatalErroredTxs, fn, txIDs...) | ||
default: | ||
panic("apply_to_txs_by_state: unknown transaction state") | ||
} | ||
} | ||
} | ||
|
||
// findTxAttempts returns all attempts for the given transactions that match the given filters. | ||
|
@@ -149,7 +179,7 @@ | |
// If no txIDs are provided, all transactions are considered. | ||
// If no txStates are provided, all transactions are considered. | ||
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts( | ||
txStates []txmgrtypes.TxState, | ||
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, | ||
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, | ||
|
@@ -167,24 +197,56 @@ | |
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, | ||
txIDs ...int64, | ||
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { | ||
return nil | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
// if txStates is empty then apply the filter to only the as.allTransactions map | ||
if len(txStates) == 0 { | ||
return as._findTxs(as.allTxs, filter, txIDs...) | ||
poopoothegorilla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] | ||
for _, txState := range txStates { | ||
switch txState { | ||
case TxInProgress: | ||
if as.inprogressTx != nil && filter(as.inprogressTx) { | ||
txs = append(txs, *as.inprogressTx) | ||
} | ||
case TxUnconfirmed: | ||
txs = append(txs, as._findTxs(as.unconfirmedTxs, filter, txIDs...)...) | ||
case TxConfirmedMissingReceipt: | ||
txs = append(txs, as._findTxs(as.confirmedMissingReceiptTxs, filter, txIDs...)...) | ||
case TxConfirmed: | ||
txs = append(txs, as._findTxs(as.confirmedTxs, filter, txIDs...)...) | ||
case TxFatalError: | ||
txs = append(txs, as._findTxs(as.fatalErroredTxs, filter, txIDs...)...) | ||
default: | ||
panic("find_txs: unknown transaction state") | ||
} | ||
} | ||
|
||
return txs | ||
} | ||
|
||
// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { | ||
} | ||
|
||
// deleteTxs removes the transactions with the given IDs from the address state. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
as._deleteTxs(txs...) | ||
} | ||
|
||
// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { | ||
return nil, nil | ||
} | ||
|
||
// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { | ||
return nil, nil | ||
} | ||
|
||
|
@@ -224,23 +286,194 @@ | |
} | ||
|
||
// moveTxToFatalError moves a transaction to the fatal error state. | ||
// It returns an error if there is no transaction with the given ID. | ||
// It returns an error if the transaction is not in an expected state. | ||
// Unknown transaction states will cause a panic this includes Unconfirmed and Confirmed transactions. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError( | ||
txID int64, txError null.String, | ||
) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
tx := as.allTxs[txID] | ||
if tx == nil { | ||
return fmt.Errorf("move_tx_to_fatal_error: no transaction with ID %d", txID) | ||
} | ||
|
||
as._moveTxToFatalError(tx, txError) | ||
|
||
switch tx.State { | ||
case TxUnstarted: | ||
_ = as.unstartedTxs.RemoveTxByID(txID) | ||
case TxInProgress: | ||
as.inprogressTx = nil | ||
case TxConfirmedMissingReceipt: | ||
delete(as.confirmedMissingReceiptTxs, tx.ID) | ||
default: | ||
panic("move_tx_to_fatal_error: unknown transaction state") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could you include the state we panic on in the message for easier debugging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call out |
||
} | ||
|
||
return nil | ||
} | ||
|
||
// moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { | ||
// If there is no unconfirmed transaction with the given ID, an error is returned. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My paranoia is getting the better of me here. Should we be wary of accepting the entire txAttempt here and updating our state with this object? If someone inadvertently makes a change to the txAttempt before calling this function, we'd store those changes without validation. It might be safer just accepting the hash and looking it up in our internal map and making the changes on that object. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I noticed we append the attempt to the tx's attempt list. Wouldn't the attempt already exist in the list if it's an Unconfirmed tx? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fair point cleaned up a bit |
||
as.Lock() | ||
defer as.Unlock() | ||
|
||
tx, ok := as.unconfirmedTxs[txAttempt.TxID] | ||
if !ok || tx == nil { | ||
return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no unconfirmed transaction with ID %d", txAttempt.TxID) | ||
} | ||
if len(tx.TxAttempts) == 0 { | ||
return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no attempts for transaction with ID %d", txAttempt.TxID) | ||
} | ||
if tx.BroadcastAt.Before(broadcastAt) { | ||
tx.BroadcastAt = &broadcastAt | ||
} | ||
tx.State = TxConfirmedMissingReceipt | ||
txAttempt.State = txmgrtypes.TxAttemptBroadcast | ||
tx.TxAttempts = append(tx.TxAttempts, txAttempt) | ||
|
||
as.confirmedMissingReceiptTxs[tx.ID] = tx | ||
delete(as.unconfirmedTxs, tx.ID) | ||
|
||
return nil | ||
} | ||
|
||
// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { | ||
// If there is no in-progress transaction, an error is returned. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
tx := as.inprogressTx | ||
if tx == nil { | ||
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction in progress") | ||
} | ||
if len(tx.TxAttempts) == 0 { | ||
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no attempts for transaction with ID %d", tx.ID) | ||
} | ||
if tx.BroadcastAt.Before(broadcastAt) { | ||
tx.BroadcastAt = &broadcastAt | ||
} | ||
tx.State = TxConfirmedMissingReceipt | ||
txAttempt.State = txmgrtypes.TxAttemptBroadcast | ||
tx.TxAttempts = append(tx.TxAttempts, txAttempt) | ||
|
||
as.confirmedMissingReceiptTxs[tx.ID] = tx | ||
as.inprogressTx = nil | ||
|
||
return nil | ||
} | ||
|
||
// moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { | ||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
if txAttempt.State != txmgrtypes.TxAttemptBroadcast { | ||
return fmt.Errorf("move_confirmed_to_unconfirmed: attempt must be in broadcast state") | ||
} | ||
|
||
tx, ok := as.confirmedTxs[txAttempt.TxID] | ||
if !ok || tx == nil { | ||
return fmt.Errorf("move_confirmed_to_unconfirmed: no confirmed transaction with ID %d", txAttempt.TxID) | ||
} | ||
if len(tx.TxAttempts) == 0 { | ||
return fmt.Errorf("move_confirmed_to_unconfirmed: no attempts for transaction with ID %d", txAttempt.TxID) | ||
} | ||
tx.State = TxUnconfirmed | ||
|
||
// Delete the receipt from the attempt | ||
txAttempt.Receipts = nil | ||
// Reset the broadcast information for the attempt | ||
txAttempt.State = txmgrtypes.TxAttemptInProgress | ||
txAttempt.BroadcastBeforeBlockNum = nil | ||
tx.TxAttempts = append(tx.TxAttempts, txAttempt) | ||
|
||
as.unconfirmedTxs[tx.ID] = tx | ||
delete(as.confirmedTxs, tx.ID) | ||
|
||
return nil | ||
} | ||
|
||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs( | ||
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], | ||
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), | ||
txIDs ...int64, | ||
) { | ||
// if txIDs is not empty then only apply the filter to those transactions | ||
if len(txIDs) > 0 { | ||
for _, txID := range txIDs { | ||
tx := txIDsToTx[txID] | ||
if tx != nil { | ||
fn(tx) | ||
} | ||
} | ||
return | ||
} | ||
|
||
// if txIDs is empty then apply the filter to all transactions | ||
for _, tx := range txIDsToTx { | ||
fn(tx) | ||
} | ||
} | ||
|
||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findTxs( | ||
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], | ||
filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, | ||
txIDs ...int64, | ||
) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { | ||
var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] | ||
// if txIDs is not empty then only apply the filter to those transactions | ||
if len(txIDs) > 0 { | ||
for _, txID := range txIDs { | ||
tx := txIDsToTx[txID] | ||
if tx != nil && filter(tx) { | ||
txs = append(txs, *tx) | ||
} | ||
} | ||
return txs | ||
} | ||
|
||
// if txIDs is empty then apply the filter to all transactions | ||
for _, tx := range txIDsToTx { | ||
if filter(tx) { | ||
txs = append(txs, *tx) | ||
} | ||
} | ||
|
||
return txs | ||
} | ||
|
||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { | ||
for _, tx := range txs { | ||
if tx.IdempotencyKey != nil { | ||
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) | ||
} | ||
txID := tx.ID | ||
if as.inprogressTx != nil && as.inprogressTx.ID == txID { | ||
as.inprogressTx = nil | ||
} | ||
delete(as.allTxs, txID) | ||
delete(as.unconfirmedTxs, txID) | ||
delete(as.confirmedMissingReceiptTxs, txID) | ||
delete(as.confirmedTxs, txID) | ||
delete(as.fatalErroredTxs, txID) | ||
as.unstartedTxs.RemoveTxByID(txID) | ||
} | ||
} | ||
|
||
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveTxToFatalError( | ||
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], | ||
txError null.String, | ||
) { | ||
tx.State = TxFatalError | ||
tx.Sequence = nil | ||
tx.TxAttempts = nil | ||
tx.InitialBroadcastAt = nil | ||
tx.Error = txError | ||
as.fatalErroredTxs[tx.ID] = tx | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you are applying this func to allTxs, but in above comment on the function, you said that if applied to UnstartedTx, then this will panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeh i think we may want to think about this in the future, but I will the capability