Skip to content

Commit

Permalink
Revert "Configurable Mercury transmitter parameters (#12680)"
Browse files Browse the repository at this point in the history
This reverts commit f55d8be.
  • Loading branch information
HenryNguyen5 committed Apr 27, 2024
1 parent 7fb6f7c commit c5f77ba
Show file tree
Hide file tree
Showing 29 changed files with 45 additions and 234 deletions.
13 changes: 0 additions & 13 deletions .changeset/sour-jars-cross.md

This file was deleted.

5 changes: 2 additions & 3 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
14 changes: 0 additions & 14 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -622,17 +622,3 @@ LatestReportDeadline = "5s" # Default
[Mercury.TLS]
# CertFile is the path to a PEM file of trusted root certificate authority certificates
CertFile = "/path/to/client/certs.pem" # Example

# Mercury.Transmitter controls settings for the mercury transmitter
[Mercury.Transmitter]
# TransmitQueueMaxSize controls the size of the transmit queue. This is scoped
# per OCR instance. If the queue is full, the transmitter will start dropping
# the oldest messages in order to make space.
#
# This is useful if mercury server goes offline and the nop needs to buffer
# transmissions.
TransmitQueueMaxSize = 10_000 # Default
# TransmitTimeout controls how long the transmitter will wait for a response
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
7 changes: 0 additions & 7 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package config
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand All @@ -17,14 +16,8 @@ type MercuryTLS interface {
CertFile() string
}

type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type Mercury interface {
Credentials(credName string) *types.MercuryCredentials
Cache() MercuryCache
TLS() MercuryTLS
Transmitter() MercuryTransmitter
}
20 changes: 2 additions & 18 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,30 +1312,14 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
return
}

type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitQueueMaxSize; v != nil {
m.TransmitQueueMaxSize = v
}
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
m.TLS.setFrom(&f.TLS)
m.Transmitter.setFrom(&f.Transmitter)
}

func (m *Mercury) ValidateConfig() (err error) {
Expand Down
3 changes: 1 addition & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
MailMon: mailMon,
DS: ds,
},
CSAETHKeystore: keyStore,
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
}

if cfg.EVMEnabled() {
Expand Down
21 changes: 0 additions & 21 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chainlink
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/config"
Expand All @@ -26,8 +25,6 @@ func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration {
return m.c.LatestReportDeadline.Duration()
}

var _ config.MercuryTLS = (*mercuryTLSConfig)(nil)

type mercuryTLSConfig struct {
c toml.MercuryTLS
}
Expand All @@ -36,20 +33,6 @@ func (m *mercuryTLSConfig) CertFile() string {
return *m.c.CertFile
}

var _ config.MercuryTransmitter = (*mercuryTransmitterConfig)(nil)

type mercuryTransmitterConfig struct {
c toml.MercuryTransmitter
}

func (m *mercuryTransmitterConfig) TransmitQueueMaxSize() uint32 {
return *m.c.TransmitQueueMaxSize
}

func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down Expand Up @@ -77,7 +60,3 @@ func (m *mercuryConfig) Cache() config.MercuryCache {
func (m *mercuryConfig) TLS() config.MercuryTLS {
return &mercuryTLSConfig{c: m.c.TLS}
}

func (m *mercuryConfig) Transmitter() config.MercuryTransmitter {
return &mercuryTransmitterConfig{c: m.c.Transmitter}
}
8 changes: 0 additions & 8 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,6 @@ func TestConfig_Marshal(t *testing.T) {
TLS: toml.MercuryTLS{
CertFile: ptr("/path/to/cert.pem"),
},
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
},
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1169,10 +1165,6 @@ LatestReportDeadline = '1m42s'
[Mercury.TLS]
CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
9 changes: 3 additions & 6 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand All @@ -39,7 +38,6 @@ type RelayerFactory struct {
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) {
Expand Down Expand Up @@ -69,10 +67,9 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
}

relayerOpts := evmrelay.RelayerOpts{
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
}
relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
4 changes: 0 additions & 4 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
4 changes: 0 additions & 4 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,6 @@ LatestReportDeadline = '1m42s'
[Mercury.TLS]
CertFile = '/path/to/cert.pem'

[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
1 change: 0 additions & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ type mercuryConfig interface {
Credentials(credName string) *types.MercuryCredentials
Cache() coreconfig.MercuryCache
TLS() coreconfig.MercuryTLS
Transmitter() coreconfig.MercuryTransmitter
}

type thresholdConfig interface {
Expand Down
25 changes: 11 additions & 14 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ type Relayer struct {
codec commontypes.Codec

// Mercury
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig
mercuryORM mercury.ORM

// LLO/data streams
cdcFactory llo.ChannelDefinitionCacheFactory
Expand All @@ -94,8 +93,7 @@ type CSAETHKeystore interface {
type RelayerOpts struct {
DS sqlutil.DataSource
CSAETHKeystore
MercuryPool wsrpc.Pool
TransmitterConfig mercury.TransmitterConfig
MercuryPool wsrpc.Pool
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -124,15 +122,14 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
lloORM := llo.NewORM(opts.DS, chain.ID())
cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller())
return &Relayer{
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
transmitterCfg: opts.TransmitterConfig,
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
}, nil
}

Expand Down Expand Up @@ -249,7 +246,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)
transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)

return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
}
Expand Down
27 changes: 11 additions & 16 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"

Expand All @@ -34,6 +33,12 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
maxTransmitQueueSize = 10_000
maxDeleteQueueSize = 10_000
transmitTimeout = 5 * time.Second
)

const (
// Mercury server error codes
DuplicateReport = 2
Expand Down Expand Up @@ -99,15 +104,9 @@ type TransmitterReportDecoder interface {

var _ Transmitter = (*mercuryTransmitter)(nil)

type TransmitterConfig interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type mercuryTransmitter struct {
services.StateMachine
lggr logger.Logger
cfg TransmitterConfig

servers map[string]*server

Expand Down Expand Up @@ -143,8 +142,6 @@ func getPayloadTypes() abi.Arguments {
type server struct {
lggr logger.Logger

transmitTimeout time.Duration

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
Expand Down Expand Up @@ -224,7 +221,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
// queue was closed
return
}
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(s.transmitTimeout))
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout))
res, err := s.c.Transmit(ctx, t.Req)
cancel()
if runloopCtx.Err() != nil {
Expand Down Expand Up @@ -275,19 +272,18 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
cLggr := lggr.Named(serverURL).With("serverURL", serverURL)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency)
servers[serverURL] = &server{
cLggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm),
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
Expand All @@ -299,7 +295,6 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
return &mercuryTransmitter{
services.StateMachine{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
cfg,
servers,
codec,
feedID,
Expand Down
Loading

0 comments on commit c5f77ba

Please sign in to comment.