Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mercury 1.0 (parallel composition) #10810

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ gomodtidy: ## Run go mod tidy on all modules.
go mod tidy
cd ./core/scripts && go mod tidy
cd ./integration-tests && go mod tidy
cd ./integration-tests/load && go mod tidy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wen gomods?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what that means... do you need me to do something here? I added this because it was missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing to do. This is necessary for now.
I was referring to https://github.com/jmank88/gomods


.PHONY: godoc
godoc: ## Install and run godoc
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ require (
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chain-selectors v1.0.10 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240216142700-c5869534c19e // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240213121419-1272736c2ac0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,8 @@ github.com/smartcontractkit/chainlink-common v0.1.7-0.20240221153538-1ea85cf3dc6
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240221153538-1ea85cf3dc6c/go.mod h1:6aXWSEQawX2oZXcPPOdxnEGufAhj7PqPKolXf6ijRGA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336 h1:j00D0/EqE9HRu+63v7KwUOe4ZxLc4AN5SOJFiinkkH0=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240214203158-47dae5de1336/go.mod h1:umLyYLRGqyIuFfGpEREZP3So6+O8iL35cCCqW+OxX5w=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540/go.mod h1:sjAmX8K2kbQhvDarZE1ZZgDgmHJ50s0BBc/66vKY2ek=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 h1:1BcjXuviSAKttOX7BZoVHRZZGfxqoA2+AL8tykmkdoc=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8/go.mod h1:vy1L7NybTy2F/Yv7BOh+oZBa1MACD6gzd1+DkcSkfp8=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240216142700-c5869534c19e h1:k8HS3GsAFZnxXIW3141VsQP2+EL1XrTtOi/HDt7sdBE=
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
bridgeORM,
mercuryORM,
pipelineRunner,
streamRegistry,
peerWrapper,
telemetryManager,
legacyEVMChains,
Expand Down
58 changes: 32 additions & 26 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,34 +467,40 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
}

// ValidateKeyStoreMatch confirms that the key has a valid match in the keystore
func ValidateKeyStoreMatch(spec *OCR2OracleSpec, keyStore keystore.Master, key string) error {
if spec.PluginType == types.Mercury {
_, err := keyStore.CSA().Get(key)
func ValidateKeyStoreMatch(spec *OCR2OracleSpec, keyStore keystore.Master, key string) (err error) {
switch spec.PluginType {
case types.Mercury, types.LLO:
_, err = keyStore.CSA().Get(key)
if err != nil {
return errors.Errorf("no CSA key matching: %q", key)
err = errors.Errorf("no CSA key matching: %q", key)
}
} else {
switch spec.Relay {
case relay.EVM:
_, err := keyStore.Eth().Get(key)
if err != nil {
return errors.Errorf("no EVM key matching: %q", key)
}
case relay.Cosmos:
_, err := keyStore.Cosmos().Get(key)
if err != nil {
return errors.Errorf("no Cosmos key matching: %q", key)
}
case relay.Solana:
_, err := keyStore.Solana().Get(key)
if err != nil {
return errors.Errorf("no Solana key matching: %q", key)
}
case relay.StarkNet:
_, err := keyStore.StarkNet().Get(key)
if err != nil {
return errors.Errorf("no Starknet key matching: %q", key)
}
default:
err = validateKeyStoreMatchForRelay(spec.Relay, keyStore, key)
}
return
}

func validateKeyStoreMatchForRelay(network relay.Network, keyStore keystore.Master, key string) error {
switch network {
case relay.EVM:
_, err := keyStore.Eth().Get(key)
if err != nil {
return errors.Errorf("no EVM key matching: %q", key)
}
case relay.Cosmos:
_, err := keyStore.Cosmos().Get(key)
if err != nil {
return errors.Errorf("no Cosmos key matching: %q", key)
}
case relay.Solana:
_, err := keyStore.Solana().Get(key)
if err != nil {
return errors.Errorf("no Solana key matching: %q", key)
}
case relay.StarkNet:
_, err := keyStore.StarkNet().Get(key)
if err != nil {
return errors.Errorf("no Starknet key matching: %q", key)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil })
ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig)

d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr))
delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d}

Expand Down
79 changes: 79 additions & 0 deletions core/services/llo/bm/dummy_transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package bm

import (
"context"
"crypto/ed25519"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// A dummy transmitter useful for benchmarking and testing

var (
transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "llo_transmit_success_count",
Help: "Running count of successful transmits",
})
)

type Transmitter interface {
llotypes.Transmitter
services.Service
}

type transmitter struct {
lggr logger.Logger
fromAccount string
}

