diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 35659aa7797..daf936b36c8 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -43,6 +43,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/sessions" @@ -157,11 +159,19 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID()) loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing()) + mercuryPool := wsrpc.NewPool(appLggr, cache.Config{ + Logger: appLggr, + LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(), + MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(), + LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(), + }) + // create the relayer-chain interoperators from application configuration relayerFactory := chainlink.RelayerFactory{ Logger: appLggr, LoopRegistry: loopRegistry, GRPCOpts: grpcOpts, + MercuryPool: mercuryPool, } evmFactoryCfg := chainlink.EVMFactoryConfig{ @@ -227,6 +237,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G SecretGenerator: chainlink.FilePersistedSecretGenerator{}, LoopRegistry: loopRegistry, GRPCOpts: grpcOpts, + MercuryPool: mercuryPool, }) } diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 79801c2c52b..e438f4553fe 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -601,3 +601,26 @@ TLSCertPath = '/path/to/cert.pem' # Example [Tracing.Attributes] # env is an example user specified key-value pair env = 'test' # Example + +[Mercury] + +# Mercury.Cache controls settings for the price retrieval cache querying a mercury server +[Mercury.Cache] +# LatestReportTTL controls how "stale" we will allow a price to be e.g. if +# set to 1s, a new price will always be fetched if the last result was +# from 1 second ago or older. +# +# Another way of looking at it is such: the cache will _never_ return a +# price that was queried from now-LatestReportTTL or before. +# +# Setting to zero disables caching entirely. +LatestReportTTL = "1s" # Default +# MaxStaleAge is that maximum amount of time that a value can be stale +# before it is deleted from the cache (a form of garbage collection). +# +# This should generally be set to something much larger than +# LatestReportTTL. Setting to zero disables garbage collection. +MaxStaleAge = "1h" # Default +# LatestReportDeadline controls how long to wait for a response from the +# mercury server before retrying. Setting this to zero will wait indefinitely. +LatestReportDeadline = "5s" # Default diff --git a/core/config/mercury_config.go b/core/config/mercury_config.go index d7e985d14e6..e530af6338f 100644 --- a/core/config/mercury_config.go +++ b/core/config/mercury_config.go @@ -1,7 +1,18 @@ package config -import ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" +import ( + "time" + + ocr2models "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" +) + +type MercuryCache interface { + LatestReportTTL() time.Duration + MaxStaleAge() time.Duration + LatestReportDeadline() time.Duration +} type Mercury interface { Credentials(credName string) *ocr2models.MercuryCredentials + Cache() MercuryCache } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 31076c1f1de..c420d7f3f47 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -54,6 +54,7 @@ type Core struct { Sentry Sentry `toml:",omitempty"` Insecure Insecure `toml:",omitempty"` Tracing Tracing `toml:",omitempty"` + Mercury Mercury `toml:",omitempty"` } // SetFrom updates c with any non-nil values from f. (currently TOML field only!) @@ -82,6 +83,7 @@ func (c *Core) SetFrom(f *Core) { c.OCR.setFrom(&f.OCR) c.P2P.setFrom(&f.P2P) c.Keeper.setFrom(&f.Keeper) + c.Mercury.setFrom(&f.Mercury) c.AutoPprof.setFrom(&f.AutoPprof) c.Pyroscope.setFrom(&f.Pyroscope) @@ -1358,6 +1360,32 @@ func (ins *Insecure) setFrom(f *Insecure) { } } +type MercuryCache struct { + LatestReportTTL *models.Duration + MaxStaleAge *models.Duration + LatestReportDeadline *models.Duration +} + +func (mc *MercuryCache) setFrom(f *MercuryCache) { + if v := f.LatestReportTTL; v != nil { + mc.LatestReportTTL = v + } + if v := f.MaxStaleAge; v != nil { + mc.MaxStaleAge = v + } + if v := f.LatestReportDeadline; v != nil { + mc.LatestReportDeadline = v + } +} + +type Mercury struct { + Cache MercuryCache `toml:",omitempty"` +} + +func (m *Mercury) setFrom(f *Mercury) { + m.Cache.setFrom(&f.Cache) +} + type MercuryCredentials struct { // LegacyURL is the legacy base URL for mercury v0.2 API LegacyURL *models.SecretURL diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 17a1d4fadd4..fc648486a03 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -76,6 +76,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/vrfkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" clsessions "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/static" @@ -342,10 +344,18 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn mailMon := utils.NewMailboxMonitor(cfg.AppID().String()) loopRegistry := plugins.NewLoopRegistry(lggr, nil) + mercuryPool := wsrpc.NewPool(lggr, cache.Config{ + Logger: lggr, + LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(), + MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(), + LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(), + }) + relayerFactory := chainlink.RelayerFactory{ Logger: lggr, LoopRegistry: loopRegistry, GRPCOpts: loop.GRPCOpts{}, + MercuryPool: mercuryPool, } evmOpts := chainlink.EVMFactoryConfig{ @@ -419,6 +429,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn UnrestrictedHTTPClient: c, SecretGenerator: MockSecretGenerator{}, LoopRegistry: plugins.NewLoopRegistry(lggr, nil), + MercuryPool: mercuryPool, }) require.NoError(t, err) app := appInstance.(*chainlink.ChainlinkApplication) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 29679ee92fb..1ecf95d3a5e 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -48,6 +48,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/core/services/vrf" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" @@ -160,6 +161,7 @@ type ApplicationOpts struct { SecretGenerator SecretGenerator LoopRegistry *plugins.LoopRegistry GRPCOpts loop.GRPCOpts + MercuryPool wsrpc.Pool } // NewApplication initializes a new store if one is not already @@ -241,6 +243,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { promReporter := promreporter.NewPromReporter(db.DB, globalLogger) srvcs = append(srvcs, promReporter) + // pool must be started before all relayers and stopped after them + srvcs = append(srvcs, opts.MercuryPool) + // EVM chains are used all over the place. This will need to change for fully EVM extraction // TODO: BCF-2510, BCF-2511 @@ -457,6 +462,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } for _, s := range srvcs { + if s == nil { + panic("service unexpectedly nil") + } if err := healthChecker.Register(s); err != nil { return nil, err } diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index fff7822a814..c7d9cc6ce5d 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -507,7 +507,7 @@ func (g *generalConfig) Prometheus() coreconfig.Prometheus { } func (g *generalConfig) Mercury() coreconfig.Mercury { - return &mercuryConfig{s: g.secrets.Mercury} + return &mercuryConfig{c: g.c.Mercury, s: g.secrets.Mercury} } func (g *generalConfig) Threshold() coreconfig.Threshold { diff --git a/core/services/chainlink/config_mercury.go b/core/services/chainlink/config_mercury.go index 1a20dd069d8..61e48d340e4 100644 --- a/core/services/chainlink/config_mercury.go +++ b/core/services/chainlink/config_mercury.go @@ -1,11 +1,31 @@ package chainlink import ( + "time" + + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/config/toml" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" ) +var _ config.MercuryCache = (*mercuryCacheConfig)(nil) + +type mercuryCacheConfig struct { + c toml.MercuryCache +} + +func (m *mercuryCacheConfig) LatestReportTTL() time.Duration { + return m.c.LatestReportTTL.Duration() +} +func (m *mercuryCacheConfig) MaxStaleAge() time.Duration { + return m.c.MaxStaleAge.Duration() +} +func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration { + return m.c.LatestReportDeadline.Duration() +} + type mercuryConfig struct { + c toml.Mercury s toml.MercurySecrets } @@ -23,3 +43,7 @@ func (m *mercuryConfig) Credentials(credName string) *models.MercuryCredentials } return nil } + +func (m *mercuryConfig) Cache() config.MercuryCache { + return &mercuryCacheConfig{c: m.c.Cache} +} diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 33fda221285..d777e34abf7 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -668,6 +668,13 @@ func TestConfig_Marshal(t *testing.T) { }, }, } + full.Mercury = toml.Mercury{ + Cache: toml.MercuryCache{ + LatestReportTTL: models.MustNewDuration(100 * time.Second), + MaxStaleAge: models.MustNewDuration(101 * time.Second), + LatestReportDeadline: models.MustNewDuration(102 * time.Second), + }, + } for _, tt := range []struct { name string @@ -1103,6 +1110,12 @@ ConfirmationPoll = '42s' [[Starknet.Nodes]] Name = 'primary' URL = 'http://stark.node' +`}, + {"Mercury", Config{Core: toml.Core{Mercury: full.Mercury}}, `[Mercury] +[Mercury.Cache] +LatestReportTTL = '1m40s' +MaxStaleAge = '1m41s' +LatestReportDeadline = '1m42s' `}, {"full", full, fullTOML}, {"multi-chain", multiChain, multiChainTOML}, diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index c15643551e9..8b8749013fc 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/relay" evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/plugins" ) @@ -32,6 +33,7 @@ type RelayerFactory struct { logger.Logger *plugins.LoopRegistry loop.GRPCOpts + MercuryPool wsrpc.Pool } type EVMFactoryConfig struct { @@ -68,6 +70,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m QConfig: ccOpts.AppConfig.Database(), CSAETHKeystore: config.CSAETHKeystore, EventBroadcaster: ccOpts.EventBroadcaster, + MercuryPool: r.MercuryPool, } relayer, err2 := evmrelay.NewRelayer(r.Logger.Named("EVM").Named(relayID.ChainID), chain, relayerOpts) if err2 != nil { diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index 8f3135b34e4..2531e7c281d 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -234,3 +234,9 @@ NodeID = '' SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' + +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index eca5f6f96d2..8036165d6e8 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -245,6 +245,12 @@ TLSCertPath = '/path/to/cert.pem' env = 'dev' test = 'load' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1m40s' +MaxStaleAge = '1m41s' +LatestReportDeadline = '1m42s' + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 6a60dfd419a..371cc50a170 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -235,6 +235,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 45ee4e0f8fb..eebc95903ed 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -189,6 +189,7 @@ type jobPipelineConfig interface { type mercuryConfig interface { Credentials(credName string) *models.MercuryCredentials + Cache() coreconfig.MercuryCache } type thresholdConfig interface { diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 952c1869bfa..088a69a2582 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -66,6 +66,7 @@ type RelayerOpts struct { pg.QConfig CSAETHKeystore pg.EventBroadcaster + MercuryPool wsrpc.Pool } func (c RelayerOpts) Validate() error { @@ -100,7 +101,7 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R chain: chain, lggr: lggr, ks: opts.CSAETHKeystore, - mercuryPool: wsrpc.NewPool(lggr), + mercuryPool: opts.MercuryPool, eventBroadcaster: opts.EventBroadcaster, pgCfg: opts.QConfig, }, nil @@ -116,17 +117,16 @@ func (r *Relayer) Start(context.Context) error { } func (r *Relayer) Close() error { - return r.mercuryPool.Close() + return nil } // Ready does noop: always ready func (r *Relayer) Ready() error { - return r.mercuryPool.Ready() + return nil } func (r *Relayer) HealthReport() (report map[string]error) { report = make(map[string]error) - services.CopyHealth(report, r.mercuryPool.HealthReport()) return } diff --git a/core/services/relay/evm/mercury/utils/feeds.go b/core/services/relay/evm/mercury/utils/feeds.go index b1a20618ef6..6f8978bbf0d 100644 --- a/core/services/relay/evm/mercury/utils/feeds.go +++ b/core/services/relay/evm/mercury/utils/feeds.go @@ -88,6 +88,10 @@ const ( type FeedID [32]byte +func BytesToFeedID(b []byte) FeedID { + return (FeedID)(utils.BytesToHash(b)) +} + func (f FeedID) Hex() string { return (utils.Hash)(f).Hex() } func (f FeedID) String() string { return (utils.Hash)(f).String() } diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache.go b/core/services/relay/evm/mercury/wsrpc/cache/cache.go new file mode 100644 index 00000000000..4962210d58f --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache.go @@ -0,0 +1,395 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/jpillora/backoff" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + promFetchFailedCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_cache_fetch_failure_count", + Help: "Number of times we tried to call LatestReport from the mercury server, but some kind of error occurred", + }, + []string{"serverURL", "feedID"}, + ) + promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_cache_hit_count", + Help: "Running count of cache hits", + }, + []string{"serverURL", "feedID"}, + ) + promCacheWaitCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_cache_wait_count", + Help: "Running count of times that we had to wait for a fetch to complete before reading from cache", + }, + []string{"serverURL", "feedID"}, + ) + promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_cache_miss_count", + Help: "Running count of cache misses", + }, + []string{"serverURL", "feedID"}, + ) +) + +type Fetcher interface { + LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) +} + +type Client interface { + Fetcher + ServerURL() string + RawClient() pb.MercuryClient +} + +// Cache is scoped to one particular mercury server +// Use CacheSet to hold lookups for multiple servers +type Cache interface { + Fetcher + services.Service +} + +type Config struct { + Logger logger.Logger + // LatestReportTTL controls how "stale" we will allow a price to be e.g. if + // set to 1s, a new price will always be fetched if the last result was + // from more than 1 second ago. + // + // Another way of looking at it is such: the cache will _never_ return a + // price that was queried from before now-LatestReportTTL. + // + // Setting to zero disables caching entirely. + LatestReportTTL time.Duration + // MaxStaleAge is that maximum amount of time that a value can be stale + // before it is deleted from the cache (a form of garbage collection). + // + // This should generally be set to something much larger than + // LatestReportTTL. Setting to zero disables garbage collection. + MaxStaleAge time.Duration + // LatestReportDeadline controls how long to wait for a response before + // retrying. Setting this to zero will wait indefinitely. + LatestReportDeadline time.Duration +} + +func NewCache(client Client, cfg Config) Cache { + return newMemCache(client, cfg) +} + +type cacheVal struct { + sync.RWMutex + + fetching bool + fetchCh chan (struct{}) + + val *pb.LatestReportResponse + err error + + expiresAt time.Time +} + +func (v *cacheVal) read() (*pb.LatestReportResponse, error) { + v.RLock() + defer v.RUnlock() + return v.val, v.err +} + +// caller expected to hold lock +func (v *cacheVal) initiateFetch() <-chan struct{} { + if v.fetching { + panic("cannot initiateFetch on cache val that is already fetching") + } + v.fetching = true + v.fetchCh = make(chan struct{}) + return v.fetchCh +} + +func (v *cacheVal) setError(err error) { + v.Lock() + defer v.Unlock() + v.err = err +} + +func (v *cacheVal) completeFetch(val *pb.LatestReportResponse, err error, expiresAt time.Time) { + v.Lock() + defer v.Unlock() + if !v.fetching { + panic("can only completeFetch on cache val that is fetching") + } + v.val = val + v.err = err + if err == nil { + v.expiresAt = expiresAt + } + close(v.fetchCh) + v.fetchCh = nil + v.fetching = false +} + +func (v *cacheVal) abandonFetch(err error) { + v.completeFetch(nil, err, time.Now()) +} + +func (v *cacheVal) waitForResult(ctx context.Context, chResult <-chan struct{}, chStop <-chan struct{}) (*pb.LatestReportResponse, error) { + select { + case <-ctx.Done(): + _, err := v.read() + return nil, errors.Join(err, ctx.Err()) + case <-chStop: + return nil, errors.New("stopped") + case <-chResult: + return v.read() + } +} + +// memCache stores values in memory +// it will never return a stale value older than latestPriceTTL, instead +// waiting for a successful fetch or caller context cancels, whichever comes +// first +type memCache struct { + services.StateMachine + lggr logger.Logger + + client Client + + latestPriceTTL time.Duration + maxStaleAge time.Duration + latestReportDeadline time.Duration + + cache sync.Map + + wg sync.WaitGroup + chStop services.StopChan +} + +func newMemCache(client Client, cfg Config) *memCache { + return &memCache{ + services.StateMachine{}, + cfg.Logger.Named("MercuryMemCache"), + client, + cfg.LatestReportTTL, + cfg.MaxStaleAge, + cfg.LatestReportDeadline, + sync.Map{}, + sync.WaitGroup{}, + make(chan (struct{})), + } +} + +// LatestReport +// NOTE: This will actually block on all types of errors, even non-timeouts. +// Context should be set carefully and timed to be the maximum time we are +// willing to wait for a result, the background thread will keep re-querying +// until it gets one even on networking errors etc. +func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { + if req == nil { + return nil, errors.New("req must not be nil") + } + if m.latestPriceTTL <= 0 { + return m.client.RawClient().LatestReport(ctx, req) + } + vi, _ := m.cache.LoadOrStore(req, &cacheVal{ + sync.RWMutex{}, + false, + nil, + nil, + nil, + time.Now(), // first result is always "expired" and requires fetch + }) + v := vi.(*cacheVal) + + // HOT PATH + v.RLock() + if time.Now().Before(v.expiresAt) { + // CACHE HIT + promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + + defer v.RUnlock() + return v.val, nil + } else if v.fetching { + // CACHE WAIT + promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + // if someone else is fetching then wait for the fetch to complete + ch := v.fetchCh + v.RUnlock() + return v.waitForResult(ctx, ch, m.chStop) + } + // CACHE MISS + promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + // fallthrough to cold path and fetch + v.RUnlock() + + // COLD PATH + v.Lock() + if time.Now().Before(v.expiresAt) { + // CACHE HIT + promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + defer v.RUnlock() + return v.val, nil + } else if v.fetching { + // CACHE WAIT + promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + // if someone else is fetching then wait for the fetch to complete + ch := v.fetchCh + v.Unlock() + return v.waitForResult(ctx, ch, m.chStop) + } + // CACHE MISS + promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + // initiate the fetch and wait for result + ch := v.initiateFetch() + v.Unlock() + + ok := m.IfStarted(func() { + m.wg.Add(1) + go m.fetch(req, v) + }) + if !ok { + err := fmt.Errorf("memCache must be started, but is: %v", m.State()) + v.abandonFetch(err) + return nil, err + } + return v.waitForResult(ctx, ch, m.chStop) +} + +const minBackoffRetryInterval = 50 * time.Millisecond + +// newBackoff creates a backoff for retrying +func (m *memCache) newBackoff() backoff.Backoff { + min := minBackoffRetryInterval + max := m.latestPriceTTL / 2 + if min > max { + // avoid setting a min that is greater than max + min = max + } + return backoff.Backoff{ + Min: min, + Max: max, + Factor: 2, + Jitter: true, + } +} + +// fetch continually tries to call FetchLatestReport and write the result to v +// it writes errors as they come up +func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) { + defer m.wg.Done() + b := m.newBackoff() + memcacheCtx, cancel := m.chStop.NewCtx() + defer cancel() + var t time.Time + var val *pb.LatestReportResponse + var err error + defer func() { + v.completeFetch(val, err, t.Add(m.latestPriceTTL)) + }() + + for { + t = time.Now() + + ctx := memcacheCtx + cancel := func() {} + if m.latestReportDeadline > 0 { + ctx, cancel = context.WithTimeoutCause(memcacheCtx, m.latestReportDeadline, errors.New("latest report fetch deadline exceeded")) + } + + // NOTE: must drop down to RawClient here otherwise we enter an + // infinite loop of calling a client that calls back to this same cache + // and on and on + val, err = m.client.RawClient().LatestReport(ctx, req) + cancel() + v.setError(err) + if memcacheCtx.Err() != nil { + // stopped + return + } else if err != nil { + m.lggr.Warnw("FetchLatestReport failed", "err", err) + promFetchFailedCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc() + select { + case <-m.chStop: + return + case <-time.After(b.Duration()): + continue + } + } + return + } +} + +func (m *memCache) Start(context.Context) error { + return m.StartOnce(m.Name(), func() error { + m.wg.Add(1) + go m.runloop() + return nil + }) +} + +func (m *memCache) runloop() { + defer m.wg.Done() + + if m.maxStaleAge == 0 { + return + } + t := time.NewTicker(utils.WithJitter(m.maxStaleAge)) + + for { + select { + case <-t.C: + m.cleanup() + t.Reset(utils.WithJitter(m.maxStaleAge)) + case <-m.chStop: + return + } + } +} + +// remove anything that has been stale for longer than maxStaleAge so that +// cache doesn't grow forever and cause memory leaks +// +// NOTE: This should be concurrent-safe with LatestReport. The only time they +// can race is if the cache item has expired past the stale age between +// creation of the cache item and start of fetch. This is unlikely, and even if +// it does occur, the worst case is that we discard a cache item early and +// double fetch, which isn't bad at all. +func (m *memCache) cleanup() { + m.cache.Range(func(k, vi any) bool { + v := vi.(*cacheVal) + v.RLock() + defer v.RUnlock() + if v.fetching { + // skip cleanup if fetching + return true + } + if time.Now().After(v.expiresAt.Add(m.maxStaleAge)) { + // garbage collection + m.cache.Delete(k) + } + return true + }) +} + +func (m *memCache) Close() error { + return m.StopOnce(m.Name(), func() error { + close(m.chStop) + m.wg.Wait() + return nil + }) +} +func (m *memCache) HealthReport() map[string]error { + return map[string]error{ + m.Name(): m.Ready(), + } +} +func (m *memCache) Name() string { return m.lggr.Name() } diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go b/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go new file mode 100644 index 00000000000..f06f1137fb3 --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache_set.go @@ -0,0 +1,120 @@ +package cache + +import ( + "context" + "fmt" + "sync" + "time" + + "golang.org/x/exp/maps" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// CacheSet holds a set of mercury caches keyed by server URL +type CacheSet interface { + services.Service + Get(ctx context.Context, client Client) (Fetcher, error) +} + +var _ CacheSet = (*cacheSet)(nil) + +type cacheSet struct { + sync.RWMutex + services.StateMachine + + lggr logger.Logger + caches map[string]Cache + + latestPriceTTL time.Duration + maxStaleAge time.Duration +} + +func NewCacheSet(cfg Config) CacheSet { + return newCacheSet(cfg) +} + +func newCacheSet(cfg Config) *cacheSet { + return &cacheSet{ + sync.RWMutex{}, + services.StateMachine{}, + cfg.Logger.Named("CacheSet"), + make(map[string]Cache), + cfg.LatestReportTTL, + cfg.MaxStaleAge, + } +} + +func (cs *cacheSet) Start(context.Context) error { + return cs.StartOnce("CacheSet", func() error { + return nil + }) +} + +func (cs *cacheSet) Close() error { + return cs.StopOnce("CacheSet", func() error { + cs.Lock() + defer cs.Unlock() + caches := maps.Values(cs.caches) + if err := services.MultiCloser(caches).Close(); err != nil { + return err + } + cs.caches = nil + return nil + }) +} + +func (cs *cacheSet) Get(ctx context.Context, client Client) (f Fetcher, err error) { + ok := cs.IfStarted(func() { + f, err = cs.get(ctx, client) + }) + if !ok { + return nil, fmt.Errorf("cacheSet must be started, but is: %v", cs.State()) + } + return +} + +func (cs *cacheSet) get(ctx context.Context, client Client) (Fetcher, error) { + sURL := client.ServerURL() + // HOT PATH + cs.RLock() + c, exists := cs.caches[sURL] + cs.RUnlock() + if exists { + return c, nil + } + + // COLD PATH + cs.Lock() + defer cs.Unlock() + c, exists = cs.caches[sURL] + if exists { + return c, nil + } + cfg := Config{ + Logger: cs.lggr.With("serverURL", sURL), + LatestReportTTL: cs.latestPriceTTL, + MaxStaleAge: cs.maxStaleAge, + } + c = newMemCache(client, cfg) + if err := c.Start(ctx); err != nil { + return nil, err + } + cs.caches[sURL] = c + return c, nil +} + +func (cs *cacheSet) HealthReport() map[string]error { + report := map[string]error{ + cs.Name(): cs.Ready(), + } + cs.RLock() + defer cs.RUnlock() + for _, c := range cs.caches { + services.CopyHealth(report, c.HealthReport()) + } + return report +} +func (cs *cacheSet) Name() string { return cs.lggr.Name() } diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go b/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go new file mode 100644 index 00000000000..9262fb52d31 --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go @@ -0,0 +1,48 @@ +package cache + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_CacheSet(t *testing.T) { + lggr := logger.TestLogger(t) + cs := newCacheSet(Config{Logger: lggr}) + ctx := testutils.Context(t) + require.NoError(t, cs.Start(ctx)) + t.Cleanup(func() { + assert.NoError(t, cs.Close()) + }) + + t.Run("Get", func(t *testing.T) { + c := &mockClient{} + + var err error + var f Fetcher + t.Run("with virgin cacheset, makes new entry and returns it", func(t *testing.T) { + assert.Len(t, cs.caches, 0) + + f, err = cs.Get(ctx, c) + require.NoError(t, err) + + assert.IsType(t, f, &memCache{}) + assert.Len(t, cs.caches, 1) + }) + t.Run("with existing cache for value, returns that", func(t *testing.T) { + var f2 Fetcher + assert.Len(t, cs.caches, 1) + + f2, err = cs.Get(ctx, c) + require.NoError(t, err) + + assert.IsType(t, f, &memCache{}) + assert.Equal(t, f, f2) + assert.Len(t, cs.caches, 1) + }) + }) +} diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache_test.go b/core/services/relay/evm/mercury/wsrpc/cache/cache_test.go new file mode 100644 index 00000000000..71f74b3b3cb --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache_test.go @@ -0,0 +1,199 @@ +package cache + +import ( + "context" + "errors" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" +) + +const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test + +func Test_Cache(t *testing.T) { + client := &mockClient{} + cfg := Config{ + Logger: logger.TestLogger(t), + } + ctx := testutils.Context(t) + + req1 := &pb.LatestReportRequest{FeedId: []byte{1}} + req2 := &pb.LatestReportRequest{FeedId: []byte{2}} + req3 := &pb.LatestReportRequest{FeedId: []byte{3}} + + t.Run("errors with nil req", func(t *testing.T) { + c := newMemCache(client, cfg) + + _, err := c.LatestReport(ctx, nil) + assert.EqualError(t, err, "req must not be nil") + }) + + t.Run("with LatestReportTTL=0 does no caching", func(t *testing.T) { + c := newMemCache(client, cfg) + + req := &pb.LatestReportRequest{} + for i := 0; i < 5; i++ { + client.resp = &pb.LatestReportResponse{Report: &pb.Report{Price: []byte(strconv.Itoa(i))}} + + resp, err := c.LatestReport(ctx, req) + require.NoError(t, err) + assert.Equal(t, client.resp, resp) + } + + client.resp = nil + client.err = errors.New("something exploded") + + resp, err := c.LatestReport(ctx, req) + assert.EqualError(t, err, "something exploded") + assert.Nil(t, resp) + }) + + t.Run("caches repeated calls to LatestReport, keyed by request", func(t *testing.T) { + cfg.LatestReportTTL = neverExpireTTL + client.err = nil + c := newMemCache(client, cfg) + + t.Run("if cache is unstarted, returns error", func(t *testing.T) { + // starting the cache is required for state management if we + // actually cache results, since fetches are initiated async and + // need to be cleaned up properly on close + _, err := c.LatestReport(ctx, &pb.LatestReportRequest{}) + assert.EqualError(t, err, "memCache must be started, but is: Unstarted") + }) + + err := c.StartOnce("test start", func() error { return nil }) + require.NoError(t, err) + + t.Run("returns cached value for key", func(t *testing.T) { + var firstResp *pb.LatestReportResponse + for i := 0; i < 5; i++ { + client.resp = &pb.LatestReportResponse{Report: &pb.Report{Price: []byte(strconv.Itoa(i))}} + if firstResp == nil { + firstResp = client.resp + } + + resp, err := c.LatestReport(ctx, req1) + require.NoError(t, err) + assert.Equal(t, firstResp, resp) + } + }) + + t.Run("cache keys do not conflict", func(t *testing.T) { + var firstResp1 *pb.LatestReportResponse + for i := 5; i < 10; i++ { + client.resp = &pb.LatestReportResponse{Report: &pb.Report{Price: []byte(strconv.Itoa(i))}} + if firstResp1 == nil { + firstResp1 = client.resp + } + + resp, err := c.LatestReport(ctx, req2) + require.NoError(t, err) + assert.Equal(t, firstResp1, resp) + } + + var firstResp2 *pb.LatestReportResponse + for i := 10; i < 15; i++ { + client.resp = &pb.LatestReportResponse{Report: &pb.Report{Price: []byte(strconv.Itoa(i))}} + if firstResp2 == nil { + firstResp2 = client.resp + } + + resp, err := c.LatestReport(ctx, req3) + require.NoError(t, err) + assert.Equal(t, firstResp2, resp) + } + + // req1 key still has same value + resp, err := c.LatestReport(ctx, req1) + require.NoError(t, err) + assert.Equal(t, []byte(strconv.Itoa(0)), resp.Report.Price) + + // req2 key still has same value + resp, err = c.LatestReport(ctx, req2) + require.NoError(t, err) + assert.Equal(t, []byte(strconv.Itoa(5)), resp.Report.Price) + }) + + t.Run("re-queries when a cache item has expired", func(t *testing.T) { + vi, exists := c.cache.Load(req1) + assert.True(t, exists) + v := vi.(*cacheVal) + v.expiresAt = time.Now().Add(-1 * time.Second) + + client.resp = &pb.LatestReportResponse{Report: &pb.Report{Price: []byte(strconv.Itoa(15))}} + + resp, err := c.LatestReport(ctx, req1) + require.NoError(t, err) + assert.Equal(t, client.resp, resp) + + // querying again yields the same cached item + resp, err = c.LatestReport(ctx, req1) + require.NoError(t, err) + assert.Equal(t, client.resp, resp) + }) + }) + + t.Run("complete fetch", func(t *testing.T) { + t.Run("does not change expiry if fetch returns error", func(t *testing.T) { + expires := time.Now().Add(-1 * time.Second) + v := &cacheVal{ + fetching: true, + fetchCh: make(chan (struct{})), + val: nil, + err: nil, + expiresAt: expires, + } + v.completeFetch(nil, errors.New("foo"), time.Now().Add(neverExpireTTL)) + assert.Equal(t, expires, v.expiresAt) + + v = &cacheVal{ + fetching: true, + fetchCh: make(chan (struct{})), + val: nil, + err: nil, + expiresAt: expires, + } + expires = time.Now().Add(neverExpireTTL) + v.completeFetch(nil, nil, expires) + assert.Equal(t, expires, v.expiresAt) + }) + }) + + t.Run("timeouts", func(t *testing.T) { + c := newMemCache(client, cfg) + // simulate fetch already executing in background + v := &cacheVal{ + fetching: true, + fetchCh: make(chan (struct{})), + val: nil, + err: nil, + expiresAt: time.Now().Add(-1 * time.Second), + } + c.cache.Store(req1, v) + + canceledCtx, cancel := context.WithCancel(testutils.Context(t)) + cancel() + + t.Run("returns context deadline exceeded error if fetch takes too long", func(t *testing.T) { + _, err := c.LatestReport(canceledCtx, req1) + require.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + assert.EqualError(t, err, "context canceled") + }) + t.Run("returns wrapped context deadline exceeded error if fetch has errored and is in the retry loop", func(t *testing.T) { + v.err = errors.New("some background fetch error") + + _, err := c.LatestReport(canceledCtx, req1) + require.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + assert.EqualError(t, err, "some background fetch error\ncontext canceled") + }) + }) +} diff --git a/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go b/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go new file mode 100644 index 00000000000..4cc08bdd52e --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go @@ -0,0 +1,38 @@ +package cache + +import ( + "context" + + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" +) + +var _ Client = &mockClient{} + +type mockClient struct { + resp *pb.LatestReportResponse + err error +} + +func (m *mockClient) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { + return m.resp, m.err +} + +func (m *mockClient) ServerURL() string { + return "mock client url" +} + +func (m *mockClient) RawClient() pb.MercuryClient { + return &mockRawClient{m.resp, m.err} +} + +type mockRawClient struct { + resp *pb.LatestReportResponse + err error +} + +func (m *mockRawClient) Transmit(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + return nil, nil +} +func (m *mockRawClient) LatestReport(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + return m.resp, m.err +} diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index c04c00074a2..5cdf1f44e96 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -19,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -63,6 +64,8 @@ var ( type Client interface { services.Service pb.MercuryClient + ServerURL() string + RawClient() pb.MercuryClient } type Conn interface { @@ -78,15 +81,18 @@ type client struct { serverPubKey []byte serverURL string - logger logger.Logger - conn Conn - client pb.MercuryClient + logger logger.Logger + conn Conn + rawClient pb.MercuryClient consecutiveTimeoutCnt atomic.Int32 wg sync.WaitGroup chStop services.StopChan chResetTransport chan struct{} + cacheSet cache.CacheSet + cache cache.Fetcher + timeoutCountMetric prometheus.Counter dialCountMetric prometheus.Counter dialSuccessCountMetric prometheus.Counter @@ -95,17 +101,18 @@ type client struct { } // Consumers of wsrpc package should not usually call NewClient directly, but instead use the Pool -func NewClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string) Client { - return newClient(lggr, clientPrivKey, serverPubKey, serverURL) +func NewClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string, cacheSet cache.CacheSet) Client { + return newClient(lggr, clientPrivKey, serverPubKey, serverURL, cacheSet) } -func newClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string) *client { +func newClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string, cacheSet cache.CacheSet) *client { return &client{ csaKey: clientPrivKey, serverPubKey: serverPubKey, serverURL: serverURL, logger: lggr.Named("WSRPC").With("mercuryServerURL", serverURL), chResetTransport: make(chan struct{}, 1), + cacheSet: cacheSet, chStop: make(services.StopChan), timeoutCountMetric: timeoutCount.WithLabelValues(serverURL), dialCountMetric: dialCount.WithLabelValues(serverURL), @@ -115,9 +122,15 @@ func newClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []by } } -func (w *client) Start(_ context.Context) error { - return w.StartOnce("WSRPC Client", func() error { - if err := w.dial(context.Background()); err != nil { +func (w *client) Start(ctx context.Context) error { + return w.StartOnce("WSRPC Client", func() (err error) { + // NOTE: This is not a mistake, dial is non-blocking so it should use a + // background context, not the Start context + if err = w.dial(context.Background()); err != nil { + return err + } + w.cache, err = w.cacheSet.Get(ctx, w) + if err != nil { return err } w.wg.Add(1) @@ -148,7 +161,7 @@ func (w *client) dial(ctx context.Context, opts ...wsrpc.DialOption) error { w.dialSuccessCountMetric.Inc() setLivenessMetric(true) w.conn = conn - w.client = pb.NewMercuryClient(conn) + w.rawClient = pb.NewMercuryClient(conn) return nil } @@ -242,7 +255,7 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p if err = w.waitForReady(ctx); err != nil { return nil, errors.Wrap(err, "Transmit call failed") } - resp, err = w.client.Transmit(ctx, req) + resp, err = w.rawClient.Transmit(ctx, req) if errors.Is(err, context.DeadlineExceeded) { w.timeoutCountMetric.Inc() cnt := w.consecutiveTimeoutCnt.Add(1) @@ -290,7 +303,11 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) if err = w.waitForReady(ctx); err != nil { return nil, errors.Wrap(err, "LatestReport failed") } - resp, err = w.client.LatestReport(ctx, req) + if w.cache == nil { + resp, err = w.rawClient.LatestReport(ctx, req) + } else { + resp, err = w.cache.LatestReport(ctx, req) + } if err != nil { lggr.Errorw("LatestReport failed", "err", err, "resp", resp) } else if resp.Error != "" { @@ -300,3 +317,11 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) } return } + +func (w *client) ServerURL() string { + return w.serverURL +} + +func (w *client) RawClient() pb.MercuryClient { + return w.rawClient +} diff --git a/core/services/relay/evm/mercury/wsrpc/client_test.go b/core/services/relay/evm/mercury/wsrpc/client_test.go index 2410cbc52f1..d5d1b1b5ee5 100644 --- a/core/services/relay/evm/mercury/wsrpc/client_test.go +++ b/core/services/relay/evm/mercury/wsrpc/client_test.go @@ -3,6 +3,7 @@ package wsrpc import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -10,15 +11,48 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" mocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) +var _ cache.CacheSet = &mockCacheSet{} + +type mockCacheSet struct{} + +func (m *mockCacheSet) Get(ctx context.Context, client cache.Client) (cache.Fetcher, error) { + return nil, nil +} +func (m *mockCacheSet) Start(context.Context) error { return nil } +func (m *mockCacheSet) Ready() error { return nil } +func (m *mockCacheSet) HealthReport() map[string]error { return nil } +func (m *mockCacheSet) Name() string { return "" } +func (m *mockCacheSet) Close() error { return nil } + +var _ cache.Cache = &mockCache{} + +type mockCache struct{} + +func (m *mockCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { + return nil, nil +} +func (m *mockCache) Start(context.Context) error { return nil } +func (m *mockCache) Ready() error { return nil } +func (m *mockCache) HealthReport() map[string]error { return nil } +func (m *mockCache) Name() string { return "" } +func (m *mockCache) Close() error { return nil } + +func newNoopCacheSet() cache.CacheSet { + return &mockCacheSet{} +} + func Test_Client_Transmit(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) req := &pb.TransmitRequest{} + noopCacheSet := newNoopCacheSet() + t.Run("sends on reset channel after MaxConsecutiveTransmitFailures timed out transmits", func(t *testing.T) { calls := 0 transmitErr := context.DeadlineExceeded @@ -31,9 +65,9 @@ func Test_Client_Transmit(t *testing.T) { conn := &mocks.MockConn{ Ready: true, } - c := newClient(lggr, csakey.KeyV2{}, nil, "") + c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet) c.conn = conn - c.client = wsrpcClient + c.rawClient = wsrpcClient require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) for i := 1; i < MaxConsecutiveTransmitFailures; i++ { _, err := c.Transmit(ctx, req) @@ -73,3 +107,109 @@ func Test_Client_Transmit(t *testing.T) { }) }) } + +func Test_Client_LatestReport(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + + t.Run("with nil cache", func(t *testing.T) { + req := &pb.LatestReportRequest{} + noopCacheSet := newNoopCacheSet() + resp := &pb.LatestReportResponse{} + + wsrpcClient := &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + assert.Equal(t, req, in) + return resp, nil + }, + } + + conn := &mocks.MockConn{ + Ready: true, + } + c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet) + c.conn = conn + c.rawClient = wsrpcClient + require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) + + r, err := c.LatestReport(ctx, req) + + require.NoError(t, err) + assert.Equal(t, resp, r) + }) + + t.Run("with cache disabled", func(t *testing.T) { + req := &pb.LatestReportRequest{} + cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: 0, Logger: lggr}) + resp := &pb.LatestReportResponse{} + + var calls int + wsrpcClient := &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + calls++ + assert.Equal(t, req, in) + return resp, nil + }, + } + + conn := &mocks.MockConn{ + Ready: true, + } + c := newClient(lggr, csakey.KeyV2{}, nil, "", cacheSet) + c.conn = conn + c.rawClient = wsrpcClient + + // simulate start without dialling + require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) + var err error + require.NoError(t, cacheSet.Start(ctx)) + c.cache, err = cacheSet.Get(ctx, c) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + r, err := c.LatestReport(ctx, req) + + require.NoError(t, err) + assert.Equal(t, resp, r) + } + assert.Equal(t, 5, calls, "expected 5 calls to LatestReport but it was called %d times", calls) + }) + + t.Run("with caching", func(t *testing.T) { + req := &pb.LatestReportRequest{} + const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test + cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: neverExpireTTL, Logger: lggr}) + resp := &pb.LatestReportResponse{} + + var calls int + wsrpcClient := &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + calls++ + assert.Equal(t, req, in) + return resp, nil + }, + } + + conn := &mocks.MockConn{ + Ready: true, + } + c := newClient(lggr, csakey.KeyV2{}, nil, "", cacheSet) + c.conn = conn + c.rawClient = wsrpcClient + + // simulate start without dialling + require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) + var err error + require.NoError(t, cacheSet.Start(ctx)) + c.cache, err = cacheSet.Get(ctx, c) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + r, err := c.LatestReport(ctx, req) + + require.NoError(t, err) + assert.Equal(t, resp, r) + } + assert.Equal(t, 1, calls, "expected only 1 call to LatestReport but it was called %d times", calls) + }) +} diff --git a/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go b/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go index d764143c5fc..c0caf0dee12 100644 --- a/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go +++ b/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go @@ -24,6 +24,9 @@ func (m MockWSRPCClient) Transmit(ctx context.Context, in *pb.TransmitRequest) ( func (m MockWSRPCClient) LatestReport(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { return m.LatestReportF(ctx, in) } +func (m MockWSRPCClient) ServerURL() string { return "mock server url" } + +func (m MockWSRPCClient) RawClient() pb.MercuryClient { return nil } type MockConn struct { State connectivity.State diff --git a/core/services/relay/evm/mercury/wsrpc/pool.go b/core/services/relay/evm/mercury/wsrpc/pool.go index 6630a78437e..6c525741ddc 100644 --- a/core/services/relay/evm/mercury/wsrpc/pool.go +++ b/core/services/relay/evm/mercury/wsrpc/pool.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -58,7 +59,7 @@ func (conn *connection) checkout(ctx context.Context) (cco *clientCheckout, err // not thread-safe, access must be serialized func (conn *connection) ensureStartedClient(ctx context.Context) error { if len(conn.checkouts) == 0 { - conn.Client = conn.pool.newClient(conn.lggr, conn.clientPrivKey, conn.serverPubKey, conn.serverURL) + conn.Client = conn.pool.newClient(conn.lggr, conn.clientPrivKey, conn.serverPubKey, conn.serverURL, conn.pool.cacheSet) return conn.Client.Start(ctx) } return nil @@ -119,16 +120,19 @@ type pool struct { connections map[string]map[credentials.StaticSizedPublicKey]*connection // embedding newClient makes testing/mocking easier - newClient func(lggr logger.Logger, privKey csakey.KeyV2, serverPubKey []byte, serverURL string) Client + newClient func(lggr logger.Logger, privKey csakey.KeyV2, serverPubKey []byte, serverURL string, cacheSet cache.CacheSet) Client mu sync.RWMutex + cacheSet cache.CacheSet + closed bool } -func NewPool(lggr logger.Logger) Pool { +func NewPool(lggr logger.Logger, cacheCfg cache.Config) Pool { p := newPool(lggr.Named("Mercury.WSRPCPool")) p.newClient = NewClient + p.cacheSet = cache.NewCacheSet(cacheCfg) return p } @@ -188,7 +192,9 @@ func (p *pool) newConnection(lggr logger.Logger, clientPrivKey csakey.KeyV2, ser } } -func (p *pool) Start(_ context.Context) error { return nil } +func (p *pool) Start(ctx context.Context) error { + return p.cacheSet.Start(ctx) +} func (p *pool) Close() (merr error) { p.mu.Lock() @@ -199,6 +205,7 @@ func (p *pool) Close() (merr error) { merr = errors.Join(merr, conn.forceCloseAll()) } } + merr = errors.Join(merr, p.cacheSet.Close()) return } diff --git a/core/services/relay/evm/mercury/wsrpc/pool_test.go b/core/services/relay/evm/mercury/wsrpc/pool_test.go index 980b9f4d742..3d418d39d87 100644 --- a/core/services/relay/evm/mercury/wsrpc/pool_test.go +++ b/core/services/relay/evm/mercury/wsrpc/pool_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -19,8 +20,9 @@ import ( var _ Client = &mockClient{} type mockClient struct { - started bool - closed bool + started bool + closed bool + rawClient pb.MercuryClient } func (c *mockClient) Transmit(ctx context.Context, in *pb.TransmitRequest) (out *pb.TransmitResponse, err error) { @@ -40,6 +42,8 @@ func (c *mockClient) Close() error { func (c *mockClient) Name() string { return "mock client" } func (c *mockClient) Ready() error { return nil } func (c *mockClient) HealthReport() map[string]error { return nil } +func (c *mockClient) ServerURL() string { return "mock client url" } +func (c *mockClient) RawClient() pb.MercuryClient { return c.rawClient } func newMockClient(lggr logger.Logger) *mockClient { return &mockClient{} @@ -52,6 +56,7 @@ func Test_Pool(t *testing.T) { t.Run("Checkout", func(t *testing.T) { p := newPool(lggr) + p.cacheSet = &mockCacheSet{} t.Run("checks out one started client", func(t *testing.T) { clientPrivKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(rand.Int63())) @@ -59,7 +64,7 @@ func Test_Pool(t *testing.T) { serverURL := "example.com:443/ws" client := newMockClient(lggr) - p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string) Client { + p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string, cs cache.CacheSet) Client { assert.Equal(t, clientPrivKey, cprivk) assert.Equal(t, serverPubKey, spubk) assert.Equal(t, serverURL, surl) @@ -105,7 +110,7 @@ func Test_Pool(t *testing.T) { "example.invalid:8000/ws", } - p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string) Client { + p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string, cs cache.CacheSet) Client { return newMockClient(lggr) } @@ -199,6 +204,7 @@ func Test_Pool(t *testing.T) { }) p := newPool(lggr) + p.cacheSet = &mockCacheSet{} t.Run("Name", func(t *testing.T) { assert.Equal(t, "PoolTestLogger", p.Name()) @@ -220,7 +226,7 @@ func Test_Pool(t *testing.T) { } var clients []*mockClient - p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string) Client { + p.newClient = func(lggr logger.Logger, cprivk csakey.KeyV2, spubk []byte, surl string, cs cache.CacheSet) Client { c := newMockClient(lggr) clients = append(clients, c) return c diff --git a/core/utils/hash.go b/core/utils/hash.go index bcae823770e..b0a32454e2f 100644 --- a/core/utils/hash.go +++ b/core/utils/hash.go @@ -8,12 +8,32 @@ import ( "github.com/pkg/errors" ) +const HashLength = 32 + // Hash is a simplified version of go-ethereum's common.Hash to avoid // go-ethereum dependency // It represents a 32 byte fixed size array that marshals/unmarshals assuming a // 0x prefix type Hash [32]byte +// BytesToHash sets b to hash. +// If b is larger than len(h), b will be cropped from the left. +func BytesToHash(b []byte) Hash { + var h Hash + h.SetBytes(b) + return h +} + +// SetBytes sets the hash to the value of b. +// If b is larger than len(h), b will be cropped from the left. +func (h *Hash) SetBytes(b []byte) { + if len(b) > len(h) { + b = b[len(b)-HashLength:] + } + + copy(h[HashLength-len(b):], b) +} + // Hex converts a hash to a hex string. func (h Hash) Hex() string { return fmt.Sprintf("0x%s", hex.EncodeToString(h[:])) } diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index 8f3135b34e4..2531e7c281d 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -234,3 +234,9 @@ NodeID = '' SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' + +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index 7e9872e9554..cd0bce3cc73 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -245,6 +245,12 @@ TLSCertPath = '/path/to/cert.pem' env = 'dev' test = 'load' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1m40s' +MaxStaleAge = '1m41s' +LatestReportDeadline = '1m42s' + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 6a60dfd419a..371cc50a170 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -235,6 +235,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 265222c8bec..d9a785ff518 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -13,11 +13,42 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added distributed tracing in the OpenTelemetry trace format to the node, currently focused at the LOOPP Plugin development effort. This includes a new set of `Tracing` TOML configurations. The default for collecting traces is off - you must explicitly enable traces and setup a valid OpenTelemetry collector. Refer to `.github/tracing/README.md` for more details. - Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`. -- New prom metrics for mercury: +- New prom metrics for mercury transmit queue: `mercury_transmit_queue_delete_error_count` `mercury_transmit_queue_insert_error_count` `mercury_transmit_queue_push_error_count` Nops should consider alerting on these. +- Mercury now implements a local cache for fetching prices for fees, which ought to reduce latency and load on the mercury server, as well as increasing performance. It is enabled by default and can be configured with the following new config variables: +``` +[Mercury] + +# Mercury.Cache controls settings for the price retrieval cache querying a mercury server +[Mercury.Cache] +# LatestReportTTL controls how "stale" we will allow a price to be e.g. if +# set to 1s, a new price will always be fetched if the last result was +# from 1 second ago or older. +# +# Another way of looking at it is such: the cache will _never_ return a +# price that was queried from now-LatestReportTTL or before. +# +# Setting to zero disables caching entirely. +LatestReportTTL = "1s" # Default +# MaxStaleAge is that maximum amount of time that a value can be stale +# before it is deleted from the cache (a form of garbage collection). +# +# This should generally be set to something much larger than +# LatestReportTTL. Setting to zero disables garbage collection. +MaxStaleAge = "1h" # Default +# LatestReportDeadline controls how long to wait for a response from the +# mercury server before retrying. Setting this to zero will wait indefinitely. +LatestReportDeadline = "5s" # Default +``` +- New prom metrics for the mercury cache: + `mercury_cache_fetch_failure_count` + `mercury_cache_hit_count` + `mercury_cache_wait_count` + `mercury_cache_miss_count` + ### Changed diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 63c20bdf4a5..61d079fa4a6 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1652,6 +1652,51 @@ env = 'test' # Example ``` env is an example user specified key-value pair +## Mercury +```toml +[Mercury] +``` + + +## Mercury.Cache +```toml +[Mercury.Cache] +LatestReportTTL = "1s" # Default +MaxStaleAge = "1h" # Default +LatestReportDeadline = "5s" # Default +``` +Mercury.Cache controls settings for the price retrieval cache querying a mercury server + +### LatestReportTTL +```toml +LatestReportTTL = "1s" # Default +``` +LatestReportTTL controls how "stale" we will allow a price to be e.g. if +set to 1s, a new price will always be fetched if the last result was +from 1 second ago or older. + +Another way of looking at it is such: the cache will _never_ return a +price that was queried from now-LatestReportTTL or before. + +Setting to zero disables caching entirely. + +### MaxStaleAge +```toml +MaxStaleAge = "1h" # Default +``` +MaxStaleAge is that maximum amount of time that a value can be stale +before it is deleted from the cache (a form of garbage collection). + +This should generally be set to something much larger than +LatestReportTTL. Setting to zero disables garbage collection. + +### LatestReportDeadline +```toml +LatestReportDeadline = "5s" # Default +``` +LatestReportDeadline controls how long to wait for a response from the +mercury server before retrying. Setting this to zero will wait indefinitely. + ## EVM EVM defaults depend on ChainID: diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 267a760f08c..0b841f694be 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -247,6 +247,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + Invalid configuration: invalid secrets: 2 errors: - Database.URL: empty: must be provided and non-empty - Password.Keystore: empty: must be provided and non-empty diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index e6281e8d221..45b08f0e52f 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -291,6 +291,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 65d832aa82e..2869af3e2de 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -291,6 +291,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 6b9e3d56ce6..fb705819fc2 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -291,6 +291,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index aa2036413c7..7b82d3323b1 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -281,6 +281,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index 4ceb9d5df35..91fe0952dd8 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -288,6 +288,12 @@ SamplingRatio = 0.0 Mode = 'tls' TLSCertPath = '' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index e4ff2aa35ea..a10d6df537c 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -296,6 +296,12 @@ SamplingRatio = 0.0 Mode = 'unencrypted' TLSCertPath = 'something' +[Mercury] +[Mercury.Cache] +LatestReportTTL = '1s' +MaxStaleAge = '1h0m0s' +LatestReportDeadline = '5s' + # Configuration warning: 3 errors: - P2P.V1: is deprecated and will be removed in a future version