-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Mercury 1.0 (parallel composition) (#10810)
* Implement Data Streams plugin * Try to make tests a bit more deterministic * lint * Increase DeltaGrace * gomodtidy] * Increase timeouts significantly * Attempt test fix * Add comments * Remove useless comment * Add fromblock to LLO job spec
- Loading branch information
Showing
51 changed files
with
3,436 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package bm | ||
|
||
import ( | ||
"context" | ||
"crypto/ed25519" | ||
"fmt" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" | ||
"github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
// A dummy transmitter useful for benchmarking and testing | ||
|
||
var ( | ||
transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "llo_transmit_success_count", | ||
Help: "Running count of successful transmits", | ||
}) | ||
) | ||
|
||
type Transmitter interface { | ||
llotypes.Transmitter | ||
services.Service | ||
} | ||
|
||
type transmitter struct { | ||
lggr logger.Logger | ||
fromAccount string | ||
} | ||
|
||
func NewTransmitter(lggr logger.Logger, fromAccount ed25519.PublicKey) Transmitter { | ||
return &transmitter{ | ||
lggr.Named("DummyTransmitter"), | ||
fmt.Sprintf("%x", fromAccount), | ||
} | ||
} | ||
|
||
func (t *transmitter) Start(context.Context) error { | ||
return nil | ||
} | ||
|
||
func (t *transmitter) Close() error { | ||
return nil | ||
} | ||
|
||
func (t *transmitter) Transmit( | ||
ctx context.Context, | ||
digest types.ConfigDigest, | ||
seqNr uint64, | ||
report ocr3types.ReportWithInfo[llotypes.ReportInfo], | ||
sigs []types.AttributedOnchainSignature, | ||
) error { | ||
transmitSuccessCount.Inc() | ||
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs) | ||
return nil | ||
} | ||
|
||
// FromAccount returns the stringified (hex) CSA public key | ||
func (t *transmitter) FromAccount() (ocr2types.Account, error) { | ||
return ocr2types.Account(t.fromAccount), nil | ||
} | ||
|
||
func (t *transmitter) Ready() error { return nil } | ||
|
||
func (t *transmitter) HealthReport() map[string]error { | ||
report := map[string]error{t.Name(): nil} | ||
return report | ||
} | ||
|
||
func (t *transmitter) Name() string { return t.lggr.Name() } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package bm | ||
|
||
import ( | ||
"crypto/ed25519" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap/zapcore" | ||
|
||
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" | ||
"github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" | ||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
func Test_DummyTransmitter(t *testing.T) { | ||
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) | ||
tr := NewTransmitter(lggr, ed25519.PublicKey("dummy")) | ||
|
||
servicetest.Run(t, tr) | ||
|
||
err := tr.Transmit( | ||
testutils.Context(t), | ||
types.ConfigDigest{}, | ||
42, | ||
ocr3types.ReportWithInfo[llotypes.ReportInfo]{}, | ||
[]types.AttributedOnchainSignature{}, | ||
) | ||
require.NoError(t, err) | ||
|
||
testutils.RequireLogMessage(t, observedLogs, "Transmit") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package llo | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
|
||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config" | ||
) | ||
|
||
type ChannelDefinitionCacheFactory interface { | ||
NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) | ||
} | ||
|
||
var _ ChannelDefinitionCacheFactory = &channelDefinitionCacheFactory{} | ||
|
||
func NewChannelDefinitionCacheFactory(lggr logger.Logger, orm ChannelDefinitionCacheORM, lp logpoller.LogPoller) ChannelDefinitionCacheFactory { | ||
return &channelDefinitionCacheFactory{ | ||
lggr, | ||
orm, | ||
lp, | ||
make(map[common.Address]struct{}), | ||
sync.Mutex{}, | ||
} | ||
} | ||
|
||
type channelDefinitionCacheFactory struct { | ||
lggr logger.Logger | ||
orm ChannelDefinitionCacheORM | ||
lp logpoller.LogPoller | ||
|
||
caches map[common.Address]struct{} | ||
mu sync.Mutex | ||
} | ||
|
||
func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) { | ||
if cfg.ChannelDefinitions != "" { | ||
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions) | ||
} | ||
|
||
addr := cfg.ChannelDefinitionsContractAddress | ||
fromBlock := cfg.ChannelDefinitionsContractFromBlock | ||
|
||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
if _, exists := f.caches[addr]; exists { | ||
// This shouldn't really happen and isn't supported | ||
return nil, fmt.Errorf("cache already exists for contract address %s", addr.Hex()) | ||
} | ||
f.caches[addr] = struct{}{} | ||
return NewChannelDefinitionCache(f.lggr, f.orm, f.lp, addr, fromBlock), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package llo | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/big" | ||
"sync" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
"github.com/smartcontractkit/chainlink-data-streams/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/streams" | ||
) | ||
|
||
var ( | ||
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "llo_stream_missing_count", | ||
Help: "Number of times we tried to observe a stream, but it was missing", | ||
}, | ||
[]string{"streamID"}, | ||
) | ||
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "llo_stream_observation_error_count", | ||
Help: "Number of times we tried to observe a stream, but it failed with an error", | ||
}, | ||
[]string{"streamID"}, | ||
) | ||
) | ||
|
||
type ErrMissingStream struct { | ||
id string | ||
} | ||
|
||
type Registry interface { | ||
Get(streamID streams.StreamID) (strm streams.Stream, exists bool) | ||
} | ||
|
||
func (e ErrMissingStream) Error() string { | ||
return fmt.Sprintf("missing stream definition for: %q", e.id) | ||
} | ||
|
||
var _ llo.DataSource = &dataSource{} | ||
|
||
type dataSource struct { | ||
lggr logger.Logger | ||
registry Registry | ||
} | ||
|
||
func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource { | ||
return &dataSource{lggr.Named("DataSource"), registry} | ||
} | ||
|
||
// Observe looks up all streams in the registry and returns a map of stream ID => value | ||
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) { | ||
var wg sync.WaitGroup | ||
wg.Add(len(streamIDs)) | ||
sv := make(llo.StreamValues) | ||
var mu sync.Mutex | ||
|
||
for streamID := range streamIDs { | ||
go func(streamID llotypes.StreamID) { | ||
defer wg.Done() | ||
|
||
var res llo.ObsResult[*big.Int] | ||
|
||
stream, exists := d.registry.Get(streamID) | ||
if exists { | ||
run, trrs, err := stream.Run(ctx) | ||
if err != nil { | ||
var runID int64 | ||
if run != nil { | ||
runID = run.ID | ||
} | ||
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID) | ||
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() | ||
} else { | ||
// TODO: support types other than *big.Int | ||
// https://smartcontract-it.atlassian.net/browse/MERC-3525 | ||
val, err := streams.ExtractBigInt(trrs) | ||
if err == nil { | ||
res.Val = val | ||
res.Valid = true | ||
} | ||
} | ||
} else { | ||
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID) | ||
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() | ||
} | ||
|
||
mu.Lock() | ||
defer mu.Unlock() | ||
sv[streamID] = res | ||
}(streamID) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return sv, nil | ||
} |
Oops, something went wrong.