Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 16, 2024
1 parent 4bf0bf7 commit 70f90a3
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 53 deletions.
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/mocks/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false }
func (m *MockTask) TaskRetries() uint32 { return 0 }
func (m *MockTask) TaskMinBackoff() time.Duration { return 0 }
func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 }
func (m *MockTask) TaskStreamID() *int32 { return nil }
func (m *MockTask) TaskStreamID() *uint32 { return nil }
2 changes: 1 addition & 1 deletion core/services/streams/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ answer1 [type=median index=0];
"""
`,
assertion: func(t *testing.T, jb job.Job, err error) {
assert.EqualError(t, err, "jobs of type 'stream' require streamID to be specified")
assert.EqualError(t, err, "no streamID found in spec (must be either specified as top-level key 'streamID' or at least one streamID tag must be provided in the pipeline)")
},
},
}
Expand Down
14 changes: 14 additions & 0 deletions core/services/streams/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
if jb.StreamID != nil {
streamIDs = append(streamIDs, *jb.StreamID)
}
if err := validateStreamIDs(streamIDs); err != nil {
return nil, fmt.Errorf("invalid stream IDs: %w", err)
}
vars := pipeline.NewVarsFrom(map[string]interface{}{
"pipelineSpec": map[string]interface{}{
"id": jb.PipelineSpecID,
Expand All @@ -83,6 +86,17 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
vars}, nil
}

func validateStreamIDs(streamIDs []StreamID) error {
seen := make(map[StreamID]struct{})
for _, id := range streamIDs {
if _, ok := seen[id]; ok {
return fmt.Errorf("duplicate stream ID: %v", id)
}
seen[id] = struct{}{}
}
return nil
}

func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) {
run, trrs, err = s.executeRun(ctx)

Expand Down
15 changes: 8 additions & 7 deletions core/services/streams/stream_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,23 @@ func (s *streamRegistry) Register(jb job.Job, rrs ResultRunSaver) error {
if jb.Type != job.Stream {
return fmt.Errorf("cannot register job type %s; only Stream jobs are supported", jb.Type)
}
p, err := NewMultiStreamPipeline(s.lggr, jb, s.runner, rrs)
if err != nil {
return fmt.Errorf("cannot register job with ID: %d; %w", jb.ID, err)
}
s.Lock()
defer s.Unlock()
if _, exists := s.pipelinesByJobID[jb.ID]; exists {
return fmt.Errorf("cannot register job with ID: %d; it is already registered", jb.ID)
}
p, err := NewMultiStreamPipeline(s.lggr, jb, s.runner, rrs)
if err != nil {
return fmt.Errorf("cannot register job with ID: %d; %w", jb.ID, err)
for _, strmID := range p.StreamIDs() {
if _, exists := s.pipelines[strmID]; exists {
return fmt.Errorf("cannot register job with ID: %d; stream id %d is already registered", jb.ID, strmID)
}
}
s.pipelinesByJobID[jb.ID] = p
// FIXME: Naming is so awkward, call it a Multistream or something instead? Or combistream?
streamIDs := p.StreamIDs()
for _, strmID := range streamIDs {
if _, exists := s.pipelines[strmID]; exists {
return fmt.Errorf("cannot register job with ID: %d; stream id %d is already registered", jb.ID, strmID)
}
s.pipelines[strmID] = p
}
return nil
Expand Down
161 changes: 127 additions & 34 deletions core/services/streams/stream_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ type mockPipeline struct {
run *pipeline.Run
trrs pipeline.TaskRunResults
err error

streamIDs []StreamID
}

func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
return m.run, m.trrs, m.err
}

func (m *mockPipeline) StreamIDs() []StreamID {
return nil
return m.streamIDs
}

func Test_Registry(t *testing.T) {
Expand Down Expand Up @@ -58,12 +60,12 @@ func Test_Registry(t *testing.T) {
t.Run("Register", func(t *testing.T) {
sr := newRegistry(lggr, runner)

t.Run("registers new pipeline with multiple stream IDs", func(t *testing.T) {
assert.Len(t, sr.pipelines, 0)
// err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil)
// TODO: what if the dag is unparseable?
// err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: `
// registers new pipeline with multiple stream IDs
assert.Len(t, sr.pipelines, 0)
// err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil)
// TODO: what if the dag is unparseable?
// err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
err := sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: `
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
Expand All @@ -75,45 +77,136 @@ result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`}}, nil)
require.NoError(t, err)
assert.Len(t, sr.pipelines, 1)

v, exists := sr.Get(1)
require.True(t, exists)
msp := v.(*multiStreamPipeline)
assert.Equal(t, "foo", msp.StreamIDs())
assert.Equal(t, int32(32), msp.spec.ID)
})
require.NoError(t, err)
assert.Len(t, sr.pipelines, 3) // three streams, one pipeline
assert.Contains(t, sr.pipelines, StreamID(1))
assert.Contains(t, sr.pipelines, StreamID(2))
assert.Contains(t, sr.pipelines, StreamID(3))
p := sr.pipelines[1]
assert.Equal(t, p, sr.pipelines[2])
assert.Equal(t, p, sr.pipelines[3])

t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) {
assert.Len(t, sr.pipelines, 1)
err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil)
require.Error(t, err)
assert.Len(t, sr.pipelines, 1)
assert.EqualError(t, err, "stream already registered for id: 1")

v, exists := sr.Get(1)
require.True(t, exists)
msp := v.(*multiStreamPipeline)
assert.Equal(t, StreamID(1), msp.id)
assert.Equal(t, int32(32), msp.spec.ID)
})
v, exists := sr.Get(1)
require.True(t, exists)
msp := v.(*multiStreamPipeline)
assert.Equal(t, []StreamID{1, 2, 3}, msp.StreamIDs())
assert.Equal(t, int32(32), msp.spec.ID)

// errors when attempt to re-register a stream with an existing job ID
err = sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022"];
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 100; it is already registered")

// errors when attempt to register a new job with duplicates stream IDs within ig
err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(100)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022" streamID=100];
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100")

// errors with unparseable pipeline
err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 101; unparseable pipeline: UnmarshalTaskFromMap: unknown task type: \"\"")

// errors when attempt to re-register a stream with an existing streamID at top-level
err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(3)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=4 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=5 index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=6 index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 101; stream id 3 is already registered")

// errors when attempt to re-register a stream with an existing streamID in DAG
err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=5 index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=6 index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`}}, nil)
require.Error(t, err)
assert.EqualError(t, err, "cannot register job with ID: 101; stream id 1 is already registered")

// registers new job with all new stream IDs
err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=5 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=6 index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=7 index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`}}, nil)
require.NoError(t, err)

// did not overwrite existing stream
assert.Len(t, sr.pipelines, 7)
assert.Equal(t, p, sr.pipelines[1])
assert.Equal(t, p, sr.pipelines[2])
assert.Equal(t, p, sr.pipelines[3])
p2 := sr.pipelines[4]
assert.NotEqual(t, p, p2)
assert.Equal(t, p2, sr.pipelines[5])
assert.Equal(t, p2, sr.pipelines[6])
assert.Equal(t, p2, sr.pipelines[7])

v, exists = sr.Get(1)
require.True(t, exists)
msp = v.(*multiStreamPipeline)
assert.ElementsMatch(t, []StreamID{1, 2, 3}, msp.StreamIDs())
assert.Equal(t, int32(32), msp.spec.ID)

v, exists = sr.Get(4)
require.True(t, exists)
msp = v.(*multiStreamPipeline)
assert.ElementsMatch(t, []StreamID{4, 5, 6, 7}, msp.StreamIDs())
assert.Equal(t, int32(33), msp.spec.ID)
})
t.Run("Unregister", func(t *testing.T) {
sr := newRegistry(lggr, runner)

sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}}
sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}}
sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}}
err := sr.Register(job.Job{ID: 100, StreamID: ptr(StreamID(1)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022" streamID=2];
`}}, nil)
require.NoError(t, err)
err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022" streamID=3];
`}}, nil)
require.NoError(t, err)
err = sr.Register(job.Job{ID: 102, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: `
result1 [type=memo value="900.0022" streamID=4];
`}}, nil)
require.NoError(t, err)

t.Run("unregisters a stream", func(t *testing.T) {
assert.Len(t, sr.pipelines, 3)
assert.Len(t, sr.pipelines, 4)

sr.Unregister(1)
sr.Unregister(100)

assert.Len(t, sr.pipelines, 2)
_, exists := sr.pipelines[1]
assert.False(t, exists)
_, exists = sr.pipelines[2]
assert.False(t, exists)
})
t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) {
assert.Len(t, sr.pipelines, 2)
Expand Down
26 changes: 16 additions & 10 deletions core/services/streams/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

Expand Down Expand Up @@ -57,24 +58,29 @@ func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 }
func Test_Stream(t *testing.T) {
lggr := logger.TestLogger(t)
runner := &mockRunner{}
spec := pipeline.Spec{}
id := StreamID(123)
ctx := testutils.Context(t)

t.Run("Run", func(t *testing.T) {
strm := newStream(lggr, id, spec, runner, nil)
t.Run("errors with empty pipeline", func(t *testing.T) {
jbInvalid := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: ``}}
_, err := newMultiStreamPipeline(lggr, jbInvalid, runner, nil)
require.EqualError(t, err, "unparseable pipeline: empty pipeline")
})

t.Run("errors with empty pipeline", func(t *testing.T) {
_, _, err := strm.Run(ctx)
assert.EqualError(t, err, "Run failed: Run failed due to unparseable pipeline: empty pipeline")
})
jb := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: `
result1 [type=memo value="900.0022" streamID=124];
`}}

t.Run("Run", func(t *testing.T) {
strm, err := newMultiStreamPipeline(lggr, jb, runner, nil)

Check failure on line 74 in core/services/streams/stream_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to strm (ineffassign)
require.NoError(t, err)

spec.DotDagSource = `
jb.PipelineSpec.DotDagSource = `
succeed [type=memo value=42]
succeed;
`

strm = newStream(lggr, id, spec, runner, nil)
strm, err = newMultiStreamPipeline(lggr, jb, runner, nil)
require.NoError(t, err)

t.Run("executes the pipeline (success)", func(t *testing.T) {
runner.run = &pipeline.Run{ID: 42}
Expand Down

0 comments on commit 70f90a3

Please sign in to comment.