Skip to content

Commit

Permalink
Cleanup and refactor mercury v0.3 (#10303)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav authored Aug 24, 2023
1 parent 2ad6189 commit fae804e
Show file tree
Hide file tree
Showing 22 changed files with 236 additions and 264 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230811192642-2299ce672665 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230817164916-93440e96411d // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230824125819-215fd09979a2 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230802143301-165000751a85 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20230802150127-d2c95679d61a // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230823081604-f2a0e6b108bb // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1371,8 +1371,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230811192642-2299ce672665 h1:gtgDzthL8YLz+0SMfqj0SM6BdTJ/79UuhNbHmL8/3tA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230811192642-2299ce672665/go.mod h1:FivgQBChVNSUUMKdOtzRJFSto2g40nfOkWVAA65nHOI=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230817164916-93440e96411d h1:gUaXYfDsXMAjMJj1k1Nm3EGZIfLVigMibi+4aj8az7o=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230817164916-93440e96411d/go.mod h1:gWclxGW7rLkbjXn7FGizYlyKhp/boekto4MEYGyiMG4=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230824125819-215fd09979a2 h1:z9PIgm0klhunwPy+KZYR4E9vCpjgJaMOyQRLCYgfoLk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230824125819-215fd09979a2/go.mod h1:gWclxGW7rLkbjXn7FGizYlyKhp/boekto4MEYGyiMG4=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230802143301-165000751a85 h1:/fm02hYSUdhbSh7xPn7os9yHj7dnl8aLs2+nFXPiB4g=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230802143301-165000751a85/go.mod h1:H3/j2l84FsxYevCLNERdVasI7FVr+t2mkpv+BCJLSVw=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20230802150127-d2c95679d61a h1:b3rjvZLpTV45TmCV+ALX+EDDslf91pnDUugP54Lu9FA=
Expand Down
7 changes: 4 additions & 3 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ func (mt *mercuryTransmitter) LatestPrice(ctx context.Context, feedID [32]byte)
return price, nil
}

