diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index bb0d99b3cb..fd263470a8 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -865,7 +865,7 @@ func (app *ChainlinkApplication) RunJobV2( }, } } - runID, _, err = app.pipelineRunner.ExecuteAndInsertFinishedRun(ctx, *jb.PipelineSpec, pipeline.NewVarsFrom(vars), app.logger, saveTasks) + runID, _, err = app.pipelineRunner.ExecuteAndInsertFinishedRun(ctx, *jb.PipelineSpec, pipeline.NewVarsFrom(vars), saveTasks) } return runID, err } diff --git a/core/services/cron/cron.go b/core/services/cron/cron.go index aa4d1d782f..9f9d2efbce 100644 --- a/core/services/cron/cron.go +++ b/core/services/cron/cron.go @@ -88,7 +88,7 @@ func (cr *Cron) runPipeline() { run := pipeline.NewRun(*cr.jobSpec.PipelineSpec, vars) - _, err := cr.pipelineRunner.Run(ctx, run, cr.logger, false, nil) + _, err := cr.pipelineRunner.Run(ctx, run, false, nil) if err != nil { cr.logger.Errorf("Error executing new run for jobSpec ID %v", cr.jobSpec.ID) } diff --git a/core/services/directrequest/delegate.go b/core/services/directrequest/delegate.go index 26f2c5f9c8..844bd0a12c 100644 --- a/core/services/directrequest/delegate.go +++ b/core/services/directrequest/delegate.go @@ -375,7 +375,7 @@ func (l *listener) handleOracleRequest(ctx context.Context, request *operator_wr }, }) run := pipeline.NewRun(*l.job.PipelineSpec, vars) - _, err := l.pipelineRunner.Run(ctx, run, l.logger, true, func(tx sqlutil.DataSource) error { + _, err := l.pipelineRunner.Run(ctx, run, true, func(tx sqlutil.DataSource) error { l.markLogConsumed(ctx, tx, lb) return nil }) diff --git a/core/services/directrequest/delegate_test.go b/core/services/directrequest/delegate_test.go index e754713b01..0876031fb9 100644 --- a/core/services/directrequest/delegate_test.go +++ b/core/services/directrequest/delegate_test.go @@ -163,11 +163,11 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) { uni.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything, mock.Anything).Return(nil) runBeganAwaiter := cltest.NewAwaiter() - uni.runner.On("Run", mock.Anything, mock.AnythingOfType("*pipeline.Run"), mock.Anything, mock.Anything, mock.Anything). + uni.runner.On("Run", mock.Anything, mock.AnythingOfType("*pipeline.Run"), mock.Anything, mock.Anything). Return(false, nil). Run(func(args mock.Arguments) { runBeganAwaiter.ItHappened() - fn := args.Get(4).(func(source sqlutil.DataSource) error) + fn := args.Get(3).(func(source sqlutil.DataSource) error) require.NoError(t, fn(nil)) }).Once() @@ -227,7 +227,7 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) { uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Run(func(args mock.Arguments) { runBeganAwaiter.ItHappened() - fn := args.Get(4).(func(sqlutil.DataSource) error) + fn := args.Get(3).(func(sqlutil.DataSource) error) require.NoError(t, fn(nil)) }).Once().Return(false, nil) @@ -393,9 +393,9 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) { uni.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything, mock.Anything).Return(nil) runBeganAwaiter := cltest.NewAwaiter() - uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { runBeganAwaiter.ItHappened() - fn := args.Get(4).(func(sqlutil.DataSource) error) + fn := args.Get(3).(func(sqlutil.DataSource) error) require.NoError(t, fn(nil)) }).Once().Return(false, nil) @@ -492,9 +492,9 @@ func TestDelegate_ServicesListenerHandleLog(t *testing.T) { }).Return(nil) runBeganAwaiter := cltest.NewAwaiter() - uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + uni.runner.On("Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { runBeganAwaiter.ItHappened() - fn := args.Get(4).(func(sqlutil.DataSource) error) + fn := args.Get(3).(func(sqlutil.DataSource) error) require.NoError(t, fn(nil)) }).Once().Return(false, nil) diff --git a/core/services/fluxmonitorv2/flux_monitor.go b/core/services/fluxmonitorv2/flux_monitor.go index 31db95f262..9175feb1a6 100644 --- a/core/services/fluxmonitorv2/flux_monitor.go +++ b/core/services/fluxmonitorv2/flux_monitor.go @@ -749,12 +749,12 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr }) // Call the v2 pipeline to execute a new job run - run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger) + run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars) if err != nil { newRoundLogger.Errorw(fmt.Sprintf("error executing new run for job ID %v name %v", fm.spec.JobID, fm.spec.JobName), "err", err) return } - result, err := results.FinalResult(newRoundLogger).SingularResult() + result, err := results.FinalResult().SingularResult() if err != nil || result.Error != nil { newRoundLogger.Errorw("can't fetch answer", "err", err, "result", result) fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling") @@ -956,13 +956,13 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker }, }) - run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars, fm.logger) + run, results, err := fm.runner.ExecuteRun(ctx, fm.spec, vars) if err != nil { l.Errorw("can't fetch answer", "err", err) fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling") return } - result, err := results.FinalResult(l).SingularResult() + result, err := results.FinalResult().SingularResult() if err != nil || result.Error != nil { l.Errorw("can't fetch answer", "err", err, "result", result) fm.jobORM.TryRecordError(ctx, fm.spec.JobID, "Error polling") diff --git a/core/services/job/job_pipeline_orm_integration_test.go b/core/services/job/job_pipeline_orm_integration_test.go index f8a43bca34..ca6cc6f832 100644 --- a/core/services/job/job_pipeline_orm_integration_test.go +++ b/core/services/job/job_pipeline_orm_integration_test.go @@ -175,7 +175,7 @@ func TestPipelineORM_Integration(t *testing.T) { pipelineSpecID := pipelineSpecs[0].ID // Create the run - runID, _, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), pipelineSpecs[0], pipeline.NewVarsFrom(nil), lggr, true) + runID, _, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), pipelineSpecs[0], pipeline.NewVarsFrom(nil), true) require.NoError(t, err) // Check the DB for the pipeline.Run diff --git a/core/services/job/runner_integration_test.go b/core/services/job/runner_integration_test.go index 0232fbadd9..0b55b14250 100644 --- a/core/services/job/runner_integration_test.go +++ b/core/services/job/runner_integration_test.go @@ -127,10 +127,10 @@ func TestRunner(t *testing.T) { m, err := bridges.MarshalBridgeMetaData(big.NewInt(10), big.NewInt(100)) require.NoError(t, err) - runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(map[string]interface{}{"jobRun": map[string]interface{}{"meta": m}}), logger.TestLogger(t), true) + runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(map[string]interface{}{"jobRun": map[string]interface{}{"meta": m}}), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() require.Len(t, results.Values, 2) require.GreaterOrEqual(t, len(results.FatalErrors), 2) assert.Nil(t, results.FatalErrors[0]) @@ -318,10 +318,10 @@ answer1 [type=median index=0]; err := jobORM.CreateJob(ctx, jb) require.NoError(t, err) - runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() assert.Len(t, results.FatalErrors, 1) assert.Len(t, results.Values, 1) assert.Contains(t, results.FatalErrors[0].Error(), "type cannot be converted to decimal.Decimal") @@ -364,10 +364,10 @@ answer1 [type=median index=0]; err := jobORM.CreateJob(testutils.Context(t), jb) require.NoError(t, err) - runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() assert.Len(t, results.Values, 1) assert.Len(t, results.FatalErrors, 1) assert.Contains(t, results.FatalErrors[0].Error(), pipeline.ErrTooManyErrors.Error()) @@ -409,10 +409,10 @@ answer1 [type=median index=0]; err := jobORM.CreateJob(testutils.Context(t), jb) require.NoError(t, err) - runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + runID, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() assert.Len(t, results.Values, 1) assert.Contains(t, results.FatalErrors[0].Error(), "type cannot be converted to decimal.Decimal") assert.Nil(t, results.Values[0]) @@ -697,9 +697,9 @@ answer1 [type=median index=0]; err := jobORM.CreateJob(ctx, jb) require.NoError(t, err) - _, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + _, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() assert.Nil(t, results.Values[0]) // No task timeout should succeed. @@ -707,9 +707,9 @@ answer1 [type=median index=0]; jb.Name = null.NewString("a job 2", true) err = jobORM.CreateJob(ctx, jb) require.NoError(t, err) - _, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + _, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results = taskResults.FinalResult(logger.TestLogger(t)) + results = taskResults.FinalResult() assert.Equal(t, 10.1, results.Values[0]) assert.Nil(t, results.FatalErrors[0]) @@ -720,9 +720,9 @@ answer1 [type=median index=0]; err = jobORM.CreateJob(ctx, jb) require.NoError(t, err) - _, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + _, taskResults, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - resultsNoFatalErrs := taskResults.FinalResult(logger.TestLogger(t)) + resultsNoFatalErrs := taskResults.FinalResult() assert.NotNil(t, resultsNoFatalErrs.FatalErrors[0]) }) @@ -740,9 +740,9 @@ answer1 [type=median index=0]; err := jobORM.CreateJob(ctx, jb) require.NoError(t, err) - _, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + _, taskResults, err := runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.NoError(t, err) - results := taskResults.FinalResult(logger.TestLogger(t)) + results := taskResults.FinalResult() assert.Len(t, results.Values, 1) assert.Nil(t, results.FatalErrors[0]) assert.Equal(t, "4242", results.Values[0].(decimal.Decimal).String()) @@ -752,7 +752,7 @@ answer1 [type=median index=0]; require.NoError(t, err) // Create another run, it should fail - _, _, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), logger.TestLogger(t), true) + _, _, err = runner.ExecuteAndInsertFinishedRun(testutils.Context(t), *jb.PipelineSpec, pipeline.NewVarsFrom(nil), true) require.Error(t, err) }) } diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index c66f2d31c5..170546287e 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -227,7 +227,7 @@ func (ex *UpkeepExecuter) execute(upkeep UpkeepRegistration, head *evmtypes.Head ex.job.PipelineSpec.DotDagSource = pipeline.KeepersObservationSource run := pipeline.NewRun(*ex.job.PipelineSpec, vars) - if _, err := ex.pr.Run(ctxService, run, svcLogger, true, nil); err != nil { + if _, err := ex.pr.Run(ctxService, run, true, nil); err != nil { svcLogger.Error(errors.Wrap(err, "failed executing run")) return } diff --git a/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go b/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go index 22217b8366..994f043ebc 100644 --- a/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go +++ b/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go @@ -15,7 +15,7 @@ import ( var _ core.PipelineRunnerService = (*PipelineRunnerAdapter)(nil) type pipelineRunner interface { - ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) + ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) } type PipelineRunnerAdapter struct { @@ -45,7 +45,7 @@ func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, var merge(defaultVars, vars.Vars) finalVars := pipeline.NewVarsFrom(defaultVars) - _, trrs, err := p.runner.ExecuteAndInsertFinishedRun(ctx, s, finalVars, p.logger, true) + _, trrs, err := p.runner.ExecuteAndInsertFinishedRun(ctx, s, finalVars, true) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go index a33ab2a1bd..f9c51cfb66 100644 --- a/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go +++ b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go @@ -97,7 +97,7 @@ type mockPipelineRunner struct { vars pipeline.Vars } -func (m *mockPipelineRunner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) { +func (m *mockPipelineRunner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error) { m.spec = spec m.vars = vars // We never attach a run to the mock, so we can't return a runID diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index 336074f81f..9e3bde00dc 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -193,7 +193,7 @@ func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pi }, }) - run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) + run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars) if err != nil { return nil, pipeline.TaskRunResults{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } @@ -227,7 +227,7 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R return nil, err } - finalResult := trrs.FinalResult(ds.lggr) + finalResult := trrs.FinalResult() setEATelemetry(ds, finalResult, trrs, ObservationTimestamp{ Round: timestamp.Round, Epoch: timestamp.Epoch, @@ -310,7 +310,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID) } - value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr)) + value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult()) if err != nil { ds.latestUpdateErr = errors.Wrapf(err, "invalid result") return ds.latestUpdateErr @@ -318,7 +318,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { // update cache values ds.latestTrrs = latestTrrs - ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) + ds.latestResult = ds.latestTrrs.FinalResult() ds.latestUpdateErr = nil // backup in case data source fails continuously and node gets rebooted @@ -402,7 +402,7 @@ func (ds *dataSourceBase) observe(ctx context.Context, timestamp ObservationTime // a db write block that. ds.saver.Save(run) - finalResult := trrs.FinalResult(ds.lggr) + finalResult := trrs.FinalResult() setEATelemetry(&ds.inMemoryDataSource, finalResult, trrs, timestamp) return ds.inMemoryDataSource.parse(finalResult) diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index a15972bbdb..763e50546f 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -3,6 +3,7 @@ package pipeline import ( "context" "errors" + "fmt" "net/url" "reflect" "sort" @@ -229,7 +230,7 @@ func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time { // FinalResult pulls the FinalResult for the pipeline_run from the task runs // It needs to respect the output index of each task -func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult { +func (trrs TaskRunResults) FinalResult() FinalResult { var found bool var fr FinalResult sort.Slice(trrs, func(i, j int) bool { @@ -245,7 +246,7 @@ func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult { } if !found { - l.Panicw("Expected at least one task to be final", "tasks", trrs) + panic(fmt.Sprintf("expected at least one task to be final: %v", trrs)) } return fr } diff --git a/core/services/pipeline/mocks/runner.go b/core/services/pipeline/mocks/runner.go index 222e9ead6d..bb424459ab 100644 --- a/core/services/pipeline/mocks/runner.go +++ b/core/services/pipeline/mocks/runner.go @@ -5,10 +5,8 @@ package mocks import ( context "context" - logger "github.com/smartcontractkit/chainlink/v2/core/logger" - mock "github.com/stretchr/testify/mock" - pipeline "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + mock "github.com/stretchr/testify/mock" sqlutil "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -73,9 +71,9 @@ func (_c *Runner_Close_Call) RunAndReturn(run func() error) *Runner_Close_Call { return _c } -// ExecuteAndInsertFinishedRun provides a mock function with given fields: ctx, spec, vars, l, saveSuccessfulTaskRuns -func (_m *Runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (int64, pipeline.TaskRunResults, error) { - ret := _m.Called(ctx, spec, vars, l, saveSuccessfulTaskRuns) +// ExecuteAndInsertFinishedRun provides a mock function with given fields: ctx, spec, vars, saveSuccessfulTaskRuns +func (_m *Runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool) (int64, pipeline.TaskRunResults, error) { + ret := _m.Called(ctx, spec, vars, saveSuccessfulTaskRuns) if len(ret) == 0 { panic("no return value specified for ExecuteAndInsertFinishedRun") @@ -84,25 +82,25 @@ func (_m *Runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline var r0 int64 var r1 pipeline.TaskRunResults var r2 error - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger, bool) (int64, pipeline.TaskRunResults, error)); ok { - return rf(ctx, spec, vars, l, saveSuccessfulTaskRuns) + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, bool) (int64, pipeline.TaskRunResults, error)); ok { + return rf(ctx, spec, vars, saveSuccessfulTaskRuns) } - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger, bool) int64); ok { - r0 = rf(ctx, spec, vars, l, saveSuccessfulTaskRuns) + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, bool) int64); ok { + r0 = rf(ctx, spec, vars, saveSuccessfulTaskRuns) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger, bool) pipeline.TaskRunResults); ok { - r1 = rf(ctx, spec, vars, l, saveSuccessfulTaskRuns) + if rf, ok := ret.Get(1).(func(context.Context, pipeline.Spec, pipeline.Vars, bool) pipeline.TaskRunResults); ok { + r1 = rf(ctx, spec, vars, saveSuccessfulTaskRuns) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(pipeline.TaskRunResults) } } - if rf, ok := ret.Get(2).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger, bool) error); ok { - r2 = rf(ctx, spec, vars, l, saveSuccessfulTaskRuns) + if rf, ok := ret.Get(2).(func(context.Context, pipeline.Spec, pipeline.Vars, bool) error); ok { + r2 = rf(ctx, spec, vars, saveSuccessfulTaskRuns) } else { r2 = ret.Error(2) } @@ -119,15 +117,14 @@ type Runner_ExecuteAndInsertFinishedRun_Call struct { // - ctx context.Context // - spec pipeline.Spec // - vars pipeline.Vars -// - l logger.Logger // - saveSuccessfulTaskRuns bool -func (_e *Runner_Expecter) ExecuteAndInsertFinishedRun(ctx interface{}, spec interface{}, vars interface{}, l interface{}, saveSuccessfulTaskRuns interface{}) *Runner_ExecuteAndInsertFinishedRun_Call { - return &Runner_ExecuteAndInsertFinishedRun_Call{Call: _e.mock.On("ExecuteAndInsertFinishedRun", ctx, spec, vars, l, saveSuccessfulTaskRuns)} +func (_e *Runner_Expecter) ExecuteAndInsertFinishedRun(ctx interface{}, spec interface{}, vars interface{}, saveSuccessfulTaskRuns interface{}) *Runner_ExecuteAndInsertFinishedRun_Call { + return &Runner_ExecuteAndInsertFinishedRun_Call{Call: _e.mock.On("ExecuteAndInsertFinishedRun", ctx, spec, vars, saveSuccessfulTaskRuns)} } -func (_c *Runner_ExecuteAndInsertFinishedRun_Call) Run(run func(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool)) *Runner_ExecuteAndInsertFinishedRun_Call { +func (_c *Runner_ExecuteAndInsertFinishedRun_Call) Run(run func(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, saveSuccessfulTaskRuns bool)) *Runner_ExecuteAndInsertFinishedRun_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(pipeline.Spec), args[2].(pipeline.Vars), args[3].(logger.Logger), args[4].(bool)) + run(args[0].(context.Context), args[1].(pipeline.Spec), args[2].(pipeline.Vars), args[3].(bool)) }) return _c } @@ -137,14 +134,14 @@ func (_c *Runner_ExecuteAndInsertFinishedRun_Call) Return(runID int64, results p return _c } -func (_c *Runner_ExecuteAndInsertFinishedRun_Call) RunAndReturn(run func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger, bool) (int64, pipeline.TaskRunResults, error)) *Runner_ExecuteAndInsertFinishedRun_Call { +func (_c *Runner_ExecuteAndInsertFinishedRun_Call) RunAndReturn(run func(context.Context, pipeline.Spec, pipeline.Vars, bool) (int64, pipeline.TaskRunResults, error)) *Runner_ExecuteAndInsertFinishedRun_Call { _c.Call.Return(run) return _c } -// ExecuteRun provides a mock function with given fields: ctx, spec, vars, l -func (_m *Runner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error) { - ret := _m.Called(ctx, spec, vars, l) +// ExecuteRun provides a mock function with given fields: ctx, spec, vars +func (_m *Runner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (*pipeline.Run, pipeline.TaskRunResults, error) { + ret := _m.Called(ctx, spec, vars) if len(ret) == 0 { panic("no return value specified for ExecuteRun") @@ -153,27 +150,27 @@ func (_m *Runner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipel var r0 *pipeline.Run var r1 pipeline.TaskRunResults var r2 error - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error)); ok { - return rf(ctx, spec, vars, l) + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars) (*pipeline.Run, pipeline.TaskRunResults, error)); ok { + return rf(ctx, spec, vars) } - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) *pipeline.Run); ok { - r0 = rf(ctx, spec, vars, l) + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars) *pipeline.Run); ok { + r0 = rf(ctx, spec, vars) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*pipeline.Run) } } - if rf, ok := ret.Get(1).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) pipeline.TaskRunResults); ok { - r1 = rf(ctx, spec, vars, l) + if rf, ok := ret.Get(1).(func(context.Context, pipeline.Spec, pipeline.Vars) pipeline.TaskRunResults); ok { + r1 = rf(ctx, spec, vars) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(pipeline.TaskRunResults) } } - if rf, ok := ret.Get(2).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) error); ok { - r2 = rf(ctx, spec, vars, l) + if rf, ok := ret.Get(2).(func(context.Context, pipeline.Spec, pipeline.Vars) error); ok { + r2 = rf(ctx, spec, vars) } else { r2 = ret.Error(2) } @@ -190,14 +187,13 @@ type Runner_ExecuteRun_Call struct { // - ctx context.Context // - spec pipeline.Spec // - vars pipeline.Vars -// - l logger.Logger -func (_e *Runner_Expecter) ExecuteRun(ctx interface{}, spec interface{}, vars interface{}, l interface{}) *Runner_ExecuteRun_Call { - return &Runner_ExecuteRun_Call{Call: _e.mock.On("ExecuteRun", ctx, spec, vars, l)} +func (_e *Runner_Expecter) ExecuteRun(ctx interface{}, spec interface{}, vars interface{}) *Runner_ExecuteRun_Call { + return &Runner_ExecuteRun_Call{Call: _e.mock.On("ExecuteRun", ctx, spec, vars)} } -func (_c *Runner_ExecuteRun_Call) Run(run func(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger)) *Runner_ExecuteRun_Call { +func (_c *Runner_ExecuteRun_Call) Run(run func(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars)) *Runner_ExecuteRun_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(pipeline.Spec), args[2].(pipeline.Vars), args[3].(logger.Logger)) + run(args[0].(context.Context), args[1].(pipeline.Spec), args[2].(pipeline.Vars)) }) return _c } @@ -207,7 +203,7 @@ func (_c *Runner_ExecuteRun_Call) Return(run *pipeline.Run, trrs pipeline.TaskRu return _c } -func (_c *Runner_ExecuteRun_Call) RunAndReturn(run func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error)) *Runner_ExecuteRun_Call { +func (_c *Runner_ExecuteRun_Call) RunAndReturn(run func(context.Context, pipeline.Spec, pipeline.Vars) (*pipeline.Run, pipeline.TaskRunResults, error)) *Runner_ExecuteRun_Call { _c.Call.Return(run) return _c } @@ -587,9 +583,9 @@ func (_c *Runner_ResumeRun_Call) RunAndReturn(run func(context.Context, uuid.UUI return _c } -// Run provides a mock function with given fields: ctx, run, l, saveSuccessfulTaskRuns, fn -func (_m *Runner) Run(ctx context.Context, run *pipeline.Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(sqlutil.DataSource) error) (bool, error) { - ret := _m.Called(ctx, run, l, saveSuccessfulTaskRuns, fn) +// Run provides a mock function with given fields: ctx, run, saveSuccessfulTaskRuns, fn +func (_m *Runner) Run(ctx context.Context, run *pipeline.Run, saveSuccessfulTaskRuns bool, fn func(sqlutil.DataSource) error) (bool, error) { + ret := _m.Called(ctx, run, saveSuccessfulTaskRuns, fn) if len(ret) == 0 { panic("no return value specified for Run") @@ -597,17 +593,17 @@ func (_m *Runner) Run(ctx context.Context, run *pipeline.Run, l logger.Logger, s var r0 bool var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *pipeline.Run, logger.Logger, bool, func(sqlutil.DataSource) error) (bool, error)); ok { - return rf(ctx, run, l, saveSuccessfulTaskRuns, fn) + if rf, ok := ret.Get(0).(func(context.Context, *pipeline.Run, bool, func(sqlutil.DataSource) error) (bool, error)); ok { + return rf(ctx, run, saveSuccessfulTaskRuns, fn) } - if rf, ok := ret.Get(0).(func(context.Context, *pipeline.Run, logger.Logger, bool, func(sqlutil.DataSource) error) bool); ok { - r0 = rf(ctx, run, l, saveSuccessfulTaskRuns, fn) + if rf, ok := ret.Get(0).(func(context.Context, *pipeline.Run, bool, func(sqlutil.DataSource) error) bool); ok { + r0 = rf(ctx, run, saveSuccessfulTaskRuns, fn) } else { r0 = ret.Get(0).(bool) } - if rf, ok := ret.Get(1).(func(context.Context, *pipeline.Run, logger.Logger, bool, func(sqlutil.DataSource) error) error); ok { - r1 = rf(ctx, run, l, saveSuccessfulTaskRuns, fn) + if rf, ok := ret.Get(1).(func(context.Context, *pipeline.Run, bool, func(sqlutil.DataSource) error) error); ok { + r1 = rf(ctx, run, saveSuccessfulTaskRuns, fn) } else { r1 = ret.Error(1) } @@ -623,16 +619,15 @@ type Runner_Run_Call struct { // Run is a helper method to define mock.On call // - ctx context.Context // - run *pipeline.Run -// - l logger.Logger // - saveSuccessfulTaskRuns bool // - fn func(sqlutil.DataSource) error -func (_e *Runner_Expecter) Run(ctx interface{}, run interface{}, l interface{}, saveSuccessfulTaskRuns interface{}, fn interface{}) *Runner_Run_Call { - return &Runner_Run_Call{Call: _e.mock.On("Run", ctx, run, l, saveSuccessfulTaskRuns, fn)} +func (_e *Runner_Expecter) Run(ctx interface{}, run interface{}, saveSuccessfulTaskRuns interface{}, fn interface{}) *Runner_Run_Call { + return &Runner_Run_Call{Call: _e.mock.On("Run", ctx, run, saveSuccessfulTaskRuns, fn)} } -func (_c *Runner_Run_Call) Run(run func(ctx context.Context, run *pipeline.Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(sqlutil.DataSource) error)) *Runner_Run_Call { +func (_c *Runner_Run_Call) Run(run func(ctx context.Context, run *pipeline.Run, saveSuccessfulTaskRuns bool, fn func(sqlutil.DataSource) error)) *Runner_Run_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*pipeline.Run), args[2].(logger.Logger), args[3].(bool), args[4].(func(sqlutil.DataSource) error)) + run(args[0].(context.Context), args[1].(*pipeline.Run), args[2].(bool), args[3].(func(sqlutil.DataSource) error)) }) return _c } @@ -642,7 +637,7 @@ func (_c *Runner_Run_Call) Return(incomplete bool, err error) *Runner_Run_Call { return _c } -func (_c *Runner_Run_Call) RunAndReturn(run func(context.Context, *pipeline.Run, logger.Logger, bool, func(sqlutil.DataSource) error) (bool, error)) *Runner_Run_Call { +func (_c *Runner_Run_Call) RunAndReturn(run func(context.Context, *pipeline.Run, bool, func(sqlutil.DataSource) error) (bool, error)) *Runner_Run_Call { _c.Call.Return(run) return _c } diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index a66feca37d..586bb7738f 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -34,12 +34,12 @@ type Runner interface { // Run is a blocking call that will execute the run until no further progress can be made. // If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in. // Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks. - Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx sqlutil.DataSource) error) (incomplete bool, err error) + Run(ctx context.Context, run *Run, saveSuccessfulTaskRuns bool, fn func(tx sqlutil.DataSource) error) (incomplete bool, err error) ResumeRun(ctx context.Context, taskID uuid.UUID, value interface{}, err error) error // ExecuteRun executes a new run in-memory according to a spec and returns the results. // We expect spec.JobID and spec.JobName to be set for logging/prometheus. - ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run *Run, trrs TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec Spec, vars Vars) (run *Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. // ds is an optional override, for example when executing a transaction. InsertFinishedRun(ctx context.Context, ds sqlutil.DataSource, run *Run, saveSuccessfulTaskRuns bool) error @@ -49,7 +49,7 @@ type Runner interface { // It is a combination of ExecuteRun and InsertFinishedRun. // Note that the spec MUST have a DOT graph for this to work. // This will persist the Spec in the DB if it doesn't have an ID. - ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results TaskRunResults, err error) + ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, saveSuccessfulTaskRuns bool) (runID int64, results TaskRunResults, err error) OnRunFinished(func(*Run)) InitializePipeline(spec Spec) (*Pipeline, error) @@ -288,12 +288,7 @@ func overtimeContext(ctx context.Context) (context.Context, context.CancelFunc) return context.WithoutCancel(ctx), func() {} } -func (r *runner) ExecuteRun( - ctx context.Context, - spec Spec, - vars Vars, - l logger.Logger, -) (*Run, TaskRunResults, error) { +func (r *runner) ExecuteRun(ctx context.Context, spec Spec, vars Vars) (*Run, TaskRunResults, error) { // Pipeline runs may return results after the context is cancelled, so we modify the // deadline to give them time to return before the parent context deadline. var cancel func() @@ -318,7 +313,7 @@ func (r *runner) ExecuteRun( } run := NewRun(spec, vars) - taskRunResults := r.run(ctx, pipeline, run, vars, l) + taskRunResults := r.run(ctx, pipeline, run, vars) if run.Pending { return run, nil, fmt.Errorf("unexpected async run for spec ID %v, tried executing via ExecuteRun", spec.ID) @@ -380,8 +375,8 @@ func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) { return pipeline, nil } -func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars, l logger.Logger) TaskRunResults { - l = l.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName) +func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars) TaskRunResults { + l := r.lggr.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName) l.Debug("Initiating tasks for pipeline run of spec") scheduler := newScheduler(pipeline, run, vars, l) @@ -608,8 +603,8 @@ func logTaskRunToPrometheus(trr TaskRunResult, spec Spec) { } // ExecuteAndInsertFinishedRun executes a run in memory then inserts the finished run/task run records, returning the final result -func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results TaskRunResults, err error) { - run, trrs, err := r.ExecuteRun(ctx, spec, vars, l) +func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, saveSuccessfulTaskRuns bool) (runID int64, results TaskRunResults, err error) { + run, trrs, err := r.ExecuteRun(ctx, spec, vars) if err != nil { return 0, trrs, pkgerrors.Wrapf(err, "error executing run for spec ID %v", spec.ID) } @@ -630,7 +625,7 @@ func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, var return run.ID, trrs, nil } -func (r *runner) Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx sqlutil.DataSource) error) (incomplete bool, err error) { +func (r *runner) Run(ctx context.Context, run *Run, saveSuccessfulTaskRuns bool, fn func(tx sqlutil.DataSource) error) (incomplete bool, err error) { pipeline, err := r.InitializePipeline(run.PipelineSpec) if err != nil { return false, err @@ -681,7 +676,7 @@ func (r *runner) Run(ctx context.Context, run *Run, l logger.Logger, saveSuccess } for { - r.run(ctx, pipeline, run, NewVarsFrom(run.Inputs.Val.(map[string]interface{})), l) + r.run(ctx, pipeline, run, NewVarsFrom(run.Inputs.Val.(map[string]interface{}))) if preinsert { // FailSilently = run failed and task was marked failEarly. skip StoreRun and instead delete all trace of it @@ -737,7 +732,9 @@ func (r *runner) ResumeRun(ctx context.Context, taskID uuid.UUID, value interfac if start { // start the runner again go func() { - if _, err := r.Run(context.Background(), &run, r.lggr, false, nil); err != nil { + ctx, cancel := r.chStop.NewCtx() + defer cancel() + if _, err := r.Run(ctx, &run, false, nil); err != nil { r.lggr.Errorw("Resume run failure", "err", err) } r.lggr.Debug("Resume run success") @@ -799,7 +796,7 @@ func (r *runner) scheduleUnfinishedRuns() { go func() { defer wgRunsDone.Done() - _, err := r.Run(ctx, &run, r.lggr, false, nil) + _, err := r.Run(ctx, &run, false, nil) if ctx.Err() != nil { return } else if err != nil { diff --git a/core/services/pipeline/runner_test.go b/core/services/pipeline/runner_test.go index dddc84e736..022a77c947 100644 --- a/core/services/pipeline/runner_test.go +++ b/core/services/pipeline/runner_test.go @@ -106,12 +106,11 @@ ds5 [type=http method="GET" url="%s" index=2] spec := pipeline.Spec{DotDagSource: s} vars := pipeline.NewVarsFrom(nil) - lggr := logger.TestLogger(t) - _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, len(d.Tasks)) - finalResults := trrs.FinalResult(lggr) + finalResults := trrs.FinalResult() require.Len(t, finalResults.Values, 3) require.Len(t, finalResults.AllErrors, 12) require.Len(t, finalResults.FatalErrors, 3) @@ -282,7 +281,7 @@ func Test_PipelineRunner_ExecuteTaskRunsWithVars(t *testing.T) { spec := pipeline.Spec{ DotDagSource: specStr, } - _, taskRunResults, err := runner.ExecuteRun(testutils.Context(t), spec, pipeline.NewVarsFrom(test.vars), logger.TestLogger(t)) + _, taskRunResults, err := runner.ExecuteRun(testutils.Context(t), spec, pipeline.NewVarsFrom(test.vars)) require.NoError(t, err) require.Len(t, taskRunResults, len(p.Tasks)) @@ -367,12 +366,11 @@ func Test_PipelineRunner_CBORParse(t *testing.T) { } vars := pipeline.NewVarsFrom(global) - lggr := logger.TestLogger(t) - _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, len(d.Tasks)) - finalResults := trrs.FinalResult(lggr) + finalResults := trrs.FinalResult() require.Len(t, finalResults.Values, 1) assert.Equal(t, make(map[string]interface{}), finalResults.Values[0]) require.Len(t, finalResults.FatalErrors, 1) @@ -395,12 +393,11 @@ func Test_PipelineRunner_CBORParse(t *testing.T) { } vars := pipeline.NewVarsFrom(global) - lggr := logger.TestLogger(t) - _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, len(d.Tasks)) - finalResults := trrs.FinalResult(lggr) + finalResults := trrs.FinalResult() require.Len(t, finalResults.Values, 1) assert.Equal(t, "foo", finalResults.Values[0]) require.Len(t, finalResults.FatalErrors, 1) @@ -450,7 +447,7 @@ answer1 [type=median index=0]; spec := pipeline.Spec{DotDagSource: s} vars := pipeline.NewVarsFrom(nil) - _, trrs, err := r.ExecuteRun(ctx, spec, vars, logger.TestLogger(t)) + _, trrs, err := r.ExecuteRun(ctx, spec, vars) require.NoError(t, err) for _, trr := range trrs { if trr.IsTerminal() { @@ -490,8 +487,8 @@ succeed2 -> final; `} vars := pipeline.NewVarsFrom(nil) - _, taskResults, err := r.ExecuteAndInsertFinishedRun(testutils.Context(t), spec, vars, lggr, false) - finalResult := taskResults.FinalResult(lggr) + _, taskResults, err := r.ExecuteAndInsertFinishedRun(testutils.Context(t), spec, vars, false) + finalResult := taskResults.FinalResult() require.NoError(t, err) assert.True(t, finalResult.HasErrors()) assert.False(t, finalResult.HasFatalErrors()) @@ -529,8 +526,8 @@ succeed2 -> final; `} vars := pipeline.NewVarsFrom(nil) - _, taskResults, err := r.ExecuteAndInsertFinishedRun(testutils.Context(t), spec, vars, lggr, false) - finalResult := taskResults.FinalResult(lggr) + _, taskResults, err := r.ExecuteAndInsertFinishedRun(testutils.Context(t), spec, vars, false) + finalResult := taskResults.FinalResult() require.NoError(t, err) assert.True(t, finalResult.HasErrors()) assert.False(t, finalResult.HasFatalErrors()) @@ -544,7 +541,6 @@ func Test_PipelineRunner_MultipleOutputs(t *testing.T) { btORM := bridgesMocks.NewORM(t) r, _ := newRunner(t, db, btORM, cfg) input := map[string]interface{}{"val": 2} - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=multiply input="$(val)" times=2] @@ -553,16 +549,16 @@ b2 [type=multiply input="$(a)" times=3] c [type=median values=<[ $(b1), $(b2) ]> index=0] a->b1->c; a->b2->c;`, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 4, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) // a = 4 // (b1 = 8) + (b2 = 12) // c = 20 / 2 - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, mustDecimal(t, "10").String(), result.Value.(decimal.Decimal).String()) } @@ -572,7 +568,6 @@ func Test_PipelineRunner_MultipleTerminatingOutputs(t *testing.T) { btORM := bridgesMocks.NewORM(t) r, _ := newRunner(t, pgtest.NewSqlxDB(t), btORM, cfg) input := map[string]interface{}{"val": 2} - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=multiply input="$(val)" times=2] @@ -580,10 +575,10 @@ b1 [type=multiply input="$(a)" times=2 index=0] b2 [type=multiply input="$(a)" times=3 index=1] a->b1; a->b2;`, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 3, len(trrs)) - result := trrs.FinalResult(lggr) + result := trrs.FinalResult() assert.Equal(t, false, result.HasFatalErrors()) assert.Equal(t, mustDecimal(t, "8").String(), result.Values[0].(decimal.Decimal).String()) @@ -672,8 +667,7 @@ ds5 [type=http method="GET" url="%s" index=2] run.ID = 1 // give it a valid "id" }).Once() orm.On("StoreRun", mock.Anything, mock.AnythingOfType("*pipeline.Run")).Return(false, nil).Once() - lggr := logger.TestLogger(t) - incomplete, err := r.Run(testutils.Context(t), run, lggr, false, nil) + incomplete, err := r.Run(testutils.Context(t), run, false, nil) require.NoError(t, err) require.Len(t, run.PipelineTaskRuns, 9) // 3 tasks are suspended: ds1_parse, ds1_multiply, median. ds1 is present, but contains ErrPending require.Equal(t, true, incomplete) // still incomplete @@ -682,7 +676,7 @@ ds5 [type=http method="GET" url="%s" index=2] // Trigger run resumption with no new data orm.On("StoreRun", mock.Anything, mock.AnythingOfType("*pipeline.Run")).Return(false, nil).Once() - incomplete, err = r.Run(testutils.Context(t), run, lggr, false, nil) + incomplete, err = r.Run(testutils.Context(t), run, false, nil) require.NoError(t, err) require.Equal(t, true, incomplete) // still incomplete @@ -695,7 +689,7 @@ ds5 [type=http method="GET" url="%s" index=2] } // Trigger run resumption orm.On("StoreRun", mock.Anything, mock.AnythingOfType("*pipeline.Run")).Return(false, nil).Once() - incomplete, err = r.Run(testutils.Context(t), run, lggr, false, nil) + incomplete, err = r.Run(testutils.Context(t), run, false, nil) require.NoError(t, err) require.Equal(t, false, incomplete) // done require.Len(t, run.PipelineTaskRuns, 12) @@ -815,7 +809,7 @@ ds5 [type=http method="GET" url="%s" index=2] }).Once() // StoreRun is called again to store the final result orm.On("StoreRun", mock.Anything, mock.AnythingOfType("*pipeline.Run")).Return(false, nil).Once() - incomplete, err := r.Run(testutils.Context(t), run, logger.TestLogger(t), false, nil) + incomplete, err := r.Run(testutils.Context(t), run, false, nil) require.NoError(t, err) require.Len(t, run.PipelineTaskRuns, 12) require.Equal(t, false, incomplete) // run is complete @@ -849,17 +843,16 @@ func Test_PipelineRunner_LowercaseOutputs(t *testing.T) { "first": "camelCase", "second": "UPPERCASE", } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=lowercase input="$(first)"] `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 1, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, "camelcase", result.Value.(string)) } @@ -872,17 +865,16 @@ func Test_PipelineRunner_UppercaseOutputs(t *testing.T) { input := map[string]interface{}{ "first": "somerAnDomTEST", } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=uppercase input="$(first)"] `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 1, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, "SOMERANDOMTEST", result.Value.(string)) } @@ -895,17 +887,16 @@ func Test_PipelineRunner_HexDecodeOutputs(t *testing.T) { input := map[string]interface{}{ "astring": "0x12345678", } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=hexdecode input="$(astring)"] `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 1, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, []byte{0x12, 0x34, 0x56, 0x78}, result.Value) } @@ -919,19 +910,18 @@ func Test_PipelineRunner_HexEncodeAndDecode(t *testing.T) { input := map[string]interface{}{ "input_val": inputBytes, } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` en [type=hexencode input="$(input_val)"] de [type=hexdecode] en->de `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 2, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, inputBytes, result.Value) } @@ -944,17 +934,16 @@ func Test_PipelineRunner_Base64DecodeOutputs(t *testing.T) { input := map[string]interface{}{ "astring": "SGVsbG8sIHBsYXlncm91bmQ=", } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` a [type=base64decode input="$(astring)"] `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 1, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, []byte("Hello, playground"), result.Value) } @@ -968,19 +957,18 @@ func Test_PipelineRunner_Base64EncodeAndDecode(t *testing.T) { input := map[string]interface{}{ "input_val": inputBytes, } - lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), pipeline.Spec{ DotDagSource: ` en [type=base64encode input="$(input_val)"] de [type=base64decode] en->de `, - }, pipeline.NewVarsFrom(input), lggr) + }, pipeline.NewVarsFrom(input)) require.NoError(t, err) require.Equal(t, 2, len(trrs)) - assert.Equal(t, false, trrs.FinalResult(lggr).HasFatalErrors()) + assert.Equal(t, false, trrs.FinalResult().HasFatalErrors()) - result, err := trrs.FinalResult(lggr).SingularResult() + result, err := trrs.FinalResult().SingularResult() require.NoError(t, err) assert.Equal(t, inputBytes, result.Value) } @@ -1003,7 +991,7 @@ succeed; spec := pipeline.Spec{DotDagSource: fmt.Sprintf(template, 1)} vars := pipeline.NewVarsFrom(nil) - _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, 1) assert.Equal(t, "1", trrs[0].Result.Value.(pipeline.ObjectParam).DecimalValue.Decimal().String()) @@ -1018,7 +1006,7 @@ succeed; // even though this is set to 2, it should use the cached version spec.DotDagSource = fmt.Sprintf(template, 2) - _, trrs, err = r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err = r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, 1) assert.Equal(t, "1", trrs[0].Result.Value.(pipeline.ObjectParam).DecimalValue.Decimal().String()) diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index b076148c28..d7519232eb 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -1258,8 +1258,7 @@ ds [type=bridge name="adapter-error-bridge" timeout="50ms" requestData="{\"data\ spec := pipeline.Spec{DotDagSource: dag} vars := pipeline.NewVarsFrom(nil) - lggr := logger.TestLogger(t) - _, trrs, err := r.ExecuteRun(ctx, spec, vars, lggr) + _, trrs, err := r.ExecuteRun(ctx, spec, vars) require.NoError(t, err) require.Len(t, trrs, 1) diff --git a/core/services/pipeline/task.divide_test.go b/core/services/pipeline/task.divide_test.go index 8eb8e4de06..e13f451279 100644 --- a/core/services/pipeline/task.divide_test.go +++ b/core/services/pipeline/task.divide_test.go @@ -220,8 +220,7 @@ ds1 -> div_by_ds2 -> multiply; spec := pipeline.Spec{DotDagSource: dag} vars := pipeline.NewVarsFrom(nil) - lggr := logger.TestLogger(t) - _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) require.NoError(t, err) require.Len(t, trrs, 4) diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index f553ba9850..44be1377ae 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -13,7 +13,7 @@ type MockRunner struct { Err error } -func (m *MockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { +func (m *MockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { return &pipeline.Run{ID: 42}, m.Trrs, m.Err } diff --git a/core/services/relay/evm/mercury/v1/data_source.go b/core/services/relay/evm/mercury/v1/data_source.go index 7f41bd1e36..5a9a11deca 100644 --- a/core/services/relay/evm/mercury/v1/data_source.go +++ b/core/services/relay/evm/mercury/v1/data_source.go @@ -45,7 +45,7 @@ var ( const nBlocksObservation int = v1.MaxAllowedBlocks type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } // Fetcher fetcher data from Mercury server @@ -283,7 +283,7 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T }, }) - run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) + run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars) if err != nil { return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } diff --git a/core/services/relay/evm/mercury/v2/data_source.go b/core/services/relay/evm/mercury/v2/data_source.go index 7c2d6424fa..28487ec714 100644 --- a/core/services/relay/evm/mercury/v2/data_source.go +++ b/core/services/relay/evm/mercury/v2/data_source.go @@ -26,7 +26,7 @@ import ( ) type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } type LatestReportFetcher interface { @@ -228,7 +228,7 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T }, }) - run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) + run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars) if err != nil { return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } diff --git a/core/services/relay/evm/mercury/v3/data_source.go b/core/services/relay/evm/mercury/v3/data_source.go index befe513d22..644f4e775e 100644 --- a/core/services/relay/evm/mercury/v3/data_source.go +++ b/core/services/relay/evm/mercury/v3/data_source.go @@ -28,7 +28,7 @@ import ( const adapterLWBAErrorName = "AdapterLWBAError" type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } type LatestReportFetcher interface { @@ -281,7 +281,7 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T }, }) - run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) + run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars) if err != nil { return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } diff --git a/core/services/streams/stream.go b/core/services/streams/stream.go index cb168c11bc..8825cd3b34 100644 --- a/core/services/streams/stream.go +++ b/core/services/streams/stream.go @@ -12,7 +12,7 @@ import ( ) type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) InitializePipeline(spec pipeline.Spec) (*pipeline.Pipeline, error) } @@ -87,7 +87,7 @@ func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRu }, }) - run, trrs, err := s.runner.ExecuteRun(ctx, *s.spec, vars, s.lggr) + run, trrs, err := s.runner.ExecuteRun(ctx, *s.spec, vars) if err != nil { return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) } diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go index 3e8f58cd58..61e5187880 100644 --- a/core/services/streams/stream_test.go +++ b/core/services/streams/stream_test.go @@ -26,7 +26,7 @@ type mockRunner struct { err error } -func (m *mockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { +func (m *mockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { return m.run, m.trrs, m.err } func (m *mockRunner) InitializePipeline(spec pipeline.Spec) (p *pipeline.Pipeline, err error) { diff --git a/core/services/vrf/v1/listener_v1.go b/core/services/vrf/v1/listener_v1.go index c802940308..74b522d408 100644 --- a/core/services/vrf/v1/listener_v1.go +++ b/core/services/vrf/v1/listener_v1.go @@ -484,7 +484,7 @@ func (lsn *Listener) ProcessRequest(ctx context.Context, req request) bool { run := pipeline.NewRun(*lsn.Job.PipelineSpec, vars) // The VRF pipeline has no async tasks, so we don't need to check for `incomplete` - if _, err = lsn.PipelineRunner.Run(ctx, run, lggr, true, func(tx sqlutil.DataSource) error { + if _, err = lsn.PipelineRunner.Run(ctx, run, true, func(tx sqlutil.DataSource) error { // Always mark consumed regardless of whether the proof failed or not. if err = lsn.Chain.LogBroadcaster().MarkConsumed(ctx, tx, req.lb); err != nil { lggr.Errorw("Failed mark consumed", "err", err) diff --git a/core/services/vrf/v2/listener_v2_log_processor.go b/core/services/vrf/v2/listener_v2_log_processor.go index 9408768d4d..3e9acf7760 100644 --- a/core/services/vrf/v2/listener_v2_log_processor.go +++ b/core/services/vrf/v2/listener_v2_log_processor.go @@ -1126,7 +1126,7 @@ func (lsn *listenerV2) simulateFulfillment( }, }) var trrs pipeline.TaskRunResults - res.run, trrs, err = lsn.pipelineRunner.ExecuteRun(ctx, *lsn.job.PipelineSpec, vars, lg) + res.run, trrs, err = lsn.pipelineRunner.ExecuteRun(ctx, *lsn.job.PipelineSpec, vars) if err != nil { res.err = fmt.Errorf("executing run: %w", err) return res @@ -1168,7 +1168,7 @@ func (lsn *listenerV2) simulateFulfillment( return res } - finalResult := trrs.FinalResult(lg) + finalResult := trrs.FinalResult() if len(finalResult.Values) != 1 { res.err = errors.Errorf("unexpected number of outputs, expected 1, was %d", len(finalResult.Values)) return res diff --git a/core/services/webhook/delegate.go b/core/services/webhook/delegate.go index 690ae38d08..2c6d597f2a 100644 --- a/core/services/webhook/delegate.go +++ b/core/services/webhook/delegate.go @@ -179,7 +179,7 @@ func (r *webhookJobRunner) RunJob(ctx context.Context, jobUUID uuid.UUID, reques run := pipeline.NewRun(*spec.PipelineSpec, vars) - _, err := r.runner.Run(ctx, run, jobLggr, true, nil) + _, err := r.runner.Run(ctx, run, true, nil) if err != nil { jobLggr.Errorw("Error running pipeline for webhook job", "err", err) return 0, err