Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL] Add mode quorum configuration option to Reduce Aggregator #972

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ const (
DEVIATION_TYPE_ANY = "any"
// DEVIATION_TYPE_PERCENT is a numeric percentage difference
DEVIATION_TYPE_PERCENT = "percent"
// DEVIATION_TYPE_ABSOLUTE is a numeric absolute difference
// DEVIATION_TYPE_ABSOLUTE is a numeric unsigned difference
DEVIATION_TYPE_ABSOLUTE = "absolute"
REPORT_FORMAT_MAP = "map"
REPORT_FORMAT_ARRAY = "array"
REPORT_FORMAT_VALUE = "value"
MODE_QUORUM_OCR = "ocr"
MODE_QUORUM_ANY = "any"

DEFAULT_REPORT_FORMAT = REPORT_FORMAT_MAP
DEFAULT_OUTPUT_FIELD_NAME = "Reports"
DEFAULT_MODE_QUORUM = MODE_QUORUM_ANY
)

type ReduceAggConfig struct {
Expand Down Expand Up @@ -70,8 +73,12 @@ type AggregationField struct {
InputKey string `mapstructure:"inputKey" json:"inputKey"`
// How the data set should be aggregated to a single value
// * median - take the centermost value of the sorted data set of observations. can only be used on numeric types. not a true median, because no average if two middle values.
// * mode - take the most frequent value. if tied, use the "first".
// * mode - take the most frequent value. if tied, use the "first". use "ModeQuorom" to configure the minimum number of seen values.
Method string `mapstructure:"method" json:"method" jsonschema:"enum=median,enum=mode" required:"true"`
// When using Method=mode, this will configure the minimum number of values that must be seen
// * ocr - (default) enforces that the number of matching values must be at least f+1, otherwise consensus fails
// * any - do not enforce any limit on the minimum viable count. this may result in unexpected answers if every observation is unique.
ModeQuorum string `mapstructure:"modeQuorum" json:"modeQuorum,omitempty" jsonschema:"enum=ocr,enum=any" default:"ocr"`
// The key that the aggregated data is put under
// If omitted, the InputKey will be used
OutputKey string `mapstructure:"outputKey" json:"outputKey"`
Expand Down Expand Up @@ -108,7 +115,7 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.
return nil, fmt.Errorf("not enough observations provided %s, have %d want %d", field.InputKey, len(vals), 2*f+1)
}

singleValue, err := reduce(field.Method, vals)
singleValue, err := reduce(field.Method, vals, f, field.ModeQuorum)
if err != nil {
return nil, fmt.Errorf("unable to reduce on method %s, err: %s", field.Method, err.Error())
}
Expand Down Expand Up @@ -335,12 +342,20 @@ func (a *reduceAggregator) extractValues(lggr logger.Logger, observations map[oc
return vals
}

func reduce(method string, items []values.Value) (values.Value, error) {
func reduce(method string, items []values.Value, f int, modeQuorum string) (values.Value, error) {
switch method {
case AGGREGATION_METHOD_MEDIAN:
return median(items)
case AGGREGATION_METHOD_MODE:
return mode(items)
value, count, err := mode(items)
if err != nil {
return value, err
}
err = modeHasQuorum(modeQuorum, count, f)
if err != nil {
return value, err
}
return value, err
default:
// invariant, config should be validated
return nil, fmt.Errorf("unsupported aggregation method %s", method)
Expand Down Expand Up @@ -408,18 +423,18 @@ func toDecimal(item values.Value) (decimal.Decimal, error) {
}
}

func mode(items []values.Value) (values.Value, error) {
func mode(items []values.Value) (values.Value, int, error) {
if len(items) == 0 {
// invariant, as long as f > 0 there should be items
return nil, errors.New("items cannot be empty")
return nil, 0, errors.New("items cannot be empty")
}

counts := make(map[[32]byte]*counter)
for _, item := range items {
marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(item))
if err != nil {
// invariant: values should always be able to be proto marshalled
return nil, err
return nil, 0, err
}
sha := sha256.Sum256(marshalled)
elem, ok := counts[sha]
Expand Down Expand Up @@ -449,7 +464,22 @@ func mode(items []values.Value) (values.Value, error) {

// If more than one mode found, choose first

return modes[0], nil
return modes[0], maxCount, nil
}

func modeHasQuorum(quorumType string, count int, f int) error {
switch quorumType {
case MODE_QUORUM_ANY:
return nil
case MODE_QUORUM_OCR:
if count < f+1 {
return fmt.Errorf("mode quorum not reached. have: %d, want: %d", count, f+1)
}
return nil
default:
// invariant, config should be validated
return fmt.Errorf("unsupported mode quorum %s", quorumType)
}
}

func deviation(method string, previousValue values.Value, nextValue values.Value) (decimal.Decimal, error) {
Expand Down Expand Up @@ -561,6 +591,13 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) {
if len(field.Method) == 0 || !isOneOf(field.Method, []string{AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE}) {
return ReduceAggConfig{}, fmt.Errorf("aggregation field must contain a method. options: [%s, %s]", AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE)
}
if field.Method == AGGREGATION_METHOD_MODE && len(field.ModeQuorum) == 0 {
field.ModeQuorum = MODE_QUORUM_OCR
parsedConfig.Fields[i].ModeQuorum = MODE_QUORUM_OCR
}
if field.Method == AGGREGATION_METHOD_MODE && !isOneOf(field.ModeQuorum, []string{MODE_QUORUM_ANY, MODE_QUORUM_OCR}) {
return ReduceAggConfig{}, fmt.Errorf("mode quorum must be one of options: [%s, %s]", MODE_QUORUM_ANY, MODE_QUORUM_OCR)
}
if len(field.DeviationString) > 0 && isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) {
return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", field.DeviationType)
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,25 @@ func TestReduceAggregator_Aggregate(t *testing.T) {
return map[commontypes.OracleID][]values.Value{1: {mockValue}, 2: {mockValue}, 3: {mockValue}}
},
},
{
name: "reduce error mode with mode quorum of ocr",
previousOutcome: nil,
fields: []aggregators.AggregationField{
{
Method: "mode",
ModeQuorum: "ocr",
OutputKey: "Price",
},
},
extraConfig: map[string]any{},
observationsFactory: func() map[commontypes.OracleID][]values.Value {
mockValue, err := values.Wrap(true)
require.NoError(t, err)
mockValue2, err := values.Wrap(true)
require.NoError(t, err)
return map[commontypes.OracleID][]values.Value{1: {mockValue}, 2: {mockValue2}}
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -825,6 +844,7 @@ func TestMedianAggregator_ParseConfig(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedId",
Method: "mode",
ModeQuorum: "ocr",
DeviationString: "1.1",
Deviation: decimal.NewFromFloat(1.1),
DeviationType: "absolute",
Expand Down Expand Up @@ -1153,6 +1173,23 @@ func TestMedianAggregator_ParseConfig(t *testing.T) {
return vMap
},
},
{
name: "invalid mode quorum",
configFactory: func() *values.Map {
vMap, err := values.NewMap(map[string]any{
"fields": []aggregators.AggregationField{
{
InputKey: "Price",
Method: "mode",
ModeQuorum: "invalid",
OutputKey: "Price",
},
},
})
require.NoError(t, err)
return vMap
},
},
}

for _, tt := range cases {
Expand Down Expand Up @@ -1233,6 +1270,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1278,6 +1316,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "BoolField",
OutputKey: "BoolField",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1323,6 +1362,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1368,6 +1408,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "BoolField",
OutputKey: "BoolField",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1413,6 +1454,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1458,6 +1500,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1503,6 +1546,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1548,6 +1592,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1593,6 +1638,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1638,6 +1684,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1683,6 +1730,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down Expand Up @@ -1728,6 +1776,7 @@ func TestAggregateShouldReport(t *testing.T) {
InputKey: "FeedID",
OutputKey: "FeedID",
Method: "mode",
ModeQuorum: "any",
DeviationType: "any",
},
{
Expand Down
Loading