From 8c8994e24284236645509b4c49152e6270ce0e35 Mon Sep 17 00:00:00 2001 From: george-dorin <120329946+george-dorin@users.noreply.github.com> Date: Fri, 26 Apr 2024 15:55:40 +0300 Subject: [PATCH 1/5] Validate config on NewApplication (#12997) * validate config on new application spawn * Add changeset * Check config only in the rebroadcast-transactions command --- .changeset/pink-schools-provide.md | 5 +++++ core/cmd/shell_local.go | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 .changeset/pink-schools-provide.md diff --git a/.changeset/pink-schools-provide.md b/.changeset/pink-schools-provide.md new file mode 100644 index 00000000000..6b2aa5ea0c4 --- /dev/null +++ b/.changeset/pink-schools-provide.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix Fixed an issue where the `rebroadcast-transactions` commands did not execute config validation. diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 6dbffbe404a..24cb43e2090 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -580,6 +580,11 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { } } + err = s.Config.Validate() + if err != nil { + return err + } + lggr := logger.Sugared(s.Logger.Named("RebroadcastTransactions")) db, err := pg.OpenUnlockedDB(s.Config.AppID(), s.Config.Database()) if err != nil { From f55d8be495a83c97ac5439672563400e12ec2ee7 Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 26 Apr 2024 10:10:40 -0400 Subject: [PATCH 2/5] Configurable Mercury transmitter parameters (#12680) * Configurable Mercury transmitter parameters * Changeset * Remove commented code * add tag * Rename --- .changeset/sour-jars-cross.md | 13 ++++++ core/cmd/shell.go | 5 ++- core/config/docs/core.toml | 14 +++++++ core/config/mercury_config.go | 7 ++++ core/config/toml/types.go | 20 ++++++++- core/internal/cltest/cltest.go | 3 +- core/services/chainlink/config_mercury.go | 21 ++++++++++ core/services/chainlink/config_test.go | 8 ++++ core/services/chainlink/relayer_factory.go | 9 ++-- .../testdata/config-empty-effective.toml | 4 ++ .../chainlink/testdata/config-full.toml | 4 ++ .../config-multi-chain-effective.toml | 4 ++ core/services/ocr2/delegate.go | 1 + core/services/relay/evm/evm.go | 25 ++++++----- .../services/relay/evm/mercury/transmitter.go | 27 +++++++----- .../relay/evm/mercury/transmitter_test.go | 42 ++++++++++++------- .../evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go | 1 + .../testdata/config-empty-effective.toml | 4 ++ core/web/resolver/testdata/config-full.toml | 4 ++ .../config-multi-chain-effective.toml | 4 ++ docs/CONFIG.md | 27 ++++++++++++ testdata/scripts/node/validate/default.txtar | 4 ++ .../disk-based-logging-disabled.txtar | 4 ++ .../validate/disk-based-logging-no-dir.txtar | 4 ++ .../node/validate/disk-based-logging.txtar | 4 ++ .../node/validate/invalid-ocr-p2p.txtar | 4 ++ testdata/scripts/node/validate/invalid.txtar | 4 ++ testdata/scripts/node/validate/valid.txtar | 4 ++ testdata/scripts/node/validate/warnings.txtar | 4 ++ 29 files changed, 234 insertions(+), 45 deletions(-) create mode 100644 .changeset/sour-jars-cross.md diff --git a/.changeset/sour-jars-cross.md b/.changeset/sour-jars-cross.md new file mode 100644 index 00000000000..b904e8e3dd0 --- /dev/null +++ b/.changeset/sour-jars-cross.md @@ -0,0 +1,13 @@ +--- +"chainlink": patch +--- + +#added + +Add configurability to mercury transmitter + +```toml +[Mercury.Transmitter] +TransmitQueueMaxSize = 10_000 # Default +TransmitTimeout = "5s" # Default +``` diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 0372148e742..adbb66ce63f 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -174,8 +174,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } evmFactoryCfg := chainlink.EVMFactoryConfig{ - CSAETHKeystore: keyStore, - ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, + CSAETHKeystore: keyStore, + ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, + MercuryTransmitter: cfg.Mercury().Transmitter(), } // evm always enabled for backward compatibility // TODO BCF-2510 this needs to change in order to clear the path for EVM extraction diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 605f6ced0bc..92d75430daf 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -622,3 +622,17 @@ LatestReportDeadline = "5s" # Default [Mercury.TLS] # CertFile is the path to a PEM file of trusted root certificate authority certificates CertFile = "/path/to/client/certs.pem" # Example + +# Mercury.Transmitter controls settings for the mercury transmitter +[Mercury.Transmitter] +# TransmitQueueMaxSize controls the size of the transmit queue. This is scoped +# per OCR instance. If the queue is full, the transmitter will start dropping +# the oldest messages in order to make space. +# +# This is useful if mercury server goes offline and the nop needs to buffer +# transmissions. +TransmitQueueMaxSize = 10_000 # Default +# TransmitTimeout controls how long the transmitter will wait for a response +# when sending a message to the mercury server, before aborting and considering +# the transmission to be failed. +TransmitTimeout = "5s" # Default diff --git a/core/config/mercury_config.go b/core/config/mercury_config.go index 1210fd282ef..f16fc4661a5 100644 --- a/core/config/mercury_config.go +++ b/core/config/mercury_config.go @@ -3,6 +3,7 @@ package config import ( "time" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -16,8 +17,14 @@ type MercuryTLS interface { CertFile() string } +type MercuryTransmitter interface { + TransmitQueueMaxSize() uint32 + TransmitTimeout() commonconfig.Duration +} + type Mercury interface { Credentials(credName string) *types.MercuryCredentials Cache() MercuryCache TLS() MercuryTLS + Transmitter() MercuryTransmitter } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index ed52c21e34e..ba74528b3b6 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1312,14 +1312,30 @@ func (m *MercuryTLS) ValidateConfig() (err error) { return } +type MercuryTransmitter struct { + TransmitQueueMaxSize *uint32 + TransmitTimeout *commonconfig.Duration +} + +func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) { + if v := f.TransmitQueueMaxSize; v != nil { + m.TransmitQueueMaxSize = v + } + if v := f.TransmitTimeout; v != nil { + m.TransmitTimeout = v + } +} + type Mercury struct { - Cache MercuryCache `toml:",omitempty"` - TLS MercuryTLS `toml:",omitempty"` + Cache MercuryCache `toml:",omitempty"` + TLS MercuryTLS `toml:",omitempty"` + Transmitter MercuryTransmitter `toml:",omitempty"` } func (m *Mercury) setFrom(f *Mercury) { m.Cache.setFrom(&f.Cache) m.TLS.setFrom(&f.TLS) + m.Transmitter.setFrom(&f.Transmitter) } func (m *Mercury) ValidateConfig() (err error) { diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index dc7079e44d9..58cedbb96e1 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -369,7 +369,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn MailMon: mailMon, DS: ds, }, - CSAETHKeystore: keyStore, + CSAETHKeystore: keyStore, + MercuryTransmitter: cfg.Mercury().Transmitter(), } if cfg.EVMEnabled() { diff --git a/core/services/chainlink/config_mercury.go b/core/services/chainlink/config_mercury.go index 27303a68899..1b64e0bde45 100644 --- a/core/services/chainlink/config_mercury.go +++ b/core/services/chainlink/config_mercury.go @@ -3,6 +3,7 @@ package chainlink import ( "time" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/config" @@ -25,6 +26,8 @@ func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration { return m.c.LatestReportDeadline.Duration() } +var _ config.MercuryTLS = (*mercuryTLSConfig)(nil) + type mercuryTLSConfig struct { c toml.MercuryTLS } @@ -33,6 +36,20 @@ func (m *mercuryTLSConfig) CertFile() string { return *m.c.CertFile } +var _ config.MercuryTransmitter = (*mercuryTransmitterConfig)(nil) + +type mercuryTransmitterConfig struct { + c toml.MercuryTransmitter +} + +func (m *mercuryTransmitterConfig) TransmitQueueMaxSize() uint32 { + return *m.c.TransmitQueueMaxSize +} + +func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration { + return *m.c.TransmitTimeout +} + type mercuryConfig struct { c toml.Mercury s toml.MercurySecrets @@ -60,3 +77,7 @@ func (m *mercuryConfig) Cache() config.MercuryCache { func (m *mercuryConfig) TLS() config.MercuryTLS { return &mercuryTLSConfig{c: m.c.TLS} } + +func (m *mercuryConfig) Transmitter() config.MercuryTransmitter { + return &mercuryTransmitterConfig{c: m.c.Transmitter} +} diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index d02948fd07b..0d40697345d 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -710,6 +710,10 @@ func TestConfig_Marshal(t *testing.T) { TLS: toml.MercuryTLS{ CertFile: ptr("/path/to/cert.pem"), }, + Transmitter: toml.MercuryTransmitter{ + TransmitQueueMaxSize: ptr(uint32(123)), + TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second), + }, } for _, tt := range []struct { @@ -1165,6 +1169,10 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '/path/to/cert.pem' + +[Mercury.Transmitter] +TransmitQueueMaxSize = 123 +TransmitTimeout = '3m54s' `}, {"full", full, fullTOML}, {"multi-chain", multiChain, multiChainTOML}, diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 00db81cce37..31645b7c54d 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -19,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -38,6 +39,7 @@ type RelayerFactory struct { type EVMFactoryConfig struct { legacyevm.ChainOpts evmrelay.CSAETHKeystore + coreconfig.MercuryTransmitter } func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) { @@ -67,9 +69,10 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m } relayerOpts := evmrelay.RelayerOpts{ - DS: ccOpts.DS, - CSAETHKeystore: config.CSAETHKeystore, - MercuryPool: r.MercuryPool, + DS: ccOpts.DS, + CSAETHKeystore: config.CSAETHKeystore, + MercuryPool: r.MercuryPool, + TransmitterConfig: config.MercuryTransmitter, } relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts) if err2 != nil { diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index 759a380d15c..38c3ed62017 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -230,6 +230,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 8a016149e59..b199ae530f5 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -240,6 +240,10 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '/path/to/cert.pem' +[Mercury.Transmitter] +TransmitQueueMaxSize = 123 +TransmitTimeout = '3m54s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 13 diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index a6cba2aaac3..7aa3bb50b35 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -230,6 +230,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index dbde65efe40..4e1eb0cc623 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -198,6 +198,7 @@ type mercuryConfig interface { Credentials(credName string) *types.MercuryCredentials Cache() coreconfig.MercuryCache TLS() coreconfig.MercuryTLS + Transmitter() coreconfig.MercuryTransmitter } type thresholdConfig interface { diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 9097c217590..737a8e7561e 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -78,7 +78,8 @@ type Relayer struct { codec commontypes.Codec // Mercury - mercuryORM mercury.ORM + mercuryORM mercury.ORM + transmitterCfg mercury.TransmitterConfig // LLO/data streams cdcFactory llo.ChannelDefinitionCacheFactory @@ -93,7 +94,8 @@ type CSAETHKeystore interface { type RelayerOpts struct { DS sqlutil.DataSource CSAETHKeystore - MercuryPool wsrpc.Pool + MercuryPool wsrpc.Pool + TransmitterConfig mercury.TransmitterConfig } func (c RelayerOpts) Validate() error { @@ -122,14 +124,15 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R lloORM := llo.NewORM(opts.DS, chain.ID()) cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller()) return &Relayer{ - ds: opts.DS, - chain: chain, - lggr: lggr, - ks: opts.CSAETHKeystore, - mercuryPool: opts.MercuryPool, - cdcFactory: cdcFactory, - lloORM: lloORM, - mercuryORM: mercuryORM, + ds: opts.DS, + chain: chain, + lggr: lggr, + ks: opts.CSAETHKeystore, + mercuryPool: opts.MercuryPool, + cdcFactory: cdcFactory, + lloORM: lloORM, + mercuryORM: mercuryORM, + transmitterCfg: opts.TransmitterConfig, }, nil } @@ -246,7 +249,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty default: return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) } - transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec) + transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec) return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 6f49ca91bfc..82a76450e5f 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -23,6 +23,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" @@ -33,12 +34,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -var ( - maxTransmitQueueSize = 10_000 - maxDeleteQueueSize = 10_000 - transmitTimeout = 5 * time.Second -) - const ( // Mercury server error codes DuplicateReport = 2 @@ -104,9 +99,15 @@ type TransmitterReportDecoder interface { var _ Transmitter = (*mercuryTransmitter)(nil) +type TransmitterConfig interface { + TransmitQueueMaxSize() uint32 + TransmitTimeout() commonconfig.Duration +} + type mercuryTransmitter struct { services.StateMachine lggr logger.Logger + cfg TransmitterConfig servers map[string]*server @@ -142,6 +143,8 @@ func getPayloadTypes() abi.Arguments { type server struct { lggr logger.Logger + transmitTimeout time.Duration + c wsrpc.Client pm *PersistenceManager q *TransmitQueue @@ -221,7 +224,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed // queue was closed return } - ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout)) + ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(s.transmitTimeout)) res, err := s.c.Transmit(ctx, t.Req) cancel() if runloopCtx.Err() != nil { @@ -272,18 +275,19 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed } } -func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { +func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) servers := make(map[string]*server, len(clients)) for serverURL, client := range clients { cLggr := lggr.Named(serverURL).With("serverURL", serverURL) - pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) + pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) servers[serverURL] = &server{ cLggr, + cfg.TransmitTimeout().Duration(), client, pm, - NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm), - make(chan *pb.TransmitRequest, maxDeleteQueueSize), + NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), + make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), @@ -295,6 +299,7 @@ func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAcc return &mercuryTransmitter{ services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), + cfg, servers, codec, feedID, diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 46bf116ed3a..b0da9bea635 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -4,6 +4,7 @@ import ( "context" "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -12,6 +13,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -21,6 +23,16 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) +type mockCfg struct{} + +func (m mockCfg) TransmitQueueMaxSize() uint32 { + return 10_000 +} + +func (m mockCfg) TransmitTimeout() commonconfig.Duration { + return *commonconfig.MustNewDuration(1 * time.Hour) +} + func Test_MercuryTransmitter_Transmit(t *testing.T) { lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) @@ -36,7 +48,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV1Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -50,7 +62,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV2Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -64,7 +76,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV3Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -83,7 +95,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { clients[sURL2] = c clients[sURL3] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) mt.servers[sURL2].q.Init([]*Transmission{}) @@ -125,7 +137,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -141,7 +153,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -155,7 +167,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestTimestamp(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -185,7 +197,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -228,7 +240,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) { codec.val = originalPrice @@ -259,7 +271,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.NoError(t, err) @@ -273,7 +285,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -303,7 +315,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -319,7 +331,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -332,7 +344,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -350,7 +362,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x") diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index 4d05db4380f..0c31a1d7ac9 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -11,6 +11,7 @@ import ( ) // MercuryClient is the client API for Mercury service. +// type MercuryClient interface { Transmit(ctx context.Context, in *TransmitRequest) (*TransmitResponse, error) LatestReport(ctx context.Context, in *LatestReportRequest) (*LatestReportResponse, error) diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index 759a380d15c..38c3ed62017 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -230,6 +230,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index 69d56974130..75fad4d2fc9 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -240,6 +240,10 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 123 +TransmitTimeout = '3m54s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 13 diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index a6cba2aaac3..7aa3bb50b35 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -230,6 +230,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 0596fcdd84d..f93d990413f 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1698,6 +1698,33 @@ CertFile = "/path/to/client/certs.pem" # Example ``` CertFile is the path to a PEM file of trusted root certificate authority certificates +## Mercury.Transmitter +```toml +[Mercury.Transmitter] +TransmitQueueMaxSize = 10_000 # Default +TransmitTimeout = "5s" # Default +``` +Mercury.Transmitter controls settings for the mercury transmitter + +### TransmitQueueMaxSize +```toml +TransmitQueueMaxSize = 10_000 # Default +``` +TransmitQueueMaxSize controls the size of the transmit queue. This is scoped +per OCR instance. If the queue is full, the transmitter will start dropping +the oldest messages in order to make space. + +This is useful if mercury server goes offline and the nop needs to buffer +transmissions. + +### TransmitTimeout +```toml +TransmitTimeout = "5s" # Default +``` +TransmitTimeout controls how long the transmitter will wait for a response +when sending a message to the mercury server, before aborting and considering +the transmission to be failed. + ## EVM EVM defaults depend on ChainID: diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index dd3af5f91b6..a8e8e41750d 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -242,6 +242,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 15a476460da..feaf546f022 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -286,6 +286,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index cc8b4577bfb..b37fed41150 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -286,6 +286,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index c578d200923..6ae02ab38f4 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -286,6 +286,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 91ae520532d..45c97477bd5 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -271,6 +271,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index a5e4b766b6e..df0118bbbbf 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -276,6 +276,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index c220d7f2e5f..edb07fd5e4f 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -283,6 +283,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index 018aaf95f4c..cf121e959e1 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -265,6 +265,10 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Mercury.Transmitter] +TransmitQueueMaxSize = 10000 +TransmitTimeout = '5s' + [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 From 27d941328655e0cde608c1eff47de736c11e2e58 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:28:55 +0200 Subject: [PATCH 3/5] LogPoller CLI command to resolve reorg greater than finality depth (#12867) * find lca and remove block after CLI * fix sort.Find typo * make RemoveBlocks local cmd * tests * added changeset * added tags to the changeset * fixed tests * make cmds, vars cases consistent --- .changeset/brave-dots-breathe.md | 7 ++ core/chains/evm/logpoller/disabled.go | 8 ++ core/chains/evm/logpoller/log_poller.go | 99 +++++++++++++++ core/chains/evm/logpoller/log_poller_test.go | 116 ++++++++++++++++++ core/chains/evm/logpoller/mocks/log_poller.go | 48 ++++++++ core/chains/evm/logpoller/observability.go | 6 + core/chains/evm/logpoller/orm.go | 9 ++ core/chains/evm/logpoller/orm_test.go | 30 +++++ core/cmd/blocks_commands.go | 58 +++++++++ core/cmd/blocks_commands_test.go | 25 ++++ core/cmd/shell_local.go | 79 ++++++++++++ core/cmd/shell_local_test.go | 56 +++++++++ core/internal/mocks/application.go | 50 ++++++++ core/services/chainlink/application.go | 43 +++++++ core/web/api.go | 2 +- core/web/lca_controller.go | 74 +++++++++++ core/web/lca_controller_test.go | 29 +++++ core/web/router.go | 2 + testdata/scripts/blocks/help.txtar | 3 +- testdata/scripts/help-all/help-all.txtar | 2 + testdata/scripts/node/help.txtar | 1 + 21 files changed, 745 insertions(+), 2 deletions(-) create mode 100644 .changeset/brave-dots-breathe.md create mode 100644 core/web/lca_controller.go create mode 100644 core/web/lca_controller_test.go diff --git a/.changeset/brave-dots-breathe.md b/.changeset/brave-dots-breathe.md new file mode 100644 index 00000000000..f1ae4f4d21e --- /dev/null +++ b/.changeset/brave-dots-breathe.md @@ -0,0 +1,7 @@ +--- +"chainlink": minor +--- + +Added a new CLI command, `blocks find-lca,` which finds the latest block that is available in both the database and on the chain for the specified chain. +Added a new CLI command, `node remove-blocks,` which removes all blocks and logs greater than or equal to the specified block number. +#nops #added diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index f3e64378384..6f95b9c55da 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -114,3 +114,11 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, from func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) { return nil, ErrDisabled } + +func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) { + return nil, ErrDisabled +} + +func (d disabled) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + return ErrDisabled +} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 7592ec104c4..cd26889627f 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -44,6 +44,8 @@ type LogPoller interface { GetFilters() map[string]Filter LatestBlock(ctx context.Context) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error) + FindLCA(ctx context.Context) (*LogPollerBlock, error) + DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // General querying Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) @@ -1422,6 +1424,103 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(ctx context.Context, address c return lp.orm.SelectIndexedLogsWithSigsExcluding(ctx, eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs) } +// DeleteLogsAndBlocksAfter - removes blocks and logs starting from the specified block +func (lp *logPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + return lp.orm.DeleteLogsAndBlocksAfter(ctx, start) +} + +func (lp *logPoller) FindLCA(ctx context.Context) (*LogPollerBlock, error) { + latest, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + return nil, fmt.Errorf("failed to select the latest block: %w", err) + } + + oldest, err := lp.orm.SelectOldestBlock(ctx, 0) + if err != nil { + return nil, fmt.Errorf("failed to select the oldest block: %w", err) + } + + if latest == nil || oldest == nil { + return nil, fmt.Errorf("expected at least one block to be present in DB") + } + + lp.lggr.Debugf("Received request to find LCA. Searching in range [%d, %d]", oldest.BlockNumber, latest.BlockNumber) + + // Find the largest block number for which block hash stored in the DB matches one that we get from the RPC. + // `sort.Find` expects slice of following format s = [1, 0, -1] and returns smallest index i for which s[i] = 0. + // To utilise `sort.Find` we represent range of blocks as slice [latestBlock, latestBlock-1, ..., olderBlock+1, oldestBlock] + // and return 1 if DB block was reorged or 0 if it's still present on chain. + lcaI, found := sort.Find(int(latest.BlockNumber-oldest.BlockNumber)+1, func(i int) int { + const notFound = 1 + const found = 0 + // if there is an error - stop the search + if err != nil { + return notFound + } + + // canceled search + if ctx.Err() != nil { + err = fmt.Errorf("aborted, FindLCA request cancelled: %w", ctx.Err()) + return notFound + } + iBlockNumber := latest.BlockNumber - int64(i) + var dbBlock *LogPollerBlock + // Block with specified block number might not exist in the database, to address that we check closest child + // of the iBlockNumber. If the child is present on chain, it's safe to assume that iBlockNumber is present too + dbBlock, err = lp.orm.SelectOldestBlock(ctx, iBlockNumber) + if err != nil { + err = fmt.Errorf("failed to select block %d by number: %w", iBlockNumber, err) + return notFound + } + + if dbBlock == nil { + err = fmt.Errorf("expected block to exist with blockNumber >= %d as observed block with number %d", iBlockNumber, latest.BlockNumber) + return notFound + } + + lp.lggr.Debugf("Looking for matching block on chain blockNumber: %d blockHash: %s", + dbBlock.BlockNumber, dbBlock.BlockHash) + var chainBlock *evmtypes.Head + chainBlock, err = lp.ec.HeadByHash(ctx, dbBlock.BlockHash) + // our block in DB does not exist on chain + if (chainBlock == nil && err == nil) || errors.Is(err, ethereum.NotFound) { + err = nil + return notFound + } + if err != nil { + err = fmt.Errorf("failed to get block %s from RPC: %w", dbBlock.BlockHash, err) + return notFound + } + + if chainBlock.BlockNumber() != dbBlock.BlockNumber { + err = fmt.Errorf("expected block numbers to match (db: %d, chain: %d), if block hashes match "+ + "(db: %s, chain: %s)", dbBlock.BlockNumber, chainBlock.BlockNumber(), dbBlock.BlockHash, chainBlock.Hash) + return notFound + } + + return found + }) + if err != nil { + return nil, fmt.Errorf("failed to find: %w", err) + } + + if !found { + return nil, fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") + } + + lcaBlockNumber := latest.BlockNumber - int64(lcaI) + lca, err := lp.orm.SelectBlockByNumber(ctx, lcaBlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to select lca from db: %w", err) + } + + if lca == nil { + return nil, fmt.Errorf("expected lca (blockNum: %d) to exist in DB", lcaBlockNumber) + } + + return lca, nil +} + func EvmWord(i uint64) common.Hash { var b = make([]byte, 8) binary.BigEndian.PutUint64(b, i) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 74ec41fa85a..cb211043a4c 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1921,3 +1921,119 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H require.NoError(t, err) th.Client.Blockchain().SetFinalized(b.Header()) } + +func TestFindLCA(t *testing.T) { + ctx := testutils.Context(t) + ec := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.Test(t) + chainID := testutils.NewRandomEVMChainID() + db := pgtest.NewSqlxDB(t) + + orm := logpoller.NewORM(chainID, db, lggr) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + KeepFinalizedBlocksDepth: 1000, + } + + lp := logpoller.NewLogPoller(orm, ec, lggr, lpOpts) + t.Run("Fails, if failed to select oldest block", func(t *testing.T) { + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "failed to select the latest block") + }) + // oldest + require.NoError(t, orm.InsertBlock(ctx, common.HexToHash("0x123"), 10, time.Now(), 0)) + // latest + latestBlockHash := common.HexToHash("0x124") + require.NoError(t, orm.InsertBlock(ctx, latestBlockHash, 16, time.Now(), 0)) + t.Run("Fails, if caller's context canceled", func(t *testing.T) { + lCtx, cancel := context.WithCancel(ctx) + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, nil).Run(func(_ mock.Arguments) { + cancel() + }).Once() + _, err := lp.FindLCA(lCtx) + require.ErrorContains(t, err, "aborted, FindLCA request cancelled") + + }) + t.Run("Fails, if RPC returns an error", func(t *testing.T) { + expectedError := fmt.Errorf("failed to call RPC") + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, expectedError).Once() + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, expectedError.Error()) + }) + t.Run("Fails, if block numbers do not match", func(t *testing.T) { + ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(&evmtypes.Head{ + Number: 123, + }, nil).Once() + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "expected block numbers to match") + }) + t.Run("Fails, if none of the blocks in db matches on chain", func(t *testing.T) { + ec.On("HeadByHash", mock.Anything, mock.Anything).Return(nil, nil).Times(3) + _, err := lp.FindLCA(ctx) + require.ErrorContains(t, err, "failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") + }) + + type block struct { + BN int + Exists bool + } + testCases := []struct { + Name string + Blocks []block + ExpectedBlockNumber int + ExpectedError error + }{ + { + Name: "All of the blocks are present on chain - returns the latest", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: true}}, + ExpectedBlockNumber: 4, + }, + { + Name: "None of the blocks exists on chain - returns an erro", + Blocks: []block{{BN: 1, Exists: false}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 0, + ExpectedError: fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured"), + }, + { + Name: "Only latest block does not exist", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 3, + }, + { + Name: "Only oldest block exists on chain", + Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, + ExpectedBlockNumber: 1, + }, + } + + blockHashI := int64(0) + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + // reset the database + require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 0)) + for _, b := range tc.Blocks { + blockHashI++ + hash := common.BigToHash(big.NewInt(blockHashI)) + require.NoError(t, orm.InsertBlock(ctx, hash, int64(b.BN), time.Now(), 0)) + // Hashes are unique for all test cases + var onChainBlock *evmtypes.Head + if b.Exists { + onChainBlock = &evmtypes.Head{Number: int64(b.BN)} + } + ec.On("HeadByHash", mock.Anything, hash).Return(onChainBlock, nil).Maybe() + } + + result, err := lp.FindLCA(ctx) + if tc.ExpectedError != nil { + require.ErrorContains(t, err, tc.ExpectedError.Error()) + } else { + require.NotNil(t, result) + require.Equal(t, result.BlockNumber, int64(tc.ExpectedBlockNumber), "expected block numbers to match") + } + }) + } +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 548e9ca3b90..ef3f4dbd428 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -37,6 +37,54 @@ func (_m *LogPoller) Close() error { return r0 } +// DeleteLogsAndBlocksAfter provides a mock function with given fields: ctx, start +func (_m *LogPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { + ret := _m.Called(ctx, start) + + if len(ret) == 0 { + panic("no return value specified for DeleteLogsAndBlocksAfter") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, start) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// FindLCA provides a mock function with given fields: ctx +func (_m *LogPoller) FindLCA(ctx context.Context) (*logpoller.LogPollerBlock, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FindLCA") + } + + var r0 *logpoller.LogPollerBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*logpoller.LogPollerBlock, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *logpoller.LogPollerBlock); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpoller.LogPollerBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetBlocksRange provides a mock function with given fields: ctx, numbers func (_m *LogPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]logpoller.LogPollerBlock, error) { ret := _m.Called(ctx, numbers) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 14dec5274ad..8f3cdfe185e 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -151,6 +151,12 @@ func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, e }) } +func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { + return withObservedQuery(o, "SelectOldestBlock", func() (*LogPollerBlock, error) { + return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber) + }) +} + func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 838a38c8ebb..5e0a74a9183 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -38,6 +38,7 @@ type ORM interface { SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) + SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) @@ -202,6 +203,14 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) return &b, nil } +func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { + var b LogPollerBlock + if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { + return nil, err + } + return &b, nil +} + func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withConfs(confs). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 8a45ff2f1c5..2a1be62dd5b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1759,3 +1759,33 @@ func Benchmark_DeleteExpiredLogs(b *testing.B) { assert.NoError(b, err1) } } + +func TestSelectOldestBlock(t *testing.T) { + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) + t.Run("Selects oldest within given chain", func(t *testing.T) { + // insert blocks + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 0)) + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1232"), 12, time.Now(), 0)) + // insert newer block from different chain + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 14, time.Now(), 0)) + block, err := o1.SelectOldestBlock(ctx, 0) + require.NoError(t, err) + require.NotNil(t, block) + require.Equal(t, block.BlockNumber, int64(13)) + require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) + }) + t.Run("Does not select blocks older than specified limit", func(t *testing.T) { + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1234"), 15, time.Now(), 0)) + block, err := o1.SelectOldestBlock(ctx, 12) + require.NoError(t, err) + require.NotNil(t, block) + require.Equal(t, block.BlockNumber, int64(13)) + require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) + }) +} diff --git a/core/cmd/blocks_commands.go b/core/cmd/blocks_commands.go index 72b0523e18d..158caf253ab 100644 --- a/core/cmd/blocks_commands.go +++ b/core/cmd/blocks_commands.go @@ -9,6 +9,8 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli" "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/v2/core/web" ) func initBlocksSubCmds(s *Shell) []cli.Command { @@ -34,6 +36,18 @@ func initBlocksSubCmds(s *Shell) []cli.Command { }, }, }, + { + Name: "find-lca", + Usage: "Find latest common block stored in DB and on chain", + Action: s.FindLCA, + Flags: []cli.Flag{ + cli.Int64Flag{ + Name: "evm-chain-id", + Usage: "Chain ID of the EVM-based blockchain", + Required: true, + }, + }, + }, } } @@ -75,3 +89,47 @@ func (s *Shell) ReplayFromBlock(c *cli.Context) (err error) { fmt.Println("Replay started") return nil } + +// LCAPresenter implements TableRenderer for an LCAResponse. +type LCAPresenter struct { + web.LCAResponse +} + +// ToRow presents the EVMChainResource as a slice of strings. +func (p *LCAPresenter) ToRow() []string { + return []string{p.EVMChainID.String(), p.Hash, strconv.FormatInt(p.BlockNumber, 10)} +} + +// RenderTable implements TableRenderer +// Just renders a single row +func (p LCAPresenter) RenderTable(rt RendererTable) error { + renderList([]string{"ChainID", "Block Hash", "Block Number"}, [][]string{p.ToRow()}, rt.Writer) + + return nil +} + +// FindLCA finds last common block stored in DB and on chain. +func (s *Shell) FindLCA(c *cli.Context) (err error) { + v := url.Values{} + + if c.IsSet("evm-chain-id") { + v.Add("evmChainID", fmt.Sprintf("%d", c.Int64("evm-chain-id"))) + } + + resp, err := s.HTTP.Get(s.ctx(), + fmt.Sprintf( + "/v2/find_lca?%s", + v.Encode(), + )) + if err != nil { + return s.errorOut(err) + } + + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + err = multierr.Append(err, cerr) + } + }() + + return s.renderAPIResponse(resp, &LCAPresenter{}, "Last Common Ancestor") +} diff --git a/core/cmd/blocks_commands_test.go b/core/cmd/blocks_commands_test.go index 30540748cb1..f7656b94ae1 100644 --- a/core/cmd/blocks_commands_test.go +++ b/core/cmd/blocks_commands_test.go @@ -41,3 +41,28 @@ func Test_ReplayFromBlock(t *testing.T) { c = cli.NewContext(nil, set, nil) require.NoError(t, client.ReplayFromBlock(c)) } + +func Test_FindLCA(t *testing.T) { + t.Parallel() + + //ethClient.On("BalanceAt", mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(42), nil) + app := startNewApplicationV2(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].ChainID = (*ubig.Big)(big.NewInt(5)) + c.EVM[0].Enabled = ptr(true) + }) + + client, _ := app.NewShellAndRenderer() + + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(client.FindLCA, set, "") + + //Incorrect chain ID + require.NoError(t, set.Set("evm-chain-id", "1")) + c := cli.NewContext(nil, set, nil) + require.ErrorContains(t, client.FindLCA(c), "does not match any local chains") + + //Correct chain ID + require.NoError(t, set.Set("evm-chain-id", "5")) + c = cli.NewContext(nil, set, nil) + require.ErrorContains(t, client.FindLCA(c), "FindLCA is only available if LogPoller is enabled") +} diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 24cb43e2090..7c9c025d4be 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -34,6 +34,7 @@ import ( "github.com/jmoiron/sqlx" cutils "github.com/smartcontractkit/chainlink-common/pkg/utils" + "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -253,6 +254,23 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { }, }, }, + { + Name: "remove-blocks", + Usage: "Deletes block range and all associated data", + Action: s.RemoveBlocks, + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "start", + Usage: "Beginning of block range to be deleted", + Required: true, + }, + cli.Int64Flag{ + Name: "evm-chain-id", + Usage: "Chain ID of the EVM-based blockchain", + Required: true, + }, + }, + }, } } @@ -1180,3 +1198,64 @@ func insertFixtures(dbURL url.URL, pathToFixtures string) (err error) { _, err = db.Exec(string(fixturesSQL)) return err } + +// RemoveBlocks - removes blocks after the specified blocks number +func (s *Shell) RemoveBlocks(c *cli.Context) error { + start := c.Int64("start") + if start <= 0 { + return s.errorOut(errors.New("Must pass a positive value in '--start' parameter")) + } + + chainID := big.NewInt(0) + if c.IsSet("evm-chain-id") { + err := chainID.UnmarshalText([]byte(c.String("evm-chain-id"))) + if err != nil { + return s.errorOut(err) + } + } + + cfg := s.Config + err := cfg.Validate() + if err != nil { + return s.errorOut(fmt.Errorf("error validating configuration: %+v", err)) + } + + lggr := logger.Sugared(s.Logger.Named("RemoveBlocks")) + ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) + ctx, cancel := context.WithCancel(context.Background()) + go shutdown.HandleShutdown(func(sig string) { + cancel() + lggr.Info("received signal to stop - closing the database and releasing lock") + + if cErr := ldb.Close(); cErr != nil { + lggr.Criticalf("Failed to close LockedDB: %v", cErr) + } + + if cErr := s.CloseLogger(); cErr != nil { + log.Printf("Failed to close Logger: %v", cErr) + } + }) + + if err = ldb.Open(ctx); err != nil { + // If not successful, we know neither locks nor connection remains opened + return s.errorOut(errors.Wrap(err, "opening db")) + } + defer lggr.ErrorIfFn(ldb.Close, "Error closing db") + + // From now on, DB locks and DB connection will be released on every return. + // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. + + app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, ldb.DB()) + if err != nil { + return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) + } + + err = app.DeleteLogPollerDataAfter(ctx, chainID, start) + if err != nil { + return s.errorOut(err) + } + + lggr.Infof("RemoveBlocks: successfully removed blocks") + + return nil +} diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 7427e6caedb..e7322e513ae 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -2,6 +2,7 @@ package cmd_test import ( "flag" + "fmt" "math/big" "os" "strconv" @@ -514,3 +515,58 @@ func TestShell_CleanupChainTables(t *testing.T) { c := cli.NewContext(nil, set, nil) require.NoError(t, client.CleanupChainTables(c)) } + +func TestShell_RemoveBlocks(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + s.Password.Keystore = models.NewSecret("dummy") + c.EVM[0].Nodes[0].Name = ptr("fake") + c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") + c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") + // seems to be needed for config validate + c.Insecure.OCRDevelopmentMode = nil + }) + + lggr := logger.TestLogger(t) + + app := mocks.NewApplication(t) + app.On("GetSqlxDB").Maybe().Return(db) + shell := cmd.Shell{ + Config: cfg, + AppFactory: cltest.InstanceAppFactory{App: app}, + FallbackAPIInitializer: cltest.NewMockAPIInitializer(t), + Runner: cltest.EmptyRunner{}, + Logger: lggr, + } + + t.Run("Returns error, if --start is not positive", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "0")) + require.NoError(t, set.Set("evm-chain-id", "12")) + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.ErrorContains(t, err, "Must pass a positive value in '--start' parameter") + }) + t.Run("Returns error, if removal fails", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "10000")) + require.NoError(t, set.Set("evm-chain-id", "12")) + expectedError := fmt.Errorf("failed to delete log poller's data") + app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(expectedError).Once() + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.ErrorContains(t, err, expectedError.Error()) + }) + t.Run("Happy path", func(t *testing.T) { + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RemoveBlocks, set, "") + require.NoError(t, set.Set("start", "10000")) + require.NoError(t, set.Set("evm-chain-id", "12")) + app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(nil).Once() + c := cli.NewContext(nil, set, nil) + err := shell.RemoveBlocks(c) + require.NoError(t, err) + }) +} diff --git a/core/internal/mocks/application.go b/core/internal/mocks/application.go index c83b37a0e5d..f845d46ca8d 100644 --- a/core/internal/mocks/application.go +++ b/core/internal/mocks/application.go @@ -23,6 +23,8 @@ import ( logger "github.com/smartcontractkit/chainlink/v2/core/logger" + logpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + mock "github.com/stretchr/testify/mock" pipeline "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -147,6 +149,24 @@ func (_m *Application) DeleteJob(ctx context.Context, jobID int32) error { return r0 } +// DeleteLogPollerDataAfter provides a mock function with given fields: ctx, chainID, start +func (_m *Application) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { + ret := _m.Called(ctx, chainID, start) + + if len(ret) == 0 { + panic("no return value specified for DeleteLogPollerDataAfter") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int, int64) error); ok { + r0 = rf(ctx, chainID, start) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // EVMORM provides a mock function with given fields: func (_m *Application) EVMORM() types.Configs { ret := _m.Called() @@ -167,6 +187,36 @@ func (_m *Application) EVMORM() types.Configs { return r0 } +// FindLCA provides a mock function with given fields: ctx, chainID +func (_m *Application) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for FindLCA") + } + + var r0 *logpoller.LogPollerBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*logpoller.LogPollerBlock, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *logpoller.LogPollerBlock); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpoller.LogPollerBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetAuditLogger provides a mock function with given fields: func (_m *Application) GetAuditLogger() audit.AuditLogger { ret := _m.Called() diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 2aebef3f8f7..ae3db2e7a73 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -22,7 +22,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -115,6 +117,11 @@ type Application interface { ID() uuid.UUID SecretGenerator() SecretGenerator + + // FindLCA - finds last common ancestor for LogPoller's chain available in the database and RPC chain + FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) + // DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block + DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error } // ChainlinkApplication contains fields for the JobSubscriber, Scheduler, @@ -886,3 +893,39 @@ func (app *ChainlinkApplication) GetWebAuthnConfiguration() sessions.WebAuthnCon func (app *ChainlinkApplication) ID() uuid.UUID { return app.Config.AppID() } + +// FindLCA - finds last common ancestor +func (app *ChainlinkApplication) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { + chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) + if err != nil { + return nil, err + } + if !app.Config.Feature().LogPoller() { + return nil, fmt.Errorf("FindLCA is only available if LogPoller is enabled") + } + + lca, err := chain.LogPoller().FindLCA(ctx) + if err != nil { + return nil, fmt.Errorf("failed to find lca: %w", err) + } + + return lca, nil +} + +// DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block +func (app *ChainlinkApplication) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { + chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) + if err != nil { + return err + } + if !app.Config.Feature().LogPoller() { + return fmt.Errorf("DeleteLogPollerDataAfter is only available if LogPoller is enabled") + } + + err = chain.LogPoller().DeleteLogsAndBlocksAfter(ctx, start) + if err != nil { + return fmt.Errorf("failed to recover LogPoller: %w", err) + } + + return nil +} diff --git a/core/web/api.go b/core/web/api.go index 1f97d59c77d..51f7b855cd5 100644 --- a/core/web/api.go +++ b/core/web/api.go @@ -120,7 +120,7 @@ func ParsePaginatedResponse(input []byte, resource interface{}, links *jsonapi.L func parsePaginatedResponseToDocument(input []byte, resource interface{}, document *jsonapi.Document) error { err := ParseJSONAPIResponse(input, resource) if err != nil { - return errors.Wrap(err, "ParseJSONAPIResponse error") + return errors.Wrapf(err, "ParseJSONAPIResponse error body: %s", string(input)) } // Unmarshal using the stdlib Unmarshal to extract the links part of the document diff --git a/core/web/lca_controller.go b/core/web/lca_controller.go new file mode 100644 index 00000000000..bb4866c3d08 --- /dev/null +++ b/core/web/lca_controller.go @@ -0,0 +1,74 @@ +package web + +import ( + "errors" + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" +) + +type LCAController struct { + App chainlink.Application +} + +// FindLCA compares chain of blocks available in the DB with chain provided by an RPC and returns last common ancestor +// Example: +// +// "/v2/find_lca" +func (bdc *LCAController) FindLCA(c *gin.Context) { + chain, err := getChain(bdc.App.GetRelayers().LegacyEVMChains(), c.Query("evmChainID")) + if err != nil { + if errors.Is(err, ErrInvalidChainID) || errors.Is(err, ErrMultipleChains) || errors.Is(err, ErrMissingChainID) { + jsonAPIError(c, http.StatusUnprocessableEntity, err) + return + } + jsonAPIError(c, http.StatusInternalServerError, err) + return + } + chainID := chain.ID() + + lca, err := bdc.App.FindLCA(c.Request.Context(), chainID) + if err != nil { + jsonAPIError(c, http.StatusInternalServerError, err) + return + } + + if lca == nil { + jsonAPIError(c, http.StatusNotFound, fmt.Errorf("failed to find last common ancestor")) + return + } + + response := LCAResponse{ + BlockNumber: lca.BlockNumber, + Hash: lca.BlockHash.String(), + EVMChainID: big.New(chainID), + } + jsonAPIResponse(c, &response, "response") + +} + +type LCAResponse struct { + BlockNumber int64 `json:"blockNumber"` + Hash string `json:"hash"` + EVMChainID *big.Big `json:"evmChainID"` +} + +// GetID returns the jsonapi ID. +func (s LCAResponse) GetID() string { + return "LCAResponseID" +} + +// GetName returns the collection name for jsonapi. +func (LCAResponse) GetName() string { + return "lca_response" +} + +// SetID is used to conform to the UnmarshallIdentifier interface for +// deserializing from jsonapi documents. +func (*LCAResponse) SetID(string) error { + return nil +} diff --git a/core/web/lca_controller_test.go b/core/web/lca_controller_test.go new file mode 100644 index 00000000000..7ec476e8eca --- /dev/null +++ b/core/web/lca_controller_test.go @@ -0,0 +1,29 @@ +package web_test + +import ( + _ "embed" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" +) + +func TestLCAController_FindLCA(t *testing.T) { + cfg := configtest.NewTestGeneralConfig(t) + ec := setupEthClientForControllerTests(t) + app := cltest.NewApplicationWithConfigAndKey(t, cfg, cltest.DefaultP2PKey, ec) + require.NoError(t, app.Start(testutils.Context(t))) + client := app.NewHTTPClient(nil) + resp, cleanup := client.Get("/v2/find_lca?evmChainID=1") + t.Cleanup(cleanup) + assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Contains(t, string(b), "chain id does not match any local chains") +} diff --git a/core/web/router.go b/core/web/router.go index c327583a005..158ea4b411f 100644 --- a/core/web/router.go +++ b/core/web/router.go @@ -292,6 +292,8 @@ func v2Routes(app chainlink.Application, r *gin.RouterGroup) { rc := ReplayController{app} authv2.POST("/replay_from_block/:number", auth.RequiresRunRole(rc.ReplayFromBlock)) + lcaC := LCAController{app} + authv2.GET("/find_lca", auth.RequiresRunRole(lcaC.FindLCA)) csakc := CSAKeysController{app} authv2.GET("/keys/csa", csakc.Index) diff --git a/testdata/scripts/blocks/help.txtar b/testdata/scripts/blocks/help.txtar index 55aaf71858d..5d362a082fd 100644 --- a/testdata/scripts/blocks/help.txtar +++ b/testdata/scripts/blocks/help.txtar @@ -9,7 +9,8 @@ USAGE: chainlink blocks command [command options] [arguments...] COMMANDS: - replay Replays block data from the given number + replay Replays block data from the given number + find-lca Find latest common block stored in DB and on chain OPTIONS: --help, -h show help diff --git a/testdata/scripts/help-all/help-all.txtar b/testdata/scripts/help-all/help-all.txtar index eeaf0da98d1..e111295abb4 100644 --- a/testdata/scripts/help-all/help-all.txtar +++ b/testdata/scripts/help-all/help-all.txtar @@ -16,6 +16,7 @@ admin users list # Lists all API users and their roles attempts # Commands for managing Ethereum Transaction Attempts attempts list # List the Transaction Attempts in descending order blocks # Commands for managing blocks +blocks find-lca # Find latest common block stored in DB and on chain blocks replay # Replays block data from the given number bridges # Commands for Bridges communicating with External Adapters bridges create # Create a new Bridge to an External Adapter @@ -132,6 +133,7 @@ node db status # Display the current database migration status. node db version # Display the current database version. node profile # Collects profile metrics from the node. node rebroadcast-transactions # Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue +node remove-blocks # Deletes block range and all associated data node start # Run the Chainlink node node status # Displays the health of various services running inside the node. node validate # Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included diff --git a/testdata/scripts/node/help.txtar b/testdata/scripts/node/help.txtar index 33e1fdc90bc..875500b13df 100644 --- a/testdata/scripts/node/help.txtar +++ b/testdata/scripts/node/help.txtar @@ -13,6 +13,7 @@ COMMANDS: rebroadcast-transactions Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue validate Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included db Commands for managing the database. + remove-blocks Deletes block range and all associated data OPTIONS: --config value, -c value TOML configuration file(s) via flag, or raw TOML via env var. If used, legacy env vars must not be set. Multiple files can be used (-c configA.toml -c configB.toml), and they are applied in order with duplicated fields overriding any earlier values. If the 'CL_CONFIG' env var is specified, it is always processed last with the effect of being the final override. [$CL_CONFIG] From 71eed2133fb86a065461787e31db5fb1b18b05de Mon Sep 17 00:00:00 2001 From: Tate Date: Fri, 26 Apr 2024 08:47:17 -0600 Subject: [PATCH 4/5] Fix Node Migration Test Check For Versions (#12982) --- .../action.yml | 1 + .github/workflows/integration-tests.yml | 90 ++++++++----------- 2 files changed, 40 insertions(+), 51 deletions(-) diff --git a/.github/actions/setup-create-base64-upgrade-config/action.yml b/.github/actions/setup-create-base64-upgrade-config/action.yml index ed25fd6375f..8f514784725 100644 --- a/.github/actions/setup-create-base64-upgrade-config/action.yml +++ b/.github/actions/setup-create-base64-upgrade-config/action.yml @@ -92,6 +92,7 @@ runs: [ChainlinkUpgradeImage] image="$UPGRADE_IMAGE" version="$UPGRADE_VERSION" + postgres_version="$CHAINLINK_POSTGRES_VERSION" [Logging] test_log_collect=$test_log_collect diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 1f3e093cfdc..8dcf32b127e 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -218,40 +218,6 @@ jobs: AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} dep_evm_sha: ${{ inputs.evm-ref }} - build-test-image: - if: startsWith(github.ref, 'refs/tags/') || github.event_name == 'schedule' || contains(join(github.event.pull_request.labels.*.name, ' '), 'build-test-image') - environment: integration - permissions: - id-token: write - contents: read - name: Build Test Image - runs-on: ubuntu22.04-16cores-64GB - needs: [changes] - steps: - - name: Collect Metrics - if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' - id: collect-gha-metrics - uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 - with: - id: ${{ env.COLLECTION_ID }}-build-test-image - org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} - basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} - hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} - this-job-name: Build Test Image - continue-on-error: true - - name: Checkout the repo - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - with: - repository: smartcontractkit/chainlink - ref: ${{ inputs.cl_ref || github.event.pull_request.head.sha || github.event.merge_group.head_sha }} - - name: Build Test Image - if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' - uses: ./.github/actions/build-test-image - with: - QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} - QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} - QA_AWS_ACCOUNT_NUMBER: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} - compare-tests: needs: [changes] runs-on: ubuntu-latest @@ -726,7 +692,7 @@ jobs: cache_restore_only: "true" QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} - QA_KUBECONFIG: ${{ secrets.QA_KUBECONFIG }} + QA_KUBECONFIG: "" should_tidy: "false" - name: Show Otel-Collector Logs if: steps.check-label.outputs.trace == 'true' && matrix.product.name == 'ocr2' && matrix.product.tag_suffix == '-plugins' @@ -830,6 +796,7 @@ jobs: # Run the setup if the matrix finishes but this time save the cache if we have a cache hit miss # this will also only run if both of the matrix jobs pass eth-smoke-go-mod-cache: + environment: integration needs: [eth-smoke-tests] runs-on: ubuntu-latest @@ -863,7 +830,7 @@ jobs: id-token: write contents: read runs-on: ubuntu-latest - needs: [build-chainlink, changes, build-test-image] + needs: [build-chainlink, changes] # Only run migration tests on new tags if: startsWith(github.ref, 'refs/tags/') env: @@ -876,6 +843,17 @@ jobs: TEST_LOG_LEVEL: debug TEST_SUITE: migration steps: + - name: Collect Metrics + id: collect-gha-metrics + uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 + with: + id: ${{ env.COLLECTION_ID }}-migration-tests + org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} + basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} + hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} + this-job-name: Version Migration Tests + test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}' + continue-on-error: true - name: Checkout the repo uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: @@ -886,7 +864,12 @@ jobs: run: | untrimmed_ver=$(curl --header "Authorization: token ${{ secrets.GITHUB_TOKEN }}" --request GET https://api.github.com/repos/${{ github.repository }}/releases/latest | jq -r .name) latest_version="${untrimmed_ver:1}" - echo "latest_version=${latest_version} | tee -a $GITHUB_OUTPUT" + # Check if latest_version is empty + if [ -z "$latest_version" ]; then + echo "Error: The latest_version is empty. The migration tests need a verison to run." + exit 1 + fi + echo "latest_version=${latest_version}" >> "$GITHUB_OUTPUT" - name: Name Versions run: | echo "Running migration tests from version '${{ steps.get_latest_version.outputs.latest_version }}' to: '${{ inputs.evm-ref || github.sha }}'" @@ -898,13 +881,22 @@ jobs: chainlinkVersion: ${{ steps.get_latest_version.outputs.latest_version }} upgradeImage: ${{ env.UPGRADE_IMAGE }} upgradeVersion: ${{ env.UPGRADE_VERSION }} + runId: ${{ github.run_id }} + testLogCollect: ${{ vars.TEST_LOG_COLLECT }} + lokiEndpoint: https://${{ secrets.GRAFANA_INTERNAL_HOST }}/loki/api/v1/push + lokiTenantId: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} + lokiBasicAuth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} + logstreamLogTargets: ${{ vars.LOGSTREAM_LOG_TARGETS }} + grafanaUrl: ${{ vars.GRAFANA_URL }} + grafanaDashboardUrl: "/d/ddf75041-1e39-42af-aa46-361fe4c36e9e/ci-e2e-tests-logs" - name: Run Migration Tests uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/run-tests@519851800779323566b7b7c22cc21bff95dbb639 # v2.3.11 with: - test_command_to_run: cd ./integration-tests && go test -timeout 30m -count=1 -json ./migration 2>&1 | tee /tmp/gotest.log | gotestloghelper -ci -singlepackage + test_command_to_run: cd ./integration-tests && go test -timeout 20m -count=1 -json ./migration 2>&1 | tee /tmp/gotest.log | gotestloghelper -ci -singlepackage test_download_vendor_packages_command: cd ./integration-tests && go mod download cl_repo: ${{ env.CHAINLINK_IMAGE }} cl_image_tag: ${{ steps.get_latest_version.outputs.latest_version }} + aws_registries: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} artifacts_name: node-migration-test-logs artifacts_location: | ./integration-tests/migration/logs @@ -916,28 +908,24 @@ jobs: cache_restore_only: "true" QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} - QA_KUBECONFIG: ${{ secrets.QA_KUBECONFIG }} + QA_KUBECONFIG: "" go_coverage_src_dir: /var/tmp/go-coverage go_coverage_dest_dir: ${{ github.workspace }}/.covdata + should_tidy: "false" - name: Upload Coverage Data uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 with: name: cl-node-coverage-data-migration-tests path: .covdata retention-days: 1 - - - name: Collect Metrics - if: always() - id: collect-gha-metrics - uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 + - name: Notify Slack + if: failure() && github.event_name != 'workflow_dispatch' + uses: slackapi/slack-github-action@6c661ce58804a1a20f6dc5fbee7f0381b469e001 # v1.25.0 + env: + SLACK_BOT_TOKEN: ${{ secrets.QA_SLACK_API_KEY }} with: - id: ${{ env.COLLECTION_ID }}-migration-tests - org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} - basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} - hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} - this-job-name: Version Migration Tests - test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}' - continue-on-error: true + channel-id: "#team-test-tooling-internal" + slack-message: ":x: :mild-panic-intensifies: Node Migration Tests Failed: ${{ job.html_url }}\n${{ format('https://github.com/smartcontractkit/chainlink/actions/runs/{0}', github.run_id) }}" ## Solana Section get_solana_sha: From b1c8d74272e3b02e0a2a954c3d61b65ecb42f5cf Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Fri, 26 Apr 2024 16:53:31 +0200 Subject: [PATCH 5/5] fix: prevent query syntax error if allowlist is empty (#12912) Co-authored-by: Morgan Kuphal <87319522+KuphJr@users.noreply.github.com> --- .../services/gateway/handlers/functions/allowlist/orm.go | 5 +++++ .../gateway/handlers/functions/allowlist/orm_test.go | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/core/services/gateway/handlers/functions/allowlist/orm.go b/core/services/gateway/handlers/functions/allowlist/orm.go index 7867c06d5d4..20a8ed15252 100644 --- a/core/services/gateway/handlers/functions/allowlist/orm.go +++ b/core/services/gateway/handlers/functions/allowlist/orm.go @@ -67,6 +67,11 @@ func (o *orm) GetAllowedSenders(ctx context.Context, offset, limit uint) ([]comm } func (o *orm) CreateAllowedSenders(ctx context.Context, allowedSenders []common.Address) error { + if len(allowedSenders) == 0 { + o.lggr.Debugf("empty allowed senders list: %v for routerContractAddress: %s. skipping...", allowedSenders, o.routerContractAddress) + return nil + } + var valuesPlaceholder []string for i := 1; i <= len(allowedSenders)*2; i += 2 { valuesPlaceholder = append(valuesPlaceholder, fmt.Sprintf("($%d, $%d)", i, i+1)) diff --git a/core/services/gateway/handlers/functions/allowlist/orm_test.go b/core/services/gateway/handlers/functions/allowlist/orm_test.go index 2584e131968..388d47a769b 100644 --- a/core/services/gateway/handlers/functions/allowlist/orm_test.go +++ b/core/services/gateway/handlers/functions/allowlist/orm_test.go @@ -128,6 +128,15 @@ func TestORM_CreateAllowedSenders(t *testing.T) { require.Equal(t, expected[0], results[0]) require.Equal(t, expected[1], results[1]) }) + + // this scenario can happen if the allowlist is empty but we call CreateAllowedSenders + t.Run("OK-empty_list", func(t *testing.T) { + ctx := testutils.Context(t) + orm, err := setupORM(t) + require.NoError(t, err) + err = orm.CreateAllowedSenders(ctx, []common.Address{}) + require.NoError(t, err) + }) } func TestORM_DeleteAllowedSenders(t *testing.T) {