Skip to content

Commit

Permalink
Implement Data Streams plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Feb 22, 2024
1 parent 2b99f07 commit dde08ae
Show file tree
Hide file tree
Showing 48 changed files with 3,430 additions and 57 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ require (
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chain-selectors v1.0.10 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240216142700-c5869534c19e // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240213121419-1272736c2ac0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,8 @@ github.com/smartcontractkit/chainlink-common v0.1.7-0.20240221153538-1ea85cf3dc6
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240221153538-1ea85cf3dc6c/go.mod h1:6aXWSEQawX2oZXcPPOdxnEGufAhj7PqPKolXf6ijRGA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336 h1:j00D0/EqE9HRu+63v7KwUOe4ZxLc4AN5SOJFiinkkH0=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336/go.mod h1:umLyYLRGqyIuFfGpEREZP3So6+O8iL35cCCqW+OxX5w=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540/go.mod h1:sjAmX8K2kbQhvDarZE1ZZgDgmHJ50s0BBc/66vKY2ek=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 h1:1BcjXuviSAKttOX7BZoVHRZZGfxqoA2+AL8tykmkdoc=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8/go.mod h1:vy1L7NybTy2F/Yv7BOh+oZBa1MACD6gzd1+DkcSkfp8=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240216142700-c5869534c19e h1:k8HS3GsAFZnxXIW3141VsQP2+EL1XrTtOi/HDt7sdBE=
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
bridgeORM,
mercuryORM,
pipelineRunner,
streamRegistry,
peerWrapper,
telemetryManager,
legacyEVMChains,
Expand Down
58 changes: 32 additions & 26 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,34 +467,40 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
}

// ValidateKeyStoreMatch confirms that the key has a valid match in the keystore
func ValidateKeyStoreMatch(spec *OCR2OracleSpec, keyStore keystore.Master, key string) error {
if spec.PluginType == types.Mercury {
_, err := keyStore.CSA().Get(key)
func ValidateKeyStoreMatch(spec *OCR2OracleSpec, keyStore keystore.Master, key string) (err error) {
switch spec.PluginType {
case types.Mercury, types.LLO:
_, err = keyStore.CSA().Get(key)
if err != nil {
return errors.Errorf("no CSA key matching: %q", key)
err = errors.Errorf("no CSA key matching: %q", key)
}
} else {
switch spec.Relay {
case relay.EVM:
_, err := keyStore.Eth().Get(key)
if err != nil {
return errors.Errorf("no EVM key matching: %q", key)
}
case relay.Cosmos:
_, err := keyStore.Cosmos().Get(key)
if err != nil {
return errors.Errorf("no Cosmos key matching: %q", key)
}
case relay.Solana:
_, err := keyStore.Solana().Get(key)
if err != nil {
return errors.Errorf("no Solana key matching: %q", key)
}
case relay.StarkNet:
_, err := keyStore.StarkNet().Get(key)
if err != nil {
return errors.Errorf("no Starknet key matching: %q", key)
}
default:
err = validateKeyStoreMatchForRelay(spec.Relay, keyStore, key)
}
return
}

func validateKeyStoreMatchForRelay(network relay.Network, keyStore keystore.Master, key string) error {
switch network {
case relay.EVM:
_, err := keyStore.Eth().Get(key)
if err != nil {
return errors.Errorf("no EVM key matching: %q", key)
}
case relay.Cosmos:
_, err := keyStore.Cosmos().Get(key)
if err != nil {
return errors.Errorf("no Cosmos key matching: %q", key)
}
case relay.Solana:
_, err := keyStore.Solana().Get(key)
if err != nil {
return errors.Errorf("no Solana key matching: %q", key)
}
case relay.StarkNet:
_, err := keyStore.StarkNet().Get(key)
if err != nil {
return errors.Errorf("no Starknet key matching: %q", key)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil })
ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig)

d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr))
delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d}

Expand Down
79 changes: 79 additions & 0 deletions core/services/llo/bm/dummy_transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package bm

import (
"context"
"crypto/ed25519"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// A dummy transmitter useful for benchmarking and testing

var (
transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "llo_transmit_success_count",
Help: "Running count of successful transmits",
})
)

type Transmitter interface {
llotypes.Transmitter
services.Service
}

type transmitter struct {
lggr logger.Logger
fromAccount string
}

