Skip to content

Commit

Permalink
[KS-196] Validate Signatures in Mercury Aggregator (#522)
Browse files Browse the repository at this point in the history
1. Add Metadata field to TriggerEvent. Use it to pass Streams DON info ("SignersMetadata" struct) between trigger receiver and OCR aggregator.
2. Extend feeds OCR aggregator with an initial phase of agreeing on a set of valid signers and the F value.
3. Add ReportContext field to FeedReport - Streams DON signs both fields together so we need to pass them to validate sigantures.
  • Loading branch information
bolekk authored May 21, 2024
1 parent 82080be commit 6a2cfa1
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 57 deletions.
5 changes: 3 additions & 2 deletions pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type TriggerEvent struct {
TriggerType string
ID string
Timestamp string
// Trigger-specific payload
Payload values.Value
// Trigger-specific payload+metadata
Metadata values.Value
Payload values.Value
}

type RegisterToWorkflowRequest struct {
Expand Down
101 changes: 86 additions & 15 deletions pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (

ocrcommon "github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

const OutputFieldName = "mercury_reports"
const (
OutputFieldName = "mercury_reports"
addrLen = 20
)

type aggregatorConfig struct {
Feeds map[datastreams.FeedID]feedConfig
Expand All @@ -37,23 +41,24 @@ type dataFeedsAggregator struct {

var _ types.Aggregator = (*dataFeedsAggregator)(nil)

// EncodableOutcome is a list of AggregatedPricePoints
// This Aggregator has two phases:
// 1. Agree on valid trigger signers by extracting them from event metadata and aggregating using MODE (at least F+1 copies needed).
// 2. For each FeedID, select latest valid report, using signers list obtained in phase 1.
//
// EncodableOutcome is a list of aggregated price points.
// Metadata is a map of feedID -> (timestamp, price) representing onchain state (see DataFeedsOutcomeMetadata proto)
func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value) (*types.AggregationOutcome, error) {
// find latest valid Mercury report for each feed ID
func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) {
allowedSigners, minRequiredSignatures, payloads := a.extractSignersAndPayloads(observations, f)
if len(payloads) > 0 && minRequiredSignatures == 0 {
return nil, fmt.Errorf("cannot process non-empty observation payloads with minRequiredSignatures set to 0")
}
a.lggr.Debugw("extracted signers", "nAllowedSigners", len(allowedSigners), "minRequired", minRequiredSignatures, "nPayloads", len(payloads))
// find latest valid report for each feed ID
latestReportPerFeed := make(map[datastreams.FeedID]datastreams.FeedReport)
for nodeID, nodeObservations := range observations {
// we only expect a single observation per node - new Mercury data
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
a.lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) > 1 {
a.lggr.Warnf("node %d contributed with more than one observation", nodeID)
}
mercuryReports, err := a.reportCodec.Unwrap(nodeObservations[0])
for nodeID, payload := range payloads {
mercuryReports, err := a.reportCodec.UnwrapValid(payload, allowedSigners, minRequiredSignatures)
if err != nil {
a.lggr.Errorf("node %d contributed with invalid Mercury reports: %v", nodeID, err)
a.lggr.Errorf("node %d contributed with invalid reports: %v", nodeID, err)
continue
}
for _, report := range mercuryReports {
Expand Down Expand Up @@ -140,6 +145,72 @@ func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcom
}, nil
}

func (a *dataFeedsAggregator) extractSignersAndPayloads(observations map[ocrcommon.OracleID][]values.Value, fConsensus int) ([][]byte, int, map[ocrcommon.OracleID]values.Value) {
payloads := make(map[ocrcommon.OracleID]values.Value)
signers := make(map[[addrLen]byte]int)
mins := make(map[int]int)
for nodeID, nodeObservations := range observations {
// we only expect a single observation per node - a Streams trigger event
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
a.lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) > 1 {
a.lggr.Warnf("node %d contributed with more than one observation", nodeID)
continue
}
triggerEvent := &capabilities.TriggerEvent{}
if err := nodeObservations[0].UnwrapTo(triggerEvent); err != nil {
a.lggr.Warnf("could not parse observations from node %d: %v", nodeID, err)
continue
}
meta := &datastreams.SignersMetadata{}
if err := triggerEvent.Metadata.UnwrapTo(meta); err != nil {
a.lggr.Warnf("could not parse trigger metadata from node %d: %v", nodeID, err)
continue
}
currentNodeSigners, err := extractUniqueSigners(meta.Signers)
if err != nil {
a.lggr.Warnf("could not extract signers from node %d: %v", nodeID, err)
continue
}
for signer := range currentNodeSigners {
signers[signer]++
}
mins[meta.MinRequiredSignatures]++
payloads[nodeID] = triggerEvent.Payload
}
// Agree on signers list and min-required. It's technically possible to have F+1 valid values from one trigger DON and F+1 from another trigger DON.
// In that case both values are legitimate and signers list will contain nodes from both DONs. However, min-required value will be the higher one (if different).
allowedSigners := [][]byte{}
for signer, count := range signers {
signer := signer
if count >= fConsensus+1 {
allowedSigners = append(allowedSigners, signer[:])
}
}
minRequired := 0
for minCandidate, count := range mins {
if count >= fConsensus+1 && minCandidate > minRequired {
minRequired = minCandidate
}
}
return allowedSigners, minRequired, payloads
}

func extractUniqueSigners(signers [][]byte) (map[[addrLen]byte]struct{}, error) {
uniqueSigners := make(map[[addrLen]byte]struct{})
for _, signer := range signers {
if len(signer) != addrLen {
return nil, fmt.Errorf("invalid signer length: %d", len(signer))
}
var signerBytes [addrLen]byte
copy(signerBytes[:], signer)
uniqueSigners[signerBytes] = struct{}{}
}
return uniqueSigners, nil
}

func deviation(oldBytes, newBytes []byte) float64 {
oldV := big.NewInt(0).SetBytes(oldBytes)
newV := big.NewInt(0).SetBytes(newBytes)
Expand Down
53 changes: 48 additions & 5 deletions pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datafeeds_test

import (
"crypto/rand"
"math/big"
"testing"

Expand All @@ -27,8 +28,20 @@ var (
)

func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
mockTriggerEvent, err := values.Wrap(capabilities.TriggerEvent{
Payload: &values.Map{},
meta := datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
}
metaVal, err := values.Wrap(meta)
require.NoError(t, err)
mockTriggerEvent1, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
})
require.NoError(t, err)
mockTriggerEvent2, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
})
require.NoError(t, err)
config := getConfig(t, feedIDA.String(), deviationA, heartbeatA)
Expand All @@ -37,7 +50,7 @@ func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
require.NoError(t, err)

// first round, empty previous Outcome, empty observations
outcome, err := agg.Aggregate(nil, map[commontypes.OracleID][]values.Value{})
outcome, err := agg.Aggregate(nil, map[commontypes.OracleID][]values.Value{}, 1)
require.NoError(t, err)
require.False(t, outcome.ShouldReport)

Expand All @@ -59,8 +72,8 @@ func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
FullReport: mercuryFullReportA,
},
}
codec.On("Unwrap", mock.Anything).Return(latestMercuryReports, nil)
outcome, err = agg.Aggregate(outcome, map[commontypes.OracleID][]values.Value{1: {mockTriggerEvent}})
codec.On("UnwrapValid", mock.Anything, mock.Anything, mock.Anything).Return(latestMercuryReports, nil)
outcome, err = agg.Aggregate(outcome, map[commontypes.OracleID][]values.Value{1: {mockTriggerEvent1}, 2: {mockTriggerEvent2}}, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)

Expand Down Expand Up @@ -88,6 +101,29 @@ func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
require.Equal(t, string(mercuryFullReportA), string(reportBytes))
}

