Skip to content

Commit

Permalink
core/services/pipeline: remove logger from method params (#13887)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Jul 18, 2024
1 parent 81a21bb commit 3782a9c
Show file tree
Hide file tree
Showing 26 changed files with 163 additions and 184 deletions.
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (app *ChainlinkApplication) RunJobV2(
},
}
}
runID, _, err = app.pipelineRunner.ExecuteAndInsertFinishedRun(ctx, *jb.PipelineSpec, pipeline.NewVarsFrom(vars), app.logger, saveTasks)
runID, _, err = app.pipelineRunner.ExecuteAndInsertFinishedRun(ctx, *jb.PipelineSpec, pipeline.NewVarsFrom(vars), saveTasks)
}
return runID, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cr *Cron) runPipeline() {

run := pipeline.NewRun(*cr.jobSpec.PipelineSpec, vars)

_, err := cr.pipelineRunner.Run(ctx, run, cr.logger, false, nil)
_, err := cr.pipelineRunner.Run(ctx, run, false, nil)
if err != nil {
cr.logger.Errorf("Error executing new run for jobSpec ID %v", cr.jobSpec.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/directrequest/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (l *listener) handleOracleRequest(ctx context.Context, request *operator_wr
},
})
run := pipeline.NewRun(*l.job.PipelineSpec, vars)
_, err := l.pipelineRunner.Run(ctx, run, l.logger, true, func(tx sqlutil.DataSource) error {
_, err := l.pipelineRunner.Run(ctx, run, true, func(tx sqlutil.DataSource) error {
l.markLogConsumed(ctx, tx, lb)
return nil
})
Expand Down
14 changes: 7 additions & 7 deletions core/services/directrequest/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) {
uni.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything, mock.Anything).Return(nil)

runBeganAwaiter := cltest.NewAwaiter()
uni.runner.On("Run", mock.Anything, mock.AnythingOfType("*pipeline.Run"), mock.Anything, mock.Anything, mock.Anything).
uni.runner.On("Run", mock.Anything, mock.AnythingOfType("*pipeline.Run"), mock.Anything, mock.Anything).
Return(false, nil).
Run(func(args mock.Arguments) {
runBeganAwaiter.ItHappened()
fn := args.Get(4).(func(source sqlutil.DataSource) error)
fn := args.Get(3).(func(source sqlutil.DataSource) error)
require.NoError(t, fn(nil))
}).Once()

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) {
uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
runBeganAwaiter.ItHappened()
fn := args.Get(4).(func(sqlutil.DataSource) error)
fn := args.Get(3).(func(sqlutil.DataSource) error)
require.NoError(t, fn(nil))
}).Once().Return(false, nil)

Expand Down Expand Up @@ -393,9 +393,9 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) {
uni.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything, mock.Anything).Return(nil)

runBeganAwaiter := cltest.NewAwaiter()
uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
runBeganAwaiter.ItHappened()
fn := args.Get(4).(func(sqlutil.DataSource) error)
fn := args.Get(3).(func(sqlutil.DataSource) error)
require.NoError(t, fn(nil))
}).Once().Return(false, nil)

Expand Down Expand Up @@ -492,9 +492,9 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) {
}).Return(nil)

runBeganAwaiter := cltest.NewAwaiter()
uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
runBeganAwaiter.ItHappened()
fn := args.Get(4).(func(sqlutil.DataSource) error)
fn := args.Get(3).(func(sqlutil.DataSource) error)
require.NoError(t, fn(nil))
}).Once().Return(false, nil)

Expand Down
8 changes: 4 additions & 4 deletions core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,12 +749,12 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr
})

// Call the v2 pipeline to execute a new job run
run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger)
run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars)
if err != nil {
newRoundLogger.Errorw(fmt.Sprintf("error executing new run for job ID %v name %v", fm.spec.JobID, fm.spec.JobName), "err", err)
return
}
result, err := results.FinalResult(newRoundLogger).SingularResult()
result, err := results.FinalResult().SingularResult()
if err != nil || result.Error != nil {
newRoundLogger.Errorw("can't fetch answer", "err", err, "result", result)
fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling")
Expand Down Expand Up @@ -956,13 +956,13 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker
},
})

