Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TxStore to use parent context created at initialization #10735

Merged
merged 10 commits into from
Oct 4, 2023
Prev Previous commit
Next Next commit
Updated Fluxmonitor context usage
  • Loading branch information
amit-momin committed Sep 26, 2023
commit cd7bde31d1c2dfa742501b64736e1002ee13ed24
7 changes: 4 additions & 3 deletions core/services/fluxmonitorv2/contract_submitter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluxmonitorv2

import (
"context"
"math/big"

"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var FluxAggregatorABI = evmtypes.MustGetABI(flux_aggregator_wrapper.FluxAggregat

// ContractSubmitter defines an interface to submit an eth tx.
type ContractSubmitter interface {
Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error
Submit(ctx context.Context, roundID *big.Int, submission *big.Int, idempotencyKey *string) error
}

// FluxAggregatorContractSubmitter submits the polled answer in an eth tx.
Expand Down Expand Up @@ -50,7 +51,7 @@ func NewFluxAggregatorContractSubmitter(

// Submit submits the answer by writing a EthTx for the txmgr to
// pick up
func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error {
func (c *FluxAggregatorContractSubmitter) Submit(ctx context.Context, roundID *big.Int, submission *big.Int, idempotencyKey *string) error {
fromAddress, err := c.keyStore.GetRoundRobinAddress(c.chainID)
if err != nil {
return err
Expand All @@ -62,7 +63,7 @@ func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *b
}

return errors.Wrap(
c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, idempotencyKey),
c.orm.CreateEthTransaction(ctx, fromAddress, c.Address(), payload, c.gasLimit, idempotencyKey),
"failed to send Eth transaction",
)
}
2 changes: 1 addition & 1 deletion core/services/fluxmonitorv2/contract_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ func TestFluxAggregatorContractSubmitter_Submit(t *testing.T) {
idempotencyKey := uuid.New().String()
orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit, &idempotencyKey).Return(nil)

err = submitter.Submit(roundID, submission, &idempotencyKey)
err = submitter.Submit(testutils.Context(t), roundID, submission, &idempotencyKey)
assert.NoError(t, err)
}
15 changes: 10 additions & 5 deletions core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ func (fm *FluxMonitor) respondToAnswerUpdatedLog(log flux_aggregator_wrapper.Flu
// need to poll and submit an answer to the contract regardless of the deviation.
func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggregatorNewRound, lb log.Broadcast) {
started := time.Now()
ctx, cancel := utils.StopChan(fm.chStop).NewCtx()
defer cancel()

newRoundLogger := fm.logger.With(
"round", log.RoundId,
Expand Down Expand Up @@ -743,7 +745,7 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr
})

// Call the v2 pipeline to execute a new job run
run, results, err := fm.runner.ExecuteRun(context.Background(), fm.spec, vars, fm.logger)
run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger)
if err != nil {
newRoundLogger.Errorw(fmt.Sprintf("error executing new run for job ID %v name %v", fm.spec.JobID, fm.spec.JobName), "err", err)
return
Expand Down Expand Up @@ -772,7 +774,7 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr
if err2 := fm.runner.InsertFinishedRun(run, false, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if err2 := fm.queueTransactionForTxm(tx, run.ID, answer, roundState.RoundId, &log); err2 != nil {
if err2 := fm.queueTransactionForTxm(ctx, tx, run.ID, answer, roundState.RoundId, &log); err2 != nil {
return err2
}
return fm.logBroadcaster.MarkConsumed(lb, pg.WithQueryer(tx))
Expand Down Expand Up @@ -811,6 +813,8 @@ func (fm *FluxMonitor) checkEligibilityAndAggregatorFunding(roundState flux_aggr

func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker *DeviationChecker, broadcast log.Broadcast) {
started := time.Now()
ctx, cancel := utils.StopChan(fm.chStop).NewCtx()
defer cancel()

l := fm.logger.With(
"threshold", deviationChecker.Thresholds.Rel,
Expand Down Expand Up @@ -946,7 +950,7 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker
},
})

run, results, err := fm.runner.ExecuteRun(context.Background(), fm.spec, vars, fm.logger)
run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger)
if err != nil {
l.Errorw("can't fetch answer", "err", err)
fm.jobORM.TryRecordError(fm.spec.JobID, "Error polling")
Expand Down Expand Up @@ -996,7 +1000,7 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker
if err2 := fm.runner.InsertFinishedRun(run, true, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if err2 := fm.queueTransactionForTxm(tx, run.ID, answer, roundState.RoundId, nil); err2 != nil {
if err2 := fm.queueTransactionForTxm(ctx, tx, run.ID, answer, roundState.RoundId, nil); err2 != nil {
return err2
}
if broadcast != nil {
Expand Down Expand Up @@ -1072,11 +1076,12 @@ func (fm *FluxMonitor) initialRoundState() flux_aggregator_wrapper.OracleRoundSt
return latestRoundState
}

func (fm *FluxMonitor) queueTransactionForTxm(tx pg.Queryer, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error {
func (fm *FluxMonitor) queueTransactionForTxm(ctx context.Context, tx pg.Queryer, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error {
// Use pipeline run ID to generate globally unique key that can correlate this run to a Tx
idempotencyKey := fmt.Sprintf("fluxmonitor-%d", runID)
// Submit the Eth Tx
err := fm.contractSubmitter.Submit(
ctx,
new(big.Int).SetInt64(int64(roundID)),
answer.BigInt(),
&idempotencyKey,
Expand Down
11 changes: 6 additions & 5 deletions core/services/fluxmonitorv2/mocks/contract_submitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions core/services/fluxmonitorv2/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/services/fluxmonitorv2/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ORM interface {
DeleteFluxMonitorRoundsBackThrough(aggregator common.Address, roundID uint32) error
FindOrCreateFluxMonitorRoundStats(aggregator common.Address, roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, error)
UpdateFluxMonitorRoundStats(aggregator common.Address, roundID uint32, runID int64, newRoundLogsAddition uint, qopts ...pg.QOpt) error
CreateEthTransaction(fromAddress, toAddress common.Address, payload []byte, gasLimit uint32, idempotencyKey *string) error
CreateEthTransaction(ctx context.Context, fromAddress, toAddress common.Address, payload []byte, gasLimit uint32, idempotencyKey *string) error
CountFluxMonitorRoundStats() (count int, err error)
}

Expand Down Expand Up @@ -115,14 +115,15 @@ func (o *orm) CountFluxMonitorRoundStats() (count int, err error) {

// CreateEthTransaction creates an ethereum transaction for the Txm to pick up
func (o *orm) CreateEthTransaction(
ctx context.Context,
fromAddress common.Address,
toAddress common.Address,
payload []byte,
gasLimit uint32,
idempotencyKey *string,
) (err error) {

_, err = o.txm.CreateTransaction(context.TODO(), txmgr.TxRequest{
_, err = o.txm.CreateTransaction(ctx, txmgr.TxRequest{
IdempotencyKey: idempotencyKey,
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down
2 changes: 1 addition & 1 deletion core/services/fluxmonitorv2/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,5 @@ func TestORM_CreateEthTransaction(t *testing.T) {
Strategy: strategy,
}).Return(txmgr.Tx{}, nil).Once()

require.NoError(t, orm.CreateEthTransaction(from, to, payload, gasLimit, &idempotencyKey))
require.NoError(t, orm.CreateEthTransaction(testutils.Context(t), from, to, payload, gasLimit, &idempotencyKey))
}