func TestDataFeedsAggregator_Aggregate_Failures(t *testing.T) {
meta := datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
}
metaVal, err := values.Wrap(meta)
require.NoError(t, err)
mockTriggerEvent, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
})
require.NoError(t, err)

config := getConfig(t, feedIDA.String(), deviationA, heartbeatA)
codec := mocks.NewReportCodec(t)
agg, err := datafeeds.NewDataFeedsAggregator(*config, codec, logger.Nop())
require.NoError(t, err)

// no valid signers - each one should appear at least twice to be valid
_, err = agg.Aggregate(nil, map[commontypes.OracleID][]values.Value{1: {mockTriggerEvent}}, 1)
require.Error(t, err)
}

func TestDataFeedsAggregator_ParseConfig(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
config := getConfig(t, feedIDA.String(), deviationA, heartbeatA)
Expand Down Expand Up @@ -132,3 +168,10 @@ func getConfig(t *testing.T, feedID string, deviation decimal.Decimal, heartbeat
require.NoError(t, err)
return config
}

func newSigner(t *testing.T) []byte {
buf := make([]byte, 20)
_, err := rand.Read(buf)
require.NoError(t, err)
return buf
}
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Q
continue
}

outcome, err2 := agg.Aggregate(workflowOutcome, obs)
outcome, err2 := agg.Aggregate(workflowOutcome, obs, r.config.F)
if err2 != nil {
r.lggr.Errorw("error aggregating outcome", "error", err, "workflowID", weid.WorkflowId)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type aggregator struct {
outcome *pbtypes.AggregationOutcome
}

func (a *aggregator) Aggregate(pout *pbtypes.AggregationOutcome, observations map[commontypes.OracleID][]values.Value) (*pbtypes.AggregationOutcome, error) {
func (a *aggregator) Aggregate(pout *pbtypes.AggregationOutcome, observations map[commontypes.OracleID][]values.Value, _ int) (*pbtypes.AggregationOutcome, error) {
a.gotObs = observations
nm, err := values.NewMap(
map[string]any{
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/types/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
type Aggregator interface {
// Called by the Outcome() phase of OCR reporting.
// The inner array of observations corresponds to elements listed in "inputs.observations" section.
Aggregate(previousOutcome *AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value) (*AggregationOutcome, error)
Aggregate(previousOutcome *AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*AggregationOutcome, error)
}

func AppendWorkflowIDs(outcome *AggregationOutcome, workflowID string, workflowExecutionID string) (*AggregationOutcome, error) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/capabilities/datastreams/mocks/report_codec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions pkg/capabilities/datastreams/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func FeedIDFromBytes(b [FeedIDBytesLen]byte) FeedID {
}

type FeedReport struct {
FeedID string
FullReport []byte
Signatures [][]byte
FeedID string
FullReport []byte
ReportContext []byte
Signatures [][]byte

// Fields below are derived from FullReport
// NOTE: BenchmarkPrice is a byte representation of big.Int. We can't use big.Int
Expand All @@ -65,11 +66,17 @@ type FeedReport struct {
ObservationTimestamp int64
}

// passed alongside Streams trigger events
type SignersMetadata struct {
Signers [][]byte
MinRequiredSignatures int
}

//go:generate mockery --quiet --name ReportCodec --output ./mocks/ --case=underscore
type ReportCodec interface {
// validate each report and convert to a list of Mercury reports
Unwrap(raw values.Value) ([]FeedReport, error)
// unwrap and validate each report, then convert to a list of Feed reports
UnwrapValid(wrapped values.Value, allowedSigners [][]byte, minRequiredSignatures int) ([]FeedReport, error)

// validate each report and convert to Value
// convert back to Value
Wrap(reports []FeedReport) (values.Value, error)
}
25 changes: 18 additions & 7 deletions pkg/capabilities/triggers/mercury_remote_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ import (
)

type mercuryRemoteAggregator struct {
codec datastreams.ReportCodec
lggr logger.Logger
codec datastreams.ReportCodec
allowedSigners [][]byte
minRequiredSignatures int
lggr logger.Logger
}

// This aggregator is used by TriggerSubscriber to aggregate trigger events from multiple remote nodes.
// NOTE: Once Mercury supports parallel composition (and thus guarantee identical sets of reports),
// this will be replaced by the default MODE aggregator.
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, lggr logger.Logger) *mercuryRemoteAggregator {
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, lggr logger.Logger) *mercuryRemoteAggregator {
if allowedSigners == nil {
allowedSigners = [][]byte{}
}
return &mercuryRemoteAggregator{
codec: codec,
lggr: lggr,
codec: codec,
allowedSigners: allowedSigners,
minRequiredSignatures: minRequiredSignatures,
lggr: lggr,
}
}

Expand All @@ -40,7 +47,7 @@ func (a *mercuryRemoteAggregator) Aggregate(triggerEventID string, responses [][
a.lggr.Errorw("could not unwrap one of trigger events", "error", err)
continue
}
feedReports, err := a.codec.Unwrap(triggerEvent.Payload)
feedReports, err := a.codec.UnwrapValid(triggerEvent.Payload, a.allowedSigners, a.minRequiredSignatures)
if err != nil {
a.lggr.Errorw("could not unwrap one of capability responses", "error", err)
continue
Expand Down Expand Up @@ -69,5 +76,9 @@ func (a *mercuryRemoteAggregator) Aggregate(triggerEventID string, responses [][
for _, feedID := range allIDs {
reportList = append(reportList, latestReports[datastreams.FeedID(feedID)])
}
return wrapReports(reportList, triggerEventID, latestGlobalTs)
meta := datastreams.SignersMetadata{
Signers: a.allowedSigners,
MinRequiredSignatures: a.minRequiredSignatures,
}
return wrapReports(reportList, triggerEventID, latestGlobalTs, meta)
}
Loading

0 comments on commit 6a2cfa1

Please sign in to comment.