run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger)
run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars)
if err != nil {
l.Errorw("can't fetch answer", "err", err)
fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling")
return
}
result, err := results.FinalResult(l).SingularResult()
result, err := results.FinalResult().SingularResult()
if err != nil || result.Error != nil {
l.Errorw("can't fetch answer", "err", err, "result", result)
fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling")
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/job_pipeline_orm_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestPipelineORM_Integration(t *testing.T) {
pipelineSpecID := pipelineSpecs[0].ID

// Create the run
runID, _, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), pipelineSpecs[0], pipeline.NewVarsFrom(nil), lggr, true)
runID, _, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), pipelineSpecs[0], pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)

// Check the DB for the pipeline.Run
Expand Down
34 changes: 17 additions & 17 deletions core/services/job/runner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func TestRunner(t *testing.T) {

m, err := bridges.MarshalBridgeMetaData(big.NewInt(10), big.NewInt(100))
require.NoError(t, err)
runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(map[string]interface{}{"jobRun": map[string]interface{}{"meta": m}}), logger.TestLogger(t), true)
runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(map[string]interface{}{"jobRun": map[string]interface{}{"meta": m}}), true)
require.NoError(t, err)

results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
require.Len(t, results.Values, 2)
require.GreaterOrEqual(t, len(results.FatalErrors), 2)
assert.Nil(t, results.FatalErrors[0])
Expand Down Expand Up @@ -318,10 +318,10 @@ answer1 [type=median index=0];
err := jobORM.CreateJob(ctx, jb)
require.NoError(t, err)

runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)

results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
assert.Len(t, results.FatalErrors, 1)
assert.Len(t, results.Values, 1)
assert.Contains(t, results.FatalErrors[0].Error(), "type <nil> cannot be converted to decimal.Decimal")
Expand Down Expand Up @@ -364,10 +364,10 @@ answer1 [type=median index=0];
err := jobORM.CreateJob(testutils.Context(t), jb)
require.NoError(t, err)

runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)

results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
assert.Len(t, results.Values, 1)
assert.Len(t, results.FatalErrors, 1)
assert.Contains(t, results.FatalErrors[0].Error(), pipeline.ErrTooManyErrors.Error())
Expand Down Expand Up @@ -409,10 +409,10 @@ answer1 [type=median index=0];
err := jobORM.CreateJob(testutils.Context(t), jb)
require.NoError(t, err)

runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)

results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
assert.Len(t, results.Values, 1)
assert.Contains(t, results.FatalErrors[0].Error(), "type <nil> cannot be converted to decimal.Decimal")
assert.Nil(t, results.Values[0])
Expand Down Expand Up @@ -697,19 +697,19 @@ answer1 [type=median index=0];
err := jobORM.CreateJob(ctx, jb)
require.NoError(t, err)

_, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
_, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)
results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
assert.Nil(t, results.Values[0])

// No task timeout should succeed.
jb = makeMinimalHTTPOracleSpec(t, db, config, cltest.NewEIP55Address().String(), transmitterAddress.Hex(), cltest.DefaultOCRKeyBundleID, serv.URL, "")
jb.Name = null.NewString("a job 2", true)
err = jobORM.CreateJob(ctx, jb)
require.NoError(t, err)
_, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
_, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)
results = taskResults.FinalResult(logger.TestLogger(t))
results = taskResults.FinalResult()
assert.Equal(t, 10.1, results.Values[0])
assert.Nil(t, results.FatalErrors[0])

Expand All @@ -720,9 +720,9 @@ answer1 [type=median index=0];
err = jobORM.CreateJob(ctx, jb)
require.NoError(t, err)

_, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
_, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)
resultsNoFatalErrs := taskResults.FinalResult(logger.TestLogger(t))
resultsNoFatalErrs := taskResults.FinalResult()
assert.NotNil(t, resultsNoFatalErrs.FatalErrors[0])
})

