From f6f2457d9367c543bef20491a26785266849c154 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 6 Dec 2024 08:31:42 +0100 Subject: [PATCH] CCIP-4447 Promwrapper for OCR3 plugins and factories (#15521) * PoC * Basic implementation * Basic implementation * Basic implementation * fixes * fixes * fixes * fixes * fixes * fixes * fixes * fixes * fixes * fixes * fixes --- .changeset/fuzzy-hairs-appear.md | 5 + .../capabilities/ccip/oraclecreator/plugin.go | 9 + core/services/ocr3/promwrapper/factory.go | 42 +++++ .../services/ocr3/promwrapper/factory_test.go | 41 +++++ core/services/ocr3/promwrapper/plugin.go | 122 +++++++++++++ core/services/ocr3/promwrapper/plugin_test.go | 171 ++++++++++++++++++ core/services/ocr3/promwrapper/types.go | 51 ++++++ 7 files changed, 441 insertions(+) create mode 100644 .changeset/fuzzy-hairs-appear.md create mode 100644 core/services/ocr3/promwrapper/factory.go create mode 100644 core/services/ocr3/promwrapper/factory_test.go create mode 100644 core/services/ocr3/promwrapper/plugin.go create mode 100644 core/services/ocr3/promwrapper/plugin_test.go create mode 100644 core/services/ocr3/promwrapper/types.go diff --git a/.changeset/fuzzy-hairs-appear.md b/.changeset/fuzzy-hairs-appear.md new file mode 100644 index 00000000000..a4797462546 --- /dev/null +++ b/.changeset/fuzzy-hairs-appear.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Prometheus observability layer added to OCR3 Reporting Plugins #internal diff --git a/core/capabilities/ccip/oraclecreator/plugin.go b/core/capabilities/ccip/oraclecreator/plugin.go index f8868b5d9b9..1b8c6344349 100644 --- a/core/capabilities/ccip/oraclecreator/plugin.go +++ b/core/capabilities/ccip/oraclecreator/plugin.go @@ -22,6 +22,7 @@ import ( cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr3/promwrapper" "github.com/smartcontractkit/libocr/commontypes" libocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus" @@ -229,6 +230,12 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter( ) (ocr3types.ReportingPluginFactory[[]byte], ocr3types.ContractTransmitter[[]byte], error) { var factory ocr3types.ReportingPluginFactory[[]byte] var transmitter ocr3types.ContractTransmitter[[]byte] + + chainID, err := chainsel.GetChainIDFromSelector(uint64(config.Config.ChainSelector)) + if err != nil { + return nil, nil, fmt.Errorf("unsupported chain selector %d %w", config.Config.ChainSelector, err) + } + if config.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit) { if !i.peerWrapper.IsStarted() { return nil, nil, fmt.Errorf("peer wrapper is not started") @@ -263,6 +270,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter( rmnPeerClient, rmnCrypto, ) + factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPCommit") transmitter = ocrimpls.NewCommitContractTransmitter[[]byte](destChainWriter, ocrtypes.Account(destFromAccounts[0]), hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm? @@ -283,6 +291,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter( contractReaders, chainWriters, ) + factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPExec") transmitter = ocrimpls.NewExecContractTransmitter[[]byte](destChainWriter, ocrtypes.Account(destFromAccounts[0]), hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm? diff --git a/core/services/ocr3/promwrapper/factory.go b/core/services/ocr3/promwrapper/factory.go new file mode 100644 index 00000000000..0dabd346112 --- /dev/null +++ b/core/services/ocr3/promwrapper/factory.go @@ -0,0 +1,42 @@ +package promwrapper + +import ( + "context" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" +) + +var _ ocr3types.ReportingPluginFactory[any] = &ReportingPluginFactory[any]{} + +type ReportingPluginFactory[RI any] struct { + origin ocr3types.ReportingPluginFactory[RI] + chainID string + plugin string +} + +func NewReportingPluginFactory[RI any]( + origin ocr3types.ReportingPluginFactory[RI], + chainID string, + plugin string, +) *ReportingPluginFactory[RI] { + return &ReportingPluginFactory[RI]{ + origin: origin, + chainID: chainID, + plugin: plugin, + } +} + +func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) { + plugin, info, err := r.origin.NewReportingPlugin(ctx, config) + if err != nil { + return nil, ocr3types.ReportingPluginInfo{}, err + } + wrapped := newReportingPlugin( + plugin, + r.chainID, + r.plugin, + promOCR3ReportsGenerated, + promOCR3Durations, + ) + return wrapped, info, err +} diff --git a/core/services/ocr3/promwrapper/factory_test.go b/core/services/ocr3/promwrapper/factory_test.go new file mode 100644 index 00000000000..72f35aad172 --- /dev/null +++ b/core/services/ocr3/promwrapper/factory_test.go @@ -0,0 +1,41 @@ +package promwrapper + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +func Test_WrapperFactory(t *testing.T) { + validFactory := NewReportingPluginFactory(fakeFactory[uint]{}, "solana", "plugin") + failingFactory := NewReportingPluginFactory(fakeFactory[uint]{err: errors.New("error")}, "123", "plugin") + + plugin, _, err := validFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{}) + require.NoError(t, err) + + _, err = plugin.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil) + require.NoError(t, err) + + require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "true")) + require.Equal(t, 0, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "false")) + + _, _, err = failingFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{}) + require.Error(t, err) +} + +type fakeFactory[RI any] struct { + err error +} + +func (f fakeFactory[RI]) NewReportingPlugin(context.Context, ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) { + if f.err != nil { + return nil, ocr3types.ReportingPluginInfo{}, f.err + } + return fakePlugin[RI]{}, ocr3types.ReportingPluginInfo{}, nil +} diff --git a/core/services/ocr3/promwrapper/plugin.go b/core/services/ocr3/promwrapper/plugin.go new file mode 100644 index 00000000000..e4e0c3d35d5 --- /dev/null +++ b/core/services/ocr3/promwrapper/plugin.go @@ -0,0 +1,122 @@ +package promwrapper + +import ( + "context" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +) + +var _ ocr3types.ReportingPlugin[any] = &reportingPlugin[any]{} + +type reportingPlugin[RI any] struct { + ocr3types.ReportingPlugin[RI] + chainID string + plugin string + + // Prometheus components for tracking metrics + reportsGenerated *prometheus.CounterVec + durations *prometheus.HistogramVec +} + +func newReportingPlugin[RI any]( + origin ocr3types.ReportingPlugin[RI], + chainID string, + plugin string, + reportsGenerated *prometheus.CounterVec, + durations *prometheus.HistogramVec, +) *reportingPlugin[RI] { + return &reportingPlugin[RI]{ + ReportingPlugin: origin, + chainID: chainID, + plugin: plugin, + reportsGenerated: reportsGenerated, + durations: durations, + } +} + +func (p *reportingPlugin[RI]) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ocrtypes.Query, error) { + return withObservedExecution(p, query, func() (ocrtypes.Query, error) { + return p.ReportingPlugin.Query(ctx, outctx) + }) +} + +func (p *reportingPlugin[RI]) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query) (ocrtypes.Observation, error) { + return withObservedExecution(p, observation, func() (ocrtypes.Observation, error) { + return p.ReportingPlugin.Observation(ctx, outctx, query) + }) +} + +func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, ao ocrtypes.AttributedObservation) error { + _, err := withObservedExecution(p, validateObservation, func() (any, error) { + err := p.ReportingPlugin.ValidateObservation(ctx, outctx, query, ao) + return nil, err + }) + return err +} + +func (p *reportingPlugin[RI]) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, aos []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) { + return withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) { + return p.ReportingPlugin.Outcome(ctx, outctx, query, aos) + }) +} + +func (p *reportingPlugin[RI]) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) { + result, err := withObservedExecution(p, reports, func() ([]ocr3types.ReportPlus[RI], error) { + return p.ReportingPlugin.Reports(ctx, seqNr, outcome) + }) + p.trackReports(reports, len(result)) + return result, err +} + +func (p *reportingPlugin[RI]) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) { + result, err := withObservedExecution(p, shouldAccept, func() (bool, error) { + return p.ReportingPlugin.ShouldAcceptAttestedReport(ctx, seqNr, reportWithInfo) + }) + p.trackReports(shouldAccept, boolToInt(result)) + return result, err +} + +func (p *reportingPlugin[RI]) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) { + result, err := withObservedExecution(p, shouldTransmit, func() (bool, error) { + return p.ReportingPlugin.ShouldTransmitAcceptedReport(ctx, seqNr, reportWithInfo) + }) + p.trackReports(shouldTransmit, boolToInt(result)) + return result, err +} + +func (p *reportingPlugin[RI]) trackReports( + function functionType, + count int, +) { + p.reportsGenerated. + WithLabelValues(p.chainID, p.plugin, string(function)). + Add(float64(count)) +} + +func boolToInt(arg bool) int { + if arg { + return 1 + } + return 0 +} + +func withObservedExecution[RI, R any]( + p *reportingPlugin[RI], + function functionType, + exec func() (R, error), +) (R, error) { + start := time.Now() + result, err := exec() + + success := err == nil + + p.durations. + WithLabelValues(p.chainID, p.plugin, string(function), strconv.FormatBool(success)). + Observe(float64(time.Since(start))) + + return result, err +} diff --git a/core/services/ocr3/promwrapper/plugin_test.go b/core/services/ocr3/promwrapper/plugin_test.go new file mode 100644 index 00000000000..35a97d109aa --- /dev/null +++ b/core/services/ocr3/promwrapper/plugin_test.go @@ -0,0 +1,171 @@ +package promwrapper + +import ( + "context" + "errors" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +) + +func Test_ReportsGeneratedGauge(t *testing.T) { + plugin1 := newReportingPlugin( + fakePlugin[uint]{reports: make([]ocr3types.ReportPlus[uint], 2)}, + "123", "empty", promOCR3ReportsGenerated, promOCR3Durations, + ) + plugin2 := newReportingPlugin( + fakePlugin[bool]{reports: make([]ocr3types.ReportPlus[bool], 10)}, + "solana", "different_plugin", promOCR3ReportsGenerated, promOCR3Durations, + ) + plugin3 := newReportingPlugin( + fakePlugin[string]{err: errors.New("error")}, + "1234", "empty", promOCR3ReportsGenerated, promOCR3Durations, + ) + + r1, err := plugin1.Reports(tests.Context(t), 1, nil) + require.NoError(t, err) + require.Len(t, r1, 2) + + for i := 0; i < 10; i++ { + r2, err1 := plugin2.Reports(tests.Context(t), 1, nil) + require.NoError(t, err1) + require.Len(t, r2, 10) + } + + _, err = plugin2.ShouldAcceptAttestedReport(tests.Context(t), 1, ocr3types.ReportWithInfo[bool]{}) + require.NoError(t, err) + + _, err = plugin3.Reports(tests.Context(t), 1, nil) + require.Error(t, err) + + g1 := testutil.ToFloat64(promOCR3ReportsGenerated.WithLabelValues("123", "empty", "reports")) + require.Equal(t, 2, int(g1)) + + g2 := testutil.ToFloat64(promOCR3ReportsGenerated.WithLabelValues("solana", "different_plugin", "reports")) + require.Equal(t, 100, int(g2)) + + g3 := testutil.ToFloat64(promOCR3ReportsGenerated.WithLabelValues("solana", "different_plugin", "shouldAccept")) + require.Equal(t, 1, int(g3)) + + g4 := testutil.ToFloat64(promOCR3ReportsGenerated.WithLabelValues("1234", "empty", "reports")) + require.Equal(t, 0, int(g4)) +} + +func Test_DurationHistograms(t *testing.T) { + plugin1 := newReportingPlugin( + fakePlugin[uint]{}, + "123", "empty", promOCR3ReportsGenerated, promOCR3Durations, + ) + plugin2 := newReportingPlugin( + fakePlugin[uint]{err: errors.New("error")}, + "123", "empty", promOCR3ReportsGenerated, promOCR3Durations, + ) + plugin3 := newReportingPlugin( + fakePlugin[uint]{}, + "solana", "commit", promOCR3ReportsGenerated, promOCR3Durations, + ) + + for _, p := range []*reportingPlugin[uint]{plugin1, plugin2, plugin3} { + _, _ = p.Query(tests.Context(t), ocr3types.OutcomeContext{}) + for i := 0; i < 2; i++ { + _, _ = p.Observation(tests.Context(t), ocr3types.OutcomeContext{}, nil) + } + _ = p.ValidateObservation(tests.Context(t), ocr3types.OutcomeContext{}, nil, ocrtypes.AttributedObservation{}) + _, _ = p.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil) + _, _ = p.Reports(tests.Context(t), 0, nil) + _, _ = p.ShouldAcceptAttestedReport(tests.Context(t), 0, ocr3types.ReportWithInfo[uint]{}) + _, _ = p.ShouldTransmitAcceptedReport(tests.Context(t), 0, ocr3types.ReportWithInfo[uint]{}) + } + + require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "123", "empty", "query", "true")) + require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "123", "empty", "query", "false")) + require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "commit", "query", "true")) + + require.Equal(t, 2, counterFromHistogramByLabels(t, promOCR3Durations, "123", "empty", "observation", "true")) + require.Equal(t, 2, counterFromHistogramByLabels(t, promOCR3Durations, "123", "empty", "observation", "false")) + require.Equal(t, 2, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "commit", "observation", "true")) +} + +type fakePlugin[RI any] struct { + reports []ocr3types.ReportPlus[RI] + err error +} + +func (f fakePlugin[RI]) Query(context.Context, ocr3types.OutcomeContext) (ocrtypes.Query, error) { + if f.err != nil { + return nil, f.err + } + return ocrtypes.Query{}, nil +} + +func (f fakePlugin[RI]) Observation(context.Context, ocr3types.OutcomeContext, ocrtypes.Query) (ocrtypes.Observation, error) { + if f.err != nil { + return nil, f.err + } + return ocrtypes.Observation{}, nil +} + +func (f fakePlugin[RI]) ValidateObservation(context.Context, ocr3types.OutcomeContext, ocrtypes.Query, ocrtypes.AttributedObservation) error { + return f.err +} + +func (f fakePlugin[RI]) ObservationQuorum(context.Context, ocr3types.OutcomeContext, ocrtypes.Query, []ocrtypes.AttributedObservation) (quorumReached bool, err error) { + return false, f.err +} + +func (f fakePlugin[RI]) Outcome(context.Context, ocr3types.OutcomeContext, ocrtypes.Query, []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) { + if f.err != nil { + return nil, f.err + } + return ocr3types.Outcome{}, nil +} + +func (f fakePlugin[RI]) Reports(context.Context, uint64, ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) { + if f.err != nil { + return nil, f.err + } + return f.reports, nil +} + +func (f fakePlugin[RI]) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + if f.err != nil { + return false, f.err + } + return true, nil +} + +func (f fakePlugin[RI]) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + if f.err != nil { + return false, f.err + } + return true, nil +} + +func (f fakePlugin[RI]) Close() error { + return f.err +} + +func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int { + observer, err := histogramVec.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + + metricCh := make(chan prometheus.Metric, 1) + observer.(prometheus.Histogram).Collect(metricCh) + close(metricCh) + + metric := <-metricCh + pb := &io_prometheus_client.Metric{} + err = metric.Write(pb) + require.NoError(t, err) + + //nolint:gosec // we don't care about that in tests + return int(pb.GetHistogram().GetSampleCount()) +} diff --git a/core/services/ocr3/promwrapper/types.go b/core/services/ocr3/promwrapper/types.go new file mode 100644 index 00000000000..bf6a1b2a39c --- /dev/null +++ b/core/services/ocr3/promwrapper/types.go @@ -0,0 +1,51 @@ +package promwrapper + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type functionType string + +const ( + query functionType = "query" + observation functionType = "observation" + validateObservation functionType = "validateObservation" + outcome functionType = "outcome" + reports functionType = "reports" + shouldAccept functionType = "shouldAccept" + shouldTransmit functionType = "shouldTransmit" +) + +var ( + buckets = []float64{ + float64(10 * time.Millisecond), + float64(50 * time.Millisecond), + float64(100 * time.Millisecond), + float64(200 * time.Millisecond), + float64(500 * time.Millisecond), + float64(700 * time.Millisecond), + float64(time.Second), + float64(2 * time.Second), + float64(5 * time.Second), + float64(10 * time.Second), + } + + promOCR3ReportsGenerated = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ocr3_reporting_plugin_reports_processed", + Help: "Tracks number of reports processed/generated within by different OCR3 functions", + }, + []string{"chainID", "plugin", "function"}, + ) + promOCR3Durations = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "ocr3_reporting_plugin_duration", + Help: "The amount of time elapsed during the OCR3 plugin's function", + Buckets: buckets, + }, + []string{"chainID", "plugin", "function", "success"}, + ) +)