Skip to content

Commit

Permalink
Drop unused queryTimeout config from TXM strategy (#12859)
Browse files Browse the repository at this point in the history
* Drop unused queryTimeout config from TXM strategy

* Add changeset

* Fix changeset

* Fix changeset error

* Add internal tag
  • Loading branch information
dimriou authored Apr 17, 2024
1 parent e77c458 commit 44c9b40
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/new-forks-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": removed
---

Drop unused queryTimeout config from TXM strategy #internal
18 changes: 6 additions & 12 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package txmgr
import (
"context"
"fmt"
"time"

"github.com/google/uuid"

Expand All @@ -14,9 +13,9 @@ var _ txmgrtypes.TxStrategy = SendEveryStrategy{}

// NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the
// queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy) {
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32) (strategy txmgrtypes.TxStrategy) {
if queueSize > 0 {
strategy = NewDropOldestStrategy(subject, queueSize, queryTimeout)
strategy = NewDropOldestStrategy(subject, queueSize)
} else {
strategy = SendEveryStrategy{}
}
Expand All @@ -41,26 +40,21 @@ var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
// DropOldestStrategy will send the newest N transactions, older ones will be
// removed from the queue
type DropOldestStrategy struct {
subject uuid.UUID
queueSize uint32
queryTimeout time.Duration
subject uuid.UUID
queueSize uint32
}

// NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the
// queue size is exceeded.
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy {
return DropOldestStrategy{subject, queueSize, queryTimeout}
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32) DropOldestStrategy {
return DropOldestStrategy{subject, queueSize}
}

func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

// NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune.
ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) {
subject1 := uuid.New()
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout())
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5))
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
Expand All @@ -1850,7 +1850,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) {
subject2 := uuid.New()
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout())
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3))
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
Expand Down
8 changes: 2 additions & 6 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
)

func Test_SendEveryStrategy(t *testing.T) {
Expand All @@ -28,25 +27,22 @@ func Test_SendEveryStrategy(t *testing.T) {

func Test_DropOldestStrategy_Subject(t *testing.T) {
t.Parallel()
cfg := configtest.NewGeneralConfig(t, nil)

subject := uuid.New()
s := txmgrcommon.NewDropOldestStrategy(subject, 1, cfg.Database().DefaultQueryTimeout())
s := txmgrcommon.NewDropOldestStrategy(subject, 1)

assert.True(t, s.Subject().Valid)
assert.Equal(t, subject, s.Subject().UUID)
}

func Test_DropOldestStrategy_PruneQueue(t *testing.T) {
t.Parallel()
cfg := configtest.NewGeneralConfig(t, nil)
subject := uuid.New()
queueSize := uint32(2)
queryTimeout := cfg.Database().DefaultQueryTimeout()
mockTxStore := mocks.NewEvmTxStore(t)

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/services/blockhashstore/bhs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *BulletproofBHS) Store(ctx context.Context, blockNum uint64) error {

// Set a queue size of 256. At most we store the blockhash of every block, and only the
// latest 256 can possibly be stored.
Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256, c.dbConfig.DefaultQueryTimeout()),
Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256),
})
if err != nil {
return errors.Wrap(err, "creating transaction")
Expand Down
2 changes: 1 addition & 1 deletion core/services/fluxmonitorv2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []
return nil, err
}
cfg := chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth())
var checker txmgr.TransmitCheckerSpec
if chain.Config().FluxMonitor().SimulateTransactions() {
checker.CheckerType = txmgr.TransmitCheckerTypeSimulate
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []
}

cfg := chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth())

var checker txmgr.TransmitCheckerSpec
if chain.Config().OCR().SimulateTransactions() {
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg
subject = *opts.subjectID
}
scoped := configWatcher.chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth())

var checker txm.TransmitCheckerSpec
if configWatcher.chain.Config().OCR2().SimulateTransactions() {
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newFunctionsContractTransmitter(ctx context.Context, contractVersion uint32
}

scoped := configWatcher.chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth())

var checker txm.TransmitCheckerSpec
if configWatcher.chain.Config().OCR2().SimulateTransactions() {
Expand Down

0 comments on commit 44c9b40

Please sign in to comment.