Expand All @@ -740,9 +740,9 @@ answer1 [type=median index=0];
err := jobORM.CreateJob(ctx, jb)
require.NoError(t, err)

_, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
_, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.NoError(t, err)
results := taskResults.FinalResult(logger.TestLogger(t))
results := taskResults.FinalResult()
assert.Len(t, results.Values, 1)
assert.Nil(t, results.FatalErrors[0])
assert.Equal(t, "4242", results.Values[0].(decimal.Decimal).String())
Expand All @@ -752,7 +752,7 @@ answer1 [type=median index=0];
require.NoError(t, err)

// Create another run, it should fail
_, _, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true)
_, _, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true)
require.Error(t, err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/keeper/upkeep_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (ex *UpkeepExecuter) execute(upkeep UpkeepRegistration, head *evmtypes.Head
ex.job.PipelineSpec.DotDagSource = pipeline.KeepersObservationSource
run := pipeline.NewRun(*ex.job.PipelineSpec, vars)

if _, err := ex.pr.Run(ctxService, run, svcLogger, true, nil); err != nil {
if _, err := ex.pr.Run(ctxService, run, true, nil); err != nil {
svcLogger.Error(errors.Wrap(err, "failed executing run"))
return
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/generic/pipeline_runner_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var _ core.PipelineRunnerService = (*PipelineRunnerAdapter)(nil)

type pipelineRunner interface {
ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error)
ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error)
}

type PipelineRunnerAdapter struct {
Expand Down Expand Up @@ -45,7 +45,7 @@ func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, var
merge(defaultVars, vars.Vars)

finalVars := pipeline.NewVarsFrom(defaultVars)
_, trrs, err := p.runner.ExecuteAndInsertFinishedRun(ctx, s, finalVars, p.logger, true)
_, trrs, err := p.runner.ExecuteAndInsertFinishedRun(ctx, s, finalVars, true)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type mockPipelineRunner struct {
vars pipeline.Vars
}

func (m *mockPipelineRunner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) {
func (m *mockPipelineRunner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) {
m.spec = spec
m.vars = vars
// We never attach a run to the mock, so we can't return a runID
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pi
},
})

run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr)
run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars)
if err != nil {
return nil, pipeline.TaskRunResults{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID)
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R
return nil, err
}

finalResult := trrs.FinalResult(ds.lggr)
finalResult := trrs.FinalResult()
setEATelemetry(ds, finalResult, trrs, ObservationTimestamp{
Round: timestamp.Round,
Epoch: timestamp.Epoch,
Expand Down Expand Up @@ -310,15 +310,15 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID)
}

value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr))
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult())
if err != nil {
ds.latestUpdateErr = errors.Wrapf(err, "invalid result")
return ds.latestUpdateErr
}

// update cache values
ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
ds.latestResult = ds.latestTrrs.FinalResult()
ds.latestUpdateErr = nil

// backup in case data source fails continuously and node gets rebooted
Expand Down Expand Up @@ -402,7 +402,7 @@ func (ds *dataSourceBase) observe(ctx context.Context, timestamp ObservationTime
// a db write block that.
ds.saver.Save(run)

finalResult := trrs.FinalResult(ds.lggr)
finalResult := trrs.FinalResult()
setEATelemetry(&ds.inMemoryDataSource, finalResult, trrs, timestamp)

return ds.inMemoryDataSource.parse(finalResult)
Expand Down
5 changes: 3 additions & 2 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"context"
"errors"
"fmt"
"net/url"
"reflect"
"sort"
Expand Down Expand Up @@ -229,7 +230,7 @@ func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
func (trrs TaskRunResults) FinalResult() FinalResult {
var found bool
var fr FinalResult
sort.Slice(trrs, func(i, j int) bool {
Expand All @@ -245,7 +246,7 @@ func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
}

if !found {
l.Panicw("Expected at least one task to be final", "tasks", trrs)
panic(fmt.Sprintf("expected at least one task to be final: %v", trrs))
}
return fr
}
Expand Down
Loading

0 comments on commit 3782a9c

Please sign in to comment.