Skip to content

Commit

Permalink
Merge pull request #12230 from smartcontractkit/jtw/step-3-04-save-co…
Browse files Browse the repository at this point in the history
…nfirmed-missing-receipt-attempt

TXM In-memory: 3-04-SaveConfirmedMissingReceiptAttempts
  • Loading branch information
poopoothegorilla authored Apr 4, 2024
2 parents 09e6d61 + 17fe719 commit d0f6376
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 20 deletions.
50 changes: 31 additions & 19 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,36 +449,40 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn
if !ok || tx == nil {
return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no unconfirmed transaction with ID %d", txID)
}
tx.State = TxConfirmedMissingReceipt

as.confirmedMissingReceiptTxs[tx.ID] = tx
delete(as.unconfirmedTxs, tx.ID)
as._moveUnconfirmedToConfirmedMissingReceipt(tx)

return nil
}

// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state.
// If there is no in-progress transaction, an error is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
// moveTxAttemptToBroadcast moves the transaction attempt to the broadcast state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxAttemptToBroadcast(
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
broadcastAt time.Time,
) error {
as.Lock()
defer as.Unlock()

tx := as.inprogressTx
if tx == nil {
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction in progress")
tx, ok := as.allTxs[txAttempt.TxID]
if !ok || tx == nil {
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction with ID %d", txAttempt.TxID)
}
if len(tx.TxAttempts) == 0 {
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no attempts for transaction with ID %d", tx.ID)

var found bool
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
tx.TxAttempts[i].State = txmgrtypes.TxAttemptBroadcast
as.attemptHashToTxAttempt[txAttempt.Hash] = &tx.TxAttempts[i]
found = true
break
}
}
if !found {
return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: transaction with ID %d has no attempt with ID: %q", txAttempt.TxID, txAttempt.ID)
}
if tx.BroadcastAt.Before(broadcastAt) {
if tx.BroadcastAt != nil && tx.BroadcastAt.Before(broadcastAt) {
tx.BroadcastAt = &broadcastAt
}
tx.State = TxConfirmedMissingReceipt
txAttempt.State = txmgrtypes.TxAttemptBroadcast
tx.TxAttempts = append(tx.TxAttempts, txAttempt)

as.confirmedMissingReceiptTxs[tx.ID] = tx
as.inprogressTx = nil
as._moveUnconfirmedToConfirmedMissingReceipt(tx)

return nil
}
Expand Down Expand Up @@ -622,3 +626,11 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveT
tx.Error = txError
as.fatalErroredTxs[tx.ID] = tx
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveUnconfirmedToConfirmedMissingReceipt(
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) {
tx.State = TxConfirmedMissingReceipt
as.confirmedMissingReceiptTxs[tx.ID] = tx
delete(as.unconfirmedTxs, tx.ID)
}
26 changes: 25 additions & 1 deletion common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,31 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo
return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error {
return nil
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()

if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("expected state to be in_progress")
}

var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
for _, vas := range ms.addressStates {
if vas.hasTx(attempt.TxID) {
as = vas
break
}
}
if as == nil {
return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w: %q", ErrTxnNotFound, attempt.TxID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt); err != nil {
return err
}

// Update in memory store
return as.moveTxAttemptToBroadcast(*attempt, broadcastAt)
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
Expand Down
48 changes: 48 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,54 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_SaveConfirmedMissingReceiptAttempt(t *testing.T) {
t.Parallel()

t.Run("updates attempt to 'broadcast' and transaction to 'confirm_missing_receipt'", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertUnconfirmedEthTxWithAttemptState(t, persistentStore, 1, fromAddress, txmgrtypes.TxAttemptInProgress)
now := time.Now()
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

defaultDuration := 5 * time.Second
err = inMemoryStore.SaveConfirmedMissingReceiptAttempt(ctx, defaultDuration, &inTx.TxAttempts[0], now)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State)
assert.Equal(t, commontxmgr.TxConfirmedMissingReceipt, actTx.State)
})
}

func TestInMemoryStore_UpdateTxFatalError(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit d0f6376

Please sign in to comment.