diff --git a/] b/] new file mode 100644 index 00000000000..43374c0e6e8 --- /dev/null +++ b/] @@ -0,0 +1,118 @@ +package llo + +import ( + "context" + "fmt" + "math/big" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +type Runner interface { + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) +} + +// TODO: Generalize to beyond simply an int +type DataPoint *big.Int + +type Stream interface { + Observe(ctx context.Context) (DataPoint, error) +} + +type stream struct { + id string + lggr logger.Logger + runResults chan<- *pipeline.Run + spec pipeline.Spec + runner Runner +} + +func NewStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) Stream { + return newStream(lggr, id, runResults, pipelineSpec, pipelineRunner) +} + +func newStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) *stream { + return &stream{id, lggr, runResults, pipelineSpec, pipelineRunner} +} + +func (s *stream) Observe(ctx context.Context) (DataPoint, error) { + var run *pipeline.Run + run, trrs, err := s.executeRun(ctx) + if err != nil { + return nil, fmt.Errorf("Observe failed while executing run: %w", err) + } + select { + case s.runResults <- run: + default: + s.lggr.Warnf("unable to enqueue run save for job ID %d, buffer full", s.spec.JobID) + } + + // NOTE: trrs comes back as _all_ tasks, but we only want the terminal ones + // They are guaranteed to be sorted by index asc so should be in the correct order + var finaltrrs []pipeline.TaskRunResult + for _, trr := range trrs { + if trr.IsTerminal() { + finaltrrs = append(finaltrrs, trr) + } + } + + // FIXME: How to handle arbitrary-shaped inputs? + // For now just assume everything is one *big.Int + var parsed parseOutput + parsed, pipelineExecutionErr = ds.parse(finaltrrs) + if pipelineExecutionErr != nil { + pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr) + return + } + obs.BenchmarkPrice = parsed.benchmarkPrice + obs.Bid = parsed.bid + obs.Ask = parsed.ask + +} + +// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). +// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. +func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "jb": map[string]interface{}{ + "databaseID": ds.jb.ID, + "externalJobID": ds.jb.ExternalJobID, + "name": ds.jb.Name.ValueOrZero(), + }, + }) + + run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) + if err != nil { + return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) + } + + return run, trrs, err +} + +// returns error on parse errors: if something is the wrong type +func (ds *datasource) parse(trrs pipeline.TaskRunResults) (*big.Int, error) { + var finaltrrs []pipeline.TaskRunResult + for _, trr := range trrs { + // only return terminal trrs from executeRun + if trr.IsTerminal() { + finaltrrs = append(finaltrrs, trr) + } + } + + // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed + // by the pipeline executor + if len(finaltrrs) != 1 { + return o, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs)) + } + res := finaltrrs[0].Result + if res.Error != nil { + o.benchmarkPrice.Err = res.Error + } else if val, err := toBigInt(res.Value); err != nil { + return fmt.Errorf("failed to parse BenchmarkPrice: %w", err) + } else { + o.benchmarkPrice.Val = val + } + + return o, merr +} diff --git a/contracts/src/v0.8/llo-feeds/ConfigurationStore.sol b/contracts/src/v0.8/llo-feeds/ConfigurationStore.sol new file mode 100644 index 00000000000..8eaa03f76c7 --- /dev/null +++ b/contracts/src/v0.8/llo-feeds/ConfigurationStore.sol @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: UNLICENSED +pragma solidity ^0.8.13; + +struct ChannelDefinition { + // e.g. evm, solana, CosmWasm, kalechain, etc... + string reportFormat; + // Specifies the chain on which this channel can be verified. Currently uses + // CCIP chain selectors, but lots of other schemes are possible as well. + uint64 chainSelector; + // We assume that StreamIDs is always non-empty and that the 0-th stream + // contains the verification price in LINK and the 1-st stream contains the + // verification price in the native coin. + string[] streamIDs; +} + +contract ConfigurationStore { + //////////////////////// + // protocol instance management + //////////////////////// + + ChannelDefinition[] private s_channelDefinitions; + + // setProductionConfig() onlyOwner -- the usual OCR way + // sets config for the production protocol instance + + // setStagingConfig() onlyOwner -- the usual OCR way + // sets config for the staging protocol instance + + // promoteStagingConfig() onlyOwner + // this will trigger the following: + // - offchain ShouldRetireCache will start returning true for the old (production) + // protocol instance + // - once the old production instance retires it will generate a handover + // retirement report + // - the staging instance will become the new production instance once + // any honest oracle that is on both instances forward the retirement + // report from the old instance to the new instace via the + // PredecessorRetirementReportCache + // + // Note: the promotion flow only works if the previous production instance + // is working correctly & generating reports. If that's not the case, the + // owner is expected to "setProductionConfig" directly instead. This will + // cause "gaps" to be created, but that seems unavoidable in such a scenario. + + //////////////////////// + // channel management + //////////////////////// + + addChannel(ChannelDefinition) onlyOwner { + // TODO + } + + removeChannel(bytes32 channelId) onlyOwner { + // TODO + + } + + getChannelDefinitions() onlyEOA public view returns (ChannelDefinition[] memory) { + // TODO + + } + // used by ChannelDefinitionCache +} \ No newline at end of file diff --git a/core/services/llo/channel_definition_cache.go b/core/services/llo/channel_definition_cache.go new file mode 100644 index 00000000000..f5941e5c2a2 --- /dev/null +++ b/core/services/llo/channel_definition_cache.go @@ -0,0 +1,52 @@ +package llo + +import ( + "context" + "maps" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink-relay/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type ChannelDefinitionCache interface { + relayllo.ChannelDefinitionCache + services.Service +} + +var _ ChannelDefinitionCache = &channelDefinitionCache{} + +type channelDefinitionCache struct { + services.StateMachine + + lggr logger.Logger + definitions relayllo.ChannelDefinitions +} + +func NewChannelDefinitionCache() ChannelDefinitionCache { + return &channelDefinitionCache{} +} + +func (c *channelDefinitionCache) Start(ctx context.Context) error { + // TODO: Initial load, then poll + // TODO: needs to be populated asynchronously from onchain ConfigurationStore + return nil +} + +func (c *channelDefinitionCache) Close() error { + // TODO + return nil +} + +func (c *channelDefinitionCache) HealthReport() map[string]error { + report := map[string]error{c.Name(): c.Healthy()} + return report +} + +func (c *channelDefinitionCache) Name() string { return c.lggr.Name() } + +func (c *channelDefinitionCache) Definitions() relayllo.ChannelDefinitions { + c.StateMachine.RLock() + defer c.StateMachine.RUnlock() + return maps.Clone(c.definitions) +} diff --git a/core/services/llo/channel_definition_cache_test.go b/core/services/llo/channel_definition_cache_test.go new file mode 100644 index 00000000000..92716a53bba --- /dev/null +++ b/core/services/llo/channel_definition_cache_test.go @@ -0,0 +1,9 @@ +package llo + +import "testing" + +func Test_ChannelDefinitionCache(t *testing.T) { + t.Run("Definitions", func(t *testing.T) { + t.Fatal("TODO") + }) +} diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go new file mode 100644 index 00000000000..4cce919ecd2 --- /dev/null +++ b/core/services/llo/data_source.go @@ -0,0 +1,91 @@ +package llo + +// TODO: llo datasource +import ( + "context" + "fmt" + "math/big" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "llo_stream_missing_count", + Help: "Number of times we tried to observe a stream, but it was missing", + }, + []string{"streamID"}, + ) + promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "llo_stream_observation_error_count", + Help: "Number of times we tried to observe a stream, but it failed with an error", + }, + []string{"streamID"}, + ) +) + +type ErrMissingStream struct { + id string +} + +func (e ErrMissingStream) Error() string { + return fmt.Sprintf("missing stream definition for: %q", e.id) +} + +var _ relayllo.DataSource = &dataSource{} + +type dataSource struct { + lggr logger.Logger + streamCache StreamCache +} + +func NewDataSource(lggr logger.Logger, streamCache StreamCache) relayllo.DataSource { + // TODO: lggr should include job ID + return &dataSource{lggr, streamCache} +} + +func (d *dataSource) Observe(ctx context.Context, streamIDs map[relayllo.StreamID]struct{}) (relayllo.StreamValues, error) { + // There is no "observationSource" (AKA pipeline) + // Need a concept of "streams" + // Streams are referenced by ID from the on-chain config + // Each stream contains its own pipeline + // See: https://docs.google.com/document/d/1l1IiDOL1QSteLTnhmiGnJAi6QpcSpyOe0nkqS7D3SvU/edit for stream ID naming + + var wg sync.WaitGroup + wg.Add(len(streamIDs)) + sv := make(relayllo.StreamValues) + var mu sync.Mutex + + for streamID := range streamIDs { + go func() { + defer wg.Done() + + var res relayllo.ObsResult[*big.Int] + + stream, exists := d.streamCache.Get(streamID) + if exists { + res.Val, res.Err = stream.Observe(ctx) + if res.Err != nil { + d.lggr.Debugw("Observation failed for stream", "err", res.Err, "streamID", streamID) + promObservationErrorCount.WithLabelValues(streamID.String()).Inc() + } + } else { + d.lggr.Errorw(fmt.Sprintf("Missing stream: %q"), "streamID", streamID) + promMissingStreamCount.WithLabelValues(streamID.String()).Inc() + res.Err = ErrMissingStream{streamID.String()} + } + + mu.Lock() + defer mu.Unlock() + sv[streamID] = res + }() + } + + wg.Wait() + + return sv, nil +} diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go new file mode 100644 index 00000000000..148cc0b505c --- /dev/null +++ b/core/services/llo/data_source_test.go @@ -0,0 +1,9 @@ +package llo + +import "testing" + +func Test_DataSource(t *testing.T) { + t.Run("Observe", func(t *testing.T) { + t.Fatal("TODO") + }) +} diff --git a/core/services/llo/stream.go b/core/services/llo/stream.go new file mode 100644 index 00000000000..b5f37974816 --- /dev/null +++ b/core/services/llo/stream.go @@ -0,0 +1,125 @@ +package llo + +import ( + "context" + "fmt" + "math/big" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type Runner interface { + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) +} + +// TODO: Generalize to beyond simply an int +type DataPoint *big.Int + +type Stream interface { + Observe(ctx context.Context) (DataPoint, error) +} + +type stream struct { + id relayllo.StreamID + lggr logger.Logger + runResults chan<- *pipeline.Run + jb job.Job + spec pipeline.Spec + runner Runner +} + +func NewStream(lggr logger.Logger, id relayllo.StreamID, runResults chan<- *pipeline.Run, jb job.Job, spec pipeline.Spec, runner Runner) Stream { + return newStream(lggr, id, runResults, jb, spec, runner) +} + +func newStream(lggr logger.Logger, id relayllo.StreamID, runResults chan<- *pipeline.Run, jb job.Job, spec pipeline.Spec, runner Runner) *stream { + return &stream{id, lggr, runResults, jb, spec, runner} +} + +func (s *stream) Observe(ctx context.Context) (DataPoint, error) { + var run *pipeline.Run + run, trrs, err := s.executeRun(ctx) + if err != nil { + return nil, fmt.Errorf("Observe failed while executing run: %w", err) + } + select { + case s.runResults <- run: + default: + s.lggr.Warnf("unable to enqueue run save for job ID %d, buffer full", s.spec.JobID) + } + + // NOTE: trrs comes back as _all_ tasks, but we only want the terminal ones + // They are guaranteed to be sorted by index asc so should be in the correct order + var finaltrrs []pipeline.TaskRunResult + for _, trr := range trrs { + if trr.IsTerminal() { + finaltrrs = append(finaltrrs, trr) + } + } + + // FIXME: How to handle arbitrary-shaped inputs? + // For now just assume everything is one *big.Int + parsed, err := s.parse(finaltrrs) + if err != nil { + return nil, fmt.Errorf("Observe failed while parsing run results: %w", err) + } + return parsed, nil + +} + +// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). +// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. +func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "jb": map[string]interface{}{ + "databaseID": s.jb.ID, + "externalJobID": s.jb.ExternalJobID, + "name": s.jb.Name.ValueOrZero(), + }, + }) + + run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, vars, s.lggr) + if err != nil { + return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) + } + + return run, trrs, err +} + +// returns error on parse errors: if something is the wrong type +func (s *stream) parse(trrs pipeline.TaskRunResults) (*big.Int, error) { + var finaltrrs []pipeline.TaskRunResult + for _, trr := range trrs { + // only return terminal trrs from executeRun + if trr.IsTerminal() { + finaltrrs = append(finaltrrs, trr) + } + } + + // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed + // by the pipeline executor + if len(finaltrrs) != 1 { + return nil, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs)) + } + res := finaltrrs[0].Result + if res.Error != nil { + return nil, res.Error + } else if val, err := toBigInt(res.Value); err != nil { + return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err) + } else { + return val, nil + } +} + +func toBigInt(val interface{}) (*big.Int, error) { + dec, err := utils.ToDecimal(val) + if err != nil { + return nil, err + } + return dec.BigInt(), nil +} diff --git a/core/services/llo/stream_cache.go b/core/services/llo/stream_cache.go new file mode 100644 index 00000000000..b3b963c6cb0 --- /dev/null +++ b/core/services/llo/stream_cache.go @@ -0,0 +1,46 @@ +package llo + +import ( + "context" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +type StreamCache interface { + Get(streamID relayllo.StreamID) (Stream, bool) +} + +type streamCache struct { + q pg.Queryer + streams map[relayllo.StreamID]Stream +} + +func NewStreamCache(q pg.Queryer) StreamCache { + return &streamCache{ + q, + make(map[relayllo.StreamID]Stream), + } +} + +func (s *streamCache) Get(streamID relayllo.StreamID) (Stream, bool) { + strm, exists := s.streams[streamID] + return strm, exists +} + +func (s *streamCache) Load(ctx context.Context) error { + rows, err := s.q.QueryContext(ctx, "SELECT s.id, ps.id, ps.dot_dag_source, ps.max_task_duration FROM streams s JOIN pipeline_specs ps ON ps.id = s.pipeline_spec_id") + if err != nil { + // TODO: retries? + return err + } + + for rows.Next() { + var strm stream + if err := rows.Scan(&strm.id, &strm.spec.ID, &strm.spec.DotDagSource, &strm.spec.MaxTaskDuration); err != nil { + return err + } + s.streams[strm.id] = &strm + } + return rows.Err() +} diff --git a/core/services/llo/stream_cache_test.go b/core/services/llo/stream_cache_test.go new file mode 100644 index 00000000000..563b700d1d1 --- /dev/null +++ b/core/services/llo/stream_cache_test.go @@ -0,0 +1,9 @@ +package llo + +import "testing" + +func Test_StreamCache(t *testing.T) { + t.Run("Load", func(t *testing.T) { + t.Fatal("TODO") + }) +} diff --git a/core/services/llo/stream_test.go b/core/services/llo/stream_test.go new file mode 100644 index 00000000000..4d78980a294 --- /dev/null +++ b/core/services/llo/stream_test.go @@ -0,0 +1,9 @@ +package llo + +import "testing" + +func Test_Stream(t *testing.T) { + t.Run("Observe", func(t *testing.T) { + t.Fatal("TODO") + }) +} diff --git a/core/services/llo/transmitter.go b/core/services/llo/transmitter.go new file mode 100644 index 00000000000..a6cc26f47de --- /dev/null +++ b/core/services/llo/transmitter.go @@ -0,0 +1,72 @@ +package llo + +// TODO: llo transmitter + +import ( + "context" + "crypto/ed25519" + "fmt" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink-relay/pkg/services" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" +) + +type Transmitter interface { + relayllo.Transmitter + services.Service +} + +type transmitter struct { + services.StateMachine + lggr logger.Logger + rpcClient wsrpc.Client + fromAccount string +} + +func NewTransmitter(lggr logger.Logger, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey) Transmitter { + return &transmitter{ + services.StateMachine{}, + lggr, + rpcClient, + fmt.Sprintf("%x", fromAccount), + } +} + +func (t *transmitter) Start(ctx context.Context) error { + // TODO + return nil +} + +func (t *transmitter) Close() error { + // TODO + return nil +} + +func (t *transmitter) HealthReport() map[string]error { + report := map[string]error{t.Name(): t.Healthy()} + services.CopyHealth(report, t.rpcClient.HealthReport()) + // FIXME + // services.CopyHealth(report, t.queue.HealthReport()) + return report +} + +func (t *transmitter) Name() string { return t.lggr.Name() } + +func (t *transmitter) Transmit(ctx context.Context, reportCtx ocrtypes.ReportContext, report ocrtypes.Report, signatures []ocrtypes.AttributedOnchainSignature) error { + // TODO + return nil +} + +// FromAccount returns the stringified (hex) CSA public key +func (t *transmitter) FromAccount() (ocrtypes.Account, error) { + return ocrtypes.Account(t.fromAccount), nil +} + +// LatestConfigDigestAndEpoch retrieves the latest config digest and epoch from the OCR2 contract. +func (t *transmitter) LatestConfigDigestAndEpoch(ctx context.Context) (cd ocrtypes.ConfigDigest, epoch uint32, err error) { + panic("not needed for OCR3") +} diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index bbb3b5cf7ae..a93032bba9c 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -430,6 +430,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { case types.Mercury: return d.newServicesMercury(ctx, lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + case types.LLO: + return d.newServicesLLO(ctx, lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + case types.Median: return d.newServicesMedian(ctx, lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger) @@ -720,6 +723,91 @@ func (d *Delegate) newServicesMercury( return mercuryServices, err2 } +func (d *Delegate) newServicesLLO( + ctx context.Context, + lggr logger.SugaredLogger, + jb job.Job, + runResults chan *pipeline.Run, + bootstrapPeers []commontypes.BootstrapperLocator, + kb ocr2key.KeyBundle, + ocrDB *db, + lc ocrtypes.LocalConfig, + ocrLogger commontypes.Logger, +) ([]job.ServiceCtx, error) { + spec := jb.OCR2OracleSpec + transmitterID := spec.TransmitterID.String + if len(transmitterID) != 64 { + return nil, errors.Errorf("ServicesForSpec: llo job type requires transmitter ID to be a 32-byte hex string, got: %q", transmitterID) + } + if _, err := hex.DecodeString(transmitterID); err != nil { + return nil, errors.Wrapf(err, "ServicesForSpec: llo job type requires transmitter ID to be a 32-byte hex string, got: %q", transmitterID) + } + + rid, err := spec.RelayID() + if err != nil { + return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "llo"} + } + if rid.Network != relay.EVM { + return nil, fmt.Errorf("llo services: expected EVM relayer got %s", rid.Network) + } + relayer, err := d.RelayGetter.Get(rid) + if err != nil { + return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "llo"} + } + chain, err := d.legacyChains.Get(rid.ChainID) + if err != nil { + return nil, fmt.Errorf("llo services: failed to get chain %s: %w", rid.ChainID, err) + } + + provider, err2 := relayer.NewPluginProvider(ctx, + types.RelayArgs{ + ExternalJobID: jb.ExternalJobID, + JobID: jb.ID, + ContractID: spec.ContractID, + New: d.isNewlyCreatedJob, + RelayConfig: spec.RelayConfig.Bytes(), + ProviderType: string(spec.PluginType), + }, types.PluginArgs{ + TransmitterID: transmitterID, + PluginConfig: spec.PluginConfig.Bytes(), + }) + if err2 != nil { + return nil, err2 + } + + lloProvider, ok := provider.(types.LLOProvider) + if !ok { + return nil, errors.New("could not coerce PluginProvider to LLOProvider") + } + + oracleArgsNoPlugin := libocr2.OCR3OracleArgs{ + // BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, + // V2Bootstrappers: bootstrapPeers, + // ContractTransmitter: lloProvider.ContractTransmitter(), + // ContractConfigTracker: lloProvider.ContractConfigTracker(), + // Database: ocrDB, + // LocalConfig: lc, + // Logger: ocrLogger, + // MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.OCR3Mercury), + // OffchainConfigDigester: lloProvider.OffchainConfigDigester(), + // OffchainKeyring: kb, + // OnchainKeyring: kb, + } + + chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100) + + mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + + if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { + enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) + mercuryServices = append(mercuryServices, enhancedTelemService) + } else { + lggr.Infow("Enhanced telemetry is disabled for mercury job", "job", jb.Name) + } + + return mercuryServices, err2 +} + func (d *Delegate) newServicesMedian( ctx context.Context, lggr logger.SugaredLogger, diff --git a/core/services/ocr2/plugins/llo/config/config.go b/core/services/ocr2/plugins/llo/config/config.go new file mode 100644 index 00000000000..d0e01e27d1c --- /dev/null +++ b/core/services/ocr2/plugins/llo/config/config.go @@ -0,0 +1,52 @@ +// config is a separate package so that we can validate +// the config in other packages, for example in job at job create time. + +package config + +import ( + "errors" + "fmt" + "net/url" + "regexp" + + pkgerrors "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type PluginConfig struct { + RawServerURL string `json:"serverURL" toml:"serverURL"` + ServerPubKey utils.PlainHexBytes `json:"serverPubKey" toml:"serverPubKey"` +} + +func (p PluginConfig) Validate() (merr error) { + if p.RawServerURL == "" { + merr = errors.New("mercury: ServerURL must be specified") + } else { + var normalizedURI string + if schemeRegexp.MatchString(p.RawServerURL) { + normalizedURI = p.RawServerURL + } else { + normalizedURI = fmt.Sprintf("wss://%s", p.RawServerURL) + } + uri, err := url.ParseRequestURI(normalizedURI) + if err != nil { + merr = pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL") + } else if uri.Scheme != "wss" { + merr = pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, p.RawServerURL, uri.Scheme) + } + } + + if len(p.ServerPubKey) != 32 { + merr = errors.Join(merr, errors.New("mercury: ServerPubKey is required and must be a 32-byte hex string")) + } + + return merr +} + +var schemeRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9+.-]*://`) +var wssRegexp = regexp.MustCompile(`^wss://`) + +func (p PluginConfig) ServerURL() string { + return wssRegexp.ReplaceAllString(p.RawServerURL, "") +} diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go new file mode 100644 index 00000000000..ec8a9dea484 --- /dev/null +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -0,0 +1,433 @@ +package llo_test + +import ( + "context" + "crypto/ed25519" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "math/big" + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/ava-labs/coreth/accounts/abi/bind" + "github.com/ava-labs/coreth/accounts/abi/bind/backends" + "github.com/ava-labs/coreth/core" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/hashicorp/consul/sdk/freeport" + "github.com/shopspring/decimal" + "github.com/smartcontractkit/chainlink/v2/core/assets" + "github.com/smartcontractkit/chainlink/v2/core/bridges" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/fee_manager" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/reward_manager" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/verifier" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/verifier_proxy" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" + "github.com/smartcontractkit/chainlink/v2/core/store/models" + "github.com/smartcontractkit/libocr/offchainreporting/confighelper" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3confighelper" + "github.com/smartcontractkit/wsrpc/credentials" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/require" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc/peer" +) + +func setupBlockchain(t *testing.T) (*bind.TransactOpts, *backends.SimulatedBackend, *verifier.Verifier, common.Address) { + steve := testutils.MustNewSimTransactor(t) // config contract deployer and owner + genesisData := core.GenesisAlloc{steve.From: {Balance: assets.Ether(1000).ToInt()}} + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + backend.Commit() // ensure starting block number at least 1 + stopMining := cltest.Mine(backend, 1*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + t.Cleanup(stopMining) + + // Deploy contracts + linkTokenAddress, _, linkToken, err := token.DeployLinkToken(steve, backend) + require.NoError(t, err) + _, err = linkToken.Transfer(steve, steve.From, big.NewInt(1000)) + require.NoError(t, err) + nativeTokenAddress, _, nativeToken, err := token.DeployLinkToken(steve, backend) + require.NoError(t, err) + _, err = nativeToken.Transfer(steve, steve.From, big.NewInt(1000)) + require.NoError(t, err) + verifierProxyAddr, _, verifierProxy, err := verifier_proxy.DeployVerifierProxy(steve, backend, common.Address{}) // zero address for access controller disables access control + require.NoError(t, err) + verifierAddress, _, verifier, err := verifier.DeployVerifier(steve, backend, verifierProxyAddr) + require.NoError(t, err) + _, err = verifierProxy.InitializeVerifier(steve, verifierAddress) + require.NoError(t, err) + rewardManagerAddr, _, rewardManager, err := reward_manager.DeployRewardManager(steve, backend, linkTokenAddress) + require.NoError(t, err) + feeManagerAddr, _, _, err := fee_manager.DeployFeeManager(steve, backend, linkTokenAddress, nativeTokenAddress, verifierProxyAddr, rewardManagerAddr) + require.NoError(t, err) + _, err = verifierProxy.SetFeeManager(steve, feeManagerAddr) + require.NoError(t, err) + _, err = rewardManager.SetFeeManager(steve, feeManagerAddr) + require.NoError(t, err) + backend.Commit() + + return steve, backend, verifier, verifierAddress +} + +type mercuryServer struct { + privKey ed25519.PrivateKey + reqsCh chan request + t *testing.T +} + +func NewMercuryServer(t *testing.T, privKey ed25519.PrivateKey, reqsCh chan request) *mercuryServer { + return &mercuryServer{privKey, reqsCh, t} +} + +func (s *mercuryServer) Transmit(ctx context.Context, req *pb.TransmitRequest) (*pb.TransmitResponse, error) { + p, ok := peer.FromContext(ctx) + if !ok { + return nil, errors.New("could not extract public key") + } + r := request{p.PublicKey, req} + s.reqsCh <- r + + return &pb.TransmitResponse{ + Code: 1, + Error: "", + }, nil +} + +func (s *mercuryServer) LatestReport(ctx context.Context, lrr *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + panic("not needed for llo") +} + +func TestIntegration_LLO(t *testing.T) { + t.Fatal("TODO") + // TODO: + + t.Parallel() + + var logObservers []*observer.ObservedLogs + t.Cleanup(func() { + detectPanicLogs(t, logObservers) + }) + lggr := logger.TestLogger(t) + const fromBlock = 1 // cannot use zero, start from block 1 + testStartTimeStamp := uint32(time.Now().Unix()) + + // feeds + // feed1 := Feed{"BTC/USD", randomFeedID(1), big.NewInt(20_000 * multiplier), big.NewInt(19_997 * multiplier), big.NewInt(20_004 * multiplier)} + + reqs := make(chan request) + serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-1)) + serverPubKey := serverKey.PublicKey + srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), reqs) + clientCSAKeys := make([]csakey.KeyV2, n+1) + clientPubKeys := make([]ed25519.PublicKey, n+1) + for i := 0; i < n+1; i++ { + k := big.NewInt(int64(i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + serverURL := startMercuryServer(t, srv, clientPubKeys) + chainID := testutils.SimulatedChainID + + steve, backend, verifier, verifierAddress := setupBlockchain(t) + + // Setup bootstrap + oracle nodes + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, observedLogs := setupNode(t, bootstrapNodePort, "bootstrap_mercury", backend, clientCSAKeys[n]) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} + logObservers = append(logObservers, observedLogs) + + // Set up n oracles + var ( + oracles []confighelper.OracleIdentityExtra + nodes []Node + ) + ports := freeport.GetN(t, n) + for i := 0; i < n; i++ { + app, peerID, transmitter, kb, observedLogs := setupNode(t, ports[i], fmt.Sprintf("oracle_llo_%d", i), backend, clientCSAKeys[i]) + + nodes = append(nodes, Node{ + app, transmitter, kb, + }) + offchainPublicKey, _ := hex.DecodeString(strings.TrimPrefix(kb.OnChainPublicKey(), "0x")) + oracles = append(oracles, confighelper.OracleIdentityExtra{ + OracleIdentity: confighelper.OracleIdentity{ + OnchainPublicKey: offchainPublicKey, + TransmitAccount: ocr2types.Account(fmt.Sprintf("%x", transmitter[:])), + OffchainPublicKey: kb.OffchainPublicKey(), + PeerID: peerID, + }, + ConfigEncryptionPublicKey: kb.ConfigEncryptionPublicKey(), + }) + logObservers = append(logObservers, observedLogs) + } + + for _, feed := range feeds { + addBootstrapJob(t, bootstrapNode, chainID, verifierAddress, feed.name, feed.id) + } + + createBridge := func(name string, i int, p *big.Int, borm bridges.ORM) (bridgeName string) { + bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + b, err := io.ReadAll(req.Body) + require.NoError(t, err) + require.Equal(t, `{"data":{"from":"ETH","to":"USD"}}`, string(b)) + + r := rand.Int63n(101) + if r > pError.Load() { + res.WriteHeader(http.StatusOK) + val := decimal.NewFromBigInt(p, 0).Div(decimal.NewFromInt(multiplier)).Add(decimal.NewFromInt(int64(i)).Div(decimal.NewFromInt(100))).String() + resp := fmt.Sprintf(`{"result": %s}`, val) + _, err := res.Write([]byte(resp)) + require.NoError(t, err) + } else { + res.WriteHeader(http.StatusInternalServerError) + resp := `{"error": "pError test error"}` + _, err := res.Write([]byte(resp)) + require.NoError(t, err) + } + })) + t.Cleanup(bridge.Close) + u, _ := url.Parse(bridge.URL) + bridgeName = fmt.Sprintf("bridge-%s-%d", name, i) + require.NoError(t, borm.CreateBridgeType(&bridges.BridgeType{ + Name: bridges.BridgeName(bridgeName), + URL: models.WebURL(*u), + })) + + return bridgeName + } + + // Add OCR jobs - one per feed on each node + for i, node := range nodes { + for j, feed := range feeds { + bmBridge := createBridge(fmt.Sprintf("benchmarkprice-%d", j), i, feed.baseBenchmarkPrice, node.App.BridgeORM()) + askBridge := createBridge(fmt.Sprintf("ask-%d", j), i, feed.baseAsk, node.App.BridgeORM()) + bidBridge := createBridge(fmt.Sprintf("bid-%d", j), i, feed.baseBid, node.App.BridgeORM()) + + addV1MercuryJob( + t, + node, + i, + verifierAddress, + bootstrapPeerID, + bootstrapNodePort, + bmBridge, + bidBridge, + askBridge, + serverURL, + serverPubKey, + clientPubKeys[i], + feed.name, + feed.id, + chainID, + fromBlock, + ) + } + } + + // Setup config on contract + onchainConfig, err := (relaymercury.StandardOnchainConfigCodec{}).Encode(rawOnchainConfig) + require.NoError(t, err) + + reportingPluginConfig, err := json.Marshal(rawReportingPluginConfig) + require.NoError(t, err) + + signers, _, _, onchainConfig, offchainConfigVersion, offchainConfig, err := ocr3confighelper.ContractSetConfigArgsForTestsMercuryV02( + 2*time.Second, // DeltaProgress + 20*time.Second, // DeltaResend + 400*time.Millisecond, // DeltaInitial + 100*time.Millisecond, // DeltaRound + 0, // DeltaGrace + 300*time.Millisecond, // DeltaCertifiedCommitRequest + 1*time.Minute, // DeltaStage + 100, // rMax + []int{len(nodes)}, // S + oracles, + reportingPluginConfig, // reportingPluginConfig []byte, + 250*time.Millisecond, // Max duration observation + int(f), // f + onchainConfig, + ) + + require.NoError(t, err) + signerAddresses, err := evm.OnchainPublicKeyToAddress(signers) + require.NoError(t, err) + + offchainTransmitters := make([][32]byte, n) + for i := 0; i < n; i++ { + offchainTransmitters[i] = nodes[i].ClientPubKey + } + + for i, feed := range feeds { + lggr.Infow("Setting Config on Oracle Contract", + "i", i, + "feedID", feed.id, + "feedName", feed.name, + "signerAddresses", signerAddresses, + "offchainTransmitters", offchainTransmitters, + "f", f, + "onchainConfig", onchainConfig, + "offchainConfigVersion", offchainConfigVersion, + "offchainConfig", offchainConfig, + ) + + _, err = verifier.SetConfig( + steve, + feed.id, + signerAddresses, + offchainTransmitters, + f, + onchainConfig, + offchainConfigVersion, + offchainConfig, + nil, + ) + require.NoError(t, err) + backend.Commit() + } + + // Bury it with finality depth + ch, err := bootstrapNode.App.GetRelayers().LegacyEVMChains().Get(testutils.SimulatedChainID.String()) + require.NoError(t, err) + finalityDepth := ch.Config().EVM().FinalityDepth() + for i := 0; i < int(finalityDepth); i++ { + backend.Commit() + } + + t.Run("receives at least one report per feed from each oracle when EAs are at 100% reliability", func(t *testing.T) { + // Expect at least one report per feed from each oracle + seen := make(map[[32]byte]map[credentials.StaticSizedPublicKey]struct{}) + for i := range feeds { + // feedID will be deleted when all n oracles have reported + seen[feeds[i].id] = make(map[credentials.StaticSizedPublicKey]struct{}, n) + } + + for req := range reqs { + v := make(map[string]interface{}) + err := mercury.PayloadTypes.UnpackIntoMap(v, req.req.Payload) + require.NoError(t, err) + report, exists := v["report"] + if !exists { + t.Fatalf("expected payload %#v to contain 'report'", v) + } + reportElems := make(map[string]interface{}) + err = reportcodecv1.ReportTypes.UnpackIntoMap(reportElems, report.([]byte)) + require.NoError(t, err) + + feedID := ([32]byte)(reportElems["feedId"].([32]uint8)) + feed, exists := feedM[feedID] + require.True(t, exists) + + if _, exists := seen[feedID]; !exists { + continue // already saw all oracles for this feed + } + + num, err := (&reportcodecv1.ReportCodec{}).CurrentBlockNumFromReport(ocr2types.Report(report.([]byte))) + require.NoError(t, err) + currentBlock, err := backend.BlockByNumber(testutils.Context(t), nil) + require.NoError(t, err) + + assert.GreaterOrEqual(t, currentBlock.Number().Int64(), num) + + expectedBm := feed.baseBenchmarkPrice + expectedBid := feed.baseBid + expectedAsk := feed.baseAsk + + assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp)) + assert.InDelta(t, expectedBm.Int64(), reportElems["benchmarkPrice"].(*big.Int).Int64(), 5000000) + assert.InDelta(t, expectedBid.Int64(), reportElems["bid"].(*big.Int).Int64(), 5000000) + assert.InDelta(t, expectedAsk.Int64(), reportElems["ask"].(*big.Int).Int64(), 5000000) + assert.GreaterOrEqual(t, int(currentBlock.Number().Int64()), int(reportElems["currentBlockNum"].(uint64))) + assert.GreaterOrEqual(t, currentBlock.Time(), reportElems["currentBlockTimestamp"].(uint64)) + assert.NotEqual(t, common.Hash{}, common.Hash(reportElems["currentBlockHash"].([32]uint8))) + assert.LessOrEqual(t, int(reportElems["validFromBlockNum"].(uint64)), int(reportElems["currentBlockNum"].(uint64))) + assert.Less(t, int64(0), int64(reportElems["validFromBlockNum"].(uint64))) + + t.Logf("oracle %x reported for feed %s (0x%x)", req.pk, feed.name, feed.id) + + seen[feedID][req.pk] = struct{}{} + if len(seen[feedID]) == n { + t.Logf("all oracles reported for feed %s (0x%x)", feed.name, feed.id) + delete(seen, feedID) + if len(seen) == 0 { + break // saw all oracles; success! + } + } + } + }) +} + +func addLLOJob( + t *testing.T, + node Node, + i int, + verifierAddress common.Address, + bootstrapPeerID string, + bootstrapNodePort int, + bmBridge, + bidBridge, + askBridge, + serverURL string, + serverPubKey, + clientPubKey ed25519.PublicKey, + feedName string, + feedID [32]byte, + chainID *big.Int, + fromBlock int, +) { + node.AddJob(t, fmt.Sprintf(` +type = "offchainreporting2" +schemaVersion = 1 +name = "llo-%[1]d-%[14]s" +forwardingAllowed = false +maxTaskDuration = "1s" +contractID = "%[2]s" +feedID = "0x%[11]x" +contractConfigTrackerPollInterval = "1s" +ocrKeyBundleID = "%[3]s" +p2pv2Bootstrappers = [ + "%[4]s" +] +relay = "evm" +pluginType = "llo" +transmitterID = "%[10]x" + +[pluginConfig] +serverURL = "%[8]s" +serverPubKey = "%[9]x" +initialBlockNumber = %[13]d + +[relayConfig] +chainID = %[12]d + + `, + i, + verifierAddress, + node.KeyBundle.ID(), + fmt.Sprintf("%s@127.0.0.1:%d", bootstrapPeerID, bootstrapNodePort), + bmBridge, + bidBridge, + askBridge, + serverURL, + serverPubKey, + clientPubKey, + feedID, + chainID, + fromBlock, + feedName, + )) +} diff --git a/core/services/ocr2/plugins/llo/plugin.go b/core/services/ocr2/plugins/llo/plugin.go new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index aa1d1d774bd..882f00222bb 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -30,6 +30,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/llo" + lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config" mercuryconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pg" @@ -187,12 +189,56 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype default: return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) } - transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) + transmitter := mercury.NewTransmitter(lggr, client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) chainReader := NewChainReader(r.chain.HeadTracker()) return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, chainReader, lggr), nil } +func (r *Relayer) NewLLOProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.LLOProvider, error) { + // TODO + relayOpts := types.NewRelayOpts(rargs) + relayConfig, err := relayOpts.RelayConfig() + if err != nil { + return nil, fmt.Errorf("failed to get relay config: %w", err) + } + + var lloConfig lloconfig.PluginConfig + if err := json.Unmarshal(pargs.PluginConfig, &lloConfig); err != nil { + return nil, pkgerrors.WithStack(err) + } + if err := lloConfig.Validate(); err != nil { + return nil, err + } + + if relayConfig.ChainID.String() != r.chain.ID().String() { + return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String()) + } + configWatcher, err := newConfigProvider(r.lggr, r.chain, relayOpts, r.eventBroadcaster) + if err != nil { + return nil, pkgerrors.WithStack(err) + } + + if !relayConfig.EffectiveTransmitterID.Valid { + return nil, pkgerrors.New("EffectiveTransmitterID must be specified") + } + privKey, err := r.ks.CSA().Get(relayConfig.EffectiveTransmitterID.String) + if err != nil { + return nil, pkgerrors.Wrap(err, "failed to get CSA key for mercury connection") + } + + client, err := r.mercuryPool.Checkout(context.Background(), privKey, lloConfig.ServerPubKey, lloConfig.ServerURL()) + if err != nil { + return nil, err + } + + // FIXME + // transmitter := llo.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, r.db, r.pgCfg) + transmitter := llo.NewTransmitter(r.lggr, client, privKey.PublicKey) + + return NewLLOProvider(configWatcher, transmitter, r.lggr), nil +} + func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) { lggr := r.lggr.Named("FunctionsProvider").Named(rargs.ExternalJobID.String()) // TODO(FUN-668): Not ready yet (doesn't implement FunctionsEvents() properly) diff --git a/core/services/relay/evm/llo_provider.go b/core/services/relay/evm/llo_provider.go new file mode 100644 index 00000000000..643ab5130dc --- /dev/null +++ b/core/services/relay/evm/llo_provider.go @@ -0,0 +1,82 @@ +package evm + +import ( + "context" + "errors" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink-relay/pkg/services" + relaytypes "github.com/smartcontractkit/chainlink-relay/pkg/types" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/llo" +) + +var _ relaytypes.LLOProvider = (*lloProvider)(nil) + +type lloProvider struct { + configWatcher *configWatcher + transmitter llo.Transmitter + logger logger.Logger + + ms services.MultiStart +} + +func NewLLOProvider( + configWatcher *configWatcher, + transmitter llo.Transmitter, + lggr logger.Logger, + channelDefinitionCache llo.ChannelDefinitionCache, +) relaytypes.LLOProvider { + return &lloProvider{ + configWatcher, + transmitter, + lggr, + services.MultiStart{}, + } +} + +func (p *lloProvider) Start(ctx context.Context) error { + return p.ms.Start(ctx, p.configWatcher, p.transmitter, p.channelDefinitionCache) +} + +func (p *lloProvider) Close() error { + return p.ms.Close() +} + +func (p *lloProvider) Ready() error { + return errors.Join(p.configWatcher.Ready(), p.transmitter.Ready(), p.channelDefinitionCache.Ready()) +} + +func (p *lloProvider) Name() string { + return p.logger.Name() +} + +func (p *lloProvider) HealthReport() map[string]error { + report := map[string]error{} + services.CopyHealth(report, p.configWatcher.HealthReport()) + services.CopyHealth(report, p.transmitter.HealthReport()) + services.CopyHealth(report, p.channelDefinitionCache.HealthReport()) + return report +} + +func (p *lloProvider) ContractConfigTracker() ocrtypes.ContractConfigTracker { + return p.configWatcher.ContractConfigTracker() +} + +func (p *lloProvider) OffchainConfigDigester() ocrtypes.OffchainConfigDigester { + return p.configWatcher.OffchainConfigDigester() +} + +func (p *lloProvider) OnchainConfigCodec() relayllo.OnchainConfigCodec { + return &relayllo.JSONOnchainConfigCodec{} +} + +func (p *lloProvider) ContractTransmitter() ocrtypes.ContractTransmitter { + return p.transmitter +} + +func (p *lloProvider) ChannelDefinitionCache() relayllo.ChannelDefinitionCache { + return p.channelDefinitionCache +} diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 269f28b122d..66dc4bccb32 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -106,7 +106,6 @@ type mercuryTransmitter struct { services.StateMachine lggr logger.Logger rpcClient wsrpc.Client - cfgTracker ConfigTracker persistenceManager *PersistenceManager codec TransmitterReportDecoder @@ -147,14 +146,13 @@ func getPayloadTypes() abi.Arguments { }) } -func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig, codec TransmitterReportDecoder) *mercuryTransmitter { +func NewTransmitter(lggr logger.Logger, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig, codec TransmitterReportDecoder) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) persistenceManager := NewPersistenceManager(lggr, NewORM(db, lggr, cfg), jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) return &mercuryTransmitter{ services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), rpcClient, - cfgTracker, persistenceManager, codec, feedID, diff --git a/core/services/relay/relay.go b/core/services/relay/relay.go index 5c7bf5cab57..3be4f7dc5c2 100644 --- a/core/services/relay/relay.go +++ b/core/services/relay/relay.go @@ -88,6 +88,8 @@ func (r *ServerAdapter) NewPluginProvider(ctx context.Context, rargs types.Relay return r.NewFunctionsProvider(ctx, rargs, pargs) case types.Mercury: return r.NewMercuryProvider(ctx, rargs, pargs) + case types.LLO: + return r.NewLLOProvider(ctx, rargs, pargs) case types.DKG, types.OCR2VRF, types.OCR2Keeper, types.GenericPlugin: return r.RelayerAdapter.NewPluginProvider(ctx, rargs, pargs) case types.CCIPCommit, types.CCIPExecution: diff --git a/core/store/migrate/migrations/0209_create_streams.sql b/core/store/migrate/migrations/0209_create_streams.sql new file mode 100644 index 00000000000..6bebd4b111d --- /dev/null +++ b/core/store/migrate/migrations/0209_create_streams.sql @@ -0,0 +1,9 @@ +-- +goose Up +CREATE TABLE llo_streams ( + id text PRIMARY KEY, + pipeline_spec_id INT REFENCES (pipeline_specs) id, + created_at timestamp with time zone NOT NULL +); + +-- +goose Down +DROP TABLE llo_streams; diff --git a/go.mod b/go.mod index 0a85fe7f488..4c66871ca49 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Depado/ginprom v1.7.11 github.com/Masterminds/semver/v3 v3.2.1 github.com/Masterminds/sprig/v3 v3.2.3 + github.com/ava-labs/coreth v0.12.7 github.com/avast/retry-go/v4 v4.5.0 github.com/btcsuite/btcd v0.23.4 github.com/cometbft/cometbft v0.37.2 @@ -77,6 +78,7 @@ require ( github.com/smartcontractkit/wsrpc v0.7.2 github.com/spf13/cast v1.5.1 github.com/stretchr/testify v1.8.4 + github.com/test-go/testify v1.1.4 github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a github.com/tidwall/gjson v1.17.0 github.com/ugorji/go/codec v1.2.11 @@ -103,6 +105,18 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) +require ( + github.com/ava-labs/avalanchego v1.10.14 // indirect + github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect + github.com/google/renameio/v2 v2.0.0 // indirect + github.com/hashicorp/go-bexpr v0.1.10 // indirect + github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e // indirect + github.com/mitchellh/pointerstructure v1.2.0 // indirect + github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + go.uber.org/mock v0.2.0 // indirect +) + require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.5 // indirect cosmossdk.io/api v0.3.1 // indirect @@ -320,6 +334,8 @@ require ( github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/smartcontractkit/chain-selectors v1.0.1 // indirect + github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.9.3 // indirect @@ -388,4 +404,11 @@ replace ( // until merged upstream: https://github.com/mwitkow/grpc-proxy/pull/69 github.com/mwitkow/grpc-proxy => github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f + + // TODO: llo + github.com/smartcontractkit/chainlink-cosmos => /Users/sam/code/smartcontractkit/chainlink-cosmos + github.com/smartcontractkit/chainlink-relay => /Users/sam/code/smartcontractkit/chainlink-relay + github.com/smartcontractkit/chainlink-solana => /Users/sam/code/smartcontractkit/chainlink-solana + github.com/smartcontractkit/chainlink-starknet/relayer => /Users/sam/code/smartcontractkit/chainlink-starknet/relayer + ) diff --git a/go.sum b/go.sum index 7fe91a6b123..4380a66d0fb 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,10 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/ava-labs/avalanchego v1.10.14 h1:1jTMrikYD49Pb64ZLUi2z2BnNGLzIGip4fValq6/YfE= +github.com/ava-labs/avalanchego v1.10.14/go.mod h1:En/ti2xoxQqJuN6t9ne2ogckU9leuZzTjl5mbEsfjTc= +github.com/ava-labs/coreth v0.12.7 h1:5jwqftaI6gUJyPB708XIFZ2FwtbPDFhJFOwPaD6WDkY= +github.com/ava-labs/coreth v0.12.7/go.mod h1:7wH4Mv5SIzZy9Ps1aQdif58p12vA8kbC96givIjrLyk= github.com/avast/retry-go/v4 v4.5.0 h1:QoRAZZ90cj5oni2Lsgl2GW8mNTnUCnmpx/iKpwVisHg= github.com/avast/retry-go/v4 v4.5.0/go.mod h1:7hLEXp0oku2Nir2xBAsg0PTphp9z71bN5Aq1fboC3+I= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -173,8 +177,8 @@ github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c= github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= -github.com/btcsuite/btcd/btcutil v1.1.2 h1:XLMbX8JQEiwMcYft2EGi8zPUkoa0abKIU6/BJSRsjzQ= -github.com/btcsuite/btcd/btcutil v1.1.2/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0= +github.com/btcsuite/btcd/btcutil v1.1.3 h1:xfbtw8lwpp0G6NwSHb+UE67ryTFHJAiNuipusjXSohQ= +github.com/btcsuite/btcd/btcutil v1.1.3/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -596,6 +600,8 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA= github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg= +github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4= github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -711,6 +717,8 @@ github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce h1:7UnVY3T/ZnHUrfv github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hdevalence/ed25519consensus v0.1.0 h1:jtBwzzcHuTmFrQN6xQZn6CQEO/V9f7HsjsjeEZ6auqU= github.com/hdevalence/ed25519consensus v0.1.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= +github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e h1:pIYdhNkDh+YENVNi3gto8n9hAmRxKxoar0iE6BLucjw= +github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e/go.mod h1:j9cQbcqHQujT0oKJ38PylVfqohClLr3CvDC+Qcg+lhU= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.2 h1:TXKcSGc2WaxPD2+bmzAsVthL4+pEN0YwXcL5qED83vk= @@ -1215,6 +1223,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= @@ -1463,14 +1472,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumvbfM1u/etVq42Afwq/jtNSBSOA8n5jntnNPo= github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= -github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s= -github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231113174149-046d4ddaca1a h1:G/pD8uI1PULRJU8Y3eLLzjqQBp9ruG9hj+wWxtyrgTo= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231113174149-046d4ddaca1a/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= -github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= -github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= -github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= -github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb/go.mod h1:/30flFG4L/iCYAFeA3DUzR0xuHSxAMONiWTzyzvsNwo= +github.com/smartcontractkit/chain-selectors v1.0.1 h1:NrSTMpxiB0yEi3BDfiiCkKjUVmSV1Ti3ecsvKtwOwHg= +github.com/smartcontractkit/chain-selectors v1.0.1/go.mod h1:WBhLlODF5b95vvx2tdKK55vGACg1+qZpuBhOGu1UXVo= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306 h1:ko88+ZznniNJZbZPWAvHQU8SwKAdHngdDZ+pvVgB5ss= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306/go.mod h1:w1sAEES3g3PuV/RzUrgow20W2uErMly84hhD3um1WL4= github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU= @@ -1481,6 +1484,8 @@ github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGuj github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= +github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= +github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb/go.mod h1:HNUu4cJekUdsJbwRBCiOybtkPJEfGRELQPe2tkoDEyk= github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 h1:yiKnypAqP8l0OX0P3klzZ7SCcBUxy5KqTAKZmQOvSQE= github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1/go.mod h1:q6f4fe39oZPdsh1i57WznEZgxd8siidMaSFq3wdPmVg= github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 h1:Dai1bn+Q5cpeGMQwRdjOdVjG8mmFFROVkSKuUgBErRQ= @@ -1691,6 +1696,8 @@ go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU= +go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=