From 5153d24ab572d357831674d23e142c38397ead4c Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 14:47:25 -0500 Subject: [PATCH 1/4] implement SaveConfirmedMissingReceiptAttempts --- common/txmgr/inmemory_store.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..481e41dee4e 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -314,7 +314,23 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { - return nil + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrAddressNotFound) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected state to be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + return as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt) } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil From b6341838743b8aed5aa7715c6714abde9b8d5ebc Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 25 Mar 2024 21:40:58 -0400 Subject: [PATCH 2/4] implement tests for SaveConfirmedMissingReceiptAttempt --- common/txmgr/address_state.go | 34 ++++++++----- common/txmgr/inmemory_store.go | 22 +++++++-- .../evm/txmgr/evm_inmemory_store_test.go | 48 +++++++++++++++++++ 3 files changed, 88 insertions(+), 16 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 1c0d7cbfb66..bb9bea2ea2f 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -355,28 +355,40 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn return nil } -// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. -// If there is no in-progress transaction, an error is returned. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// moveInProgressToConfirmedMissingReceipt moves transaction to the confirmed missing receipt state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { as.Lock() defer as.Unlock() - tx := as.inprogressTx - if tx == nil { - return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction in progress") + tx, ok := as.allTxs[txAttempt.TxID] + if !ok { + return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction with ID %d", txAttempt.TxID) } if len(tx.TxAttempts) == 0 { return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no attempts for transaction with ID %d", tx.ID) } - if tx.BroadcastAt.Before(broadcastAt) { + if tx.BroadcastAt != nil && tx.BroadcastAt.Before(broadcastAt) { tx.BroadcastAt = &broadcastAt } - tx.State = TxConfirmedMissingReceipt - txAttempt.State = txmgrtypes.TxAttemptBroadcast - tx.TxAttempts = append(tx.TxAttempts, txAttempt) + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + attempt := tx.TxAttempts[i] + attempt.State = txmgrtypes.TxAttemptBroadcast + } + } + + switch tx.State { + case TxInProgress: + as.inprogressTx = nil + case TxUnconfirmed: + delete(as.unconfirmedTxs, tx.ID) + default: + panic(fmt.Sprintf("unknown transaction state: %q", tx.State)) + } + + tx.State = TxConfirmedMissingReceipt as.confirmedMissingReceiptTxs[tx.ID] = tx - as.inprogressTx = nil return nil } diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 47819231b44..b1e276336e1 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -333,21 +333,33 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() - as, ok := ms.addressStates[attempt.Tx.FromAddress] - if !ok { - return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrAddressNotFound) - } + if attempt.State != txmgrtypes.TxAttemptInProgress { return fmt.Errorf("expected state to be in_progress") } + var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { return true } + for _, vas := range ms.addressStates { + txs := vas.findTxs(nil, filter, attempt.TxID) + if len(txs) != 0 { + tx = &txs[0] + as = vas + break + } + } + if tx == nil { + return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrTxnNotFound) + } + // Persist to persistent storage if err := ms.persistentTxStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt); err != nil { return err } // Update in memory store - return as.moveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt) + return as.moveTxToConfirmedMissingReceipt(*attempt, broadcastAt) } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 217da333005..82152100acb 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -22,6 +22,54 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_SaveConfirmedMissingReceiptAttempt(t *testing.T) { + t.Parallel() + + t.Run("updates attempt to 'broadcast' and transaction to 'confirm_missing_receipt'", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertUnconfirmedEthTxWithAttemptState(t, persistentStore, 1, fromAddress, txmgrtypes.TxAttemptInProgress) + now := time.Now() + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + defaultDuration := 5 * time.Second + err = inMemoryStore.SaveConfirmedMissingReceiptAttempt(ctx, defaultDuration, &inTx.TxAttempts[0], now) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + assert.Equal(t, commontxmgr.TxConfirmedMissingReceipt, actTx.State) + }) +} + func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { t.Parallel() blockNum := int64(10) From f2c2e0b0a6d95f3efdbfd10553e2537ec23931b6 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 25 Mar 2024 22:01:27 -0400 Subject: [PATCH 3/4] update --- common/txmgr/address_state.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index bb9bea2ea2f..9ff5967b0a1 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -377,18 +377,10 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTx attempt.State = txmgrtypes.TxAttemptBroadcast } } - - switch tx.State { - case TxInProgress: - as.inprogressTx = nil - case TxUnconfirmed: - delete(as.unconfirmedTxs, tx.ID) - default: - panic(fmt.Sprintf("unknown transaction state: %q", tx.State)) - } - tx.State = TxConfirmedMissingReceipt + as.confirmedMissingReceiptTxs[tx.ID] = tx + delete(as.unconfirmedTxs, tx.ID) return nil } From 7de69a4f44c23dbea1b55aed08d06709b6f6ddcc Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 3 Apr 2024 16:12:32 -0400 Subject: [PATCH 4/4] clean up --- common/txmgr/address_state.go | 46 ++++++++++++++++++++-------------- common/txmgr/inmemory_store.go | 12 +++------ 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index ff76bdc6daf..c35ab05b6de 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -420,40 +420,40 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn if !ok || tx == nil { return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no unconfirmed transaction with ID %d", txID) } - tx.State = TxConfirmedMissingReceipt - - as.confirmedMissingReceiptTxs[tx.ID] = tx - delete(as.unconfirmedTxs, tx.ID) + as._moveUnconfirmedToConfirmedMissingReceipt(tx) return nil } -// moveInProgressToConfirmedMissingReceipt moves transaction to the confirmed missing receipt state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// moveTxAttemptToBroadcast moves the transaction attempt to the broadcast state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxAttemptToBroadcast( + txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + broadcastAt time.Time, +) error { as.Lock() defer as.Unlock() tx, ok := as.allTxs[txAttempt.TxID] - if !ok { + if !ok || tx == nil { return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction with ID %d", txAttempt.TxID) } - if len(tx.TxAttempts) == 0 { - return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no attempts for transaction with ID %d", tx.ID) - } - if tx.BroadcastAt != nil && tx.BroadcastAt.Before(broadcastAt) { - tx.BroadcastAt = &broadcastAt - } + var found bool for i := 0; i < len(tx.TxAttempts); i++ { if tx.TxAttempts[i].ID == txAttempt.ID { - attempt := tx.TxAttempts[i] - attempt.State = txmgrtypes.TxAttemptBroadcast + tx.TxAttempts[i].State = txmgrtypes.TxAttemptBroadcast + as.attemptHashToTxAttempt[txAttempt.Hash] = &tx.TxAttempts[i] + found = true + break } } - tx.State = TxConfirmedMissingReceipt - - as.confirmedMissingReceiptTxs[tx.ID] = tx - delete(as.unconfirmedTxs, tx.ID) + if !found { + return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: transaction with ID %d has no attempt with ID: %q", txAttempt.TxID, txAttempt.ID) + } + if tx.BroadcastAt != nil && tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + as._moveUnconfirmedToConfirmedMissingReceipt(tx) return nil } @@ -582,3 +582,11 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveT tx.Error = txError as.fatalErroredTxs[tx.ID] = tx } + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveUnconfirmedToConfirmedMissingReceipt( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) { + tx.State = TxConfirmedMissingReceipt + as.confirmedMissingReceiptTxs[tx.ID] = tx + delete(as.unconfirmedTxs, tx.ID) +} diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index ee188340a49..784009784ba 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -384,19 +384,15 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC return fmt.Errorf("expected state to be in_progress") } - var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { return true } for _, vas := range ms.addressStates { - txs := vas.findTxs(nil, filter, attempt.TxID) - if len(txs) != 0 { - tx = &txs[0] + if vas.hasTx(attempt.TxID) { as = vas break } } - if tx == nil { - return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrTxnNotFound) + if as == nil { + return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w: %q", ErrTxnNotFound, attempt.TxID) } // Persist to persistent storage @@ -405,7 +401,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC } // Update in memory store - return as.moveTxToConfirmedMissingReceipt(*attempt, broadcastAt) + return as.moveTxAttemptToBroadcast(*attempt, broadcastAt) } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil