-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9a85999
commit 847de6d
Showing
7 changed files
with
431 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package median | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"math/big" | ||
"sync" | ||
"time" | ||
|
||
"github.com/shopspring/decimal" | ||
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/types" | ||
) | ||
|
||
type BridgeMetaData struct { | ||
LatestAnswer *big.Int `json:"latestAnswer"` | ||
UpdatedAt *big.Int `json:"updatedAt"` // A unix timestamp | ||
} | ||
|
||
func MarshalBridgeMetaData(latestAnswer *big.Int, updatedAt *big.Int) (map[string]interface{}, error) { | ||
b, err := json.Marshal(&BridgeMetaData{LatestAnswer: latestAnswer, UpdatedAt: updatedAt}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var mp map[string]interface{} | ||
err = json.Unmarshal(b, &mp) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return mp, nil | ||
} | ||
|
||
type DataSource struct { | ||
pipelineRunner types.PipelineRunnerService | ||
spec string | ||
lggr logger.Logger | ||
|
||
current BridgeMetaData | ||
mu sync.RWMutex | ||
} | ||
|
||
func (d *DataSource) Observe(ctx context.Context, reportTimestamp ocrtypes.ReportTimestamp) (*big.Int, error) { | ||
md, err := MarshalBridgeMetaData(d.currentAnswer()) | ||
if err != nil { | ||
d.lggr.Warnw("unable to attach metadata for run", "err", err) | ||
} | ||
|
||
// NOTE: job metadata is automatically attached by the pipeline runner service | ||
vars := types.Vars{ | ||
Vars: map[string]interface{}{ | ||
"jobRun": md, | ||
}, | ||
} | ||
|
||
results, err := d.pipelineRunner.ExecuteRun(ctx, d.spec, vars, types.Options{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
finalResults := results.FinalResults() | ||
if len(finalResults) == 0 { | ||
return nil, errors.New("pipeline execution failed: not enough results") | ||
} | ||
|
||
finalResult := finalResults[0] | ||
if finalResult.Error != nil { | ||
return nil, fmt.Errorf("pipeline execution failed: %w", finalResult.Error) | ||
} | ||
|
||
asDecimal, ok := (finalResult.Value).(decimal.Decimal) | ||
if !ok { | ||
return nil, errors.New("cannot convert observation to decimal") | ||
} | ||
|
||
resultAsBigInt := asDecimal.BigInt() | ||
d.updateAnswer(resultAsBigInt) | ||
return resultAsBigInt, nil | ||
} | ||
|
||
func (d *DataSource) currentAnswer() (*big.Int, *big.Int) { | ||
d.mu.RLock() | ||
defer d.mu.RUnlock() | ||
return d.current.LatestAnswer, d.current.UpdatedAt | ||
} | ||
|
||
func (d *DataSource) updateAnswer(latestAnswer *big.Int) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
d.current = BridgeMetaData{ | ||
LatestAnswer: latestAnswer, | ||
UpdatedAt: big.NewInt(time.Now().Unix()), | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package median | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"math/big" | ||
"testing" | ||
|
||
"github.com/shopspring/decimal" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/types" | ||
) | ||
|
||
type mockPipelineRunner struct { | ||
results types.TaskResults | ||
err error | ||
spec string | ||
vars types.Vars | ||
options types.Options | ||
} | ||
|
||
func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) (types.TaskResults, error) { | ||
m.spec = spec | ||
m.vars = vars | ||
m.options = options | ||
return m.results, m.err | ||
} | ||
|
||
func TestDataSource(t *testing.T) { | ||
lggr := logger.Test(t) | ||
expect := int64(3) | ||
pr := &mockPipelineRunner{ | ||
results: types.TaskResults{ | ||
{ | ||
TaskValue: types.TaskValue{ | ||
Value: decimal.NewFromInt(expect), | ||
Error: nil, | ||
IsTerminal: true, | ||
}, | ||
Index: 2, | ||
}, | ||
{ | ||
TaskValue: types.TaskValue{ | ||
Value: decimal.NewFromInt(6), | ||
Error: nil, | ||
IsTerminal: false, | ||
}, | ||
Index: 1, | ||
}, | ||
}, | ||
} | ||
spec := "SPEC" | ||
ds := &DataSource{ | ||
pipelineRunner: pr, | ||
spec: spec, | ||
lggr: lggr, | ||
} | ||
res, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{}) | ||
require.NoError(t, err) | ||
assert.Equal(t, big.NewInt(expect), res) | ||
assert.Equal(t, spec, pr.spec) | ||
assert.Equal(t, big.NewInt(expect), ds.current.LatestAnswer) | ||
} | ||
|
||
func TestDataSource_ResultErrors(t *testing.T) { | ||
lggr := logger.Test(t) | ||
pr := &mockPipelineRunner{ | ||
results: types.TaskResults{ | ||
{ | ||
TaskValue: types.TaskValue{ | ||
Error: errors.New("something went wrong"), | ||
IsTerminal: true, | ||
}, | ||
Index: 0, | ||
}, | ||
}, | ||
} | ||
spec := "SPEC" | ||
ds := &DataSource{ | ||
pipelineRunner: pr, | ||
spec: spec, | ||
lggr: lggr, | ||
} | ||
_, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{}) | ||
assert.ErrorContains(t, err, "something went wrong") | ||
} | ||
|
||
func TestDataSource_ResultNotAnInt(t *testing.T) { | ||
lggr := logger.Test(t) | ||
|
||
expect := "string-result" | ||
pr := &mockPipelineRunner{ | ||
results: types.TaskResults{ | ||
{ | ||
TaskValue: types.TaskValue{ | ||
Value: expect, | ||
IsTerminal: true, | ||
}, | ||
Index: 0, | ||
}, | ||
}, | ||
} | ||
spec := "SPEC" | ||
ds := &DataSource{ | ||
pipelineRunner: pr, | ||
spec: spec, | ||
lggr: lggr, | ||
} | ||
_, err := ds.Observe(context.Background(), ocrtypes.ReportTimestamp{}) | ||
assert.ErrorContains(t, err, "cannot convert observation to decimal") | ||
} |
Oops, something went wrong.