diff --git a/core/services/fluxmonitorv2/contract_submitter.go b/core/services/fluxmonitorv2/contract_submitter.go index fe1db281393..d60f5b70e01 100644 --- a/core/services/fluxmonitorv2/contract_submitter.go +++ b/core/services/fluxmonitorv2/contract_submitter.go @@ -7,7 +7,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) //go:generate mockery --quiet --name ContractSubmitter --output ./mocks/ --case=underscore @@ -17,7 +16,7 @@ var FluxAggregatorABI = evmtypes.MustGetABI(flux_aggregator_wrapper.FluxAggregat // ContractSubmitter defines an interface to submit an eth tx. type ContractSubmitter interface { - Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error + Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error } // FluxAggregatorContractSubmitter submits the polled answer in an eth tx. @@ -51,7 +50,7 @@ func NewFluxAggregatorContractSubmitter( // Submit submits the answer by writing a EthTx for the txmgr to // pick up -func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error { +func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error { fromAddress, err := c.keyStore.GetRoundRobinAddress(c.chainID) if err != nil { return err @@ -63,7 +62,7 @@ func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *b } return errors.Wrap( - c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, qopts...), + c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, idempotencyKey), "failed to send Eth transaction", ) } diff --git a/core/services/fluxmonitorv2/contract_submitter_test.go b/core/services/fluxmonitorv2/contract_submitter_test.go index 1585810ab8e..11fede3f692 100644 --- a/core/services/fluxmonitorv2/contract_submitter_test.go +++ b/core/services/fluxmonitorv2/contract_submitter_test.go @@ -32,8 +32,10 @@ func TestFluxAggregatorContractSubmitter_Submit(t *testing.T) { keyStore.On("GetRoundRobinAddress", testutils.FixtureChainID).Return(fromAddress, nil) fluxAggregator.On("Address").Return(toAddress) - orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit).Return(nil) - err = submitter.Submit(roundID, submission) + idempotencyKey := "fluxmonitor-1" + orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit, &idempotencyKey).Return(nil) + + err = submitter.Submit(roundID, submission, &idempotencyKey) assert.NoError(t, err) } diff --git a/core/services/fluxmonitorv2/flux_monitor.go b/core/services/fluxmonitorv2/flux_monitor.go index 34ca121e9a9..11e9b25be04 100644 --- a/core/services/fluxmonitorv2/flux_monitor.go +++ b/core/services/fluxmonitorv2/flux_monitor.go @@ -1073,13 +1073,16 @@ func (fm *FluxMonitor) initialRoundState() flux_aggregator_wrapper.OracleRoundSt } func (fm *FluxMonitor) queueTransactionForTxm(tx pg.Queryer, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error { + // Use pipeline run ID to generate globally unique key that can correlate this run to a Tx + idempotencyKey := fmt.Sprintf("fluxmonitor-%d", runID) // Submit the Eth Tx err := fm.contractSubmitter.Submit( new(big.Int).SetInt64(int64(roundID)), answer.BigInt(), - pg.WithQueryer(tx), + &idempotencyKey, ) if err != nil { + fm.logger.Errorw("failed to submit Tx to TXM", "err", err) return err } diff --git a/core/services/fluxmonitorv2/flux_monitor_test.go b/core/services/fluxmonitorv2/flux_monitor_test.go index 5a67c2fa1da..e165ce68205 100644 --- a/core/services/fluxmonitorv2/flux_monitor_test.go +++ b/core/services/fluxmonitorv2/flux_monitor_test.go @@ -1,6 +1,7 @@ package fluxmonitorv2_test import ( + "fmt" "math/big" "strings" "testing" @@ -130,6 +131,11 @@ func setupMocks(t *testing.T) *testMocks { return tm } +func buildIdempotencyKey(ID int64) *string { + key := fmt.Sprintf("fluxmonitor-%d", ID) + return &key +} + type setupOptions struct { pollTickerDisabled bool idleTimerDisabled bool @@ -449,7 +455,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) { }, }, ), mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(answers.polledAnswer), @@ -468,7 +474,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) { }). Once() tm.contractSubmitter. - On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), mock.Anything). + On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), buildIdempotencyKey(run.ID)). Return(nil). Once() @@ -578,6 +584,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { tm.orm.On("MostRecentFluxMonitorRoundID", contractAddress).Return(uint32(4), nil) // Round 1 + run := pipeline.Run{ID: 1} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(1), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -586,7 +593,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, nil) tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(fetchedValue), @@ -594,15 +601,15 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, Task: &pipeline.HTTPTask{}, }, - }, nil) + }, nil).Once() tm.pipelineRunner. On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Run(func(args mock.Arguments) { args.Get(0).(*pipeline.Run).ID = 1 - }) + }).Once() tm.contractSubmitter. - On("Submit", big.NewInt(1), big.NewInt(fetchedValue), mock.Anything). + On("Submit", big.NewInt(1), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)). Return(nil). Once() @@ -617,6 +624,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { Return(nil).Once() // Round 3 + run = pipeline.Run{ID: 2} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(3), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -625,7 +633,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, nil) tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(fetchedValue), @@ -633,15 +641,15 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, Task: &pipeline.HTTPTask{}, }, - }, nil) + }, nil).Once() tm.pipelineRunner. On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Run(func(args mock.Arguments) { args.Get(0).(*pipeline.Run).ID = 2 - }) + }).Once() tm.contractSubmitter. - On("Submit", big.NewInt(3), big.NewInt(fetchedValue), mock.Anything). + On("Submit", big.NewInt(3), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)). Return(nil). Once() tm.orm. @@ -655,6 +663,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { Return(nil).Once() // Round 4 + run = pipeline.Run{ID: 3} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(4), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -663,7 +672,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, nil) tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(fetchedValue), @@ -671,15 +680,15 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { }, Task: &pipeline.HTTPTask{}, }, - }, nil) + }, nil).Once() tm.pipelineRunner. On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Run(func(args mock.Arguments) { args.Get(0).(*pipeline.Run).ID = 3 - }) + }).Once() tm.contractSubmitter. - On("Submit", big.NewInt(4), big.NewInt(fetchedValue), mock.Anything). + On("Submit", big.NewInt(4), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)). Return(nil). Once() tm.orm. @@ -1475,6 +1484,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { answer = 100 ) + run := pipeline.Run{ID: 1} + tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1488,7 +1499,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { }, nil).Once() tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(answer), @@ -1504,7 +1515,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { args.Get(0).(*pipeline.Run).ID = 1 }) tm.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything).Return(nil).Once() - tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once() + tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once() tm.orm. On("UpdateFluxMonitorRoundStats", contractAddress, @@ -1588,6 +1599,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { roundID = 3 answer = 100 ) + + run := pipeline.Run{ID: 1} tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1614,7 +1627,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { }, nil).Once() tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(answer), @@ -1629,7 +1642,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Run(func(args mock.Arguments) { args.Get(0).(*pipeline.Run).ID = 1 }) - tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once() + tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once() tm.orm. On("UpdateFluxMonitorRoundStats", contractAddress, @@ -1683,6 +1696,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { roundID = 3 answer = 100 ) + run := pipeline.Run{ID: 1} tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1709,7 +1723,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { }, nil).Once() tm.pipelineRunner. On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(answer), @@ -1724,7 +1738,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Run(func(args mock.Arguments) { args.Get(0).(*pipeline.Run).ID = 1 }) - tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once() + tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once() tm.orm. On("UpdateFluxMonitorRoundStats", contractAddress, @@ -1797,7 +1811,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { Once() // and that should result in a new submission - tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), mock.Anything).Return(nil).Once() + tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once() tm.orm. On("UpdateFluxMonitorRoundStats", @@ -1887,7 +1901,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) { }, }, ), mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(pipeline.Run{ID: runID}, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(fetchedAnswer), @@ -1905,7 +1919,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) { }). Once() tm.contractSubmitter. - On("Submit", big.NewInt(int64(roundID)), answerBigInt, mock.Anything). + On("Submit", big.NewInt(int64(roundID)), answerBigInt, buildIdempotencyKey(runID)). Return(nil). Once() diff --git a/core/services/fluxmonitorv2/mocks/contract_submitter.go b/core/services/fluxmonitorv2/mocks/contract_submitter.go index fd1469c1b9b..6214255ff66 100644 --- a/core/services/fluxmonitorv2/mocks/contract_submitter.go +++ b/core/services/fluxmonitorv2/mocks/contract_submitter.go @@ -6,8 +6,6 @@ import ( big "math/big" mock "github.com/stretchr/testify/mock" - - pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) // ContractSubmitter is an autogenerated mock type for the ContractSubmitter type @@ -15,20 +13,13 @@ type ContractSubmitter struct { mock.Mock } -// Submit provides a mock function with given fields: roundID, submission, qopts -func (_m *ContractSubmitter) Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error { - _va := make([]interface{}, len(qopts)) - for _i := range qopts { - _va[_i] = qopts[_i] - } - var _ca []interface{} - _ca = append(_ca, roundID, submission) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) +// Submit provides a mock function with given fields: roundID, submission, idempotencyKey +func (_m *ContractSubmitter) Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error { + ret := _m.Called(roundID, submission, idempotencyKey) var r0 error - if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, ...pg.QOpt) error); ok { - r0 = rf(roundID, submission, qopts...) + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, *string) error); ok { + r0 = rf(roundID, submission, idempotencyKey) } else { r0 = ret.Error(0) } diff --git a/core/services/fluxmonitorv2/mocks/orm.go b/core/services/fluxmonitorv2/mocks/orm.go index 705c2e8b2bb..1f2303fbf1a 100644 --- a/core/services/fluxmonitorv2/mocks/orm.go +++ b/core/services/fluxmonitorv2/mocks/orm.go @@ -39,20 +39,13 @@ func (_m *ORM) CountFluxMonitorRoundStats() (int, error) { return r0, r1 } -// CreateEthTransaction provides a mock function with given fields: fromAddress, toAddress, payload, gasLimit, qopts -func (_m *ORM) CreateEthTransaction(fromAddress common.Address, toAddress common.Address, payload []byte, gasLimit uint32, qopts ...pg.QOpt) error { - _va := make([]interface{}, len(qopts)) - for _i := range qopts { - _va[_i] = qopts[_i] - } - var _ca []interface{} - _ca = append(_ca, fromAddress, toAddress, payload, gasLimit) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) +// CreateEthTransaction provides a mock function with given fields: fromAddress, toAddress, payload, gasLimit, idempotencyKey +func (_m *ORM) CreateEthTransaction(fromAddress common.Address, toAddress common.Address, payload []byte, gasLimit uint32, idempotencyKey *string) error { + ret := _m.Called(fromAddress, toAddress, payload, gasLimit, idempotencyKey) var r0 error - if rf, ok := ret.Get(0).(func(common.Address, common.Address, []byte, uint32, ...pg.QOpt) error); ok { - r0 = rf(fromAddress, toAddress, payload, gasLimit, qopts...) + if rf, ok := ret.Get(0).(func(common.Address, common.Address, []byte, uint32, *string) error); ok { + r0 = rf(fromAddress, toAddress, payload, gasLimit, idempotencyKey) } else { r0 = ret.Error(0) } diff --git a/core/services/fluxmonitorv2/orm.go b/core/services/fluxmonitorv2/orm.go index c2f99c8345e..70ebd2e7020 100644 --- a/core/services/fluxmonitorv2/orm.go +++ b/core/services/fluxmonitorv2/orm.go @@ -26,7 +26,7 @@ type ORM interface { DeleteFluxMonitorRoundsBackThrough(aggregator common.Address, roundID uint32) error FindOrCreateFluxMonitorRoundStats(aggregator common.Address, roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, error) UpdateFluxMonitorRoundStats(aggregator common.Address, roundID uint32, runID int64, newRoundLogsAddition uint, qopts ...pg.QOpt) error - CreateEthTransaction(fromAddress, toAddress common.Address, payload []byte, gasLimit uint32, qopts ...pg.QOpt) error + CreateEthTransaction(fromAddress, toAddress common.Address, payload []byte, gasLimit uint32, idempotencyKey *string) error CountFluxMonitorRoundStats() (count int, err error) } @@ -118,16 +118,17 @@ func (o *orm) CreateEthTransaction( toAddress common.Address, payload []byte, gasLimit uint32, - qopts ...pg.QOpt, + idempotencyKey *string, ) (err error) { _, err = o.txm.CreateTransaction(txmgr.TxRequest{ + IdempotencyKey: idempotencyKey, FromAddress: fromAddress, ToAddress: toAddress, EncodedPayload: payload, FeeLimit: gasLimit, Strategy: o.strategy, Checker: o.checker, - }, qopts...) + }) return errors.Wrap(err, "Skipped Flux Monitor submission") } diff --git a/core/services/fluxmonitorv2/orm_test.go b/core/services/fluxmonitorv2/orm_test.go index 1377737c49b..0d44b36f169 100644 --- a/core/services/fluxmonitorv2/orm_test.go +++ b/core/services/fluxmonitorv2/orm_test.go @@ -185,8 +185,9 @@ func TestORM_CreateEthTransaction(t *testing.T) { payload = []byte{1, 0, 0} gasLimit = uint32(21000) ) - + idempotencyKey := "fluxmonitor-1" txm.On("CreateTransaction", txmgr.TxRequest{ + IdempotencyKey: &idempotencyKey, FromAddress: from, ToAddress: to, EncodedPayload: payload, @@ -195,5 +196,5 @@ func TestORM_CreateEthTransaction(t *testing.T) { Strategy: strategy, }).Return(txmgr.Tx{}, nil).Once() - require.NoError(t, orm.CreateEthTransaction(from, to, payload, gasLimit)) + require.NoError(t, orm.CreateEthTransaction(from, to, payload, gasLimit, &idempotencyKey)) }