-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mercury 1.0 (parallel composition) #10810
Changes from all commits
dde08ae
c6338be
176c78c
b3a916a
d2f4a0d
03d8386
8086d31
53da28e
a845f37
3e314ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package bm | ||
|
||
import ( | ||
"context" | ||
"crypto/ed25519" | ||
"fmt" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" | ||
"github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
// A dummy transmitter useful for benchmarking and testing | ||
|
||
var ( | ||
transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "llo_transmit_success_count", | ||
Help: "Running count of successful transmits", | ||
}) | ||
) | ||
|
||
type Transmitter interface { | ||
llotypes.Transmitter | ||
services.Service | ||
} | ||
|
||
type transmitter struct { | ||
lggr logger.Logger | ||
fromAccount string | ||
} | ||
|
||
func NewTransmitter(lggr logger.Logger, fromAccount ed25519.PublicKey) Transmitter { | ||
return &transmitter{ | ||
lggr.Named("DummyTransmitter"), | ||
fmt.Sprintf("%x", fromAccount), | ||
} | ||
} | ||
|
||
func (t *transmitter) Start(context.Context) error { | ||
return nil | ||
} | ||
|
||
func (t *transmitter) Close() error { | ||
return nil | ||
} | ||
|
||
func (t *transmitter) Transmit( | ||
ctx context.Context, | ||
digest types.ConfigDigest, | ||
seqNr uint64, | ||
report ocr3types.ReportWithInfo[llotypes.ReportInfo], | ||
sigs []types.AttributedOnchainSignature, | ||
) error { | ||
transmitSuccessCount.Inc() | ||
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs) | ||
return nil | ||
} | ||
|
||
// FromAccount returns the stringified (hex) CSA public key | ||
func (t *transmitter) FromAccount() (ocr2types.Account, error) { | ||
return ocr2types.Account(t.fromAccount), nil | ||
} | ||
|
||
func (t *transmitter) Ready() error { return nil } | ||
|
||
func (t *transmitter) HealthReport() map[string]error { | ||
report := map[string]error{t.Name(): nil} | ||
return report | ||
} | ||
|
||
func (t *transmitter) Name() string { return t.lggr.Name() } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package bm | ||
|
||
import ( | ||
"crypto/ed25519" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap/zapcore" | ||
|
||
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" | ||
"github.com/smartcontractkit/libocr/offchainreporting2plus/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" | ||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
func Test_DummyTransmitter(t *testing.T) { | ||
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) | ||
tr := NewTransmitter(lggr, ed25519.PublicKey("dummy")) | ||
|
||
servicetest.Run(t, tr) | ||
|
||
err := tr.Transmit( | ||
testutils.Context(t), | ||
types.ConfigDigest{}, | ||
42, | ||
ocr3types.ReportWithInfo[llotypes.ReportInfo]{}, | ||
[]types.AttributedOnchainSignature{}, | ||
) | ||
require.NoError(t, err) | ||
|
||
testutils.RequireLogMessage(t, observedLogs, "Transmit") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package llo | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
|
||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config" | ||
) | ||
|
||
type ChannelDefinitionCacheFactory interface { | ||
NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) | ||
} | ||
|
||
var _ ChannelDefinitionCacheFactory = &channelDefinitionCacheFactory{} | ||
|
||
func NewChannelDefinitionCacheFactory(lggr logger.Logger, orm ChannelDefinitionCacheORM, lp logpoller.LogPoller) ChannelDefinitionCacheFactory { | ||
return &channelDefinitionCacheFactory{ | ||
lggr, | ||
orm, | ||
lp, | ||
make(map[common.Address]struct{}), | ||
sync.Mutex{}, | ||
} | ||
} | ||
|
||
type channelDefinitionCacheFactory struct { | ||
lggr logger.Logger | ||
orm ChannelDefinitionCacheORM | ||
lp logpoller.LogPoller | ||
|
||
caches map[common.Address]struct{} | ||
mu sync.Mutex | ||
} | ||
|
||
func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) { | ||
if cfg.ChannelDefinitions != "" { | ||
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions) | ||
} | ||
|
||
addr := cfg.ChannelDefinitionsContractAddress | ||
fromBlock := cfg.ChannelDefinitionsContractFromBlock | ||
|
||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
if _, exists := f.caches[addr]; exists { | ||
// This shouldn't really happen and isn't supported | ||
return nil, fmt.Errorf("cache already exists for contract address %s", addr.Hex()) | ||
} | ||
f.caches[addr] = struct{}{} | ||
return NewChannelDefinitionCache(f.lggr, f.orm, f.lp, addr, fromBlock), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package llo | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/big" | ||
"sync" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" | ||
"github.com/smartcontractkit/chainlink-data-streams/llo" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/streams" | ||
) | ||
|
||
var ( | ||
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "llo_stream_missing_count", | ||
Help: "Number of times we tried to observe a stream, but it was missing", | ||
}, | ||
[]string{"streamID"}, | ||
) | ||
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "llo_stream_observation_error_count", | ||
Help: "Number of times we tried to observe a stream, but it failed with an error", | ||
}, | ||
[]string{"streamID"}, | ||
) | ||
) | ||
|
||
type ErrMissingStream struct { | ||
id string | ||
} | ||
|
||
type Registry interface { | ||
Get(streamID streams.StreamID) (strm streams.Stream, exists bool) | ||
} | ||
|
||
func (e ErrMissingStream) Error() string { | ||
return fmt.Sprintf("missing stream definition for: %q", e.id) | ||
} | ||
|
||
var _ llo.DataSource = &dataSource{} | ||
|
||
type dataSource struct { | ||
lggr logger.Logger | ||
registry Registry | ||
} | ||
|
||
func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource { | ||
return &dataSource{lggr.Named("DataSource"), registry} | ||
} | ||
|
||
// Observe looks up all streams in the registry and returns a map of stream ID => value | ||
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) { | ||
var wg sync.WaitGroup | ||
wg.Add(len(streamIDs)) | ||
sv := make(llo.StreamValues) | ||
var mu sync.Mutex | ||
|
||
for streamID := range streamIDs { | ||
go func(streamID llotypes.StreamID) { | ||
defer wg.Done() | ||
|
||
var res llo.ObsResult[*big.Int] | ||
|
||
stream, exists := d.registry.Get(streamID) | ||
if exists { | ||
run, trrs, err := stream.Run(ctx) | ||
if err != nil { | ||
var runID int64 | ||
if run != nil { | ||
runID = run.ID | ||
} | ||
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID) | ||
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() | ||
} else { | ||
// TODO: support types other than *big.Int | ||
// https://smartcontract-it.atlassian.net/browse/MERC-3525 | ||
val, err := streams.ExtractBigInt(trrs) | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about err!=nil case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled automatically since Valid is false by default |
||
res.Val = val | ||
res.Valid = true | ||
} | ||
} | ||
} else { | ||
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID) | ||
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() | ||
} | ||
|
||
mu.Lock() | ||
defer mu.Unlock() | ||
sv[streamID] = res | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there are a lot of streams this lock might be contentious. could instead send to chan and have another go routine be responsible for serializing write by reading the chan There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if there are 100,000 I think it will be ok, writing to a map ought to be very fast. Not sure if putting that through a channel would result in any speedups. |
||
}(streamID) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return sv, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wen gomods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what that means... do you need me to do something here? I added this because it was missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to do. This is necessary for now.
I was referring to https://github.com/jmank88/gomods