Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fluxmonitor to use IdempotencyKey when creating Tx #10589

Merged
merged 3 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/services/fluxmonitorv2/contract_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ContractSubmitter --output ./mocks/ --case=underscore
Expand All @@ -17,7 +16,7 @@ var FluxAggregatorABI = evmtypes.MustGetABI(flux_aggregator_wrapper.FluxAggregat

// ContractSubmitter defines an interface to submit an eth tx.
type ContractSubmitter interface {
Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error
Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error
}

// FluxAggregatorContractSubmitter submits the polled answer in an eth tx.
Expand Down Expand Up @@ -51,7 +50,7 @@ func NewFluxAggregatorContractSubmitter(

// Submit submits the answer by writing a EthTx for the txmgr to
// pick up
func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error {
func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error {
fromAddress, err := c.keyStore.GetRoundRobinAddress(c.chainID)
if err != nil {
return err
Expand All @@ -63,7 +62,7 @@ func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *b
}

return errors.Wrap(
c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, qopts...),
c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, idempotencyKey),
"failed to send Eth transaction",
)
}
7 changes: 5 additions & 2 deletions core/services/fluxmonitorv2/contract_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math/big"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/internal/mocks"
Expand Down Expand Up @@ -32,8 +33,10 @@ func TestFluxAggregatorContractSubmitter_Submit(t *testing.T) {

keyStore.On("GetRoundRobinAddress", testutils.FixtureChainID).Return(fromAddress, nil)
fluxAggregator.On("Address").Return(toAddress)
orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit).Return(nil)

err = submitter.Submit(roundID, submission)
idempotencyKey := uuid.New().String()
orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit, &idempotencyKey).Return(nil)

err = submitter.Submit(roundID, submission, &idempotencyKey)
assert.NoError(t, err)
}
5 changes: 4 additions & 1 deletion core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,13 +1073,16 @@ func (fm *FluxMonitor) initialRoundState() flux_aggregator_wrapper.OracleRoundSt
}

func (fm *FluxMonitor) queueTransactionForTxm(tx pg.Queryer, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error {
// Use pipeline run ID to generate globally unique key that can correlate this run to a Tx
idempotencyKey := fmt.Sprintf("fluxmonitor-%d", runID)
// Submit the Eth Tx
err := fm.contractSubmitter.Submit(
new(big.Int).SetInt64(int64(roundID)),
answer.BigInt(),
pg.WithQueryer(tx),
&idempotencyKey,
)
if err != nil {
fm.logger.Errorw("failed to submit Tx to TXM", "err", err)
return err
}

Expand Down
60 changes: 37 additions & 23 deletions core/services/fluxmonitorv2/flux_monitor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluxmonitorv2_test

import (
"fmt"
"math/big"
"strings"
"testing"
Expand Down Expand Up @@ -130,6 +131,11 @@ func setupMocks(t *testing.T) *testMocks {
return tm
}

func buildIdempotencyKey(ID int64) *string {
key := fmt.Sprintf("fluxmonitor-%d", ID)
return &key
}

type setupOptions struct {
pollTickerDisabled bool
idleTimerDisabled bool
Expand Down Expand Up @@ -449,7 +455,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) {
},
},
), mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answers.polledAnswer),
Expand All @@ -468,7 +474,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) {
}).
Once()
tm.contractSubmitter.
On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), mock.Anything).
On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), buildIdempotencyKey(run.ID)).
Return(nil).
Once()

Expand Down Expand Up @@ -578,6 +584,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
tm.orm.On("MostRecentFluxMonitorRoundID", contractAddress).Return(uint32(4), nil)

// Round 1
run := pipeline.Run{ID: 1}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(1), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -586,23 +593,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(1), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(1), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()

Expand All @@ -617,6 +624,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
Return(nil).Once()

// Round 3
run = pipeline.Run{ID: 2}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(3), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -625,23 +633,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 2
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(3), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(3), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()
tm.orm.
Expand All @@ -655,6 +663,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
Return(nil).Once()

// Round 4
run = pipeline.Run{ID: 3}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(4), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -663,23 +672,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 3
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(4), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(4), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()
tm.orm.
Expand Down Expand Up @@ -1475,6 +1484,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
answer = 100
)

run := pipeline.Run{ID: 1}

tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1488,7 +1499,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1504,7 +1515,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1588,6 +1599,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
roundID = 3
answer = 100
)

run := pipeline.Run{ID: 1}
tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1614,7 +1627,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1629,7 +1642,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1683,6 +1696,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
roundID = 3
answer = 100
)
run := pipeline.Run{ID: 1}
tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1709,7 +1723,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1724,7 +1738,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1797,7 +1811,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Once()

// and that should result in a new submission
tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()

tm.orm.
On("UpdateFluxMonitorRoundStats",
Expand Down Expand Up @@ -1887,7 +1901,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) {
},
},
), mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(pipeline.Run{ID: runID}, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedAnswer),
Expand All @@ -1905,7 +1919,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) {
}).
Once()
tm.contractSubmitter.
On("Submit", big.NewInt(int64(roundID)), answerBigInt, mock.Anything).
On("Submit", big.NewInt(int64(roundID)), answerBigInt, buildIdempotencyKey(runID)).
Return(nil).
Once()

Expand Down
19 changes: 5 additions & 14 deletions core/services/fluxmonitorv2/mocks/contract_submitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 5 additions & 12 deletions core/services/fluxmonitorv2/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading