Skip to content

Commit

Permalink
Price retrieval cache for Mercury
Browse files Browse the repository at this point in the history
Problem: Core nodes are querying LatestReport a lot (~70rps). This is not only wasteful, it potentially increases latency if Mercury server falls behind and is also completely unnecessary. We do not need such up-to-the-millisecond accuracy for billing,

Proposed solution: Introduce a price cache for mercury with a TTL e.g. of 1 minute or whatever tolerance we have for billing price and only fetch from the server when the cache becomes stale.
  • Loading branch information
samsondav committed Nov 22, 2023
1 parent e67a76c commit c8a74c5
Show file tree
Hide file tree
Showing 40 changed files with 984 additions and 20 deletions.
11 changes: 11 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
Expand Down Expand Up @@ -157,11 +159,19 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID())
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
Logger: appLggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
})

// create the relayer-chain interoperators from application configuration
relayerFactory := chainlink.RelayerFactory{
Logger: appLggr,
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
Expand Down Expand Up @@ -227,6 +237,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
SecretGenerator: chainlink.FilePersistedSecretGenerator{},
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
})
}

Expand Down
23 changes: 23 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,26 @@ TLSCertPath = '/path/to/cert.pem' # Example
[Tracing.Attributes]
# env is an example user specified key-value pair
env = 'test' # Example

[Mercury]

# Mercury.Cache controls settings for the price retrieval cache querying a mercury server
[Mercury.Cache]
# LatestReportTTL controls how "stale" we will allow a price to be e.g. if
# set to 1s, a new price will always be fetched if the last result was
# from 1 second ago or older.
#
# Another way of looking at it is such: the cache will _never_ return a
# price that was queried from now-LatestReportTTL or before.
#
# Setting to zero disables caching entirely.
LatestReportTTL = "5s" # Default
# MaxStaleAge is that maximum amount of time that a value can be stale
# before it is deleted from the cache (a form of garbage collection).
#
# This should generally be set to something much larger than
# LatestReportTTL. Setting to zero disables garbage collection.
MaxStaleAge = "1h" # Default
# LatestReportDeadline controls how long to wait for a response from the
# mercury server before retrying. Setting this to zero will wait indefinitely.
LatestReportDeadline = "5s" # Default
13 changes: 12 additions & 1 deletion core/config/mercury_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package config

import ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
import (
"time"

ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
)

type MercuryCache interface {
LatestReportTTL() time.Duration
MaxStaleAge() time.Duration
LatestReportDeadline() time.Duration
}

