diff --git a/go.mod b/go.mod index ac9c2e4..38498dc 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,9 @@ go 1.21.3 require ( github.com/hashicorp/go-plugin v1.5.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240119014733-5ea50bbe81be github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 + github.com/stretchr/testify v1.8.4 ) require ( @@ -34,13 +35,13 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9 // indirect github.com/oklog/run v1.0.0 // indirect + github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect - github.com/stretchr/testify v1.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 // indirect go.opentelemetry.io/otel v1.19.0 // indirect diff --git a/go.sum b/go.sum index 5047c72..bed5d10 100644 --- a/go.sum +++ b/go.sum @@ -183,6 +183,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= +github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -200,8 +202,10 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad h1:ysPjfbCPJuVxxFZa1Ifv8OPE20pzvnEHjJrPDUo4gT0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20231206181640-faad3f11cfad/go.mod h1:IdlfCN9rUs8Q/hrOYe8McNBIwEOHEsi0jilb3Cw77xs= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240118144452-7fcb73fb88c8 h1:6pL+DzkEGLT4NTd1SoGk1vL0/cbkDuvBsV0DRUb4lrc= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240118144452-7fcb73fb88c8/go.mod h1:f+0ei9N4PlTJHu7pbGzEjTnBUr45syPdGFu5+31lS5Q= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240119014733-5ea50bbe81be h1:ahwXHg5kImjhq85kWRxylXJBqClZpdrBvip9ApQJTP8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240119014733-5ea50bbe81be/go.mod h1:f+0ei9N4PlTJHu7pbGzEjTnBUr45syPdGFu5+31lS5Q= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306 h1:ko88+ZznniNJZbZPWAvHQU8SwKAdHngdDZ+pvVgB5ss= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306/go.mod h1:w1sAEES3g3PuV/RzUrgow20W2uErMly84hhD3um1WL4= github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU= @@ -209,10 +213,14 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= diff --git a/median/aggregated_attribute_observation.go b/median/aggregated_attribute_observation.go new file mode 100644 index 0000000..681d541 --- /dev/null +++ b/median/aggregated_attribute_observation.go @@ -0,0 +1,45 @@ +package median + +import ( + "cmp" + "math/big" + "slices" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" +) + +type aggregatedAttributedObservation struct { + Timestamp uint32 + Observers [32]commontypes.OracleID + Observations []*big.Int + JuelsPerFeeCoin *big.Int +} + +func aggregate(observations []median.ParsedAttributedObservation) *aggregatedAttributedObservation { + // defensive copy + n := len(observations) + observations = slices.Clone(observations) + + aggregated := &aggregatedAttributedObservation{Observations: make([]*big.Int, len(observations))} + + slices.SortFunc(observations, func(a, b median.ParsedAttributedObservation) int { + return cmp.Compare(a.Timestamp, b.Timestamp) + }) + aggregated.Timestamp = observations[n/2].Timestamp + + slices.SortFunc(observations, func(a, b median.ParsedAttributedObservation) int { + return a.JuelsPerFeeCoin.Cmp(b.JuelsPerFeeCoin) + }) + aggregated.JuelsPerFeeCoin = observations[n/2].JuelsPerFeeCoin + + slices.SortFunc(observations, func(a, b median.ParsedAttributedObservation) int { + return a.Value.Cmp(b.Value) + }) + + for i, o := range observations { + aggregated.Observers[i] = o.Observer + aggregated.Observations[i] = o.Value + } + return aggregated +} diff --git a/median/plugin.go b/median/plugin.go index 54c729c..1780c60 100644 --- a/median/plugin.go +++ b/median/plugin.go @@ -2,6 +2,7 @@ package median import ( "context" + "errors" "math/big" "time" @@ -14,6 +15,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types" ) +const contractName = "median" + type Plugin struct { loop.Plugin stop services.StopChan @@ -27,6 +30,7 @@ func (p *Plugin) NewMedianFactory(ctx context.Context, provider types.MedianProv var ctxVals loop.ContextValues ctxVals.SetValues(ctx) lggr := logger.With(p.Logger, ctxVals.Args()...) + factory := median.NumericalMedianFactory{ DataSource: dataSource, JuelsPerFeeCoinDataSource: juelsPerFeeCoin, @@ -38,13 +42,21 @@ func (p *Plugin) NewMedianFactory(ctx context.Context, provider types.MedianProv } }), OnchainConfigCodec: provider.OnchainConfigCodec(), - ReportCodec: provider.ReportCodec(), } + if cr := provider.ChainReader(); cr != nil { - factory.ContractTransmitter = &chainReaderContract{cr, types.BoundContract{Name: "median"}} + factory.ContractTransmitter = &chainReaderContract{chainReader: cr, lggr: lggr} } else { factory.ContractTransmitter = provider.MedianContract() } + + if codec := provider.Codec(); codec != nil { + factory.ReportCodec = &reportCodec{codec: codec} + } else { + lggr.Info("No codec provided, defaulting back to median specific ReportCodec") + factory.ReportCodec = provider.ReportCodec() + } + s := &reportingPluginFactoryService{lggr: logger.Named(lggr, "ReportingPluginFactory"), ReportingPluginFactory: factory} p.SubService(s) @@ -75,7 +87,7 @@ func (r *reportingPluginFactoryService) HealthReport() map[string]error { // chainReaderContract adapts a [types.ChainReader] to [median.MedianContract]. type chainReaderContract struct { chainReader types.ChainReader - contract types.BoundContract + lggr logger.Logger } type latestTransmissionDetailsResponse struct { @@ -95,9 +107,22 @@ type latestRoundRequested struct { func (c *chainReaderContract) LatestTransmissionDetails(ctx context.Context) (configDigest ocrtypes.ConfigDigest, epoch uint32, round uint8, latestAnswer *big.Int, latestTimestamp time.Time, err error) { var resp latestTransmissionDetailsResponse - err = c.chainReader.GetLatestValue(ctx, c.contract, "LatestTransmissionDetails", nil, &resp) + err = c.chainReader.GetLatestValue(ctx, contractName, "LatestTransmissionDetails", nil, &resp) if err != nil { - return + if errors.Is(err, types.ErrNotFound) { + // If there's nothing transmitted yet, an implementation will not have emitted an event, + // or may not find details of a latest transmission on-chain if it's a function call. + // A zeroed out latestTransmissionDetailsResponse tells later parts of the system that there's no data yet. + c.lggr.Warn("LatestTransmissionDetails not found", "err", err) + } else { + return + } + } + + // Depending on if there is a LatestAnswer or not, and the implementation of the ChainReader, + // it's possible that this will be unset. The desired behaviour in that case is to have a zero value. + if resp.LatestAnswer == nil { + resp.LatestAnswer = new(big.Int) } return resp.ConfigDigest, resp.Epoch, resp.Round, resp.LatestAnswer, resp.LatestTimestamp, nil @@ -106,9 +131,16 @@ func (c *chainReaderContract) LatestTransmissionDetails(ctx context.Context) (co func (c *chainReaderContract) LatestRoundRequested(ctx context.Context, lookback time.Duration) (configDigest ocrtypes.ConfigDigest, epoch uint32, round uint8, err error) { var resp latestRoundRequested - err = c.chainReader.GetLatestValue(ctx, c.contract, "LatestRoundReported", map[string]any{"lookback": lookback}, &resp) + err = c.chainReader.GetLatestValue(ctx, contractName, "LatestRoundRequested", nil, &resp) if err != nil { - return + if errors.Is(err, types.ErrNotFound) { + // If there's nothing on-chain yet, an implementation will not have emitted an event, + // or may not find details of a latest transmission on-chain if it's a function call. + // A zeroed out LatestRoundRequested tells later parts of the system that there's no data yet. + c.lggr.Warn("LatestRoundRequested not found", "err", err) + } else { + return + } } return resp.ConfigDigest, resp.Epoch, resp.Round, nil diff --git a/median/report_codec.go b/median/report_codec.go new file mode 100644 index 0000000..d6137aa --- /dev/null +++ b/median/report_codec.go @@ -0,0 +1,42 @@ +package median + +import ( + "context" + "fmt" + "math/big" + + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +) + +const typeName = "MedianReport" + +type reportCodec struct { + codec types.Codec +} + +var _ median.ReportCodec = &reportCodec{} + +func (r *reportCodec) BuildReport(observations []median.ParsedAttributedObservation) (ocrtypes.Report, error) { + if len(observations) == 0 { + return nil, fmt.Errorf("cannot build report from empty attributed observations") + } + + return r.codec.Encode(context.Background(), aggregate(observations), typeName) +} + +func (r *reportCodec) MedianFromReport(report ocrtypes.Report) (*big.Int, error) { + agg := &aggregatedAttributedObservation{} + if err := r.codec.Decode(context.Background(), report, agg, typeName); err != nil { + return nil, err + } + observations := make([]*big.Int, len(agg.Observations)) + copy(observations, agg.Observations) + medianObservation := len(agg.Observations) / 2 + return agg.Observations[medianObservation], nil +} + +func (r *reportCodec) MaxReportLength(n int) (int, error) { + return r.codec.GetMaxDecodingSize(context.Background(), n, typeName) +} diff --git a/median/report_codec_test.go b/median/report_codec_test.go new file mode 100644 index 0000000..ab7deaf --- /dev/null +++ b/median/report_codec_test.go @@ -0,0 +1,186 @@ +package median + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReportCodec(t *testing.T) { + anyReports := []median.ParsedAttributedObservation{ + { + Timestamp: 123, + Value: big.NewInt(300), + JuelsPerFeeCoin: big.NewInt(100), + Observer: 0, + }, + { + Timestamp: 125, + Value: big.NewInt(200), + JuelsPerFeeCoin: big.NewInt(110), + Observer: 1, + }, + { + Timestamp: 124, + Value: big.NewInt(250), + JuelsPerFeeCoin: big.NewInt(90), + Observer: 2, + }, + } + + aggReports := aggregatedAttributedObservation{ + Timestamp: 124, + Observers: [32]commontypes.OracleID{1, 2, 0}, + Observations: []*big.Int{ + big.NewInt(200), + big.NewInt(250), + big.NewInt(300), + }, + JuelsPerFeeCoin: big.NewInt(100), + } + + anyEncodedReport := []byte{5, 6, 7, 8} + + anyError := errors.New("nope not today") + + t.Run("BuildReport builds the type and delegates to generic codec", func(t *testing.T) { + rc := reportCodec{ + codec: &testCodec{ + t: t, + expected: &aggReports, + result: anyEncodedReport, + }, + } + + encoded, err := rc.BuildReport(anyReports) + require.NoError(t, err) + assert.Equal(t, types.Report(anyEncodedReport), encoded) + }) + + t.Run("BuildReport returns error if there are no reports", func(t *testing.T) { + rc := reportCodec{ + codec: &testCodec{ + t: t, + expected: &aggReports, + result: anyEncodedReport, + }, + } + + _, err := rc.BuildReport(nil) + assert.Error(t, err) + + _, err = rc.BuildReport([]median.ParsedAttributedObservation{}) + assert.Error(t, err) + }) + + t.Run("BuildReport returns error if codec returns error", func(t *testing.T) { + rc := reportCodec{ + &testCodec{ + t: t, + expected: &aggReports, + result: anyEncodedReport, + err: anyError, + }, + } + + _, err := rc.BuildReport(anyReports) + assert.Equal(t, anyError, err) + }) + + t.Run("MedianFromReport delegates to codec and gets the median", func(t *testing.T) { + rc := reportCodec{ + &testCodec{ + t: t, + expected: anyEncodedReport, + result: aggReports, + }, + } + + medianVal, err := rc.MedianFromReport(anyEncodedReport) + require.NoError(t, err) + assert.Equal(t, big.NewInt(250), medianVal) + }) + + t.Run("MedianFromReport returns error if codec returns error", func(t *testing.T) { + rc := reportCodec{ + &testCodec{ + t: t, + expected: anyEncodedReport, + result: aggReports, + err: anyError, + }, + } + + _, err := rc.MedianFromReport(anyEncodedReport) + assert.Equal(t, anyError, err) + }) + + anyN := 10 + anyLen := 200 + t.Run("MaxReportLength delegates to codec", func(t *testing.T) { + rc := reportCodec{ + &testCodec{ + t: t, + expected: anyN, + result: anyLen, + }, + } + + length, err := rc.MaxReportLength(anyN) + require.NoError(t, err) + assert.Equal(t, anyLen, length) + }) + + t.Run("MaxReportLength returns error if codec returns error", func(t *testing.T) { + rc := reportCodec{&testCodec{ + t: t, + expected: 10, + result: anyLen, + err: anyError, + }, + } + + _, err := rc.MaxReportLength(10) + assert.Equal(t, anyError, err) + }) +} + +type testCodec struct { + t *testing.T + expected any + result any + err error +} + +func (t *testCodec) Encode(_ context.Context, item any, itemType string) ([]byte, error) { + assert.Equal(t.t, t.expected, item) + assert.Equal(t.t, typeName, itemType) + return t.result.([]byte), t.err +} + +func (t *testCodec) GetMaxEncodingSize(_ context.Context, n int, itemType string) (int, error) { + assert.Equal(t.t, t.expected, n) + assert.Equal(t.t, typeName, itemType) + return t.result.(int), t.err +} + +func (t *testCodec) Decode(_ context.Context, raw []byte, into any, itemType string) error { + assert.Equal(t.t, t.expected, raw) + assert.Equal(t.t, typeName, itemType) + set := into.(*aggregatedAttributedObservation) + *set = t.result.(aggregatedAttributedObservation) + return t.err +} + +func (t *testCodec) GetMaxDecodingSize(_ context.Context, n int, itemType string) (int, error) { + assert.Equal(t.t, t.expected, n) + assert.Equal(t.t, typeName, itemType) + return t.result.(int), t.err +}