Skip to content

Commit

Permalink
Fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 16, 2024
1 parent 70f90a3 commit 8d8e72b
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 27 deletions.
7 changes: 4 additions & 3 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
defer wg.Done()
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
if errors.As(err, &ErrMissingStream{}) {
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
strmIDStr := strconv.FormatUint(uint64(streamID), 10)
if errors.As(err, &MissingStreamError{}) {
promMissingStreamCount.WithLabelValues(strmIDStr).Inc()
}
promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc()
promObservationErrorCount.WithLabelValues(strmIDStr).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Test_DataSource(t *testing.T) {

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)
require.NoError(t, err)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
Expand All @@ -205,7 +205,7 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, 1, int(pkt.streamID))
assert.Equal(t, opts, pkt.opts)
assert.Nil(t, pkt.val)
assert.NotNil(t, pkt.err)
assert.Error(t, pkt.err)
})
})
}
8 changes: 4 additions & 4 deletions core/services/llo/observation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre
// Extract stream value based on streamID attribute
for _, trr := range trrs {
if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID {
val, err := resultToStreamValue(trr.Result.Value)
val, err = resultToStreamValue(trr.Result.Value)
if err != nil {
return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err)
}
Expand Down Expand Up @@ -148,18 +148,18 @@ func toDecimal(val interface{}) (decimal.Decimal, error) {
return utils.ToDecimal(val)
}

type ErrMissingStream struct {
type MissingStreamError struct {
StreamID streams.StreamID
}

func (e ErrMissingStream) Error() string {
func (e MissingStreamError) Error() string {
return fmt.Sprintf("no pipeline for stream: %d", e.StreamID)
}

func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) {
strm, exists := oc.r.Get(streamID)
if !exists {
return nil, nil, ErrMissingStream{StreamID: streamID}
return nil, nil, MissingStreamError{StreamID: streamID}
}

// In case of multiple streamIDs per pipeline then the
Expand Down
10 changes: 7 additions & 3 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type mockPipelineConfig struct{}

func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 }
func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration {
return *commonconfig.MustNewDuration(time.Duration(1 * time.Hour))
return *commonconfig.MustNewDuration(1 * time.Hour)
}
func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour }
func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 }
Expand All @@ -147,12 +147,16 @@ func createBridge(t *testing.T, name string, val string, borm bridges.ORM) {
t.Fatal("expected only one call to the bridge")
}
_, herr := io.ReadAll(req.Body)
require.NoError(t, herr)
if herr != nil {
t.Fatal(herr)
}

res.WriteHeader(http.StatusOK)
resp := fmt.Sprintf(`{"result": %s}`, val)
_, herr = res.Write([]byte(resp))
require.NoError(t, herr)
if herr != nil {
t.Fatal(herr)
}
}))
t.Cleanup(bridge.Close)
u, _ := url.Parse(bridge.URL)
Expand Down
4 changes: 3 additions & 1 deletion core/services/streams/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -39,7 +41,7 @@ func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (*multiStreamPipeline, error) {
if jb.PipelineSpec == nil {
// should never happen
return nil, fmt.Errorf("job has no pipeline spec")
return nil, errors.New("job has no pipeline spec")
}
spec := *jb.PipelineSpec
if spec.Pipeline == nil {
Expand Down
8 changes: 3 additions & 5 deletions core/services/streams/stream_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_Registry(t *testing.T) {
sr := newRegistry(lggr, runner)

// registers new pipeline with multiple stream IDs
assert.Len(t, sr.pipelines, 0)
assert.Empty(t, sr.pipelines)
// err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil)
// TODO: what if the dag is unparseable?
// err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
Expand Down Expand Up @@ -96,15 +96,13 @@ result3 -> result3_parse -> multiply3;
err = sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022"];
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 100; it is already registered")
require.EqualError(t, err, "cannot register job with ID: 100; it is already registered")

// errors when attempt to register a new job with duplicates stream IDs within ig
err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(100)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022" streamID=100];
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100")
require.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100")

// errors with unparseable pipeline
err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil)
Expand Down
11 changes: 2 additions & 9 deletions core/services/streams/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,14 @@ func Test_Stream(t *testing.T) {
})

jb := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: `
result1 [type=memo value="900.0022" streamID=124];
succeed [type=memo value=42 streamID=124];
succeed;
`}}

t.Run("Run", func(t *testing.T) {
strm, err := newMultiStreamPipeline(lggr, jb, runner, nil)
require.NoError(t, err)

jb.PipelineSpec.DotDagSource = `
succeed [type=memo value=42]
succeed;
`

strm, err = newMultiStreamPipeline(lggr, jb, runner, nil)
require.NoError(t, err)

t.Run("executes the pipeline (success)", func(t *testing.T) {
runner.run = &pipeline.Run{ID: 42}
runner.trrs = []pipeline.TaskRunResult{pipeline.TaskRunResult{ID: UUID}}
Expand Down

0 comments on commit 8d8e72b

Please sign in to comment.