func NewTransmitter(lggr logger.Logger, fromAccount ed25519.PublicKey) Transmitter {
return &transmitter{
lggr.Named("DummyTransmitter"),
fmt.Sprintf("%x", fromAccount),
}
}

func (t *transmitter) Start(context.Context) error {
return nil
}

func (t *transmitter) Close() error {
return nil
}

func (t *transmitter) Transmit(
ctx context.Context,
digest types.ConfigDigest,
seqNr uint64,
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) error {
transmitSuccessCount.Inc()
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
return nil
}

// FromAccount returns the stringified (hex) CSA public key
func (t *transmitter) FromAccount() (ocr2types.Account, error) {
return ocr2types.Account(t.fromAccount), nil
}

func (t *transmitter) Ready() error { return nil }

func (t *transmitter) HealthReport() map[string]error {
report := map[string]error{t.Name(): nil}
return report
}

func (t *transmitter) Name() string { return t.lggr.Name() }
36 changes: 36 additions & 0 deletions core/services/llo/bm/dummy_transmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bm

import (
"crypto/ed25519"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_DummyTransmitter(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
tr := NewTransmitter(lggr, ed25519.PublicKey("dummy"))

servicetest.Run(t, tr)

err := tr.Transmit(
testutils.Context(t),
types.ConfigDigest{},
42,
ocr3types.ReportWithInfo[llotypes.ReportInfo]{},
[]types.AttributedOnchainSignature{},
)
require.NoError(t, err)

testutils.RequireLogMessage(t, observedLogs, "Transmit")
}
58 changes: 58 additions & 0 deletions core/services/llo/channel_definition_cache_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package llo

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config"
)

type ChannelDefinitionCacheFactory interface {
NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error)
}

var _ ChannelDefinitionCacheFactory = &channelDefinitionCacheFactory{}

func NewChannelDefinitionCacheFactory(lggr logger.Logger, orm ChannelDefinitionCacheORM, lp logpoller.LogPoller) ChannelDefinitionCacheFactory {
return &channelDefinitionCacheFactory{
lggr,
orm,
lp,
make(map[common.Address]struct{}),
sync.Mutex{},
}
}

type channelDefinitionCacheFactory struct {
lggr logger.Logger
orm ChannelDefinitionCacheORM
lp logpoller.LogPoller

caches map[common.Address]struct{}
mu sync.Mutex
}

func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) {
if cfg.ChannelDefinitions != "" {
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions)
}

addr := cfg.ChannelDefinitionsContractAddress
fromBlock := cfg.ChannelDefinitionsContractFromBlock

f.mu.Lock()
defer f.mu.Unlock()

if _, exists := f.caches[addr]; exists {
// This shouldn't really happen and isn't supported
return nil, fmt.Errorf("cache already exists for contract address %s", addr.Hex())
}
f.caches[addr] = struct{}{}
return NewChannelDefinitionCache(f.lggr, f.orm, f.lp, addr, fromBlock), nil
}
103 changes: 103 additions & 0 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package llo

import (
"context"
"fmt"
"math/big"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

var (
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_missing_count",
Help: "Number of times we tried to observe a stream, but it was missing",
},
[]string{"streamID"},
)
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_observation_error_count",
Help: "Number of times we tried to observe a stream, but it failed with an error",
},
[]string{"streamID"},
)
)

type ErrMissingStream struct {
id string
}

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
}

func (e ErrMissingStream) Error() string {
return fmt.Sprintf("missing stream definition for: %q", e.id)
}

var _ llo.DataSource = &dataSource{}

type dataSource struct {
lggr logger.Logger
registry Registry
}

func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource {
return &dataSource{lggr.Named("DataSource"), registry}
}

// Observe looks up all streams in the registry and returns a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) {
var wg sync.WaitGroup
wg.Add(len(streamIDs))
sv := make(llo.StreamValues)
var mu sync.Mutex

for streamID := range streamIDs {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var res llo.ObsResult[*big.Int]

stream, exists := d.registry.Get(streamID)
if exists {
run, trrs, err := stream.Run(ctx)
if err != nil {
var runID int64
if run != nil {
runID = run.ID
}
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID)
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
} else {
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err := streams.ExtractBigInt(trrs)
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about err!=nil case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled automatically since Valid is false by default

res.Val = val
res.Valid = true
}
}
} else {
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID)
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
}

mu.Lock()
defer mu.Unlock()
sv[streamID] = res
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are a lot of streams this lock might be contentious. could instead send to chan and have another go routine be responsible for serializing write by reading the chan

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if there are 100,000 I think it will be ok, writing to a map ought to be very fast. Not sure if putting that through a channel would result in any speedups.

}(streamID)
}

wg.Wait()

return sv, nil
}
Loading
Loading