diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 1551035da0e..854f378e4c0 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -22,9 +22,7 @@ import ( // BIG TODO LIST // TODO: make sure that all state transitions are handled by the address state to ensure that the in-memory store is always in a consistent state // TODO: figure out if multiple tx attempts are actually stored in the db for each tx -// TODO: check that txns are deep copied when returned from the in-memory store // TODO: need a way to get id for a tx attempt. since there are some methods where the persistent store creates a tx attempt and doesnt returns it -// TODO: make sure all address states are locked when updating the in-memory store var ( // ErrInvalidChainID is returned when the chain ID is invalid @@ -127,6 +125,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat if ms.chainID.String() != chainID.String() { return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[tx.FromAddress] if !ok { return tx, fmt.Errorf("create_transaction: %w", ErrAddressNotFound) @@ -182,6 +183,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Check if ms.chainID.String() != chainID.String() { return fmt.Errorf("check_tx_queue_capacity: %w", ErrInvalidChainID) } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return fmt.Errorf("check_tx_queue_capacity: %w", ErrAddressNotFound) @@ -208,6 +212,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindL if ms.chainID.String() != chainID.String() { return seq, fmt.Errorf("find_latest_sequence: %w", ErrInvalidChainID) } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return seq, fmt.Errorf("find_latest_sequence: %w", ErrAddressNotFound) @@ -228,6 +235,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count if ms.chainID.String() != chainID.String() { return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrInvalidChainID) } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) @@ -243,6 +253,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count if ms.chainID.String() != chainID.String() { return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrInvalidChainID) } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) @@ -267,6 +280,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat if attempt.State != txmgrtypes.TxAttemptInProgress { return fmt.Errorf("update_tx_unstarted_to_in_progress: attempt state must be in_progress") } + + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[tx.FromAddress] if !ok { return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", ErrAddressNotFound) @@ -289,6 +305,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // GetTxInProgress returns the in_progress transaction for a given address. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return nil, fmt.Errorf("get_tx_in_progress: %w", ErrAddressNotFound) @@ -331,6 +349,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: new attempt state must be broadcast, got: %s", newAttemptState) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[tx.FromAddress] if !ok { return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", ErrAddressNotFound) @@ -354,6 +374,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindN if ms.chainID.String() != chainID.String() { return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrAddressNotFound) @@ -386,6 +408,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR return fmt.Errorf("save_replacement_in_progress_attempt: expected oldattempt to have an ID") } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[oldAttempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrAddressNotFound) @@ -415,6 +439,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat return fmt.Errorf("update_tx_fatal_error: expected error field to be set") } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[tx.FromAddress] if !ok { return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) @@ -466,6 +492,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Aband } // check that the address exists in the unstarted transactions + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[addr] if !ok { return fmt.Errorf("abandon: %w", ErrAddressNotFound) @@ -498,6 +526,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBr } } } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { as.ApplyToTxs(nil, fn) } @@ -519,6 +549,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } states := []txmgrtypes.TxState{TxConfirmedMissingReceipt} attempts := []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { attempts = append(attempts, as.FetchTxAttempts(states, filter)...) } @@ -550,6 +582,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { as.ApplyToTxs(nil, fn, txIDs...) } @@ -565,6 +599,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // Update in memory store + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() wg := sync.WaitGroup{} for _, as := range ms.addressStates { wg.Add(1) @@ -601,6 +637,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } states := []txmgrtypes.TxState{TxUnconfirmed, TxConfirmedMissingReceipt} attempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { attempts = append(attempts, as.FetchTxAttempts(states, filterFn)...) } @@ -646,6 +684,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } states := []txmgrtypes.TxState{TxConfirmed} txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { txs = append(txs, as.FetchTxs(states, filterFn)...) } @@ -689,7 +729,10 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } } wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { + wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { as.ApplyToTxs(nil, fn) wg.Done() @@ -714,6 +757,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveF errsLock := sync.Mutex{} var errs error wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -757,6 +802,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT txsLock := sync.Mutex{} txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -795,6 +842,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT txsLock := sync.Mutex{} txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -844,6 +893,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT txsLock := sync.Mutex{} txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -877,6 +928,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT txsLock := sync.Mutex{} txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -909,6 +962,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune return tx.Subject.UUID == subject } var m int + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { m += as.PruneUnstartedTxQueue(queueSize, filter) } @@ -957,6 +1012,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapT } wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -970,6 +1027,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapT return tx.State == TxFatalError && tx.CreatedAt.Before(timeThreshold) } states = []txmgrtypes.TxState{TxFatalError} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -987,6 +1046,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count } var total int + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { total += as.CountTransactionsByState(state) } @@ -1003,6 +1064,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet } // Check if fromaddress enabled + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempt.Tx.FromAddress] if !ok { return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound) @@ -1031,6 +1094,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrInvalidChainID) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[address] if !ok { return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrAddressNotFound) @@ -1064,6 +1129,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrInvalidChainID) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[address] if !ok { return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrAddressNotFound) @@ -1104,6 +1171,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[fromAddress] if !ok { return nil, fmt.Errorf("find_tx_with_sequence: %w", ErrAddressNotFound) @@ -1151,6 +1220,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT txsLock := sync.Mutex{} txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -1186,6 +1257,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE txsLock := sync.Mutex{} txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -1224,6 +1297,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE txsLock := sync.Mutex{} txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -1250,6 +1325,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetIn return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrInvalidChainID) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[address] if !ok { return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrAddressNotFound) @@ -1285,6 +1362,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNo txsLock := sync.Mutex{} txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} wg := sync.WaitGroup{} + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -1308,6 +1387,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTx filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { return tx.ID == id } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { txs := as.FetchTxs(nil, filter, id) if len(txs) > 0 { @@ -1324,6 +1405,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasIn return false, fmt.Errorf("has_in_progress_transaction: %w", ErrInvalidChainID) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[account] if !ok { return false, fmt.Errorf("has_in_progress_transaction: %w", ErrAddressNotFound) @@ -1335,6 +1418,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasIn } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[etx.FromAddress] if !ok { return fmt.Errorf("load_tx_attempts: %w", ErrAddressNotFound) @@ -1358,6 +1443,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo return nil } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempts[0].Tx.FromAddress] if !ok { return fmt.Errorf("preload_txes: %w", ErrAddressNotFound) @@ -1382,6 +1469,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrAddressNotFound) @@ -1403,6 +1492,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_in_progress_attempt: %w", ErrAddressNotFound) @@ -1437,6 +1528,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveI return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_insufficient_funds_attempt: %w", ErrAddressNotFound) @@ -1469,6 +1562,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveI return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[attempt.Tx.FromAddress] if !ok { return fmt.Errorf("save_sent_attempt: %w", ErrAddressNotFound) @@ -1502,6 +1597,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveS return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[etx.FromAddress] if !ok { return fmt.Errorf("update_tx_for_rebroadcast: %w", ErrAddressNotFound) @@ -1541,6 +1638,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxF return attempt.Receipts[0].GetBlockNumber().Int64() <= (blockHeight - int64(tx.MinConfirmations.Uint32)) } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { txas := as.FetchTxAttempts(nil, fn, txID) if len(txas) > 0 { @@ -1559,6 +1658,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT return nil, nil } + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[address] if !ok { return nil, fmt.Errorf("find_txs_requiring_gas_bump: %w", ErrAddressNotFound) @@ -1616,6 +1717,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkA wg := sync.WaitGroup{} errsLock := sync.Mutex{} var errs error + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { @@ -1672,6 +1775,8 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkO wg := sync.WaitGroup{} errsLock := sync.Mutex{} var errs error + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {