Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feed LatestPrice query cache #11326

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
Expand Down Expand Up @@ -157,11 +159,19 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID())
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
Logger: appLggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
})

// create the relayer-chain interoperators from application configuration
relayerFactory := chainlink.RelayerFactory{
Logger: appLggr,
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
Expand Down Expand Up @@ -227,6 +237,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
SecretGenerator: chainlink.FilePersistedSecretGenerator{},
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
})
}

Expand Down
23 changes: 23 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,26 @@ TLSCertPath = '/path/to/cert.pem' # Example
[Tracing.Attributes]
# env is an example user specified key-value pair
env = 'test' # Example

[Mercury]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to name this LLO? or will those settings be separate?

Suggested change
[Mercury]
[LLO]

Copy link
Collaborator Author

@samsondav samsondav Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked about this and there is still ambiguity about naming. I thought we were renaming it to LLO, now it may be rename to Streams.

We already have config named Mercury for keepers related to this, I think the path of least resistance may be to leave it called Mercury for now which is at least unambiguous and easy to grep for, and we can change it later if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine I guess, but we'll have to figure out how to navigate those breaking config changes carefully.


# Mercury.Cache controls settings for the price retrieval cache querying a mercury server
[Mercury.Cache]
# LatestReportTTL controls how "stale" we will allow a price to be e.g. if
# set to 1s, a new price will always be fetched if the last result was
# from 1 second ago or older.
#
# Another way of looking at it is such: the cache will _never_ return a
# price that was queried from now-LatestReportTTL or before.
#
# Setting to zero disables caching entirely.
LatestReportTTL = "1s" # Default
# MaxStaleAge is that maximum amount of time that a value can be stale
# before it is deleted from the cache (a form of garbage collection).
#
# This should generally be set to something much larger than
# LatestReportTTL. Setting to zero disables garbage collection.
MaxStaleAge = "1h" # Default
# LatestReportDeadline controls how long to wait for a response from the
# mercury server before retrying. Setting this to zero will wait indefinitely.
LatestReportDeadline = "5s" # Default
13 changes: 12 additions & 1 deletion core/config/mercury_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package config

import ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
import (
"time"

ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
)

type MercuryCache interface {
LatestReportTTL() time.Duration
MaxStaleAge() time.Duration
LatestReportDeadline() time.Duration
}

type Mercury interface {
Credentials(credName string) *ocr2models.MercuryCredentials
Cache() MercuryCache
}
28 changes: 28 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Core struct {
Sentry Sentry `toml:",omitempty"`
Insecure Insecure `toml:",omitempty"`
Tracing Tracing `toml:",omitempty"`
Mercury Mercury `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand Down Expand Up @@ -82,6 +83,7 @@ func (c *Core) SetFrom(f *Core) {
c.OCR.setFrom(&f.OCR)
c.P2P.setFrom(&f.P2P)
c.Keeper.setFrom(&f.Keeper)
c.Mercury.setFrom(&f.Mercury)

c.AutoPprof.setFrom(&f.AutoPprof)
c.Pyroscope.setFrom(&f.Pyroscope)
Expand Down Expand Up @@ -1358,6 +1360,32 @@ func (ins *Insecure) setFrom(f *Insecure) {
}
}

type MercuryCache struct {
LatestReportTTL *models.Duration
MaxStaleAge *models.Duration
LatestReportDeadline *models.Duration
}

func (mc *MercuryCache) setFrom(f *MercuryCache) {
if v := f.LatestReportTTL; v != nil {
mc.LatestReportTTL = v
}
if v := f.MaxStaleAge; v != nil {
mc.MaxStaleAge = v
}
if v := f.LatestReportDeadline; v != nil {
mc.LatestReportDeadline = v
}
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
}

type MercuryCredentials struct {
// LegacyURL is the legacy base URL for mercury v0.2 API
LegacyURL *models.SecretURL
Expand Down
11 changes: 11 additions & 0 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/vrfkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
clsessions "github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
Expand Down Expand Up @@ -342,10 +344,18 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
mailMon := utils.NewMailboxMonitor(cfg.AppID().String())
loopRegistry := plugins.NewLoopRegistry(lggr, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
Logger: lggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
})

relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: loopRegistry,
GRPCOpts: loop.GRPCOpts{},
MercuryPool: mercuryPool,
}

evmOpts := chainlink.EVMFactoryConfig{
Expand Down Expand Up @@ -419,6 +429,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil),
MercuryPool: mercuryPool,
})
require.NoError(t, err)
app := appInstance.(*chainlink.ChainlinkApplication)
Expand Down
8 changes: 8 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
Expand Down Expand Up @@ -160,6 +161,7 @@ type ApplicationOpts struct {
SecretGenerator SecretGenerator
LoopRegistry *plugins.LoopRegistry
GRPCOpts loop.GRPCOpts
MercuryPool wsrpc.Pool
}

// NewApplication initializes a new store if one is not already
Expand Down Expand Up @@ -241,6 +243,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
promReporter := promreporter.NewPromReporter(db.DB, globalLogger)
srvcs = append(srvcs, promReporter)

// pool must be started before all relayers and stopped after them
srvcs = append(srvcs, opts.MercuryPool)

// EVM chains are used all over the place. This will need to change for fully EVM extraction
// TODO: BCF-2510, BCF-2511

Expand Down Expand Up @@ -457,6 +462,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}

for _, s := range srvcs {
if s == nil {
panic("service unexpectedly nil")
jmank88 marked this conversation as resolved.
Show resolved Hide resolved
}
if err := healthChecker.Register(s); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (g *generalConfig) Prometheus() coreconfig.Prometheus {
}

func (g *generalConfig) Mercury() coreconfig.Mercury {
return &mercuryConfig{s: g.secrets.Mercury}
return &mercuryConfig{c: g.c.Mercury, s: g.secrets.Mercury}
}

func (g *generalConfig) Threshold() coreconfig.Threshold {
Expand Down
24 changes: 24 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package chainlink

import (
"time"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
)

var _ config.MercuryCache = (*mercuryCacheConfig)(nil)

type mercuryCacheConfig struct {
c toml.MercuryCache
}

func (m *mercuryCacheConfig) LatestReportTTL() time.Duration {
return m.c.LatestReportTTL.Duration()
}
func (m *mercuryCacheConfig) MaxStaleAge() time.Duration {
return m.c.MaxStaleAge.Duration()
}
func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration {
return m.c.LatestReportDeadline.Duration()
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
}

Expand All @@ -23,3 +43,7 @@ func (m *mercuryConfig) Credentials(credName string) *models.MercuryCredentials
}
return nil
}

func (m *mercuryConfig) Cache() config.MercuryCache {
return &mercuryCacheConfig{c: m.c.Cache}
}
13 changes: 13 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,13 @@ func TestConfig_Marshal(t *testing.T) {
},
},
}
full.Mercury = toml.Mercury{
Cache: toml.MercuryCache{
LatestReportTTL: models.MustNewDuration(100 * time.Second),
MaxStaleAge: models.MustNewDuration(101 * time.Second),
LatestReportDeadline: models.MustNewDuration(102 * time.Second),
},
}

for _, tt := range []struct {
name string
Expand Down Expand Up @@ -1103,6 +1110,12 @@ ConfirmationPoll = '42s'
[[Starknet.Nodes]]
Name = 'primary'
URL = 'http://stark.node'
`},
{"Mercury", Config{Core: toml.Core{Mercury: full.Mercury}}, `[Mercury]
[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
LatestReportDeadline = '1m42s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
3 changes: 3 additions & 0 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type RelayerFactory struct {
logger.Logger
*plugins.LoopRegistry
loop.GRPCOpts
MercuryPool wsrpc.Pool
}

type EVMFactoryConfig struct {
Expand Down Expand Up @@ -68,6 +70,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
QConfig: ccOpts.AppConfig.Database(),
CSAETHKeystore: config.CSAETHKeystore,
EventBroadcaster: ccOpts.EventBroadcaster,
MercuryPool: r.MercuryPool,
}
relayer, err2 := evmrelay.NewRelayer(r.Logger.Named("EVM").Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,9 @@ NodeID = ''
SamplingRatio = 0.0
Mode = 'tls'
TLSCertPath = ''

[Mercury]
[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
LatestReportDeadline = '5s'
6 changes: 6 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ TLSCertPath = '/path/to/cert.pem'
env = 'dev'
test = 'load'

[Mercury]
[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
LatestReportDeadline = '1m42s'

[[EVM]]
ChainID = '1'
Enabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ SamplingRatio = 0.0
Mode = 'tls'
TLSCertPath = ''

[Mercury]
[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
LatestReportDeadline = '5s'

[[EVM]]
ChainID = '1'
AutoCreateKey = true
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type jobPipelineConfig interface {

type mercuryConfig interface {
Credentials(credName string) *models.MercuryCredentials
Cache() coreconfig.MercuryCache
}

type thresholdConfig interface {
Expand Down
8 changes: 4 additions & 4 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type RelayerOpts struct {
pg.QConfig
CSAETHKeystore
pg.EventBroadcaster
MercuryPool wsrpc.Pool
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: wsrpc.NewPool(lggr),
mercuryPool: opts.MercuryPool,
Copy link
Contributor

@jmank88 jmank88 Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a top level pool now? Is the intention to share it across chains? That means it will cross the relay API, and I'm not sure how that is meant to work 🤔

Copy link
Collaborator Author

@samsondav samsondav Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is shared across chains. I asked @cedric-cordenier and this is what he suggested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the relay API has to proxy the shared websocket connections? Why is that better than each plugin process having it's own local pool?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmank88 The reason why I suggested adding this here was because I didn't really consider this part of the relayer LOOPP API. For me this isn't part of it -- see for example how we're passing in the database. In a LOOP world, we wouldn't do that: we'd either pass in the config values and create the pool in the relayer proper, or pass in a proxy service.

I'm trying to understand your objection: I guess you're saying that this would break the isolation LOOPPs are supposed to provide?

Copy link
Contributor

@jmank88 jmank88 Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm observing that this at least adds to the technical debt in the way of LOOPPifying mercury.

  1. If we intend to share these pools between host and plugins, then I guess that is just something to figure out how to do (but it does not immediately make sense to me).
  2. If we do not intend to share these pools between host and plugins, then there is no need to refactor things to be global like this in the first place since we'll just have to undo it in a few weeks. Instead, each relayer should simply own and manage a pool internally.

Copy link
Contributor

@cedric-cordenier cedric-cordenier Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point.

One interesting thought that occurred to me is that you could look at this as a symptom of there not being a single "Mercury relayer", i.e. it's a symptom of there being a mercury provider in all relayers.

Instead, maybe a more natural way to look at this is that Mercury is a "chain", and we should give it its own relayer. If we do that, this problem goes away as calls to Mercury would go via the same relayer and therefore would benefit from the wsrpc pool which in that case would be a totally internal, but shared resource. This Mercury relayer would only implement the write APIs (i.e. the transmitter, the codec etc), and would leave the read side unimplemented or stubbed out.

The benefit of this is that it cleanly maps the relayer as a 1-1 abstraction over the underlying resource, in the same way that relayers do with the chains they target.

It does mean that we'd need to refine the relayer abstraction somewhat, to allow products to use a different relayer for reading and for writing, but that doesn't feel wrong to me, and might even be a better underlying fit (CCIP might benefit from this for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that separate reading and writing relayers makes sense 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samsondav Do you think that is a good long-term direction for Mercury?

Copy link
Collaborator Author

@samsondav samsondav Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, separating the read/write relayers makes sense to me. However, note that in this case the "write" relayer would also have to support reading. Since we send reports to mercury but also query reports from it.

Treating mercury server as a "chain" is definitely the right way to go.

eventBroadcaster: opts.EventBroadcaster,
pgCfg: opts.QConfig,
}, nil
Expand All @@ -116,17 +117,16 @@ func (r *Relayer) Start(context.Context) error {
}

func (r *Relayer) Close() error {
return r.mercuryPool.Close()
return nil
}

// Ready does noop: always ready
func (r *Relayer) Ready() error {
return r.mercuryPool.Ready()
return nil
}

func (r *Relayer) HealthReport() (report map[string]error) {
report = make(map[string]error)
services.CopyHealth(report, r.mercuryPool.HealthReport())
samsondav marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
4 changes: 4 additions & 0 deletions core/services/relay/evm/mercury/utils/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ const (

type FeedID [32]byte

func BytesToFeedID(b []byte) FeedID {
return (FeedID)(utils.BytesToHash(b))
}

func (f FeedID) Hex() string { return (utils.Hash)(f).Hex() }

func (f FeedID) String() string { return (utils.Hash)(f).String() }
Expand Down
Loading
Loading