diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index f673e1caf1d..855ac7d9940 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -117,10 +117,11 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, defer wg.Done() val, err := oc.Observe(ctx, streamID, opts) if err != nil { - if errors.As(err, &ErrMissingStream{}) { - promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() + strmIDStr := strconv.FormatUint(uint64(streamID), 10) + if errors.As(err, &MissingStreamError{}) { + promMissingStreamCount.WithLabelValues(strmIDStr).Inc() } - promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc() + promObservationErrorCount.WithLabelValues(strmIDStr).Inc() mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) mu.Unlock() diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 87370d4442a..3f9a4b48fbe 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -186,7 +186,7 @@ func Test_DataSource(t *testing.T) { vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, llo.StreamValues{ 2: llo.ToDecimal(decimal.NewFromInt(40602)), @@ -205,7 +205,7 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, 1, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Nil(t, pkt.val) - assert.NotNil(t, pkt.err) + assert.Error(t, pkt.err) }) }) } diff --git a/core/services/llo/observation_context.go b/core/services/llo/observation_context.go index 4ae4485d9e7..ab022452629 100644 --- a/core/services/llo/observation_context.go +++ b/core/services/llo/observation_context.go @@ -61,7 +61,7 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre // Extract stream value based on streamID attribute for _, trr := range trrs { if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID { - val, err := resultToStreamValue(trr.Result.Value) + val, err = resultToStreamValue(trr.Result.Value) if err != nil { return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err) } @@ -148,18 +148,18 @@ func toDecimal(val interface{}) (decimal.Decimal, error) { return utils.ToDecimal(val) } -type ErrMissingStream struct { +type MissingStreamError struct { StreamID streams.StreamID } -func (e ErrMissingStream) Error() string { +func (e MissingStreamError) Error() string { return fmt.Sprintf("no pipeline for stream: %d", e.StreamID) } func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) { strm, exists := oc.r.Get(streamID) if !exists { - return nil, nil, ErrMissingStream{StreamID: streamID} + return nil, nil, MissingStreamError{StreamID: streamID} } // In case of multiple streamIDs per pipeline then the diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 6b00766c87f..67af24c2a7b 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -123,7 +123,7 @@ type mockPipelineConfig struct{} func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 } func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration { - return *commonconfig.MustNewDuration(time.Duration(1 * time.Hour)) + return *commonconfig.MustNewDuration(1 * time.Hour) } func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour } func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 } @@ -147,12 +147,16 @@ func createBridge(t *testing.T, name string, val string, borm bridges.ORM) { t.Fatal("expected only one call to the bridge") } _, herr := io.ReadAll(req.Body) - require.NoError(t, herr) + if herr != nil { + t.Fatal(herr) + } res.WriteHeader(http.StatusOK) resp := fmt.Sprintf(`{"result": %s}`, val) _, herr = res.Write([]byte(resp)) - require.NoError(t, herr) + if herr != nil { + t.Fatal(herr) + } })) t.Cleanup(bridge.Close) u, _ := url.Parse(bridge.URL) diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index 2dc031a0338..9fa6c25d36d 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -39,7 +41,7 @@ func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (*multiStreamPipeline, error) { if jb.PipelineSpec == nil { // should never happen - return nil, fmt.Errorf("job has no pipeline spec") + return nil, errors.New("job has no pipeline spec") } spec := *jb.PipelineSpec if spec.Pipeline == nil { diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index c0affc5b230..8a8abfc89b5 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -61,7 +61,7 @@ func Test_Registry(t *testing.T) { sr := newRegistry(lggr, runner) // registers new pipeline with multiple stream IDs - assert.Len(t, sr.pipelines, 0) + assert.Empty(t, sr.pipelines) // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) // TODO: what if the dag is unparseable? // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) @@ -96,15 +96,13 @@ result3 -> result3_parse -> multiply3; err = sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` result1 [type=memo value="900.0022"]; `}}, nil) - require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 100; it is already registered") + require.EqualError(t, err, "cannot register job with ID: 100; it is already registered") // errors when attempt to register a new job with duplicates stream IDs within ig err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(100)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` result1 [type=memo value="900.0022" streamID=100]; `}}, nil) - require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100") + require.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100") // errors with unparseable pipeline err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil) diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go index 0c248d91ae1..905d421bddf 100644 --- a/core/services/streams/stream_test.go +++ b/core/services/streams/stream_test.go @@ -67,21 +67,14 @@ func Test_Stream(t *testing.T) { }) jb := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: ` -result1 [type=memo value="900.0022" streamID=124]; +succeed [type=memo value=42 streamID=124]; +succeed; `}} t.Run("Run", func(t *testing.T) { strm, err := newMultiStreamPipeline(lggr, jb, runner, nil) require.NoError(t, err) - jb.PipelineSpec.DotDagSource = ` -succeed [type=memo value=42] -succeed; -` - - strm, err = newMultiStreamPipeline(lggr, jb, runner, nil) - require.NoError(t, err) - t.Run("executes the pipeline (success)", func(t *testing.T) { runner.run = &pipeline.Run{ID: 42} runner.trrs = []pipeline.TaskRunResult{pipeline.TaskRunResult{ID: UUID}}