func NewTransmitter(lggr logger.Logger, fromAccount ed25519.PublicKey) Transmitter {
return &transmitter{
lggr.Named("DummyTransmitter"),
fmt.Sprintf("%x", fromAccount),
}
}

func (t *transmitter) Start(context.Context) error {
return nil
}

func (t *transmitter) Close() error {
return nil
}

func (t *transmitter) Transmit(
ctx context.Context,
digest types.ConfigDigest,
seqNr uint64,
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) error {
transmitSuccessCount.Inc()
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
return nil
}

// FromAccount returns the stringified (hex) CSA public key
func (t *transmitter) FromAccount() (ocr2types.Account, error) {
return ocr2types.Account(t.fromAccount), nil
}

func (t *transmitter) Ready() error { return nil }

func (t *transmitter) HealthReport() map[string]error {
report := map[string]error{t.Name(): nil}
return report
}

func (t *transmitter) Name() string { return t.lggr.Name() }
36 changes: 36 additions & 0 deletions core/services/llo/bm/dummy_transmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bm

import (
"crypto/ed25519"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_DummyTransmitter(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
tr := NewTransmitter(lggr, ed25519.PublicKey("dummy"))

servicetest.Run(t, tr)

err := tr.Transmit(
testutils.Context(t),
types.ConfigDigest{},
42,
ocr3types.ReportWithInfo[llotypes.ReportInfo]{},
[]types.AttributedOnchainSignature{},
)
require.NoError(t, err)

testutils.RequireLogMessage(t, observedLogs, "Transmit")
}
58 changes: 58 additions & 0 deletions core/services/llo/channel_definition_cache_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package llo

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config"
)

type ChannelDefinitionCacheFactory interface {
NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error)
}

var _ ChannelDefinitionCacheFactory = &channelDefinitionCacheFactory{}

func NewChannelDefinitionCacheFactory(lggr logger.Logger, orm ChannelDefinitionCacheORM, lp logpoller.LogPoller) ChannelDefinitionCacheFactory {
return &channelDefinitionCacheFactory{
lggr,
orm,
lp,
make(map[common.Address]struct{}),
sync.Mutex{},
}
}

type channelDefinitionCacheFactory struct {
lggr logger.Logger
orm ChannelDefinitionCacheORM
lp logpoller.LogPoller

caches map[common.Address]struct{}
mu sync.Mutex
}

func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) {
if cfg.ChannelDefinitions != "" {
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions)
}

addr := cfg.ChannelDefinitionsContractAddress
fromBlock := cfg.ChannelDefinitionsContractFromBlock

f.mu.Lock()
defer f.mu.Unlock()

if _, exists := f.caches[addr]; exists {
// This shouldn't really happen and isn't supported
return nil, fmt.Errorf("cache already exists for contract address %s", addr.Hex())
}
f.caches[addr] = struct{}{}
return NewChannelDefinitionCache(f.lggr, f.orm, f.lp, addr, fromBlock), nil
}
103 changes: 103 additions & 0 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package llo

import (
"context"
"fmt"
"math/big"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

var (
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_missing_count",
Help: "Number of times we tried to observe a stream, but it was missing",
},
[]string{"streamID"},
)
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_observation_error_count",
Help: "Number of times we tried to observe a stream, but it failed with an error",
},
[]string{"streamID"},
)
)

type ErrMissingStream struct {
id string
}

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
}

func (e ErrMissingStream) Error() string {
return fmt.Sprintf("missing stream definition for: %q", e.id)
}

var _ llo.DataSource = &dataSource{}

type dataSource struct {
lggr logger.Logger
registry Registry
}

func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource {
return &dataSource{lggr.Named("DataSource"), registry}
}

// Observe looks up all streams in the registry and returns a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) {
var wg sync.WaitGroup
wg.Add(len(streamIDs))
sv := make(llo.StreamValues)
var mu sync.Mutex

for streamID := range streamIDs {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var res llo.ObsResult[*big.Int]

stream, exists := d.registry.Get(streamID)
if exists {
run, trrs, err := stream.Run(ctx)
if err != nil {
var runID int64
if run != nil {
runID = run.ID
}
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID)
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
} else {
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err := streams.ExtractBigInt(trrs)
if err == nil {
res.Val = val
res.Valid = true
}
}
} else {
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID)
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
}

mu.Lock()
defer mu.Unlock()
sv[streamID] = res
}(streamID)
}

wg.Wait()

return sv, nil
}
Loading

0 comments on commit dde08ae

Please sign in to comment.