func (mt *mercuryTransmitter) LatestTimestamp(ctx context.Context) (uint32, error) {
// LatestTimestamp will return -1, nil if the feed is missing
func (mt *mercuryTransmitter) LatestTimestamp(ctx context.Context) (int64, error) {
mt.lggr.Trace("LatestTimestamp")

report, err := mt.latestReport(ctx, mt.feedID)
Expand All @@ -347,12 +348,12 @@ func (mt *mercuryTransmitter) LatestTimestamp(ctx context.Context) (uint32, erro

if report == nil {
mt.lggr.Debugw("LatestTimestamp success; got nil report")
return 0, nil
return -1, nil
}

mt.lggr.Debugw("LatestTimestamp success", "timestamp", report.ObservationsTimestamp)

return uint32(report.ObservationsTimestamp), nil
return report.ObservationsTimestamp, nil
}

func (mt *mercuryTransmitter) latestReport(ctx context.Context, feedID [32]byte) (*pb.Report, error) {
Expand Down
6 changes: 3 additions & 3 deletions core/services/relay/evm/mercury/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

assert.Equal(t, ts, uint32(42))
assert.Equal(t, int64(42), ts)
})

t.Run("successful query returning nil report (new feed)", func(t *testing.T) {
t.Run("successful query returning nil report (new feed) gives latest timestamp = -1", func(t *testing.T) {
c := mocks.MockWSRPCClient{
LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) {
out = new(pb.LatestReportResponse)
Expand All @@ -112,7 +112,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

assert.Zero(t, ts)
assert.Equal(t, int64(-1), ts)
})

t.Run("failing query", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/v1/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *mockFetcher) LatestPrice(ctx context.Context, feedID [32]byte) (*big.In
return nil, nil
}

func (m *mockFetcher) LatestTimestamp(context.Context) (uint32, error) {
func (m *mockFetcher) LatestTimestamp(context.Context) (int64, error) {
return 0, nil
}

Expand Down
103 changes: 32 additions & 71 deletions core/services/relay/evm/mercury/v1/reportcodec/report_codec.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package reportcodec

import (
"errors"
"fmt"
"math"

"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common"
pkgerrors "github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
reportcodec "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"

"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -32,44 +34,28 @@ func NewReportCodec(feedID [32]byte, lggr logger.Logger) *ReportCodec {
return &ReportCodec{lggr, feedID}
}

func (r *ReportCodec) BuildReport(paos []reportcodec.ParsedAttributedObservation, f int, validFromBlockNum int64) (ocrtypes.Report, error) {
if len(paos) == 0 {
return nil, errors.Errorf("cannot build report from empty attributed observations")
func (r *ReportCodec) BuildReport(rf reportcodec.ReportFields) (ocrtypes.Report, error) {
var merr error
if rf.BenchmarkPrice == nil {
merr = errors.Join(merr, errors.New("benchmarkPrice may not be nil"))
}

mPaos := reportcodec.Convert(paos)

timestamp := relaymercury.GetConsensusTimestamp(mPaos)
benchmarkPrice, err := relaymercury.GetConsensusBenchmarkPrice(mPaos, f)
if err != nil {
return nil, errors.Wrap(err, "GetConsensusBenchmarkPrice failed")
if rf.Bid == nil {
merr = errors.Join(merr, errors.New("bid may not be nil"))
}
bid, err := relaymercury.GetConsensusBid(mPaos, f)
if err != nil {
return nil, errors.Wrap(err, "GetConsensusBid failed")
if rf.Ask == nil {
merr = errors.Join(merr, errors.New("ask may not be nil"))
}
ask, err := relaymercury.GetConsensusAsk(mPaos, f)
if err != nil {
return nil, errors.Wrap(err, "GetConsensusAsk failed")
if len(rf.CurrentBlockHash) != 32 {
merr = errors.Join(merr, fmt.Errorf("invalid length for currentBlockHash, expected: 32, got: %d", len(rf.CurrentBlockHash)))
}

currentBlockHash, currentBlockNum, currentBlockTimestamp, err := reportcodec.GetConsensusCurrentBlock(paos, f)
if err != nil {
return nil, errors.Wrap(err, "GetConsensusCurrentBlock failed")
if merr != nil {
return nil, merr
}
var currentBlockHash common.Hash
copy(currentBlockHash[:], rf.CurrentBlockHash)

if validFromBlockNum > currentBlockNum {
return nil, errors.Errorf("validFromBlockNum=%d may not be greater than currentBlockNum=%d", validFromBlockNum, currentBlockNum)
}

if len(currentBlockHash) != 32 {
return nil, errors.Errorf("invalid length for currentBlockHash, expected: 32, got: %d", len(currentBlockHash))
}
currentBlockHashArray := [32]byte{}
copy(currentBlockHashArray[:], currentBlockHash)

reportBytes, err := ReportTypes.Pack(r.feedID, timestamp, benchmarkPrice, bid, ask, uint64(currentBlockNum), currentBlockHashArray, uint64(validFromBlockNum), currentBlockTimestamp)
return ocrtypes.Report(reportBytes), errors.Wrap(err, "failed to pack report blob")
reportBytes, err := ReportTypes.Pack(r.feedID, rf.Timestamp, rf.BenchmarkPrice, rf.Bid, rf.Ask, uint64(rf.CurrentBlockNum), currentBlockHash, uint64(rf.ValidFromBlockNum), rf.CurrentBlockTimestamp)
return ocrtypes.Report(reportBytes), pkgerrors.Wrap(err, "failed to pack report blob")
}

// Maximum length in bytes of Report returned by BuildReport. Used for
Expand All @@ -79,52 +65,27 @@ func (r *ReportCodec) MaxReportLength(n int) (int, error) {
}

func (r *ReportCodec) CurrentBlockNumFromReport(report ocrtypes.Report) (int64, error) {
reportElems := map[string]interface{}{}
if err := ReportTypes.UnpackIntoMap(reportElems, report); err != nil {
return 0, errors.Errorf("error during unpack: %v", err)
}

blockNumIface, ok := reportElems["currentBlockNum"]
if !ok {
return 0, errors.Errorf("unpacked report has no 'currentBlockNum' field")
}

blockNum, ok := blockNumIface.(uint64)
if !ok {
return 0, errors.Errorf("cannot cast blockNum to int64, type is %T", blockNumIface)
decoded, err := r.Decode(report)
if err != nil {
return 0, err
}

if blockNum > math.MaxInt64 {
return 0, errors.Errorf("blockNum overflows max int64, got: %d", blockNum)
if decoded.CurrentBlockNum > math.MaxInt64 {
return 0, fmt.Errorf("CurrentBlockNum=%d overflows max int64", decoded.CurrentBlockNum)
}

return int64(blockNum), nil
return int64(decoded.CurrentBlockNum), nil
}

func (r *ReportCodec) ValidFromBlockNumFromReport(report ocrtypes.Report) (int64, error) {
reportElems := map[string]interface{}{}
if err := ReportTypes.UnpackIntoMap(reportElems, report); err != nil {
return 0, errors.Errorf("error during unpack: %v", err)
}

blockNumIface, ok := reportElems["validFromBlockNum"]
if !ok {
return 0, errors.Errorf("unpacked report has no 'validFromBlockNum' field")
}

blockNum, ok := blockNumIface.(uint64)
if !ok {
return 0, errors.Errorf("cannot cast blockNum to int64, type is %T", blockNumIface)
decoded, err := r.Decode(report)
if err != nil {
return 0, err
}

if blockNum > math.MaxInt64 {
return 0, errors.Errorf("blockNum overflows max int64, got: %d", blockNum)
if decoded.ValidFromBlockNum > math.MaxInt64 {
return 0, fmt.Errorf("ValidFromBlockNum=%d overflows max int64", decoded.ValidFromBlockNum)
}

return int64(blockNum), nil
return int64(decoded.ValidFromBlockNum), nil
}

// Decode is made available to external users (i.e. mercury server)
func (r *ReportCodec) Decode(report ocrtypes.Report) (*reporttypes.Report, error) {
return reporttypes.Decode(report)
}
65 changes: 47 additions & 18 deletions core/services/relay/evm/mercury/v1/reportcodec/report_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"
Expand All @@ -18,28 +17,37 @@ import (
)

var hash = hexutil.MustDecode("0x552c2cea3ab43bae137d89ee6142a01db3ae2b5678bc3c9bd5f509f537bea57b")
var paos = []relaymercuryv1.ParsedAttributedObservation{
relaymercuryv1.NewParsedAttributedObservation(42, commontypes.OracleID(49), big.NewInt(43), big.NewInt(44), big.NewInt(45), true, 248, hash, 123, true, 10, true),
relaymercuryv1.NewParsedAttributedObservation(142, commontypes.OracleID(149), big.NewInt(143), big.NewInt(144), big.NewInt(145), true, 248, hash, 123, true, 11, true),
relaymercuryv1.NewParsedAttributedObservation(242, commontypes.OracleID(249), big.NewInt(243), big.NewInt(244), big.NewInt(245), true, 248, hash, 123, true, 12, true),
relaymercuryv1.NewParsedAttributedObservation(342, commontypes.OracleID(250), big.NewInt(343), big.NewInt(344), big.NewInt(345), true, 842, hash, 456, true, 13, true),

func newValidReportFields() relaymercuryv1.ReportFields {
return relaymercuryv1.ReportFields{
Timestamp: 242,
BenchmarkPrice: big.NewInt(243),
Bid: big.NewInt(244),
Ask: big.NewInt(245),
CurrentBlockNum: 248,
CurrentBlockHash: hash,
ValidFromBlockNum: 46,
CurrentBlockTimestamp: 123,
}
}

func Test_ReportCodec(t *testing.T) {
r := ReportCodec{}
f := 1

t.Run("BuildReport errors if observations are empty", func(t *testing.T) {
ps := []relaymercuryv1.ParsedAttributedObservation{}
_, err := r.BuildReport(ps, f, 1)
t.Run("BuildReport errors on zero fields", func(t *testing.T) {
_, err := r.BuildReport(relaymercuryv1.ReportFields{})
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot build report from empty attributed observation")
assert.Contains(t, err.Error(), "benchmarkPrice may not be nil")
assert.Contains(t, err.Error(), "bid may not be nil")
assert.Contains(t, err.Error(), "ask may not be nil")
assert.Contains(t, err.Error(), "invalid length for currentBlockHash, expected: 32, got: 0")
})

t.Run("BuildReport constructs a report from observations", func(t *testing.T) {
rf := newValidReportFields()
// only need to test happy path since validations are done in relaymercury

report, err := r.BuildReport(paos, f, 46)
report, err := r.BuildReport(rf)
require.NoError(t, err)

reportElems := make(map[string]interface{})
Expand Down Expand Up @@ -91,17 +99,17 @@ func Test_ReportCodec(t *testing.T) {
})
}

func buildSampleReport(bn int64, feedID [32]byte) []byte {
func buildSampleReport(bn, validFromBn int64, feedID [32]byte) []byte {
timestamp := uint32(42)
bp := big.NewInt(242)
bid := big.NewInt(243)
ask := big.NewInt(244)
currentBlockNumber := uint64(bn)
currentBlockHash := utils.NewHash()
currentBlockTimestamp := uint64(123)
validFromBlockNum := uint64(143)
validFromBlockNum := uint64(validFromBn)

b, err := ReportTypes.Pack(feedID, timestamp, bp, bid, ask, currentBlockNumber, currentBlockHash, currentBlockTimestamp, validFromBlockNum)
b, err := ReportTypes.Pack(feedID, timestamp, bp, bid, ask, currentBlockNumber, currentBlockHash, validFromBlockNum, currentBlockTimestamp)
if err != nil {
panic(err)
}
Expand All @@ -116,19 +124,40 @@ func Test_ReportCodec_CurrentBlockNumFromReport(t *testing.T) {
var invalidBn int64 = -1

t.Run("CurrentBlockNumFromReport extracts the current block number from a valid report", func(t *testing.T) {
report := buildSampleReport(validBn, feedID)
report := buildSampleReport(validBn, 143, feedID)

bn, err := r.CurrentBlockNumFromReport(report)
require.NoError(t, err)

assert.Equal(t, validBn, bn)
})
t.Run("CurrentBlockNumFromReport returns error if block num is too large", func(t *testing.T) {
report := buildSampleReport(invalidBn, feedID)
report := buildSampleReport(invalidBn, 143, feedID)

_, err := r.CurrentBlockNumFromReport(report)
require.Error(t, err)

assert.Contains(t, err.Error(), "blockNum overflows max int64, got: 18446744073709551615")
assert.Contains(t, err.Error(), "CurrentBlockNum=18446744073709551615 overflows max int64")
})
}
func Test_ReportCodec_ValidFromBlockNumFromReport(t *testing.T) {
r := ReportCodec{}
feedID := utils.NewHash()

t.Run("ValidFromBlockNumFromReport extracts the valid from block number from a valid report", func(t *testing.T) {
report := buildSampleReport(42, 999, feedID)

bn, err := r.ValidFromBlockNumFromReport(report)
require.NoError(t, err)

assert.Equal(t, int64(999), bn)
})
t.Run("ValidFromBlockNumFromReport returns error if valid from block number is too large", func(t *testing.T) {
report := buildSampleReport(42, -1, feedID)

_, err := r.ValidFromBlockNumFromReport(report)
require.Error(t, err)

assert.Contains(t, err.Error(), "ValidFromBlockNum=18446744073709551615 overflows max int64")
})
}
1 change: 1 addition & 0 deletions core/services/relay/evm/mercury/v1/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Report struct {
CurrentBlockTimestamp uint64
}

// Decode is made available to external users (i.e. mercury server)
func Decode(report []byte) (*Report, error) {
values, err := schema.Unpack(report)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions core/services/relay/evm/mercury/v2/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Runner interface {

type LatestReportFetcher interface {
LatestPrice(ctx context.Context, feedID [32]byte) (*big.Int, error)
LatestTimestamp(context.Context) (uint32, error)
LatestTimestamp(context.Context) (int64, error)
}

type datasource struct {
Expand All @@ -49,8 +49,6 @@ type datasource struct {

var _ relaymercuryv2.DataSource = &datasource{}

var maxInt192 = new(big.Int).Exp(big.NewInt(2), big.NewInt(191), nil)

func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, feedID mercuryutils.FeedID, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, fetcher LatestReportFetcher, linkFeedID, nativeFeedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, feedID, lggr, rr, fetcher, linkFeedID, nativeFeedID, sync.RWMutex{}, enhancedTelemChan}
}
Expand Down Expand Up @@ -106,7 +104,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
obs.LinkPrice.Val, obs.LinkPrice.Err = ds.fetcher.LatestPrice(ctx, ds.linkFeedID)
if obs.LinkPrice.Val == nil && obs.LinkPrice.Err == nil {
ds.lggr.Warnw("Mercury server was missing LINK feed, falling back to max int192", "linkFeedID", ds.linkFeedID)
obs.LinkPrice.Val = maxInt192
obs.LinkPrice.Val = relaymercury.MaxInt192
}
}()
}
Expand All @@ -120,7 +118,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
obs.NativePrice.Val, obs.NativePrice.Err = ds.fetcher.LatestPrice(ctx, ds.nativeFeedID)
if obs.NativePrice.Val == nil && obs.NativePrice.Err == nil {
ds.lggr.Warnw("Mercury server was missing native feed, falling back to max int192", "nativeFeedID", ds.nativeFeedID)
obs.NativePrice.Val = maxInt192
obs.NativePrice.Val = relaymercury.MaxInt192
}
}()
}
Expand Down
Loading

0 comments on commit fae804e

Please sign in to comment.