diff --git a/.github/log_poller_on_demand.yml b/.github/log_poller_on_demand.yml new file mode 100644 index 00000000000..856d1e02349 --- /dev/null +++ b/.github/log_poller_on_demand.yml @@ -0,0 +1,66 @@ +name: On Demand Log Poller Consistency Test +on: + workflow_dispatch: + inputs: + contracts: + description: Number of test contracts + default: "2" + required: true + eventsPerTx: + description: Number of events to emit per transaction + default: "10" + required: true + useFinalityTag: + description: Use finality tag + default: "false" + required: true + loadDuration: + description: Load duration (e.g. 10s, 10m, 1h) + default: "10m" + required: true + chainlinkImage: + description: Chainlink image to use + default: "public.ecr.aws/chainlink/chainlink" + required: true + chainlinkVersion: + description: Chainlink version to use + default: "v2.7.0-beta0" + required: true + selectedNetworks: + description: Network to use (only Sepolia or Mumbai) + default: "Sepolia" + required: true + fundingKey: + description: Private key used to fund the contracts + required: true + rpcURL: + description: RPC URL to use + required: true + wsURL: + description: WS URL to use + required: true + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version-file: "integration-tests/go.mod" + cache: true + - name: Show overrides + env: + CONTRACTS: ${{ inputs.contracts }} + EVENTS_PER_TX: ${{ inputs.eventsPerTx }} + LOAD_DURATION: ${{ inputs.loadDuration }} + USE_FINALITY_TAG: ${{ inputs.useFinalityTag }} + CHAINLINK_IMAGE: ${{ inputs.chainlinkImage }} + CHAINLINK_VERSION: ${{ inputs.chainlinkVersion }} + SELECTED_NETWORKS: ${{ inputs.selectedNetworks }} + EVM_KEYS: ${{ inputs.fundingKey }} + EVM_HTTP_URLS: ${{ inputs.rpcURL }} + EVM_URLS: ${{ inputs.wsURL }} + run: | + go test -v -timeout 5h -run=TestLogPollerFromEnv integration-tests/reorg/log_poller_maybe_reorg_test.go \ No newline at end of file diff --git a/.gitignore b/.gitignore index bfd66e2a39a..61ebfab0e9b 100644 --- a/.gitignore +++ b/.gitignore @@ -65,7 +65,7 @@ tests-*.xml tmp-manifest-*.yaml ztarrepo.tar.gz **/test-ledger/* -__debug_bin +__debug_bin* # goreleaser builds cosign.* @@ -82,3 +82,4 @@ contracts/yarn.lock # Ignore DevSpace cache and log folder .devspace/ +go.work* \ No newline at end of file diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 4cd2804d9f3..01d6a2aad47 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -466,6 +466,7 @@ func (lp *logPoller) run() { // Serially process replay requests. lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) lp.PollAndSaveLogs(lp.ctx, fromBlock) + lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) } } else { lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err) @@ -574,13 +575,14 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBloc lastSafeBackfillBlock := latestFinalizedBlockNumber - 1 if lastSafeBackfillBlock >= lp.backupPollerNextBlock { - lp.lggr.Infow("Backup poller backfilling logs", "start", lp.backupPollerNextBlock, "end", lastSafeBackfillBlock) + lp.lggr.Infow("Backup poller started backfilling logs", "start", lp.backupPollerNextBlock, "end", lastSafeBackfillBlock) if err = lp.backfill(ctx, lp.backupPollerNextBlock, lastSafeBackfillBlock); err != nil { // If there's an error backfilling, we can just return and retry from the last block saved // since we don't save any blocks on backfilling. We may re-insert the same logs but thats ok. lp.lggr.Warnw("Backup poller failed", "err", err) return } + lp.lggr.Infow("Backup poller finished backfilling", "start", lp.backupPollerNextBlock, "end", lastSafeBackfillBlock) lp.backupPollerNextBlock = lastSafeBackfillBlock + 1 } } diff --git a/integration-tests/client/chainlink.go b/integration-tests/client/chainlink.go index 8a79cb3ec95..3638fa11c7f 100644 --- a/integration-tests/client/chainlink.go +++ b/integration-tests/client/chainlink.go @@ -1213,3 +1213,23 @@ func (c *ChainlinkClient) GetForwarders() (*Forwarders, *http.Response, error) { } return response, resp.RawResponse, err } + +// Replays log poller from block number +func (c *ChainlinkClient) ReplayLogPollerFromBlock(fromBlock, evmChainID int64) (*ReplayResponse, *http.Response, error) { + specObj := &ReplayResponse{} + c.l.Info().Str(NodeURL, c.Config.URL).Int64("From block", fromBlock).Int64("EVM chain ID", evmChainID).Msg("Replaying Log Poller from block") + resp, err := c.APIClient.R(). + SetResult(&specObj). + SetQueryParams(map[string]string{ + "evmChainID": fmt.Sprint(evmChainID), + }). + SetPathParams(map[string]string{ + "fromBlock": fmt.Sprint(fromBlock), + }). + Post("/v2/replay_from_block/{fromBlock}") + if err != nil { + return nil, nil, err + } + + return specObj, resp.RawResponse, err +} diff --git a/integration-tests/client/chainlink_models.go b/integration-tests/client/chainlink_models.go index 6013e13e0fa..c6d1209d2ea 100644 --- a/integration-tests/client/chainlink_models.go +++ b/integration-tests/client/chainlink_models.go @@ -9,6 +9,7 @@ import ( "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) // EIServiceConfig represents External Initiator service config @@ -1407,3 +1408,16 @@ type ForwarderAttributes struct { CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } + +type ReplayResponse struct { + Data ReplayResponseData `json:"data"` +} + +type ReplayResponseData struct { + Attributes ReplayResponseAttributes `json:"attributes"` +} + +type ReplayResponseAttributes struct { + Message string `json:"message"` + EVMChainID *utils.Big `json:"evmChainID"` +} diff --git a/integration-tests/contracts/contract_deployer.go b/integration-tests/contracts/contract_deployer.go index e203d8318f2..5a3fad256e4 100644 --- a/integration-tests/contracts/contract_deployer.go +++ b/integration-tests/contracts/contract_deployer.go @@ -45,6 +45,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper2_0" registry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface" + le "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_triggered_streams_lookup_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_upkeep_counter_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/mock_aggregator_proxy" @@ -138,6 +139,7 @@ type ContractDeployer interface { DeployMercuryVerifierProxyContract(accessControllerAddr common.Address) (MercuryVerifierProxy, error) DeployMercuryFeeManager(linkAddress common.Address, nativeAddress common.Address, proxyAddress common.Address, rewardManagerAddress common.Address) (MercuryFeeManager, error) DeployMercuryRewardManager(linkAddress common.Address) (MercuryRewardManager, error) + DeployLogEmitterContract() (LogEmitter, error) } // NewContractDeployer returns an instance of a contract deployer based on the client type @@ -1613,3 +1615,21 @@ func (e *EthereumContractDeployer) DeployWERC20Mock() (WERC20Mock, error) { l: e.l, }, err } + +func (e *EthereumContractDeployer) DeployLogEmitterContract() (LogEmitter, error) { + address, _, instance, err := e.client.DeployContract("Log Emitter", func( + auth *bind.TransactOpts, + backend bind.ContractBackend, + ) (common.Address, *types.Transaction, interface{}, error) { + return le.DeployLogEmitter(auth, backend) + }) + if err != nil { + return nil, err + } + return &LogEmitterContract{ + client: e.client, + instance: instance.(*le.LogEmitter), + address: *address, + l: e.l, + }, err +} diff --git a/integration-tests/contracts/contract_models.go b/integration-tests/contracts/contract_models.go index 51fce7cb120..4c8d610fa1b 100644 --- a/integration-tests/contracts/contract_models.go +++ b/integration-tests/contracts/contract_models.go @@ -400,3 +400,13 @@ type WERC20Mock interface { Transfer(to string, amount *big.Int) error Mint(account common.Address, amount *big.Int) (*types.Transaction, error) } + +type LogEmitter interface { + Address() common.Address + EmitLogInts(ints []int) (*types.Transaction, error) + EmitLogIntsIndexed(ints []int) (*types.Transaction, error) + EmitLogStrings(strings []string) (*types.Transaction, error) + EmitLogInt(payload int) (*types.Transaction, error) + EmitLogIntIndexed(payload int) (*types.Transaction, error) + EmitLogString(strings string) (*types.Transaction, error) +} diff --git a/integration-tests/contracts/test_contracts.go b/integration-tests/contracts/test_contracts.go new file mode 100644 index 00000000000..ccdd2989e49 --- /dev/null +++ b/integration-tests/contracts/test_contracts.go @@ -0,0 +1,79 @@ +package contracts + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog" + "github.com/smartcontractkit/chainlink-testing-framework/blockchain" + + le "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" +) + +type LogEmitterContract struct { + address common.Address + client blockchain.EVMClient + instance *le.LogEmitter + l zerolog.Logger +} + +func (e *LogEmitterContract) Address() common.Address { + return e.address +} + +func (e *LogEmitterContract) EmitLogInts(ints []int) (*types.Transaction, error) { + opts, err := e.client.TransactionOpts(e.client.GetDefaultWallet()) + if err != nil { + return nil, err + } + bigInts := make([]*big.Int, len(ints)) + for i, v := range ints { + bigInts[i] = big.NewInt(int64(v)) + } + tx, err := e.instance.EmitLog1(opts, bigInts) + if err != nil { + return nil, err + } + return tx, e.client.ProcessTransaction(tx) +} + +func (e *LogEmitterContract) EmitLogIntsIndexed(ints []int) (*types.Transaction, error) { + opts, err := e.client.TransactionOpts(e.client.GetDefaultWallet()) + if err != nil { + return nil, err + } + bigInts := make([]*big.Int, len(ints)) + for i, v := range ints { + bigInts[i] = big.NewInt(int64(v)) + } + tx, err := e.instance.EmitLog2(opts, bigInts) + if err != nil { + return nil, err + } + return tx, e.client.ProcessTransaction(tx) +} + +func (e *LogEmitterContract) EmitLogStrings(strings []string) (*types.Transaction, error) { + opts, err := e.client.TransactionOpts(e.client.GetDefaultWallet()) + if err != nil { + return nil, err + } + tx, err := e.instance.EmitLog3(opts, strings) + if err != nil { + return nil, err + } + return tx, e.client.ProcessTransaction(tx) +} + +func (e *LogEmitterContract) EmitLogInt(payload int) (*types.Transaction, error) { + return e.EmitLogInts([]int{payload}) +} + +func (e *LogEmitterContract) EmitLogIntIndexed(payload int) (*types.Transaction, error) { + return e.EmitLogIntsIndexed([]int{payload}) +} + +func (e *LogEmitterContract) EmitLogString(strings string) (*types.Transaction, error) { + return e.EmitLogStrings([]string{strings}) +} diff --git a/integration-tests/docker/cmd/test_env.go b/integration-tests/docker/cmd/test_env.go index 31b7de5dcdd..f760f45f8d0 100644 --- a/integration-tests/docker/cmd/test_env.go +++ b/integration-tests/docker/cmd/test_env.go @@ -50,6 +50,7 @@ func main() { return nil }, } + startEnvCmd.AddCommand(startFullEnvCmd) // Set default log level for non-testcontainer code diff --git a/integration-tests/docker/test_env/test_env.go b/integration-tests/docker/test_env/test_env.go index 40ed0d4d535..e067e46090d 100644 --- a/integration-tests/docker/test_env/test_env.go +++ b/integration-tests/docker/test_env/test_env.go @@ -166,6 +166,7 @@ func (te *CLClusterTestEnv) FundChainlinkNodes(amount *big.Float) error { if err := cl.Fund(te.EVMClient, amount); err != nil { return errors.Wrap(err, ErrFundCLNode) } + time.Sleep(5 * time.Second) } return te.EVMClient.WaitForEvents() } diff --git a/integration-tests/docker/test_env/test_env_builder.go b/integration-tests/docker/test_env/test_env_builder.go index d1550240500..c07ea762623 100644 --- a/integration-tests/docker/test_env/test_env_builder.go +++ b/integration-tests/docker/test_env/test_env_builder.go @@ -19,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink/integration-tests/contracts" "github.com/smartcontractkit/chainlink/integration-tests/types/config/node" + evmcfg "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" ) type CleanUpType string @@ -30,22 +31,24 @@ const ( ) type CLTestEnvBuilder struct { - hasLogWatch bool - hasGeth bool - hasKillgrave bool - hasForwarders bool - clNodeConfig *chainlink.Config - secretsConfig string - nonDevGethNetworks []blockchain.EVMNetwork - clNodesCount int - customNodeCsaKeys []string - defaultNodeCsaKeys []string - l zerolog.Logger - t *testing.T - te *CLClusterTestEnv - isNonEVM bool - cleanUpType CleanUpType - cleanUpCustomFn func() + hasLogWatch bool + hasGeth bool + hasKillgrave bool + hasForwarders bool + clNodeConfig *chainlink.Config + secretsConfig string + nonDevGethNetworks []blockchain.EVMNetwork + clNodesCount int + customNodeCsaKeys []string + defaultNodeCsaKeys []string + l zerolog.Logger + t *testing.T + te *CLClusterTestEnv + isNonEVM bool + cleanUpType CleanUpType + cleanUpCustomFn func() + chainOptionsFn []ChainOption + evmClientNetworkOption []EVMClientNetworkOption /* funding */ ETHFunds *big.Float @@ -162,6 +165,24 @@ func (b *CLTestEnvBuilder) WithCustomCleanup(customFn func()) *CLTestEnvBuilder return b } +type ChainOption = func(*evmcfg.Chain) *evmcfg.Chain + +func (b *CLTestEnvBuilder) WithChainOptions(opts ...ChainOption) *CLTestEnvBuilder { + b.chainOptionsFn = make([]ChainOption, 0, 0) + b.chainOptionsFn = append(b.chainOptionsFn, opts...) + + return b +} + +type EVMClientNetworkOption = func(*blockchain.EVMNetwork) *blockchain.EVMNetwork + +func (b *CLTestEnvBuilder) EVMClientNetworkOptions(opts ...EVMClientNetworkOption) *CLTestEnvBuilder { + b.evmClientNetworkOption = make([]EVMClientNetworkOption, 0, 0) + b.evmClientNetworkOption = append(b.evmClientNetworkOption, opts...) + + return b +} + func (b *CLTestEnvBuilder) Build() (*CLClusterTestEnv, error) { if b.te == nil { var err error @@ -245,10 +266,14 @@ func (b *CLTestEnvBuilder) Build() (*CLClusterTestEnv, error) { if err != nil { return nil, err } - } if !b.isNonEVM { + if b.evmClientNetworkOption != nil && len(b.evmClientNetworkOption) > 0 { + for _, fn := range b.evmClientNetworkOption { + fn(&networkConfig) + } + } bc, err := blockchain.NewEVMClientFromNetwork(networkConfig, b.l) if err != nil { return nil, err @@ -294,6 +319,14 @@ func (b *CLTestEnvBuilder) Build() (*CLClusterTestEnv, error) { } node.SetChainConfig(cfg, wsUrls, httpUrls, networkConfig, b.hasForwarders) + + if b.chainOptionsFn != nil && len(b.chainOptionsFn) > 0 { + for _, fn := range b.chainOptionsFn { + for _, evmCfg := range cfg.EVM { + fn(&evmCfg.Chain) + } + } + } } err := b.te.StartClCluster(cfg, b.clNodesCount, b.secretsConfig) diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 33beae119ae..127980a2cb9 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -6,6 +6,7 @@ go 1.21 replace github.com/smartcontractkit/chainlink/v2 => ../ require ( + cosmossdk.io/errors v1.0.0 github.com/K-Phoen/grabana v0.21.17 github.com/cli/go-gh/v2 v2.0.0 github.com/ethereum/go-ethereum v1.12.0 @@ -18,13 +19,15 @@ require ( github.com/pelletier/go-toml/v2 v2.1.0 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.30.0 + github.com/scylladb/go-reflectx v1.0.1 github.com/segmentio/ksuid v1.0.4 github.com/slack-go/slack v0.12.2 - github.com/smartcontractkit/chainlink-testing-framework v1.18.2 + github.com/smartcontractkit/chainlink-testing-framework v1.18.2-0.20231030212542-5fb562e774a5 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 github.com/smartcontractkit/ocr2keepers v0.7.27 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 + github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 github.com/smartcontractkit/wasp v0.3.0 github.com/spf13/cobra v1.6.1 @@ -49,7 +52,6 @@ require ( cosmossdk.io/api v0.3.1 // indirect cosmossdk.io/core v0.5.1 // indirect cosmossdk.io/depinject v1.0.0-alpha.3 // indirect - cosmossdk.io/errors v1.0.0 // indirect cosmossdk.io/math v1.0.1 // indirect dario.cat/mergo v1.0.0 // indirect filippo.io/edwards25519 v1.0.0 // indirect @@ -375,7 +377,6 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday v1.6.0 // indirect github.com/sasha-s/go-deadlock v0.3.1 // indirect - github.com/scylladb/go-reflectx v1.0.1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/sercand/kuberesolver v2.4.0+incompatible // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -387,7 +388,6 @@ require ( github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231101203911-c686b4d48672 // indirect github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect - github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb // indirect github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect github.com/smartcontractkit/wsrpc v0.7.2 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 7969e82144c..24da9467176 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2368,8 +2368,8 @@ github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab0 github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb/go.mod h1:/30flFG4L/iCYAFeA3DUzR0xuHSxAMONiWTzyzvsNwo= -github.com/smartcontractkit/chainlink-testing-framework v1.18.2 h1:Ac/wdRDF4L479wpFT3yqn6ujb6kFTn7aq8gj9giyFHM= -github.com/smartcontractkit/chainlink-testing-framework v1.18.2/go.mod h1:lMdEUTdSmzldCwqf+todFEyebE9Vlb23+5rvIHJBPOk= +github.com/smartcontractkit/chainlink-testing-framework v1.18.2-0.20231030212542-5fb562e774a5 h1:4hTf8pvtdtwoaeKFSEYjBZPvDbZ05WgiHsb0TPL6HqQ= +github.com/smartcontractkit/chainlink-testing-framework v1.18.2-0.20231030212542-5fb562e774a5/go.mod h1:lMdEUTdSmzldCwqf+todFEyebE9Vlb23+5rvIHJBPOk= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306 h1:ko88+ZznniNJZbZPWAvHQU8SwKAdHngdDZ+pvVgB5ss= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306/go.mod h1:w1sAEES3g3PuV/RzUrgow20W2uErMly84hhD3um1WL4= github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU= diff --git a/integration-tests/load/log_poller/config.toml b/integration-tests/load/log_poller/config.toml new file mode 100644 index 00000000000..2e328001943 --- /dev/null +++ b/integration-tests/load/log_poller/config.toml @@ -0,0 +1,22 @@ +[general] +generator = "looped" +contracts = 10 +events_per_tx = 10 + +[chaos] +experiment_count = 10 + +[looped] +[looped.contract] +execution_count = 300 + +[looped.fuzz] +min_emit_wait_time_ms = 100 +max_emit_wait_time_ms = 500 + +[wasp] +[wasp.load] +call_timeout = "3m" +rate_limit_unit_duration = "2s" +LPS = 30 +duration = "1m" \ No newline at end of file diff --git a/integration-tests/load/log_poller/log_poller_test.go b/integration-tests/load/log_poller/log_poller_test.go new file mode 100644 index 00000000000..ec67815832c --- /dev/null +++ b/integration-tests/load/log_poller/log_poller_test.go @@ -0,0 +1,24 @@ +package logpoller + +import ( + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + + lp_helpers "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" + "github.com/stretchr/testify/require" +) + +func TestLoadTestLogPoller(t *testing.T) { + cfg, err := lp_helpers.ReadConfig(lp_helpers.DefaultConfigFilename) + require.NoError(t, err) + + eventsToEmit := []abi.Event{} + for _, event := range lp_helpers.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + lp_helpers.ExecuteBasicLogPollerTest(t, cfg) +} diff --git a/integration-tests/reorg/log_poller_maybe_reorg_test.go b/integration-tests/reorg/log_poller_maybe_reorg_test.go new file mode 100644 index 00000000000..4e802bdb09c --- /dev/null +++ b/integration-tests/reorg/log_poller_maybe_reorg_test.go @@ -0,0 +1,42 @@ +package reorg + +import ( + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + logpoller "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" +) + +func TestLogPollerFromEnv(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 100, + UseFinalityTag: true, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 100, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 800, + MaxEmitWaitTimeMs: 1200, + }, + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + err := cfg.OverrideFromEnv() + if err != nil { + t.Errorf("failed to override config from env: %v", err) + t.FailNow() + } + + logpoller.ExecuteCILogPollerTest(t, &cfg) +} diff --git a/integration-tests/smoke/automation_test.go b/integration-tests/smoke/automation_test.go index 17373e6a95f..9e35b24df1e 100644 --- a/integration-tests/smoke/automation_test.go +++ b/integration-tests/smoke/automation_test.go @@ -11,9 +11,8 @@ import ( "testing" "time" - "github.com/kelseyhightower/envconfig" - "github.com/ethereum/go-ethereum/common" + "github.com/kelseyhightower/envconfig" "github.com/onsi/gomega" "github.com/stretchr/testify/require" diff --git a/integration-tests/smoke/log_poller_test.go b/integration-tests/smoke/log_poller_test.go new file mode 100644 index 00000000000..0df7817f1e3 --- /dev/null +++ b/integration-tests/smoke/log_poller_test.go @@ -0,0 +1,140 @@ +package smoke + +import ( + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + logpoller "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" +) + +// consistency test with no network disruptions with approximate emission of 1500-1600 logs per second for ~110-120 seconds +// 6 filters are registered +func TestLogPollerFewFilters(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 4, + UseFinalityTag: false, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 100, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 200, + MaxEmitWaitTimeMs: 500, + }, + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + logpoller.ExecuteBasicLogPollerTest(t, &cfg) +} + +// consistency test with no network disruptions with approximate emission of 1000-1100 logs per second for ~110-120 seconds +// 900 filters are registered +func TestLogManyFiltersPoller(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 300, + EventsPerTx: 3, + UseFinalityTag: false, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 30, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 200, + MaxEmitWaitTimeMs: 500, + }, + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + logpoller.ExecuteBasicLogPollerTest(t, &cfg) +} + +// consistency test that introduces random distruptions by pausing either Chainlink or Postgres containers for random interval of 5-20 seconds +// with approximate emission of 520-550 logs per second for ~110 seconds +// 6 filters are registered +func TestLogPollerWithChaos(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 100, + UseFinalityTag: false, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 100, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 200, + MaxEmitWaitTimeMs: 500, + }, + }, + ChaosConfig: &logpoller.ChaosConfig{ + ExperimentCount: 10, + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + logpoller.ExecuteBasicLogPollerTest(t, &cfg) +} + +// consistency test that registers filters after events were emitted and then triggers replay via API +// unfortunately there is no way to make sure that logs that are indexed are only picked up by replay +// and not by backup poller +// with approximate emission of 24 logs per second for ~110 seconds +// 6 filters are registered +func TestLogPollerReplay(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 4, + UseFinalityTag: false, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 100, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 200, + MaxEmitWaitTimeMs: 500, + }, + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + consistencyTimeout := "5m" + + logpoller.ExecuteLogPollerReplay(t, &cfg, consistencyTimeout) +} diff --git a/integration-tests/universal/log_poller/config.go b/integration-tests/universal/log_poller/config.go new file mode 100644 index 00000000000..623fa6606ed --- /dev/null +++ b/integration-tests/universal/log_poller/config.go @@ -0,0 +1,247 @@ +package logpoller + +import ( + "fmt" + "os" + "strconv" + + "cosmossdk.io/errors" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/pelletier/go-toml/v2" + "github.com/rs/zerolog/log" + "github.com/smartcontractkit/chainlink/v2/core/store/models" +) + +const ( + DefaultConfigFilename = "config.toml" + + ErrReadPerfConfig = "failed to read TOML config for performance tests" + ErrUnmarshalPerfConfig = "failed to unmarshal TOML config for performance tests" +) + +type GeneratorType = string + +const ( + GeneratorType_WASP = "wasp" + GeneratorType_Looped = "looped" +) + +type Config struct { + General *General `toml:"general"` + ChaosConfig *ChaosConfig `toml:"chaos"` + Wasp *WaspConfig `toml:"wasp"` + LoopedConfig *LoopedConfig `toml:"looped"` +} + +type LoopedConfig struct { + ContractConfig `toml:"contract"` + FuzzConfig `toml:"fuzz"` +} + +type ContractConfig struct { + ExecutionCount int `toml:"execution_count"` +} + +type FuzzConfig struct { + MinEmitWaitTimeMs int `toml:"min_emit_wait_time_ms"` + MaxEmitWaitTimeMs int `toml:"max_emit_wait_time_ms"` +} + +type General struct { + Generator string `toml:"generator"` + EventsToEmit []abi.Event `toml:"-"` + Contracts int `toml:"contracts"` + EventsPerTx int `toml:"events_per_tx"` + UseFinalityTag bool `toml:"use_finality_tag"` +} + +type ChaosConfig struct { + ExperimentCount int `toml:"experiment_count"` +} + +type WaspConfig struct { + Load *Load `toml:"load"` +} + +type Load struct { + RPS int64 `toml:"rps"` + LPS int64 `toml:"lps"` + RateLimitUnitDuration *models.Duration `toml:"rate_limit_unit_duration"` + Duration *models.Duration `toml:"duration"` + CallTimeout *models.Duration `toml:"call_timeout"` +} + +func ReadConfig(configName string) (*Config, error) { + var cfg *Config + d, err := os.ReadFile(configName) + if err != nil { + return nil, errors.Wrap(err, ErrReadPerfConfig) + } + err = toml.Unmarshal(d, &cfg) + if err != nil { + return nil, errors.Wrap(err, ErrUnmarshalPerfConfig) + } + + if err := cfg.validate(); err != nil { + return nil, err + } + + log.Debug().Interface("Config", cfg).Msg("Parsed config") + return cfg, nil +} + +func (c *Config) OverrideFromEnv() error { + if contr := os.Getenv("CONTRACTS"); contr != "" { + c.General.Contracts = mustParseInt(contr) + } + + if eventsPerTx := os.Getenv("EVENTS_PER_TX"); eventsPerTx != "" { + c.General.EventsPerTx = mustParseInt(eventsPerTx) + } + + if useFinalityTag := os.Getenv("USE_FINALITY_TAG"); useFinalityTag != "" { + c.General.UseFinalityTag = mustParseBool(useFinalityTag) + } + + if duration := os.Getenv("LOAD_DURATION"); duration != "" { + d, err := models.ParseDuration(duration) + if err != nil { + return err + } + + if c.General.Generator == GeneratorType_WASP { + c.Wasp.Load.Duration = &d + } else { + // make the looped generator approximately run for desired duration + // on average we will emit 1 event per second + c.LoopedConfig.FuzzConfig.MinEmitWaitTimeMs = 900 + c.LoopedConfig.FuzzConfig.MaxEmitWaitTimeMs = 1100 + c.LoopedConfig.ContractConfig.ExecutionCount = int(d.Duration().Seconds()) + } + } + + return nil +} + +func (c *Config) validate() error { + if c.General == nil { + return fmt.Errorf("General config is nil") + } + + err := c.General.validate() + if err != nil { + return fmt.Errorf("General config validation failed: %v", err) + } + + switch c.General.Generator { + case GeneratorType_WASP: + if c.Wasp == nil { + return fmt.Errorf("Wasp config is nil") + } + if c.Wasp.Load == nil { + return fmt.Errorf("Wasp load config is nil") + } + + err = c.Wasp.validate() + if err != nil { + return fmt.Errorf("Wasp config validation failed: %v", err) + } + case GeneratorType_Looped: + if c.LoopedConfig == nil { + return fmt.Errorf("Looped config is nil") + } + + err = c.LoopedConfig.validate() + if err != nil { + return fmt.Errorf("Looped config validation failed: %v", err) + } + default: + return fmt.Errorf("Unknown generator type: %s", c.General.Generator) + } + + return nil +} + +func (g *General) validate() error { + if g.Generator == "" { + return fmt.Errorf("Generator is empty") + } + + if g.Contracts == 0 { + return fmt.Errorf("Contracts is 0, but must be > 0") + } + + if g.EventsPerTx == 0 { + return fmt.Errorf("Events_per_tx is 0, but must be > 0") + } + + return nil +} + +func (w *WaspConfig) validate() error { + if w.Load == nil { + return fmt.Errorf("Load config is nil") + } + + err := w.Load.validate() + if err != nil { + return fmt.Errorf("Load config validation failed: %v", err) + } + + return nil +} + +func (l *Load) validate() error { + if l.RPS == 0 && l.LPS == 0 { + return fmt.Errorf("Either RPS or LPS needs to be set") + } + + if l.RPS != 0 && l.LPS != 0 { + return fmt.Errorf("Only one of RPS or LPS can be set") + } + + if l.Duration == nil { + return fmt.Errorf("duration is nil") + } + + if l.CallTimeout == nil { + return fmt.Errorf("call_timeout is nil") + } + if l.RateLimitUnitDuration == nil { + return fmt.Errorf("rate_limit_unit_duration is nil") + } + + return nil +} + +func (l *LoopedConfig) validate() error { + if l.ExecutionCount == 0 { + return fmt.Errorf("execution_count is 0, but must be > 0") + } + + if l.MinEmitWaitTimeMs == 0 { + return fmt.Errorf("min_emit_wait_time_ms is 0, but must be > 0") + } + + if l.MaxEmitWaitTimeMs == 0 { + return fmt.Errorf("max_emit_wait_time_ms is 0, but must be > 0") + } + + return nil +} + +func mustParseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + panic(err) + } + return i +} + +func mustParseBool(s string) bool { + b, err := strconv.ParseBool(s) + if err != nil { + panic(err) + } + return b +} diff --git a/integration-tests/universal/log_poller/gun.go b/integration-tests/universal/log_poller/gun.go new file mode 100644 index 00000000000..11932330a3b --- /dev/null +++ b/integration-tests/universal/log_poller/gun.go @@ -0,0 +1,78 @@ +package logpoller + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/rs/zerolog" + + "github.com/smartcontractkit/chainlink/integration-tests/contracts" + "github.com/smartcontractkit/wasp" +) + +/* LogEmitterGun is a gun that constantly emits logs from a contract */ +type LogEmitterGun struct { + contract *contracts.LogEmitter + eventsToEmit []abi.Event + logger zerolog.Logger + eventsPerTx int +} + +type Counter struct { + mu *sync.Mutex + value int +} + +func NewLogEmitterGun( + contract *contracts.LogEmitter, + eventsToEmit []abi.Event, + eventsPerTx int, + logger zerolog.Logger, +) *LogEmitterGun { + return &LogEmitterGun{ + contract: contract, + eventsToEmit: eventsToEmit, + eventsPerTx: eventsPerTx, + logger: logger, + } +} + +func (m *LogEmitterGun) Call(l *wasp.Generator) *wasp.CallResult { + localCounter := 0 + logEmitter := (*m.contract) + address := logEmitter.Address() + for _, event := range m.eventsToEmit { + m.logger.Debug().Str("Emitter address", address.String()).Str("Event type", event.Name).Msg("Emitting log from emitter") + var err error + switch event.Name { + case "Log1": + _, err = logEmitter.EmitLogInts(getIntSlice(m.eventsPerTx)) + case "Log2": + _, err = logEmitter.EmitLogIntsIndexed(getIntSlice(m.eventsPerTx)) + case "Log3": + _, err = logEmitter.EmitLogStrings(getStringSlice(m.eventsPerTx)) + default: + err = fmt.Errorf("Unknown event name: %s", event.Name) + } + + if err != nil { + return &wasp.CallResult{Error: err.Error(), Failed: true} + } + localCounter += 1 + } + + // I don't think that will work as expected, I should atomically read the value and save it, so maybe just a mutex? + if counter, ok := l.InputSharedData().(*Counter); ok { + counter.mu.Lock() + defer counter.mu.Unlock() + counter.value += localCounter + } else { + return &wasp.CallResult{ + Error: "SharedData did not contain a Counter", + Failed: true, + } + } + + return &wasp.CallResult{} +} diff --git a/integration-tests/universal/log_poller/helpers.go b/integration-tests/universal/log_poller/helpers.go new file mode 100644 index 00000000000..aa488eb1be5 --- /dev/null +++ b/integration-tests/universal/log_poller/helpers.go @@ -0,0 +1,1136 @@ +package logpoller + +import ( + "bytes" + "context" + "errors" + "fmt" + "math/big" + "math/rand" + "sort" + "strings" + "sync" + "testing" + "time" + + geth "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + geth_types "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog" + "github.com/smartcontractkit/chainlink-testing-framework/blockchain" + ctf_blockchain "github.com/smartcontractkit/chainlink-testing-framework/blockchain" + "github.com/smartcontractkit/chainlink-testing-framework/logging" + "github.com/smartcontractkit/chainlink-testing-framework/networks" + "github.com/smartcontractkit/chainlink/integration-tests/actions" + "github.com/smartcontractkit/chainlink/integration-tests/client" + "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" + "github.com/smartcontractkit/chainlink/integration-tests/types/config/node" + "github.com/smartcontractkit/wasp" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + cltypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" + "github.com/smartcontractkit/chainlink/v2/core/store/models" + + ctf_test_env "github.com/smartcontractkit/chainlink-testing-framework/docker/test_env" + "github.com/smartcontractkit/chainlink/integration-tests/contracts" + "github.com/smartcontractkit/chainlink/integration-tests/docker/test_env" + "github.com/stretchr/testify/require" + + "github.com/scylladb/go-reflectx" + it_utils "github.com/smartcontractkit/chainlink/integration-tests/utils" + utils2 "github.com/smartcontractkit/chainlink/integration-tests/utils" + evmcfg "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" + lpEvm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + le "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" + core_logger "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/sqlx" +) + +var ( + EmitterABI, _ = abi.JSON(strings.NewReader(le.LogEmitterABI)) + automationUtilsABI = cltypes.MustGetABI(automation_utils_2_1.AutomationUtilsABI) + bytes0 = [32]byte{ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + } // bytes representation of 0x0000000000000000000000000000000000000000000000000000000000000000 + +) + +var registerSingleTopicFilter = func(registry contracts.KeeperRegistry, upkeepID *big.Int, emitterAddress common.Address, topic common.Hash) error { + logTriggerConfigStruct := automation_utils_2_1.LogTriggerConfig{ + ContractAddress: emitterAddress, + FilterSelector: 0, + Topic0: topic, + Topic1: bytes0, + Topic2: bytes0, + Topic3: bytes0, + } + encodedLogTriggerConfig, err := automationUtilsABI.Methods["_logTriggerConfig"].Inputs.Pack(&logTriggerConfigStruct) + if err != nil { + return err + } + + err = registry.SetUpkeepTriggerConfig(upkeepID, encodedLogTriggerConfig) + if err != nil { + return err + } + + return nil +} + +// this is not really possible, log trigger doesn't support multiple topics, even if log poller does +var registerMultipleTopicsFilter = func(registry contracts.KeeperRegistry, upkeepID *big.Int, emitterAddress common.Address, topics []abi.Event) error { + if len(topics) > 4 { + return errors.New("Cannot register more than 4 topics") + } + + var getTopic = func(topics []abi.Event, i int) common.Hash { + if i > len(topics)-1 { + return bytes0 + } + + return topics[i].ID + } + + var getFilterSelector = func(topics []abi.Event) (uint8, error) { + switch len(topics) { + case 0: + return 0, errors.New("Cannot register filter with 0 topics") + case 1: + return 0, nil + case 2: + return 1, nil + case 3: + return 3, nil + case 4: + return 7, nil + default: + return 0, errors.New("Cannot register filter with more than 4 topics") + } + } + + filterSelector, err := getFilterSelector(topics) + if err != nil { + return err + } + + logTriggerConfigStruct := automation_utils_2_1.LogTriggerConfig{ + ContractAddress: emitterAddress, + FilterSelector: filterSelector, + Topic0: getTopic(topics, 0), + Topic1: getTopic(topics, 1), + Topic2: getTopic(topics, 2), + Topic3: getTopic(topics, 3), + } + encodedLogTriggerConfig, err := automationUtilsABI.Methods["_logTriggerConfig"].Inputs.Pack(&logTriggerConfigStruct) + if err != nil { + return err + } + + err = registry.SetUpkeepTriggerConfig(upkeepID, encodedLogTriggerConfig) + if err != nil { + return err + } + + return nil +} + +func NewOrm(logger core_logger.SugaredLogger, chainID *big.Int, postgresDb *ctf_test_env.PostgresDb) (*lpEvm.DbORM, *sqlx.DB, error) { + dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", "127.0.0.1", postgresDb.ExternalPort, postgresDb.User, postgresDb.Password, postgresDb.DbName) + db, err := sqlx.Open("postgres", dsn) + if err != nil { + return nil, db, err + } + + db.MapperFunc(reflectx.CamelToSnakeASCII) + return lpEvm.NewORM(chainID, db, logger, pg.NewQConfig(false)), db, nil +} + +type ExpectedFilter struct { + emitterAddress common.Address + topic common.Hash +} + +func getExpectedFilters(logEmitters []*contracts.LogEmitter, cfg *Config) []ExpectedFilter { + expectedFilters := make([]ExpectedFilter, 0) + for _, emitter := range logEmitters { + for _, event := range cfg.General.EventsToEmit { + expectedFilters = append(expectedFilters, ExpectedFilter{ + emitterAddress: (*emitter).Address(), + topic: event.ID, + }) + } + } + + return expectedFilters +} + +var nodeHasExpectedFilters = func(expectedFilters []ExpectedFilter, logger core_logger.SugaredLogger, chainID *big.Int, postgresDb *ctf_test_env.PostgresDb) (bool, error) { + orm, db, err := NewOrm(logger, chainID, postgresDb) + if err != nil { + return false, err + } + + defer db.Close() + knownFilters, err := orm.LoadFilters() + if err != nil { + return false, err + } + + for _, expectedFilter := range expectedFilters { + filterFound := false + for _, knownFilter := range knownFilters { + if bytes.Equal(expectedFilter.emitterAddress.Bytes(), knownFilter.Addresses[0].Bytes()) && bytes.Equal(expectedFilter.topic.Bytes(), knownFilter.EventSigs[0].Bytes()) { + filterFound = true + break + } + } + + if !filterFound { + return false, fmt.Errorf("No filter found for emitter %s and topic %s", expectedFilter.emitterAddress.String(), expectedFilter.topic.Hex()) + } + } + + return true, nil +} + +var randomWait = func(minMilliseconds, maxMilliseconds int) { + rand.New(rand.NewSource(time.Now().UnixNano())) + randomMilliseconds := rand.Intn(maxMilliseconds-minMilliseconds+1) + minMilliseconds + time.Sleep(time.Duration(randomMilliseconds) * time.Millisecond) +} + +type LogEmitterChannel struct { + logsEmitted int + err error + currentIndex int +} + +func getIntSlice(length int) []int { + result := make([]int, length) + for i := 0; i < length; i++ { + result[i] = i + } + + return result +} + +func getStringSlice(length int) []string { + result := make([]string, length) + for i := 0; i < length; i++ { + result[i] = "amazing event" + } + + return result +} + +var emitEvents = func(ctx context.Context, l zerolog.Logger, logEmitter *contracts.LogEmitter, cfg *Config, wg *sync.WaitGroup, results chan LogEmitterChannel) { + address := (*logEmitter).Address().String() + localCounter := 0 + select { + case <-ctx.Done(): + l.Warn().Str("Emitter address", address).Msg("Context cancelled, not emitting events") + return + default: + defer wg.Done() + for i := 0; i < cfg.LoopedConfig.ExecutionCount; i++ { + for _, event := range cfg.General.EventsToEmit { + l.Debug().Str("Emitter address", address).Str("Event type", event.Name).Str("index", fmt.Sprintf("%d/%d", (i+1), cfg.LoopedConfig.ExecutionCount)).Msg("Emitting log from emitter") + var err error + switch event.Name { + case "Log1": + _, err = (*logEmitter).EmitLogInts(getIntSlice(cfg.General.EventsPerTx)) + case "Log2": + _, err = (*logEmitter).EmitLogIntsIndexed(getIntSlice(cfg.General.EventsPerTx)) + case "Log3": + _, err = (*logEmitter).EmitLogStrings(getStringSlice(cfg.General.EventsPerTx)) + default: + err = fmt.Errorf("Unknown event name: %s", event.Name) + } + + if err != nil { + results <- LogEmitterChannel{ + logsEmitted: 0, + err: err, + } + return + } + localCounter += cfg.General.EventsPerTx + + randomWait(cfg.LoopedConfig.FuzzConfig.MinEmitWaitTimeMs, cfg.LoopedConfig.FuzzConfig.MaxEmitWaitTimeMs) + } + + if (i+1)%10 == 0 { + l.Info().Str("Emitter address", address).Str("Index", fmt.Sprintf("%d/%d", i+1, cfg.LoopedConfig.ExecutionCount)).Msg("Emitted all three events") + } + } + + l.Info().Str("Emitter address", address).Int("Total logs emitted", localCounter).Msg("Finished emitting events") + + results <- LogEmitterChannel{ + logsEmitted: localCounter, + err: nil, + } + } +} + +var waitForEndBlockInLogPoller = func(endBlock int64, chainID *big.Int, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { + for i := 1; i < len(nodes.Nodes); i++ { + clNode := nodes.Nodes[i] + orm, db, err := NewOrm(coreLogger, chainID, clNode.PostgresDb) + if err != nil { + return false, err + } + + defer db.Close() + block, err := orm.SelectBlockByNumber(endBlock) + if err != nil { + return false, err + } + + if block == nil { + return false, nil + } + } + + return true, nil +} + +var chainHasFinalisedEndBlock = func(l zerolog.Logger, evmClient ctf_blockchain.EVMClient, endBlock int64) (bool, error) { + effectiveEndBlock := endBlock + 1 + lastFinalisedBlockHeader, err := evmClient.GetLatestFinalizedBlockHeader(context.Background()) + if err != nil { + return false, err + } + + l.Info().Int64("Last finalised block header", lastFinalisedBlockHeader.Number.Int64()).Int64("End block", effectiveEndBlock).Int64("Blocks left till end block", effectiveEndBlock-lastFinalisedBlockHeader.Number.Int64()).Msg("Waiting for the finalized block to move beyond end block") + + return lastFinalisedBlockHeader.Number.Int64() > effectiveEndBlock, nil +} + +var logPollerHasFinalisedEndBlock = func(endBlock int64, chainID *big.Int, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { + wg := &sync.WaitGroup{} + + type boolQueryResult struct { + nodeName string + hasFinalised bool + err error + } + + endBlockCh := make(chan boolQueryResult, len(nodes.Nodes)-1) + ctx, cancelFn := context.WithCancel(context.Background()) + + for i := 1; i < len(nodes.Nodes); i++ { + wg.Add(1) + + go func(clNode *test_env.ClNode, r chan boolQueryResult) { + defer wg.Done() + select { + case <-ctx.Done(): + return + default: + orm, db, err := NewOrm(coreLogger, chainID, clNode.PostgresDb) + if err != nil { + r <- boolQueryResult{ + nodeName: clNode.ContainerName, + hasFinalised: false, + err: err, + } + } + + defer db.Close() + + latestBlock, err := orm.SelectLatestBlock() + if err != nil { + r <- boolQueryResult{ + nodeName: clNode.ContainerName, + hasFinalised: false, + err: err, + } + } + + r <- boolQueryResult{ + nodeName: clNode.ContainerName, + hasFinalised: latestBlock.FinalizedBlockNumber > endBlock, + err: nil, + } + + } + }(nodes.Nodes[i], endBlockCh) + } + + var err error + allFinalisedCh := make(chan bool, 1) + + go func() { + foundMap := make(map[string]bool, 0) + for r := range endBlockCh { + if r.err != nil { + err = r.err + cancelFn() + return + } + + foundMap[r.nodeName] = r.hasFinalised + if r.hasFinalised { + l.Info().Str("Node name", r.nodeName).Msg("CL node has finalised end block") + } else { + l.Warn().Str("Node name", r.nodeName).Msg("CL node has not finalised end block yet") + } + + if len(foundMap) == len(nodes.Nodes)-1 { + allFinalised := true + for _, v := range foundMap { + if !v { + allFinalised = false + break + } + } + + allFinalisedCh <- allFinalised + return + } + } + }() + + wg.Wait() + close(endBlockCh) + + return <-allFinalisedCh, err +} + +var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big.Int, expectedLogCount int, expectedFilters []ExpectedFilter, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { + wg := &sync.WaitGroup{} + + type logQueryResult struct { + nodeName string + logCount int + hasExpectedCount bool + err error + } + + queryCh := make(chan logQueryResult, len(nodes.Nodes)-1) + ctx, cancelFn := context.WithCancel(context.Background()) + + for i := 1; i < len(nodes.Nodes); i++ { + wg.Add(1) + + go func(clNode *test_env.ClNode, r chan logQueryResult) { + defer wg.Done() + select { + case <-ctx.Done(): + return + default: + orm, db, err := NewOrm(coreLogger, chainID, clNode.PostgresDb) + if err != nil { + r <- logQueryResult{ + nodeName: clNode.ContainerName, + logCount: 0, + hasExpectedCount: false, + err: err, + } + } + + defer db.Close() + foundLogsCount := 0 + + for _, filter := range expectedFilters { + logs, err := orm.SelectLogs(startBlock, endBlock, filter.emitterAddress, filter.topic) + if err != nil { + r <- logQueryResult{ + nodeName: clNode.ContainerName, + logCount: 0, + hasExpectedCount: false, + err: err, + } + } + + foundLogsCount += len(logs) + } + + r <- logQueryResult{ + nodeName: clNode.ContainerName, + logCount: foundLogsCount, + hasExpectedCount: foundLogsCount >= expectedLogCount, + err: err, + } + } + }(nodes.Nodes[i], queryCh) + } + + var err error + allFoundCh := make(chan bool, 1) + + go func() { + foundMap := make(map[string]bool, 0) + for r := range queryCh { + if r.err != nil { + err = r.err + cancelFn() + return + } + + foundMap[r.nodeName] = r.hasExpectedCount + if r.hasExpectedCount { + l.Info().Str("Node name", r.nodeName).Int("Logs count", r.logCount).Msg("Expected log count found in CL node") + } else { + l.Warn().Str("Node name", r.nodeName).Str("Found/Expected logs", fmt.Sprintf("%d/%d", r.logCount, expectedLogCount)).Int("Missing logs", expectedLogCount-r.logCount).Msg("Too low log count found in CL node") + } + + if len(foundMap) == len(nodes.Nodes)-1 { + allFound := true + for _, v := range foundMap { + if !v { + allFound = false + break + } + } + + allFoundCh <- allFound + return + } + } + }() + + wg.Wait() + close(queryCh) + + return <-allFoundCh, err +} + +type MissingLogs map[string][]geth_types.Log + +func (m *MissingLogs) IsEmpty() bool { + for _, v := range *m { + if len(v) > 0 { + return false + } + } + + return true +} + +var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient ctf_blockchain.EVMClient, clnodeCluster *test_env.ClCluster, l zerolog.Logger, coreLogger core_logger.SugaredLogger, cfg *Config) (MissingLogs, error) { + wg := &sync.WaitGroup{} + + type dbQueryResult struct { + err error + nodeName string + logs []logpoller.Log + } + + ctx, cancelFn := context.WithCancel(context.Background()) + resultCh := make(chan dbQueryResult, len(clnodeCluster.Nodes)-1) + + for i := 1; i < len(clnodeCluster.Nodes); i++ { + wg.Add(1) + + go func(ctx context.Context, i int, r chan dbQueryResult) { + defer wg.Done() + select { + case <-ctx.Done(): + l.Warn().Msg("Context cancelled. Terminating fetching logs from log poller's DB") + return + default: + nodeName := clnodeCluster.Nodes[i].ContainerName + + l.Info().Str("Node name", nodeName).Msg("Fetching log poller logs") + orm, db, err := NewOrm(coreLogger, evmClient.GetChainID(), clnodeCluster.Nodes[i].PostgresDb) + if err != nil { + r <- dbQueryResult{ + err: err, + nodeName: nodeName, + logs: []logpoller.Log{}, + } + } + + defer db.Close() + logs := make([]logpoller.Log, 0) + + for j := 0; j < len(logEmitters); j++ { + address := (*logEmitters[j]).Address() + + for _, event := range cfg.General.EventsToEmit { + l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Msg("Fetching single emitter's logs") + result, err := orm.SelectLogs(startBlock, endBlock, address, event.ID) + if err != nil { + r <- dbQueryResult{ + err: err, + nodeName: nodeName, + logs: []logpoller.Log{}, + } + } + + sort.Slice(result, func(i, j int) bool { + return result[i].BlockNumber < result[j].BlockNumber + }) + + logs = append(logs, result...) + + l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Int("Log count", len(result)).Msg("Logs found per node") + } + } + + l.Warn().Int("Count", len(logs)).Str("Node name", nodeName).Msg("Fetched log poller logs") + + r <- dbQueryResult{ + err: nil, + nodeName: nodeName, + logs: logs, + } + } + }(ctx, i, resultCh) + } + + allLogPollerLogs := make(map[string][]logpoller.Log, 0) + missingLogs := map[string][]geth_types.Log{} + var dbError error + + go func() { + for r := range resultCh { + if r.err != nil { + l.Err(r.err).Str("Node name", r.nodeName).Msg("Error fetching logs from log poller's DB") + dbError = r.err + cancelFn() + return + } + // use channel for aggregation and then for := range over it after closing resultCh? + allLogPollerLogs[r.nodeName] = r.logs + } + }() + + wg.Wait() + close(resultCh) + + if dbError != nil { + return nil, dbError + } + + allLogsInEVMNode, err := getEVMLogs(startBlock, endBlock, logEmitters, evmClient, l, cfg) + if err != nil { + return nil, err + } + + wg = &sync.WaitGroup{} + + type missingLogResult struct { + nodeName string + logs []geth_types.Log + } + + l.Info().Msg("Started comparison of logs from EVM node and CL nodes. This may take a while if there's a lot of logs") + missingCh := make(chan missingLogResult, len(clnodeCluster.Nodes)-1) + evmLogCount := len(allLogsInEVMNode) + for i := 1; i < len(clnodeCluster.Nodes); i++ { + wg.Add(1) + + go func(i int, result chan missingLogResult) { + defer wg.Done() + nodeName := clnodeCluster.Nodes[i].ContainerName + l.Info().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("0/%d", evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") + + missingLogs := make([]geth_types.Log, 0) + for i, evmLog := range allLogsInEVMNode { + logFound := false + for _, logPollerLog := range allLogPollerLogs[nodeName] { + if logPollerLog.BlockNumber == int64(evmLog.BlockNumber) && logPollerLog.TxHash == evmLog.TxHash && bytes.Equal(logPollerLog.Data, evmLog.Data) && logPollerLog.LogIndex == int64(evmLog.Index) && + logPollerLog.Address == evmLog.Address && logPollerLog.BlockHash == evmLog.BlockHash && bytes.Equal(logPollerLog.Topics[0][:], evmLog.Topics[0].Bytes()) { + logFound = true + continue + } + } + + if i%10000 == 0 && i != 0 { + l.Info().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("%d/%d", i, evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") + } + + if !logFound { + missingLogs = append(missingLogs, evmLog) + } + } + + if len(missingLogs) > 0 { + l.Warn().Int("Count", len(missingLogs)).Str("Node name", nodeName).Msg("Some EMV logs were missing from CL node") + } else { + l.Info().Str("Node name", nodeName).Msg("All EVM logs were found in CL node") + } + + result <- missingLogResult{ + nodeName: nodeName, + logs: missingLogs, + } + }(i, missingCh) + } + + wg.Wait() + close(missingCh) + + for v := range missingCh { + if len(v.logs) > 0 { + missingLogs[v.nodeName] = v.logs + } + } + + expectedTotalLogsEmitted := getExpectedLogCount(cfg) + if int64(len(allLogsInEVMNode)) != expectedTotalLogsEmitted { + l.Warn().Str("Actual/Expected", fmt.Sprintf("%d/%d", expectedTotalLogsEmitted, len(allLogsInEVMNode))).Msg("Some of the test logs were not found in EVM node. This is a bug in the test") + } + + return missingLogs, nil +} + +var printMissingLogsByType = func(missingLogs map[string][]geth_types.Log, l zerolog.Logger, cfg *Config) { + var findHumanName = func(topic common.Hash) string { + for _, event := range cfg.General.EventsToEmit { + if event.ID == topic { + return event.Name + } + } + + return "Unknown event" + } + + missingByType := make(map[string]int) + for _, logs := range missingLogs { + for _, v := range logs { + humanName := findHumanName(v.Topics[0]) + if _, ok := missingByType[humanName]; ok { + missingByType[humanName] += 1 + } else { + missingByType[humanName] = 1 + } + } + } + + for k, v := range missingByType { + l.Warn().Str("Event name", k).Int("Missing count", v).Msg("Missing logs by type") + } +} + +var getEVMLogs = func(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient ctf_blockchain.EVMClient, l zerolog.Logger, cfg *Config) ([]geth_types.Log, error) { + allLogsInEVMNode := make([]geth_types.Log, 0) + for j := 0; j < len(logEmitters); j++ { + address := (*logEmitters[j]).Address() + for _, event := range cfg.General.EventsToEmit { + l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Msg("Fetching logs from EVM node") + logsInEVMNode, err := evmClient.FilterLogs(context.Background(), geth.FilterQuery{ + Addresses: []common.Address{(address)}, + Topics: [][]common.Hash{{event.ID}}, + FromBlock: big.NewInt(startBlock), + ToBlock: big.NewInt(endBlock), + }) + if err != nil { + return nil, err + } + + sort.Slice(logsInEVMNode, func(i, j int) bool { + return logsInEVMNode[i].BlockNumber < logsInEVMNode[j].BlockNumber + }) + + allLogsInEVMNode = append(allLogsInEVMNode, logsInEVMNode...) + l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Int("Log count", len(logsInEVMNode)).Msg("Logs found in EVM node") + } + } + + l.Warn().Int("Count", len(allLogsInEVMNode)).Msg("Logs in EVM node") + + return allLogsInEVMNode, nil +} + +func executeGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { + if cfg.General.Generator == GeneratorType_WASP { + return runWaspGenerator(t, cfg, logEmitters) + } + + return runLoopedGenerator(t, cfg, logEmitters) +} + +func runWaspGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { + l := logging.GetTestLogger(t) + + var RPSprime int64 + + // if LPS is set, we need to calculate based on countract count and events per transaction + if cfg.Wasp.Load.LPS > 0 { + RPSprime = cfg.Wasp.Load.LPS / int64(cfg.General.Contracts) / int64(cfg.General.EventsPerTx) / int64(len(cfg.General.EventsToEmit)) + + if RPSprime < 1 { + return 0, fmt.Errorf("Invalid load configuration, effective RPS would have been zero. Adjust LPS, contracts count, events per tx or events to emit") + } + } + + // if RPS is set simply split it between contracts + if cfg.Wasp.Load.RPS > 0 { + RPSprime = cfg.Wasp.Load.RPS / int64(cfg.General.Contracts) + } + + counter := &Counter{ + mu: &sync.Mutex{}, + value: 0, + } + + p := wasp.NewProfile() + + for _, logEmitter := range logEmitters { + g, err := wasp.NewGenerator(&wasp.Config{ + T: t, + LoadType: wasp.RPS, + GenName: fmt.Sprintf("log_poller_gen_%s", (*logEmitter).Address().String()), + RateLimitUnitDuration: cfg.Wasp.Load.RateLimitUnitDuration.Duration(), + CallTimeout: cfg.Wasp.Load.CallTimeout.Duration(), + Schedule: wasp.Plain( + RPSprime, + cfg.Wasp.Load.Duration.Duration(), + ), + Gun: NewLogEmitterGun( + logEmitter, + cfg.General.EventsToEmit, + cfg.General.EventsPerTx, + l, + ), + SharedData: counter, + }) + p.Add(g, err) + } + + _, err := p.Run(true) + + if err != nil { + return 0, err + } + + return counter.value, nil +} + +func runLoopedGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { + l := logging.GetTestLogger(t) + + // Start emitting events in parallel, each contract is emitting events in a separate goroutine + // We will stop as soon as we encounter an error + wg := &sync.WaitGroup{} + emitterCh := make(chan LogEmitterChannel, len(logEmitters)) + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + for i := 0; i < len(logEmitters); i++ { + wg.Add(1) + go emitEvents(ctx, l, logEmitters[i], cfg, wg, emitterCh) + } + + var emitErr error + total := 0 + + aggrChan := make(chan int, len(logEmitters)) + + go func() { + for emitter := range emitterCh { + if emitter.err != nil { + emitErr = emitter.err + cancelFn() + return + } + aggrChan <- emitter.logsEmitted + } + }() + + wg.Wait() + close(emitterCh) + + for i := 0; i < len(logEmitters); i++ { + total += <-aggrChan + } + + if emitErr != nil { + return 0, emitErr + } + + return int(total), nil +} + +func getExpectedLogCount(cfg *Config) int64 { + if cfg.General.Generator == GeneratorType_WASP { + if cfg.Wasp.Load.RPS != 0 { + return cfg.Wasp.Load.RPS * int64(cfg.Wasp.Load.Duration.Duration().Seconds()) * int64(cfg.General.EventsPerTx) + } else { + return cfg.Wasp.Load.LPS * int64(cfg.Wasp.Load.Duration.Duration().Seconds()) + } + } + + return int64(len(cfg.General.EventsToEmit) * cfg.LoopedConfig.ExecutionCount * cfg.General.Contracts * cfg.General.EventsPerTx) +} + +var chaosPauseSyncFn = func(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv) error { + rand.New(rand.NewSource(time.Now().UnixNano())) + randomBool := rand.Intn(2) == 0 + + randomNode := testEnv.ClCluster.Nodes[rand.Intn(len(testEnv.ClCluster.Nodes)-1)+1] + var component ctf_test_env.EnvComponent + + if randomBool { + component = randomNode.EnvComponent + } else { + component = randomNode.PostgresDb.EnvComponent + } + + pauseTimeSec := rand.Intn(20-5) + 5 + l.Info().Str("Container", component.ContainerName).Int("Pause time", pauseTimeSec).Msg("Pausing component") + pauseTimeDur := time.Duration(pauseTimeSec) * time.Second + err := component.ChaosPause(l, pauseTimeDur) + l.Info().Str("Container", component.ContainerName).Msg("Component unpaused") + + if err != nil { + return err + } + + return nil +} + +var executeChaosExperiment = func(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv, cfg *Config, errorCh chan error) { + if cfg.ChaosConfig == nil || cfg.ChaosConfig.ExperimentCount == 0 { + errorCh <- nil + return + } + + chaosChan := make(chan error, cfg.ChaosConfig.ExperimentCount) + + wg := &sync.WaitGroup{} + + go func() { + // if we wanted to have more than 1 container paused, we'd need to make sure we aren't trying to pause an already paused one + guardChan := make(chan struct{}, 1) + + for i := 0; i < cfg.ChaosConfig.ExperimentCount; i++ { + wg.Add(1) + guardChan <- struct{}{} + go func() { + defer func() { + <-guardChan + wg.Done() + l.Info().Str("Current/Total", fmt.Sprintf("%d/%d", i, cfg.ChaosConfig.ExperimentCount)).Msg("Done with experiment") + }() + chaosChan <- chaosPauseSyncFn(l, testEnv) + }() + } + + wg.Wait() + + close(chaosChan) + }() + + go func() { + for { + select { + case err, ok := <-chaosChan: + if !ok { + l.Info().Msg("All chaos experiments finished") + errorCh <- nil + return + } else { + if err != nil { + l.Err(err).Msg("Error encountered during chaos experiment") + errorCh <- err + return + } + } + } + } + }() +} + +var GetFinalityDepth = func(chainId int64) (int64, error) { + var finalityDepth int64 + switch chainId { + // Ethereum Sepolia + case 11155111: + finalityDepth = 50 + // Polygon Mumbai + case 80001: + finalityDepth = 500 + // Simulated network + case 1337: + finalityDepth = 10 + default: + return 0, fmt.Errorf("No known finality depth for chain %d", chainId) + } + + return finalityDepth, nil +} + +var GetEndBlockToWaitFor = func(endBlock, chainId int64, cfg *Config) (int64, error) { + if cfg.General.UseFinalityTag { + return endBlock + 1, nil + } + + finalityDepth, err := GetFinalityDepth(chainId) + if err != nil { + return 0, err + } + + return endBlock + finalityDepth, nil +} + +const ( + automationDefaultUpkeepGasLimit = uint32(2500000) + automationDefaultLinkFunds = int64(9e18) + automationDefaultUpkeepsToDeploy = 10 + automationExpectedData = "abcdef" + defaultAmountOfUpkeeps = 2 +) + +var ( + defaultOCRRegistryConfig = contracts.KeeperRegistrySettings{ + PaymentPremiumPPB: uint32(200000000), + FlatFeeMicroLINK: uint32(0), + BlockCountPerTurn: big.NewInt(10), + CheckGasLimit: uint32(2500000), + StalenessSeconds: big.NewInt(90000), + GasCeilingMultiplier: uint16(1), + MinUpkeepSpend: big.NewInt(0), + MaxPerformGas: uint32(5000000), + FallbackGasPrice: big.NewInt(2e11), + FallbackLinkPrice: big.NewInt(2e18), + MaxCheckDataSize: uint32(5000), + MaxPerformDataSize: uint32(5000), + } + + automationDefaultRegistryConfig = contracts.KeeperRegistrySettings{ + PaymentPremiumPPB: uint32(200000000), + FlatFeeMicroLINK: uint32(0), + BlockCountPerTurn: big.NewInt(10), + CheckGasLimit: uint32(2500000), + StalenessSeconds: big.NewInt(90000), + GasCeilingMultiplier: uint16(1), + MinUpkeepSpend: big.NewInt(0), + MaxPerformGas: uint32(5000000), + FallbackGasPrice: big.NewInt(2e11), + FallbackLinkPrice: big.NewInt(2e18), + MaxCheckDataSize: uint32(5000), + MaxPerformDataSize: uint32(5000), + } +) + +func setupLogPollerTestDocker( + t *testing.T, + testName string, + registryVersion ethereum.KeeperRegistryVersion, + registryConfig contracts.KeeperRegistrySettings, + upkeepsNeeded int, + lpPollingInterval time.Duration, + finalityTagEnabled bool, +) ( + blockchain.EVMClient, + []*client.ChainlinkClient, + contracts.ContractDeployer, + contracts.LinkToken, + contracts.KeeperRegistry, + contracts.KeeperRegistrar, + *test_env.CLClusterTestEnv, +) { + l := logging.GetTestLogger(t) + // Add registry version to config + registryConfig.RegistryVersion = registryVersion + network := networks.MustGetSelectedNetworksFromEnv()[0] + + finalityDepth, err := GetFinalityDepth(network.ChainID) + require.NoError(t, err, "Error getting finality depth") + + // build the node config + clNodeConfig := node.NewConfig(node.NewBaseConfig()) + syncInterval := models.MustMakeDuration(5 * time.Minute) + clNodeConfig.Feature.LogPoller = it_utils.Ptr[bool](true) + clNodeConfig.OCR2.Enabled = it_utils.Ptr[bool](true) + clNodeConfig.Keeper.TurnLookBack = it_utils.Ptr[int64](int64(0)) + clNodeConfig.Keeper.Registry.SyncInterval = &syncInterval + clNodeConfig.Keeper.Registry.PerformGasOverhead = it_utils.Ptr[uint32](uint32(150000)) + clNodeConfig.P2P.V2.Enabled = it_utils.Ptr[bool](true) + clNodeConfig.P2P.V2.AnnounceAddresses = &[]string{"0.0.0.0:6690"} + clNodeConfig.P2P.V2.ListenAddresses = &[]string{"0.0.0.0:6690"} + + //launch the environment + var env *test_env.CLClusterTestEnv + chainlinkNodeFunding := 0.5 + l.Debug().Msgf("Funding amount: %f", chainlinkNodeFunding) + clNodesCount := 5 + + var logPolllerSettingsFn = func(chain *evmcfg.Chain) *evmcfg.Chain { + chain.LogPollInterval = models.MustNewDuration(lpPollingInterval) + chain.FinalityDepth = utils2.Ptr[uint32](uint32(finalityDepth)) + chain.FinalityTagEnabled = utils2.Ptr[bool](finalityTagEnabled) + return chain + } + + var evmClientSettingsFn = func(network *blockchain.EVMNetwork) *blockchain.EVMNetwork { + network.FinalityDepth = uint64(finalityDepth) + network.FinalityTag = finalityTagEnabled + return network + } + + env, err = test_env.NewCLTestEnvBuilder(). + WithTestLogger(t). + WithGeth(). + WithCLNodes(clNodesCount). + WithCLNodeConfig(clNodeConfig). + WithFunding(big.NewFloat(chainlinkNodeFunding)). + WithChainOptions(logPolllerSettingsFn). + EVMClientNetworkOptions(evmClientSettingsFn). + WithStandardCleanup(). + Build() + require.NoError(t, err, "Error deploying test environment") + + env.ParallelTransactions(true) + nodeClients := env.ClCluster.NodeAPIs() + workerNodes := nodeClients[1:] + + var linkToken contracts.LinkToken + + switch network.ChainID { + // Simulated + case 1337: + linkToken, err = env.ContractDeployer.DeployLinkTokenContract() + // Ethereum Sepolia + case 11155111: + linkToken, err = env.ContractLoader.LoadLINKToken("0x779877A7B0D9E8603169DdbD7836e478b4624789") + // Polygon Mumbai + case 80001: + linkToken, err = env.ContractLoader.LoadLINKToken("0x326C977E6efc84E512bB9C30f76E30c160eD06FB") + default: + panic("Not implemented") + } + require.NoError(t, err, "Error loading/deploying LINK token") + + linkBalance, err := env.EVMClient.BalanceAt(context.Background(), common.HexToAddress(linkToken.Address())) + require.NoError(t, err, "Error getting LINK balance") + + l.Info().Str("Balance", big.NewInt(0).Div(linkBalance, big.NewInt(1e18)).String()).Msg("LINK balance") + minLinkBalanceSingleNode := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(9)) + minLinkBalance := big.NewInt(0).Mul(minLinkBalanceSingleNode, big.NewInt(int64(upkeepsNeeded))) + if minLinkBalance.Cmp(linkBalance) < 0 { + require.FailNowf(t, "Not enough LINK", "Not enough LINK to run the test. Need at least %s", big.NewInt(0).Div(minLinkBalance, big.NewInt(1e18)).String()) + } + + registry, registrar := actions.DeployAutoOCRRegistryAndRegistrar( + t, + registryVersion, + registryConfig, + linkToken, + env.ContractDeployer, + env.EVMClient, + ) + + // Fund the registry with LINK + err = linkToken.Transfer(registry.Address(), big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(int64(defaultAmountOfUpkeeps)))) + require.NoError(t, err, "Funding keeper registry contract shouldn't fail") + + err = actions.CreateOCRKeeperJobsLocal(l, nodeClients, registry.Address(), network.ChainID, 0, registryVersion) + require.NoError(t, err, "Error creating OCR Keeper Jobs") + ocrConfig, err := actions.BuildAutoOCR2ConfigVarsLocal(l, workerNodes, registryConfig, registrar.Address(), 30*time.Second, registry.RegistryOwnerAddress()) + require.NoError(t, err, "Error building OCR config vars") + err = registry.SetConfig(automationDefaultRegistryConfig, ocrConfig) + require.NoError(t, err, "Registry config should be set successfully") + require.NoError(t, env.EVMClient.WaitForEvents(), "Waiting for config to be set") + + return env.EVMClient, nodeClients, env.ContractDeployer, linkToken, registry, registrar, env +} diff --git a/integration-tests/universal/log_poller/scenarios.go b/integration-tests/universal/log_poller/scenarios.go new file mode 100644 index 00000000000..d14a3bcb2a7 --- /dev/null +++ b/integration-tests/universal/log_poller/scenarios.go @@ -0,0 +1,498 @@ +package logpoller + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/onsi/gomega" + "github.com/smartcontractkit/chainlink-testing-framework/logging" + "github.com/smartcontractkit/chainlink/integration-tests/actions" + "github.com/smartcontractkit/chainlink/integration-tests/contracts" + "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" + core_logger "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/stretchr/testify/require" +) + +func ExecuteBasicLogPollerTest(t *testing.T, cfg *Config) { + l := logging.GetTestLogger(t) + coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ + + if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { + l.Warn().Msg("No events to emit specified, using all events from log emitter contract") + for _, event := range EmitterABI.Events { + cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) + } + } + + l.Info().Msg("Starting basic log poller test") + + var ( + err error + testName = "basic-log-poller" + upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) + ) + + chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( + t, testName, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(500*time.Millisecond), cfg.General.UseFinalityTag, + ) + + _, upkeepIDs := actions.DeployConsumers( + t, + registry, + registrar, + linkToken, + contractDeployer, + chainClient, + upKeepsNeeded, + big.NewInt(automationDefaultLinkFunds), + automationDefaultUpkeepGasLimit, + true, + false, + ) + + // Deploy Log Emitter contracts + logEmitters := make([]*contracts.LogEmitter, 0) + for i := 0; i < cfg.General.Contracts; i++ { + logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() + logEmitters = append(logEmitters, &logEmitter) + require.NoError(t, err, "Error deploying log emitter contract") + l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") + time.Sleep(200 * time.Millisecond) + } + + // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) + // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) + for i := 0; i < len(upkeepIDs); i++ { + emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() + upkeepID := upkeepIDs[i] + topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID + + l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") + err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) + randomWait(50, 200) + require.NoError(t, err, "Error registering log trigger for log emitter") + } + + err = chainClient.WaitForEvents() + require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") + + // Make sure that all nodes have expected filters registered before starting to emit events + expectedFilters := getExpectedFilters(logEmitters, cfg) + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") + + hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) + if err != nil { + l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") + return + } + + g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") + } + }, "30s", "1s").Should(gomega.Succeed()) + l.Info().Msg("All nodes have expected filters registered") + l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") + + // Save block number before starting to emit events, so that we can later use it when querying logs + sb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + startBlock := int64(sb) + + l.Info().Msg("STARTING EVENT EMISSION") + startTime := time.Now() + + // Start chaos experimnents by randomly pausing random containers (Chainlink nodes or their DBs) + chaosDoneCh := make(chan error, 1) + go func() { + executeChaosExperiment(l, testEnv, cfg, chaosDoneCh) + }() + + totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) + endTime := time.Now() + require.NoError(t, err, "Error executing event generator") + + expectedLogsEmitted := getExpectedLogCount(cfg) + duration := int(endTime.Sub(startTime).Seconds()) + l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") + + // Save block number after finishing to emit events, so that we can later use it when querying logs + eb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + + endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) + require.NoError(t, err, "Error getting end block to wait for") + + l.Info().Msg("Waiting before proceeding with test until all chaos experiments finish") + chaosError := <-chaosDoneCh + require.NoError(t, chaosError, "Error encountered during chaos experiment") + + // Wait until last block in which events were emitted has been finalised + // how long should we wait here until all logs are processed? wait for block X to be processed by all nodes? + waitDuration := "15m" + l.Warn().Str("Duration", waitDuration).Msg("Waiting for logs to be processed by all nodes and for chain to advance beyond finality") + + gom.Eventually(func(g gomega.Gomega) { + hasAdvanced, err := chainHasFinalisedEndBlock(l, testEnv.EVMClient, endBlock) + if err != nil { + l.Warn().Err(err).Msg("Error checking if chain has advanced beyond finality. Retrying...") + } + g.Expect(hasAdvanced).To(gomega.BeTrue(), "Chain has not advanced beyond finality") + }, waitDuration, "30s").Should(gomega.Succeed()) + + l.Warn().Str("Duration", "1m").Msg("Waiting for all CL nodes to have end block finalised") + gom.Eventually(func(g gomega.Gomega) { + hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") + } + g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") + }, "1m", "30s").Should(gomega.Succeed()) + + gom.Eventually(func(g gomega.Gomega) { + logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), totalLogsEmitted, expectedFilters, l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if CL nodes have expected log count. Retrying...") + } + g.Expect(logCountMatches).To(gomega.BeTrue(), "Not all CL nodes have expected log count") + }, waitDuration, "5s").Should(gomega.Succeed()) + + // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has + logConsistencyWaitDuration := "1m" + l.Warn().Str("Duration", logConsistencyWaitDuration).Msg("Waiting for CL nodes to have all the logs that EVM node has") + + gom.Eventually(func(g gomega.Gomega) { + missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) + if err != nil { + l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") + } + + if !missingLogs.IsEmpty() { + printMissingLogsByType(missingLogs, l, cfg) + } + g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") + }, logConsistencyWaitDuration, "5s").Should(gomega.Succeed()) +} + +func ExecuteLogPollerReplay(t *testing.T, cfg *Config, consistencyTimeout string) { + l := logging.GetTestLogger(t) + coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ + + if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { + l.Warn().Msg("No events to emit specified, using all events from log emitter contract") + for _, event := range EmitterABI.Events { + cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) + } + } + + l.Info().Msg("Starting replay log poller test") + + var ( + err error + testName = "replay-log-poller" + upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) + ) + + // we set blockBackfillDepth to 0, to make sure nothing will be backfilled and won't interfere with our test + chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( + t, testName, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(1000*time.Millisecond), cfg.General.UseFinalityTag) + + _, upkeepIDs := actions.DeployConsumers( + t, + registry, + registrar, + linkToken, + contractDeployer, + chainClient, + upKeepsNeeded, + big.NewInt(automationDefaultLinkFunds), + automationDefaultUpkeepGasLimit, + true, + false, + ) + + // Deploy Log Emitter contracts + logEmitters := make([]*contracts.LogEmitter, 0) + for i := 0; i < cfg.General.Contracts; i++ { + logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() + logEmitters = append(logEmitters, &logEmitter) + require.NoError(t, err, "Error deploying log emitter contract") + l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") + time.Sleep(200 * time.Millisecond) + } + + //wait for contracts to be uploaded to chain, TODO: could make this wait fluent + time.Sleep(5 * time.Second) + + // Save block number before starting to emit events, so that we can later use it when querying logs + sb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + startBlock := int64(sb) + + l.Info().Msg("STARTING EVENT EMISSION") + startTime := time.Now() + totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) + endTime := time.Now() + require.NoError(t, err, "Error executing event generator") + expectedLogsEmitted := getExpectedLogCount(cfg) + duration := int(endTime.Sub(startTime).Seconds()) + l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") + + // Save block number after finishing to emit events, so that we can later use it when querying logs + eb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + + endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) + require.NoError(t, err, "Error getting end block to wait for") + + // Lets make sure no logs are in DB yet + expectedFilters := getExpectedFilters(logEmitters, cfg) + logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), 0, expectedFilters, l, coreLogger, testEnv.ClCluster) + require.NoError(t, err, "Error checking if CL nodes have expected log count") + require.True(t, logCountMatches, "Some CL nodes already had logs in DB") + l.Info().Msg("No logs were saved by CL nodes yet, as expected. Proceeding.") + + // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) + // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) + for i := 0; i < len(upkeepIDs); i++ { + emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() + upkeepID := upkeepIDs[i] + topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID + + l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") + err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) + require.NoError(t, err, "Error registering log trigger for log emitter") + } + + err = chainClient.WaitForEvents() + require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") + + // Make sure that all nodes have expected filters registered before starting to emit events + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") + + hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) + if err != nil { + l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") + return + } + + g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") + } + }, "30s", "1s").Should(gomega.Succeed()) + l.Info().Msg("All nodes have expected filters registered") + l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") + + l.Warn().Str("Duration", "1m").Msg("Waiting for all CL nodes to have end block finalised") + gom.Eventually(func(g gomega.Gomega) { + hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") + } + g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") + }, "1m", "30s").Should(gomega.Succeed()) + + // Trigger replay + l.Info().Msg("Triggering log poller's replay") + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + response, _, err := testEnv.ClCluster.Nodes[i].API.ReplayLogPollerFromBlock(startBlock, testEnv.EVMClient.GetChainID().Int64()) + require.NoError(t, err, "Error triggering log poller's replay on node %s", nodeName) + require.Equal(t, "Replay started", response.Data.Attributes.Message, "Unexpected response message from log poller's replay") + } + + l.Warn().Str("Duration", consistencyTimeout).Msg("Waiting for logs to be processed by all nodes and for chain to advance beyond finality") + + gom.Eventually(func(g gomega.Gomega) { + logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), totalLogsEmitted, expectedFilters, l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if CL nodes have expected log count. Retrying...") + } + g.Expect(logCountMatches).To(gomega.BeTrue(), "Not all CL nodes have expected log count") + }, consistencyTimeout, "30s").Should(gomega.Succeed()) + + // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has + l.Warn().Str("Duration", consistencyTimeout).Msg("Waiting for CL nodes to have all the logs that EVM node has") + + gom.Eventually(func(g gomega.Gomega) { + missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) + if err != nil { + l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") + } + + if !missingLogs.IsEmpty() { + printMissingLogsByType(missingLogs, l, cfg) + } + g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") + }, consistencyTimeout, "10s").Should(gomega.Succeed()) +} + +type FinalityBlockFn = func(chainId int64, endBlock int64) (int64, error) + +func ExecuteCILogPollerTest(t *testing.T, cfg *Config) { + l := logging.GetTestLogger(t) + coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ + + if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { + l.Warn().Msg("No events to emit specified, using all events from log emitter contract") + for _, event := range EmitterABI.Events { + cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) + } + } + + l.Info().Msg("Starting CI log poller test") + + var ( + err error + testName = "ci-log-poller" + upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) + ) + + chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( + t, testName, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(1000*time.Millisecond), cfg.General.UseFinalityTag, + ) + + _, upkeepIDs := actions.DeployConsumers( + t, + registry, + registrar, + linkToken, + contractDeployer, + chainClient, + upKeepsNeeded, + big.NewInt(automationDefaultLinkFunds), + automationDefaultUpkeepGasLimit, + true, + false, + ) + + // Deploy Log Emitter contracts + logEmitters := make([]*contracts.LogEmitter, 0) + for i := 0; i < cfg.General.Contracts; i++ { + logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() + logEmitters = append(logEmitters, &logEmitter) + require.NoError(t, err, "Error deploying log emitter contract") + l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") + time.Sleep(200 * time.Millisecond) + } + + // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) + // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) + for i := 0; i < len(upkeepIDs); i++ { + emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() + upkeepID := upkeepIDs[i] + topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID + + l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") + err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) + randomWait(50, 200) + require.NoError(t, err, "Error registering log trigger for log emitter") + } + + err = chainClient.WaitForEvents() + require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") + + // Make sure that all nodes have expected filters registered before starting to emit events + expectedFilters := getExpectedFilters(logEmitters, cfg) + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") + + hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) + if err != nil { + l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") + return + } + + g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") + } + }, "1m", "1s").Should(gomega.Succeed()) + l.Info().Msg("All nodes have expected filters registered") + l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") + + // Save block number before starting to emit events, so that we can later use it when querying logs + sb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + startBlock := int64(sb) + + l.Info().Msg("STARTING EVENT EMISSION") + startTime := time.Now() + + // Start chaos experimnents by randomly pausing random containers (Chainlink nodes or their DBs) + chaosDoneCh := make(chan error, 1) + go func() { + executeChaosExperiment(l, testEnv, cfg, chaosDoneCh) + }() + + totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) + endTime := time.Now() + require.NoError(t, err, "Error executing event generator") + + expectedLogsEmitted := getExpectedLogCount(cfg) + duration := int(endTime.Sub(startTime).Seconds()) + l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") + + // Save block number after finishing to emit events, so that we can later use it when querying logs + eb, err := testEnv.EVMClient.LatestBlockNumber(context.Background()) + require.NoError(t, err, "Error getting latest block number") + + endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) + require.NoError(t, err, "Error getting end block to wait for") + + l.Info().Msg("Waiting before proceeding with test until all chaos experiments finish") + chaosError := <-chaosDoneCh + require.NoError(t, chaosError, "Error encountered during chaos experiment") + + // Wait until last block in which events were emitted has been finalised (with buffer) + waitDuration := "45m" + l.Warn().Str("Duration", waitDuration).Msg("Waiting for chain to advance beyond finality") + + gom.Eventually(func(g gomega.Gomega) { + hasAdvanced, err := chainHasFinalisedEndBlock(l, testEnv.EVMClient, endBlock) + if err != nil { + l.Warn().Err(err).Msg("Error checking if chain has advanced beyond finality. Retrying...") + } + g.Expect(hasAdvanced).To(gomega.BeTrue(), "Chain has not advanced beyond finality") + }, waitDuration, "30s").Should(gomega.Succeed()) + + l.Warn().Str("Duration", waitDuration).Msg("Waiting for all CL nodes to have end block finalised") + gom.Eventually(func(g gomega.Gomega) { + hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") + } + g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") + }, waitDuration, "30s").Should(gomega.Succeed()) + + // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has + logConsistencyWaitDuration := "10m" + l.Warn().Str("Duration", logConsistencyWaitDuration).Msg("Waiting for CL nodes to have all the logs that EVM node has") + + gom.Eventually(func(g gomega.Gomega) { + missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) + if err != nil { + l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") + } + + if !missingLogs.IsEmpty() { + printMissingLogsByType(missingLogs, l, cfg) + } + g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") + }, logConsistencyWaitDuration, "20s").Should(gomega.Succeed()) + + evmLogs, _ := getEVMLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, l, cfg) + + if totalLogsEmitted != len(evmLogs) { + l.Warn().Int("Total logs emitted", totalLogsEmitted).Int("Total logs in EVM", len(evmLogs)).Msg("Test passed, but total logs emitted does not match total logs in EVM") + } +}