type Mercury interface {
Credentials(credName string) *ocr2models.MercuryCredentials
Cache() MercuryCache
}
28 changes: 28 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Core struct {
Sentry Sentry `toml:",omitempty"`
Insecure Insecure `toml:",omitempty"`
Tracing Tracing `toml:",omitempty"`
Mercury Mercury `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand Down Expand Up @@ -82,6 +83,7 @@ func (c *Core) SetFrom(f *Core) {
c.OCR.setFrom(&f.OCR)
c.P2P.setFrom(&f.P2P)
c.Keeper.setFrom(&f.Keeper)
c.Mercury.setFrom(&f.Mercury)

c.AutoPprof.setFrom(&f.AutoPprof)
c.Pyroscope.setFrom(&f.Pyroscope)
Expand Down Expand Up @@ -1358,6 +1360,32 @@ func (ins *Insecure) setFrom(f *Insecure) {
}
}

type MercuryCache struct {
LatestReportTTL *models.Duration
MaxStaleAge *models.Duration
LatestReportDeadline *models.Duration
}

func (mc *MercuryCache) setFrom(f *MercuryCache) {
if v := f.LatestReportTTL; v != nil {
mc.LatestReportTTL = v
}
if v := f.MaxStaleAge; v != nil {
mc.MaxStaleAge = v
}
if v := f.LatestReportDeadline; v != nil {
mc.LatestReportDeadline = v
}
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
}

type MercuryCredentials struct {
// LegacyURL is the legacy base URL for mercury v0.2 API
LegacyURL *models.SecretURL
Expand Down
11 changes: 11 additions & 0 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/vrfkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
clsessions "github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
Expand Down Expand Up @@ -380,10 +382,18 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
mailMon := utils.NewMailboxMonitor(cfg.AppID().String())
loopRegistry := plugins.NewLoopRegistry(lggr, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
Logger: lggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
})

relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: loopRegistry,
GRPCOpts: loop.GRPCOpts{},
MercuryPool: mercuryPool,
}

evmOpts := chainlink.EVMFactoryConfig{
Expand Down Expand Up @@ -457,6 +467,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil),
MercuryPool: mercuryPool,
})
require.NoError(t, err)
app := appInstance.(*chainlink.ChainlinkApplication)
Expand Down
8 changes: 8 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
Expand Down Expand Up @@ -160,6 +161,7 @@ type ApplicationOpts struct {
SecretGenerator SecretGenerator
LoopRegistry *plugins.LoopRegistry
GRPCOpts loop.GRPCOpts
MercuryPool wsrpc.Pool
}

// NewApplication initializes a new store if one is not already
Expand Down Expand Up @@ -241,6 +243,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
promReporter := promreporter.NewPromReporter(db.DB, globalLogger)
srvcs = append(srvcs, promReporter)

// pool must be started before all relayers and stopped after them
srvcs = append(srvcs, opts.MercuryPool)

// EVM chains are used all over the place. This will need to change for fully EVM extraction
// TODO: BCF-2510, BCF-2511

Expand Down Expand Up @@ -457,6 +462,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}

for _, s := range srvcs {
if s == nil {
panic("service unexpectedly nil")
}
if err := healthChecker.Register(s); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (g *generalConfig) Prometheus() coreconfig.Prometheus {
}

func (g *generalConfig) Mercury() coreconfig.Mercury {
return &mercuryConfig{s: g.secrets.Mercury}
return &mercuryConfig{c: g.c.Mercury, s: g.secrets.Mercury}
}

func (g *generalConfig) Threshold() coreconfig.Threshold {
Expand Down
24 changes: 24 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package chainlink

import (
"time"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
)

var _ config.MercuryCache = (*mercuryCacheConfig)(nil)

type mercuryCacheConfig struct {
c toml.MercuryCache
}

func (m *mercuryCacheConfig) LatestReportTTL() time.Duration {
return m.c.LatestReportTTL.Duration()
}
func (m *mercuryCacheConfig) MaxStaleAge() time.Duration {
return m.c.MaxStaleAge.Duration()
}
func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration {
return m.c.LatestReportDeadline.Duration()
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
}

Expand All @@ -23,3 +43,7 @@ func (m *mercuryConfig) Credentials(credName string) *models.MercuryCredentials
}
return nil
}

func (m *mercuryConfig) Cache() config.MercuryCache {
return &mercuryCacheConfig{c: m.c.Cache}
}
13 changes: 13 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,13 @@ func TestConfig_Marshal(t *testing.T) {
},
},
}
full.Mercury = toml.Mercury{
Cache: toml.MercuryCache{
LatestReportTTL: models.MustNewDuration(100 * time.Second),
MaxStaleAge: models.MustNewDuration(101 * time.Second),
LatestReportDeadline: models.MustNewDuration(102 * time.Second),
},
}

for _, tt := range []struct {
name string
Expand Down Expand Up @@ -1103,6 +1110,12 @@ ConfirmationPoll = '42s'
[[Starknet.Nodes]]
Name = 'primary'
URL = 'http://stark.node'
`},
{"Mercury", Config{Core: toml.Core{Mercury: full.Mercury}}, `[Mercury]
[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
LatestReportDeadline = '1m42s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
3 changes: 3 additions & 0 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type RelayerFactory struct {
logger.Logger
*plugins.LoopRegistry
loop.GRPCOpts
MercuryPool wsrpc.Pool
}

type EVMFactoryConfig struct {
Expand Down Expand Up @@ -68,6 +70,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
QConfig: ccOpts.AppConfig.Database(),
CSAETHKeystore: config.CSAETHKeystore,
EventBroadcaster: ccOpts.EventBroadcaster,
MercuryPool: r.MercuryPool,
}
relayer, err2 := evmrelay.NewRelayer(ccOpts.Logger.Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
6 changes: 6 additions & 0 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,9 @@ NodeID = ''
SamplingRatio = 0.0
Mode = 'tls'
TLSCertPath = ''

[Mercury]
[Mercury.Cache]
LatestReportTTL = '5s'
MaxStaleAge = '1h0m0s'
LatestReportDeadline = '5s'
6 changes: 6 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ TLSCertPath = '/path/to/cert.pem'
env = 'dev'
test = 'load'

[Mercury]
[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
LatestReportDeadline = '1m42s'

[[EVM]]
ChainID = '1'
Enabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ SamplingRatio = 0.0
Mode = 'tls'
TLSCertPath = ''

[Mercury]
[Mercury.Cache]
LatestReportTTL = '5s'
MaxStaleAge = '1h0m0s'
LatestReportDeadline = '5s'

[[EVM]]
ChainID = '1'
AutoCreateKey = true
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type jobPipelineConfig interface {

type mercuryConfig interface {
Credentials(credName string) *models.MercuryCredentials
Cache() coreconfig.MercuryCache
}

type thresholdConfig interface {
Expand Down
8 changes: 4 additions & 4 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type RelayerOpts struct {
pg.QConfig
CSAETHKeystore
pg.EventBroadcaster
MercuryPool wsrpc.Pool
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewRelayer(lggr logger.Logger, chain evm.Chain, opts RelayerOpts) (*Relayer
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: wsrpc.NewPool(lggr),
mercuryPool: opts.MercuryPool,
eventBroadcaster: opts.EventBroadcaster,
pgCfg: opts.QConfig,
}, nil
Expand All @@ -116,17 +117,16 @@ func (r *Relayer) Start(context.Context) error {
}

func (r *Relayer) Close() error {
return r.mercuryPool.Close()
return nil
}

// Ready does noop: always ready
func (r *Relayer) Ready() error {
return r.mercuryPool.Ready()
return nil
}

func (r *Relayer) HealthReport() (report map[string]error) {
report = make(map[string]error)
services.CopyHealth(report, r.mercuryPool.HealthReport())
return
}

Expand Down
4 changes: 4 additions & 0 deletions core/services/relay/evm/mercury/utils/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ const (

type FeedID [32]byte

func BytesToFeedID(b []byte) FeedID {
return (FeedID)(utils.BytesToHash(b))
}

func (f FeedID) Hex() string { return (utils.Hash)(f).Hex() }

func (f FeedID) String() string { return (utils.Hash)(f).String() }
Expand Down
Loading

0 comments on commit c8a74c5

Please sign in to comment.