Skip to content

Commit

Permalink
[KS-196] ReportCodec implementation for Streams trigger (#13218)
Browse files Browse the repository at this point in the history
1. Implement Codec, which validates report signatures and decodes needed fields.
2. Pass report context from Merucry Transmitter, which is needed to validate signatures.
3. Update fake Syncer to run successful e2e tests.
  • Loading branch information
bolekk authored May 21, 2024
1 parent a25ce4d commit 4938ef3
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 61 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-windows-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal ReportCodec for Streams trigger
2 changes: 1 addition & 1 deletion core/capabilities/aggregator_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func NewAggregator(name string, config values.Map, lggr logger.Logger) (types.Aggregator, error) {
switch name {
case "data_feeds":
mc := streams.NewCodec()
mc := streams.NewCodec(lggr)
return datafeeds.NewDataFeedsAggregator(config, mc, lggr)
default:
return nil, fmt.Errorf("aggregator %s not supported", name)
Expand Down
65 changes: 57 additions & 8 deletions core/capabilities/streams/codec.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,73 @@
package streams

import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
)

type Codec struct {
type codec struct {
lggr logger.Logger
}

func (c Codec) Unwrap(raw values.Value) ([]datastreams.FeedReport, error) {
var _ datastreams.ReportCodec = &codec{}

func (c *codec) UnwrapValid(wrapped values.Value, allowedSigners [][]byte, minRequiredSignatures int) ([]datastreams.FeedReport, error) {
signersMap := make(map[common.Address]struct{})
for _, signer := range allowedSigners {
signersMap[common.BytesToAddress(signer)] = struct{}{}
}
dest := []datastreams.FeedReport{}
err := raw.UnwrapTo(&dest)
// TODO (KS-196): validate reports
return dest, err
err := wrapped.UnwrapTo(&dest)
if err != nil {
return nil, fmt.Errorf("failed to unwrap: %v", err)
}
for i, report := range dest {
// signatures (report and context are signed together)
sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...)
fullHash := crypto.Keccak256(sigData)
validated := map[common.Address]struct{}{}
for _, sig := range report.Signatures {
signerPubkey, err2 := crypto.SigToPub(fullHash, sig)
if err2 != nil {
return nil, fmt.Errorf("malformed signer: %v", err2)
}
signerAddr := crypto.PubkeyToAddress(*signerPubkey)
if _, ok := signersMap[signerAddr]; !ok {
c.lggr.Debugw("invalid signer", "signerAddr", signerAddr)
continue
}
validated[signerAddr] = struct{}{}
}
if len(validated) < minRequiredSignatures {
return nil, fmt.Errorf("not enough valid signatures %d, needed %d", len(validated), minRequiredSignatures)
}
// decoding fields
id, err2 := datastreams.NewFeedID(report.FeedID)
if err2 != nil {
return nil, fmt.Errorf("malformed feed ID: %v", err2)
}
v3Codec := reportcodec.NewReportCodec(id.Bytes(), nil)
decoded, err2 := v3Codec.Decode(report.FullReport)
if err2 != nil {
return nil, fmt.Errorf("failed to decode: %v", err2)
}
dest[i].BenchmarkPrice = decoded.BenchmarkPrice.Bytes()
dest[i].ObservationTimestamp = int64(decoded.ObservationsTimestamp)
}
return dest, nil
}

func (c Codec) Wrap(reports []datastreams.FeedReport) (values.Value, error) {
func (c *codec) Wrap(reports []datastreams.FeedReport) (values.Value, error) {
return values.Wrap(reports)
}

func NewCodec() Codec {
return Codec{}
func NewCodec(lggr logger.Logger) *codec {
return &codec{lggr: lggr}
}
114 changes: 114 additions & 0 deletions core/capabilities/streams/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package streams_test

import (
"crypto/rand"
"encoding/hex"
"math/big"
"testing"

"github.com/stretchr/testify/require"

ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
)

func TestCodec_WrapUnwrap(t *testing.T) {
codec := streams.NewCodec(logger.TestLogger(t))

id1, id1Str := newFeedID(t)
id2, id2Str := newFeedID(t)
price1, price2 := big.NewInt(1), big.NewInt(2)
timestamp1, timestamp2 := int64(100), int64(200)
report1, report2 := newReport(t, id1, price1, timestamp1), newReport(t, id2, price2, timestamp2)
reportCtx := ocrTypes.ReportContext{}
rawCtx := rawReportContext(reportCtx)

keyBundle1, err := ocr2key.New(chaintype.EVM)
require.NoError(t, err)
keyBundle2, err := ocr2key.New(chaintype.EVM)
require.NoError(t, err)

signatureK1R1, err := keyBundle1.Sign(reportCtx, report1)
require.NoError(t, err)
signatureK1R2, err := keyBundle1.Sign(reportCtx, report2)
require.NoError(t, err)
signatureK2R1, err := keyBundle2.Sign(reportCtx, report1)
require.NoError(t, err)
signatureK2R2, err := keyBundle2.Sign(reportCtx, report2)
require.NoError(t, err)

allowedSigners := [][]byte{keyBundle1.PublicKey(), keyBundle2.PublicKey()} // bad name - see comment on evmKeyring.PublicKey

wrapped, err := codec.Wrap([]datastreams.FeedReport{
{
FeedID: id1Str,
FullReport: report1,
ReportContext: rawCtx,
Signatures: [][]byte{signatureK1R1, signatureK2R1},
},
{
FeedID: id2Str,
FullReport: report2,
ReportContext: rawCtx,
Signatures: [][]byte{signatureK1R2, signatureK2R2},
},
})
require.NoError(t, err)

// wrong type
_, err = codec.UnwrapValid(values.NewBool(true), nil, 0)
require.Error(t, err)

// wrong signatures
_, err = codec.UnwrapValid(wrapped, nil, 1)
require.Error(t, err)

// success
reports, err := codec.UnwrapValid(wrapped, allowedSigners, 2)
require.NoError(t, err)
require.Equal(t, 2, len(reports))
require.Equal(t, price1.Bytes(), reports[0].BenchmarkPrice)
require.Equal(t, price2.Bytes(), reports[1].BenchmarkPrice)
require.Equal(t, timestamp1, reports[0].ObservationTimestamp)
require.Equal(t, timestamp2, reports[1].ObservationTimestamp)
}

func newFeedID(t *testing.T) ([32]byte, string) {
buf := [32]byte{}
_, err := rand.Read(buf[:])
require.NoError(t, err)
return buf, "0x" + hex.EncodeToString(buf[:])
}

func newReport(t *testing.T, feedID [32]byte, price *big.Int, timestamp int64) []byte {
v3Codec := reportcodec.NewReportCodec(feedID, logger.TestLogger(t))
raw, err := v3Codec.BuildReport(v3.ReportFields{
BenchmarkPrice: price,
Timestamp: uint32(timestamp),
Bid: big.NewInt(0),
Ask: big.NewInt(0),
LinkFee: big.NewInt(0),
NativeFee: big.NewInt(0),
})
require.NoError(t, err)
return raw
}

func rawReportContext(reportCtx ocrTypes.ReportContext) []byte {
rc := evmutil.RawReportContext(reportCtx)
flat := []byte{}
for _, r := range rc {
flat = append(flat, r[:]...)
}
return flat
}
Loading

0 comments on commit 4938ef3

Please sign in to comment.