Skip to content

Commit

Permalink
TXM In-memory: step 3-04-UpdateTxForRebroadcast (#12231)
Browse files Browse the repository at this point in the history
* implement UpdateTxForRebroadcast

* implement tests for UpdateTxForRebroadcast

* address some comments

* address comments
  • Loading branch information
poopoothegorilla authored Mar 22, 2024
1 parent da6d235 commit 0693806
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 11 deletions.
19 changes: 11 additions & 8 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,24 +381,27 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveCo
defer as.Unlock()

if txAttempt.State != txmgrtypes.TxAttemptBroadcast {
return fmt.Errorf("move_confirmed_to_unconfirmed: attempt must be in broadcast state")
return fmt.Errorf("attempt must be in broadcast state")
}

tx, ok := as.confirmedTxs[txAttempt.TxID]
if !ok || tx == nil {
return fmt.Errorf("move_confirmed_to_unconfirmed: no confirmed transaction with ID %d", txAttempt.TxID)
return fmt.Errorf("no confirmed transaction with ID %d", txAttempt.TxID)
}
if len(tx.TxAttempts) == 0 {
return fmt.Errorf("move_confirmed_to_unconfirmed: no attempts for transaction with ID %d", txAttempt.TxID)
return fmt.Errorf("no attempts for transaction with ID %d", txAttempt.TxID)
}
tx.State = TxUnconfirmed

// Delete the receipt from the attempt
txAttempt.Receipts = nil
// Reset the broadcast information for the attempt
txAttempt.State = txmgrtypes.TxAttemptInProgress
txAttempt.BroadcastBeforeBlockNum = nil
tx.TxAttempts = append(tx.TxAttempts, txAttempt)
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
tx.TxAttempts[i].Receipts = nil
tx.TxAttempts[i].State = txmgrtypes.TxAttemptInProgress
tx.TxAttempts[i].BroadcastBeforeBlockNum = nil
break
}
}

as.unconfirmedTxs[tx.ID] = tx
delete(as.confirmedTxs, tx.ID)
Expand Down
19 changes: 17 additions & 2 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,7 +48,8 @@ type inMemoryStore[
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand Down Expand Up @@ -331,7 +333,20 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveS
return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[etx.FromAddress]
if !ok {
return nil
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxForRebroadcast(ctx, etx, etxAttempt); err != nil {
return err
}

// Update in memory store
return as.moveConfirmedToUnconfirmed(etxAttempt)
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) {
return false, nil
Expand Down
117 changes: 116 additions & 1 deletion core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,129 @@
package txmgr_test

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) {
t.Parallel()

t.Run("delete all receipts for transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

txAttempt := inTx.TxAttempts[0]
err = inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxUnconfirmed, actTx.State)
assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State)
assert.Nil(t, actTx.TxAttempts[0].BroadcastBeforeBlockNum)
assert.Equal(t, 0, len(actTx.TxAttempts[0].Receipts))
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

txAttempt := inTx.TxAttempts[0]

t.Run("error when attempt is not in Broadcast state", func(t *testing.T) {
txAttempt.State = txmgrtypes.TxAttemptInProgress
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Error(t, expErr)
assert.Error(t, actErr)
txAttempt.State = txmgrtypes.TxAttemptBroadcast
})
t.Run("error when transaction is not in confirmed state", func(t *testing.T) {
inTx.State = commontxmgr.TxUnconfirmed
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Error(t, expErr)
assert.Error(t, actErr)
inTx.State = commontxmgr.TxConfirmed
})
t.Run("wrong fromAddress has no error", func(t *testing.T) {
inTx.FromAddress = common.Address{}
expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt)
assert.Equal(t, expErr, actErr)
assert.Nil(t, actErr)
inTx.FromAddress = fromAddress
})

})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand All @@ -33,7 +148,7 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.SignalCallback, act.SignalCallback)
assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted)

require.Len(t, exp.TxAttempts, len(act.TxAttempts))
require.Equal(t, len(exp.TxAttempts), len(act.TxAttempts))
for i := 0; i < len(exp.TxAttempts); i++ {
assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i])
}
Expand Down

0 comments on commit 0693806

Please sign in to comment.