Skip to content

Commit

Permalink
Pipeline Data Corruption (#13286)
Browse files Browse the repository at this point in the history
* Pipeline Data Corruption

The unit test `TestDivide_Example` was acting flakey in the CI pipeline which suggested a flaw in the divide and
multiply operations. When running the test, the expected result would be one of the input values or the division
result in failure cases. This implied that results were either received out of order or were being sorted incorrectly.

The pipeline runner does a final sort on the results, so that ruled out the received out of order possibility. On
inspection of the sorting index on each task, every index was the zero value. This resulted in occasional correct
and incorrect sorting, causing the test flake.

To correct the problem, the test was updated such that the expected result has an index of `1`, leaving all
other tasks with a `0` index.

* fix test

* updated changeset
  • Loading branch information
EasterTheBunny authored May 22, 2024
1 parent 087e2a7 commit 6139126
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/blue-camels-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

enforce proper result indexing on pipeline results #breaking_change
52 changes: 52 additions & 0 deletions core/services/ocr/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,58 @@ func TestValidateOracleSpec(t *testing.T) {
overrides func(c *chainlink.Config, s *chainlink.Secrets)
assertion func(t *testing.T, os job.Job, err error)
}{
{
name: "invalid result sorting index",
toml: `
ds1 [type=memo value=10000.1234];
ds2 [type=memo value=100];
div_by_ds2 [type=divide divisor="$(ds2)"];
ds1 -> div_by_ds2 -> answer1;
answer1 [type=multiply times=10000 index=-1];
`,
assertion: func(t *testing.T, os job.Job, err error) {
require.Error(t, err)
},
},
{
name: "duplicate sorting indexes not allowed",
toml: `
ds1 [type=memo value=10000.1234];
ds2 [type=memo value=100];
div_by_ds2 [type=divide divisor="$(ds2)"];
ds1 -> div_by_ds2 -> answer1;
ds1 -> div_by_ds2 -> answer2;
answer1 [type=multiply times=10000 index=0];
answer2 [type=multiply times=10000 index=0];
`,
assertion: func(t *testing.T, os job.Job, err error) {
require.Error(t, err)
},
},
{
name: "invalid result sorting index",
toml: `
type = "offchainreporting"
schemaVersion = 1
contractAddress = "0x613a38AC1659769640aaE063C651F48E0250454C"
isBootstrapPeer = false
observationSource = """
ds1 [type=bridge name=voter_turnout];
ds1_parse [type=jsonparse path="one,two"];
ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=-1];
"""`,
assertion: func(t *testing.T, os job.Job, err error) {
require.Error(t, err)
},
},
{
name: "minimal non-bootstrap oracle spec",
toml: `
Expand Down
19 changes: 19 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,11 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID
return nil, pkgerrors.Errorf(`unknown task type: "%v"`, taskType)
}

metadata := mapstructure.Metadata{}
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Result: task,
WeaklyTypedInput: true,
Metadata: &metadata,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
func(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) {
Expand All @@ -441,6 +443,23 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID
if err != nil {
return nil, err
}

// valid explicit index values are 0-based
for _, key := range metadata.Keys {
if key == "index" {
if task.OutputIndex() < 0 {
return nil, errors.New("result sorting indexes should start with 0")
}
}
}

// the 'unset' value should be -1 to allow explicit indexes to be 0-based
for _, key := range metadata.Unset {
if key == "index" {
task.Base().Index = -1
}
}

return task, nil
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func Parse(text string) (*Pipeline, error) {
// we need a temporary mapping of graph.IDs to positional ids after toposort
ids := make(map[int64]int)

resultIdxs := make(map[int32]struct{})

// use the new ordering as the id so that we can easily reproduce the original toposort
for id, node := range nodes {
node, is := node.(*GraphNode)
Expand All @@ -251,6 +253,15 @@ func Parse(text string) (*Pipeline, error) {
return nil, err
}

if task.OutputIndex() > 0 {
_, exists := resultIdxs[task.OutputIndex()]
if exists {
return nil, errors.New("duplicate sorting indexes detected")
}

resultIdxs[task.OutputIndex()] = struct{}{}
}

// re-link the edges
for inputs := g.To(node.ID()); inputs.Next(); {
isImplicitEdge := g.IsImplicitEdge(inputs.Node().ID(), node.ID())
Expand Down
12 changes: 6 additions & 6 deletions core/services/pipeline/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,27 +171,27 @@ func TestGraph_TasksInDependencyOrder(t *testing.T) {
"ds1_multiply",
[]pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds1_parse)}},
[]pipeline.Task{answer1},
0)
-1)
ds2_multiply.BaseTask = pipeline.NewBaseTask(
5,
"ds2_multiply",
[]pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds2_parse)}},
[]pipeline.Task{answer1},
0)
-1)
ds1_parse.BaseTask = pipeline.NewBaseTask(
1,
"ds1_parse",
[]pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds1)}},
[]pipeline.Task{ds1_multiply},
0)
-1)
ds2_parse.BaseTask = pipeline.NewBaseTask(
4,
"ds2_parse",
[]pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds2)}},
[]pipeline.Task{ds2_multiply},
0)
ds1.BaseTask = pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{ds1_parse}, 0)
ds2.BaseTask = pipeline.NewBaseTask(3, "ds2", nil, []pipeline.Task{ds2_parse}, 0)
-1)
ds1.BaseTask = pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{ds1_parse}, -1)
ds2.BaseTask = pipeline.NewBaseTask(3, "ds2", nil, []pipeline.Task{ds2_parse}, -1)

for i, task := range p.Tasks {
// Make sure inputs appear before the task, and outputs don't
Expand Down
19 changes: 10 additions & 9 deletions core/services/pipeline/task.divide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline_test
import (
"fmt"
"math"
"reflect"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -198,19 +199,17 @@ func TestDivideTask_Overflow(t *testing.T) {
}

func TestDivide_Example(t *testing.T) {
testutils.SkipFlakey(t, "BCF-3236")
t.Parallel()

dag := `
ds1 [type=memo value=10000.1234]
ds1 [type=memo value=10000.1234];
ds2 [type=memo value=100];
ds2 [type=memo value=100]
div_by_ds2 [type=divide divisor="$(ds2)"];
multiply [type=multiply times=10000 index=0];
div_by_ds2 [type=divide divisor="$(ds2)"]
ds1 -> div_by_ds2 -> multiply;
multiply [type=multiply times=10000 index=0]
ds1->div_by_ds2->multiply;
`

db := pgtest.NewSqlxDB(t)
Expand All @@ -223,12 +222,14 @@ ds1->div_by_ds2->multiply;

lggr := logger.TestLogger(t)
_, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr)
require.NoError(t, err)

require.NoError(t, err)
require.Len(t, trrs, 4)

finalResult := trrs[3]

assert.Nil(t, finalResult.Result.Error)
require.NoError(t, finalResult.Result.Error)
require.Equal(t, reflect.TypeOf(decimal.Decimal{}), reflect.TypeOf(finalResult.Result.Value))

assert.Equal(t, "1000012.34", finalResult.Result.Value.(decimal.Decimal).String())
}

0 comments on commit 6139126

Please sign in to comment.