diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8338a097c6c..e53d1a11bbb 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -189,7 +189,8 @@ jobs: runs-on: ubuntu-latest name: Compare/Build Automation Test List outputs: - matrix: ${{ env.MATRIX_JSON }} + automation-matrix: ${{ env.AUTOMATION_JOB_MATRIX_JSON }} + lp-matrix: ${{ env.LP_JOB_MATRIX_JSON }} steps: - name: Check for Skip Tests Label if: contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') @@ -203,6 +204,7 @@ jobs: cd ./integration-tests ./scripts/compareTestList.sh ./smoke/automation_test.go ./scripts/compareTestList.sh ./smoke/keeper_test.go + ./scripts/compareTestList.sh ./smoke/log_poller_test.go - name: Build Test Matrix Lists id: build-test-matrix-list run: | @@ -211,19 +213,22 @@ jobs: MATRIX_JSON_KEEPER=$(./scripts/buildTestMatrixList.sh ./smoke/keeper_test.go keeper ubuntu-latest 1) COMBINED_ARRAY=$(jq -c -n "$MATRIX_JSON_AUTOMATION + $MATRIX_JSON_KEEPER") + LOG_POLLER_MATRIX_JSON=$(./scripts/buildTestMatrixList.sh ./smoke/log_poller_test.go log_poller ubuntu-latest 1) + echo "LP_JOB_MATRIX_JSON=${LOG_POLLER_MATRIX_JSON}" >> $GITHUB_ENV + # if we running a PR against the develop branch we should only run the automation tests unless we are in the merge group event if [[ "$GITHUB_EVENT_NAME" == "merge_group" ]]; then echo "We are in a merge_group event, run both automation and keepers tests" - echo "MATRIX_JSON=${COMBINED_ARRAY}" >> $GITHUB_ENV + echo "AUTOMATION_JOB_MATRIX_JSON=${COMBINED_ARRAY}" >> $GITHUB_ENV else echo "we are not in a merge_group event, if this is a PR to develop run only automation tests, otherwise run everything because we could be running against a release branch" target_branch=$(cat $GITHUB_EVENT_PATH | jq -r .pull_request.base.ref) if [[ "$target_branch" == "develop" ]]; then echo "only run automation tests" - echo "MATRIX_JSON=${MATRIX_JSON_AUTOMATION}" >> $GITHUB_ENV + echo "AUTOMATION_JOB_MATRIX_JSON=${MATRIX_JSON_AUTOMATION}" >> $GITHUB_ENV else echo "run both automation and keepers tests" - echo "MATRIX_JSON=${COMBINED_ARRAY}" >> $GITHUB_ENV + echo "AUTOMATION_JOB_MATRIX_JSON=${COMBINED_ARRAY}" >> $GITHUB_ENV fi fi @@ -245,7 +250,7 @@ jobs: strategy: fail-fast: false matrix: - product: ${{fromJson(needs.compare-tests.outputs.matrix)}} + product: ${{fromJson(needs.compare-tests.outputs.automation-matrix)}} runs-on: ${{ matrix.product.os }} name: ETH Smoke Tests ${{ matrix.product.name }} steps: @@ -286,7 +291,7 @@ jobs: LOGSTREAM_LOG_TARGETS: ${{ vars.LOGSTREAM_LOG_TARGETS }} GRAFANA_URL: ${{ vars.GRAFANA_URL }} GRAFANA_DATASOURCE: ${{ vars.GRAFANA_DATASOURCE }} - RUN_ID: ${{ github.run_id }} + RUN_ID: ${{ github.run_id }} with: test_command_to_run: cd ./integration-tests && go test -timeout 30m -count=1 -json -test.parallel=${{ matrix.product.nodes }} ${{ steps.build-go-test-command.outputs.run_command }} 2>&1 | tee /tmp/gotest.log | gotestfmt test_download_vendor_packages_command: cd ./integration-tests && go mod download @@ -306,6 +311,75 @@ jobs: if: always() uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/show-test-summary@ea889b3133bd7f16ab19ba4ba130de5d9162c669 # v2.3.4 + eth-smoke-tests-matrix-log-poller: + if: ${{ !(contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') || github.event_name == 'workflow_dispatch') }} + environment: integration + permissions: + checks: write + pull-requests: write + id-token: write + contents: read + needs: + [build-chainlink, changes, compare-tests, build-lint-integration-tests] + env: + SELECTED_NETWORKS: SIMULATED,SIMULATED_1,SIMULATED_2 + CHAINLINK_COMMIT_SHA: ${{ github.sha }} + CHAINLINK_ENV_USER: ${{ github.actor }} + TEST_LOG_LEVEL: debug + strategy: + fail-fast: false + matrix: + product: ${{fromJson(needs.compare-tests.outputs.lp-matrix)}} + runs-on: ${{ matrix.product.os }} + name: ETH Smoke Tests ${{ matrix.product.name }} + steps: + - name: Collect Metrics + if: needs.changes.outputs.src == 'true' + id: collect-gha-metrics + uses: smartcontractkit/push-gha-metrics-action@d1618b772a97fd87e6505de97b872ee0b1f1729a # v2.0.2 + with: + basic-auth: ${{ secrets.GRAFANA_CLOUD_BASIC_AUTH }} + hostname: ${{ secrets.GRAFANA_CLOUD_HOST }} + this-job-name: ETH Smoke Tests ${{ matrix.product.name }} + test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}' + continue-on-error: true + - name: Checkout the repo + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + with: + ref: ${{ github.event.pull_request.head.sha || github.event.merge_group.head_sha }} + - name: Build Go Test Command + id: build-go-test-command + run: | + # if the matrix.product.run is set, use it for a different command + if [ "${{ matrix.product.run }}" != "" ]; then + echo "run_command=${{ matrix.product.run }} ./smoke/${{ matrix.product.file }}_test.go" >> "$GITHUB_OUTPUT" + else + echo "run_command=./smoke/${{ matrix.product.name }}_test.go" >> "$GITHUB_OUTPUT" + fi + ## Run this step when changes that require tests to be run are made + - name: Run Tests + if: needs.changes.outputs.src == 'true' + uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/run-tests@e865e376b8c2d594028c8d645dd6c47169b72974 # v2.2.16 + env: + PYROSCOPE_SERVER: ${{ matrix.product.pyroscope_env == '' && '' || !startsWith(github.ref, 'refs/tags/') && '' || secrets.QA_PYROSCOPE_INSTANCE }} # Avoid sending blank envs https://github.com/orgs/community/discussions/25725 + PYROSCOPE_ENVIRONMENT: ${{ matrix.product.pyroscope_env }} + PYROSCOPE_KEY: ${{ secrets.QA_PYROSCOPE_KEY }} + with: + test_command_to_run: cd ./integration-tests && go test -timeout 30m -count=1 -json -test.parallel=${{ matrix.product.nodes }} ${{ steps.build-go-test-command.outputs.run_command }} 2>&1 | tee /tmp/gotest.log | gotestfmt + test_download_vendor_packages_command: cd ./integration-tests && go mod download + cl_repo: ${{ env.CHAINLINK_IMAGE }} + cl_image_tag: ${{ github.sha }} + aws_registries: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} + artifacts_location: ./integration-tests/smoke/logs/ + publish_check_name: ${{ matrix.product.name }} + token: ${{ secrets.GITHUB_TOKEN }} + go_mod_path: ./integration-tests/go.mod + cache_key_id: core-e2e-${{ env.MOD_CACHE_VERSION }} + cache_restore_only: "true" + QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} + QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} + QA_KUBECONFIG: "" + eth-smoke-tests-matrix: if: ${{ !contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') }} environment: integration @@ -505,7 +579,7 @@ jobs: uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 with: name: trace-data - path: ./integration-tests/smoke/traces/trace-data.json + path: ./integration-tests/smoke/traces/trace-data.json - name: Print failed test summary if: always() uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/show-test-summary@ea889b3133bd7f16ab19ba4ba130de5d9162c669 # v2.3.4 diff --git a/.github/workflows/on-demand-log-poller.yml b/.github/workflows/on-demand-log-poller.yml index 4658e188bac..4b2d67bcf9a 100644 --- a/.github/workflows/on-demand-log-poller.yml +++ b/.github/workflows/on-demand-log-poller.yml @@ -28,21 +28,22 @@ on: required: true selectedNetworks: type: choice + description: Networks to test on options: - "SIMULATED" - "SEPOLIA" - "MUMBAI" fundingPrivateKey: description: Private funding key (Skip for Simulated) - required: true + required: false type: string wsURL: description: WS URL for the network (Skip for Simulated) - required: true + required: false type: string httpURL: description: HTTP URL for the network (Skip for Simulated) - required: true + required: false type: string jobs: @@ -84,4 +85,4 @@ jobs: run: | cd integration-tests go mod download - go test -v -timeout 5h -v -count=1 -run ^TestLogPollerFromEnv$ ./reorg/log_poller_maybe_reorg_test.go + go test -v -timeout 5h -v -count=1 -run ^TestLogPollerFewFiltersFixedDepth$ ./smoke/log_poller_test.go diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 8e47c411123..064a489770b 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -6,7 +6,6 @@ go 1.21.4 replace github.com/smartcontractkit/chainlink/v2 => ../ require ( - cosmossdk.io/errors v1.0.0 github.com/K-Phoen/grabana v0.21.17 github.com/cli/go-gh/v2 v2.0.0 github.com/ethereum/go-ethereum v1.13.8 @@ -60,6 +59,7 @@ require ( cosmossdk.io/api v0.3.1 // indirect cosmossdk.io/core v0.5.1 // indirect cosmossdk.io/depinject v1.0.0-alpha.3 // indirect + cosmossdk.io/errors v1.0.0 // indirect cosmossdk.io/math v1.0.1 // indirect dario.cat/mergo v1.0.0 // indirect filippo.io/edwards25519 v1.0.0 // indirect diff --git a/integration-tests/load/log_poller/config.toml b/integration-tests/load/log_poller/config.toml deleted file mode 100644 index 2e328001943..00000000000 --- a/integration-tests/load/log_poller/config.toml +++ /dev/null @@ -1,22 +0,0 @@ -[general] -generator = "looped" -contracts = 10 -events_per_tx = 10 - -[chaos] -experiment_count = 10 - -[looped] -[looped.contract] -execution_count = 300 - -[looped.fuzz] -min_emit_wait_time_ms = 100 -max_emit_wait_time_ms = 500 - -[wasp] -[wasp.load] -call_timeout = "3m" -rate_limit_unit_duration = "2s" -LPS = 30 -duration = "1m" \ No newline at end of file diff --git a/integration-tests/load/log_poller/log_poller_test.go b/integration-tests/load/log_poller/log_poller_test.go deleted file mode 100644 index 04366848f0e..00000000000 --- a/integration-tests/load/log_poller/log_poller_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package logpoller - -import ( - "testing" - - "github.com/ethereum/go-ethereum/accounts/abi" - - "github.com/stretchr/testify/require" - - lp_helpers "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" -) - -func TestLoadTestLogPoller(t *testing.T) { - cfg, err := lp_helpers.ReadConfig(lp_helpers.DefaultConfigFilename) - require.NoError(t, err) - - eventsToEmit := []abi.Event{} - for _, event := range lp_helpers.EmitterABI.Events { - eventsToEmit = append(eventsToEmit, event) - } - - cfg.General.EventsToEmit = eventsToEmit - - lp_helpers.ExecuteBasicLogPollerTest(t, cfg) -} diff --git a/integration-tests/reorg/log_poller_maybe_reorg_test.go b/integration-tests/reorg/log_poller_maybe_reorg_test.go deleted file mode 100644 index d319e39aa20..00000000000 --- a/integration-tests/reorg/log_poller_maybe_reorg_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package reorg - -import ( - "testing" - - "github.com/ethereum/go-ethereum/accounts/abi" - - logpoller "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" -) - -func TestLogPollerFromEnv(t *testing.T) { - cfg := logpoller.Config{ - General: &logpoller.General{ - Generator: logpoller.GeneratorType_Looped, - Contracts: 2, - EventsPerTx: 100, - UseFinalityTag: true, - }, - LoopedConfig: &logpoller.LoopedConfig{ - ContractConfig: logpoller.ContractConfig{ - ExecutionCount: 100, - }, - FuzzConfig: logpoller.FuzzConfig{ - MinEmitWaitTimeMs: 400, - MaxEmitWaitTimeMs: 600, - }, - }, - } - - eventsToEmit := []abi.Event{} - for _, event := range logpoller.EmitterABI.Events { - eventsToEmit = append(eventsToEmit, event) - } - - cfg.General.EventsToEmit = eventsToEmit - err := cfg.OverrideFromEnv() - if err != nil { - t.Errorf("failed to override config from env: %v", err) - t.FailNow() - } - - logpoller.ExecuteCILogPollerTest(t, &cfg) -} diff --git a/integration-tests/smoke/log_poller_test.go b/integration-tests/smoke/log_poller_test.go index 03a287ee6b7..4eadf6b3913 100644 --- a/integration-tests/smoke/log_poller_test.go +++ b/integration-tests/smoke/log_poller_test.go @@ -1,11 +1,25 @@ package smoke import ( + "fmt" + "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi" - + "github.com/onsi/gomega" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-testing-framework/logging" + "github.com/smartcontractkit/chainlink-testing-framework/utils/testcontext" + "github.com/smartcontractkit/chainlink/integration-tests/actions" + "github.com/smartcontractkit/chainlink/integration-tests/contracts" + "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" + "github.com/smartcontractkit/chainlink/integration-tests/docker/test_env" logpoller "github.com/smartcontractkit/chainlink/integration-tests/universal/log_poller" + + core_logger "github.com/smartcontractkit/chainlink/v2/core/logger" ) // consistency test with no network disruptions with approximate emission of 1500-1600 logs per second for ~110-120 seconds @@ -36,7 +50,11 @@ func TestLogPollerFewFiltersFixedDepth(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + // this was added only to support triggering tests from CI with custom values + err := cfg.OverrideFromEnv() + require.NoError(t, err, fmt.Sprintf("Error overriding config from env: %v", err)) + + executeBasicLogPollerTest(t, &cfg) } func TestLogPollerFewFiltersFinalityTag(t *testing.T) { @@ -65,12 +83,13 @@ func TestLogPollerFewFiltersFinalityTag(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + executeBasicLogPollerTest(t, &cfg) } // consistency test with no network disruptions with approximate emission of 1000-1100 logs per second for ~110-120 seconds // 900 filters are registered -func TestLogManyFiltersPollerFixedDepth(t *testing.T) { +func TestLogPollerManyFiltersFixedDepth(t *testing.T) { + t.Skip("Execute manually, when needed as it runs for a long time") cfg := logpoller.Config{ General: &logpoller.General{ Generator: logpoller.GeneratorType_Looped, @@ -96,10 +115,11 @@ func TestLogManyFiltersPollerFixedDepth(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + executeBasicLogPollerTest(t, &cfg) } -func TestLogManyFiltersPollerFinalityTag(t *testing.T) { +func TestLogPollerManyFiltersFinalityTag(t *testing.T) { + t.Skip("Execute manually, when needed as it runs for a long time") cfg := logpoller.Config{ General: &logpoller.General{ Generator: logpoller.GeneratorType_Looped, @@ -125,7 +145,7 @@ func TestLogManyFiltersPollerFinalityTag(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + executeBasicLogPollerTest(t, &cfg) } // consistency test that introduces random distruptions by pausing either Chainlink or Postgres containers for random interval of 5-20 seconds @@ -141,15 +161,16 @@ func TestLogPollerWithChaosFixedDepth(t *testing.T) { }, LoopedConfig: &logpoller.LoopedConfig{ ContractConfig: logpoller.ContractConfig{ - ExecutionCount: 100, + ExecutionCount: 70, }, FuzzConfig: logpoller.FuzzConfig{ - MinEmitWaitTimeMs: 200, - MaxEmitWaitTimeMs: 500, + MinEmitWaitTimeMs: 100, + MaxEmitWaitTimeMs: 300, }, }, ChaosConfig: &logpoller.ChaosConfig{ - ExperimentCount: 10, + ExperimentCount: 4, + TargetComponent: "chainlink", }, } @@ -160,7 +181,7 @@ func TestLogPollerWithChaosFixedDepth(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + executeBasicLogPollerTest(t, &cfg) } func TestLogPollerWithChaosFinalityTag(t *testing.T) { @@ -173,15 +194,16 @@ func TestLogPollerWithChaosFinalityTag(t *testing.T) { }, LoopedConfig: &logpoller.LoopedConfig{ ContractConfig: logpoller.ContractConfig{ - ExecutionCount: 100, + ExecutionCount: 120, }, FuzzConfig: logpoller.FuzzConfig{ - MinEmitWaitTimeMs: 200, - MaxEmitWaitTimeMs: 500, + MinEmitWaitTimeMs: 100, + MaxEmitWaitTimeMs: 300, }, }, ChaosConfig: &logpoller.ChaosConfig{ - ExperimentCount: 10, + ExperimentCount: 6, + TargetComponent: "chainlink", }, } @@ -192,7 +214,73 @@ func TestLogPollerWithChaosFinalityTag(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit - logpoller.ExecuteBasicLogPollerTest(t, &cfg) + executeBasicLogPollerTest(t, &cfg) +} + +func TestLogPollerWithChaosPostgresFinalityTag(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 100, + UseFinalityTag: true, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 120, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 100, + MaxEmitWaitTimeMs: 300, + }, + }, + ChaosConfig: &logpoller.ChaosConfig{ + ExperimentCount: 6, + TargetComponent: "postgres", + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + executeBasicLogPollerTest(t, &cfg) +} + +func TestLogPollerWithChaosPostgresFixedDepth(t *testing.T) { + cfg := logpoller.Config{ + General: &logpoller.General{ + Generator: logpoller.GeneratorType_Looped, + Contracts: 2, + EventsPerTx: 100, + UseFinalityTag: false, + }, + LoopedConfig: &logpoller.LoopedConfig{ + ContractConfig: logpoller.ContractConfig{ + ExecutionCount: 120, + }, + FuzzConfig: logpoller.FuzzConfig{ + MinEmitWaitTimeMs: 100, + MaxEmitWaitTimeMs: 300, + }, + }, + ChaosConfig: &logpoller.ChaosConfig{ + ExperimentCount: 6, + TargetComponent: "postgres", + }, + } + + eventsToEmit := []abi.Event{} + for _, event := range logpoller.EmitterABI.Events { + eventsToEmit = append(eventsToEmit, event) + } + + cfg.General.EventsToEmit = eventsToEmit + + executeBasicLogPollerTest(t, &cfg) } // consistency test that registers filters after events were emitted and then triggers replay via API @@ -227,7 +315,7 @@ func TestLogPollerReplayFixedDepth(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit consistencyTimeout := "5m" - logpoller.ExecuteLogPollerReplay(t, &cfg, consistencyTimeout) + executeLogPollerReplay(t, &cfg, consistencyTimeout) } func TestLogPollerReplayFinalityTag(t *testing.T) { @@ -236,7 +324,7 @@ func TestLogPollerReplayFinalityTag(t *testing.T) { Generator: logpoller.GeneratorType_Looped, Contracts: 2, EventsPerTx: 4, - UseFinalityTag: false, + UseFinalityTag: true, }, LoopedConfig: &logpoller.LoopedConfig{ ContractConfig: logpoller.ContractConfig{ @@ -257,5 +345,293 @@ func TestLogPollerReplayFinalityTag(t *testing.T) { cfg.General.EventsToEmit = eventsToEmit consistencyTimeout := "5m" - logpoller.ExecuteLogPollerReplay(t, &cfg, consistencyTimeout) + executeLogPollerReplay(t, &cfg, consistencyTimeout) +} + +func executeBasicLogPollerTest(t *testing.T, cfg *logpoller.Config) { + l := logging.GetTestLogger(t) + coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ + + lpTestEnv := prepareEnvironment(l, t, cfg) + testEnv := lpTestEnv.testEnv + var err error + + // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) + // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) + err = logpoller.RegisterFiltersAndAssertUniquness(l, lpTestEnv.registry, lpTestEnv.upkeepIDs, lpTestEnv.logEmitters, cfg, lpTestEnv.upKeepsNeeded) + require.NoError(t, err, "Error registering filters") + + l.Info().Msg("No duplicate filters found. OK!") + + err = testEnv.EVMClient.WaitForEvents() + require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") + + expectedFilters := logpoller.GetExpectedFilters(lpTestEnv.logEmitters, cfg) + waitForAllNodesToHaveExpectedFiltersRegisteredOrFail(l, coreLogger, t, testEnv, expectedFilters) + + // Save block number before starting to emit events, so that we can later use it when querying logs + sb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) + require.NoError(t, err, "Error getting latest block number") + startBlock := int64(sb) + + l.Info().Int64("Starting Block", startBlock).Msg("STARTING EVENT EMISSION") + startTime := time.Now() + + // Start chaos experimnents by randomly pausing random containers (Chainlink nodes or their DBs) + chaosDoneCh := make(chan error, 1) + go func() { + logpoller.ExecuteChaosExperiment(l, testEnv, cfg, chaosDoneCh) + }() + + totalLogsEmitted, err := logpoller.ExecuteGenerator(t, cfg, lpTestEnv.logEmitters) + endTime := time.Now() + require.NoError(t, err, "Error executing event generator") + + expectedLogsEmitted := logpoller.GetExpectedLogCount(cfg) + duration := int(endTime.Sub(startTime).Seconds()) + + eb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) + require.NoError(t, err, "Error getting latest block number") + + l.Info(). + Int("Total logs emitted", totalLogsEmitted). + Uint64("Probable last block with logs", eb). + Int64("Expected total logs emitted", expectedLogsEmitted). + Str("Duration", fmt.Sprintf("%d sec", duration)). + Str("LPS", fmt.Sprintf("~%d/sec", totalLogsEmitted/duration)). + Msg("FINISHED EVENT EMISSION") + + l.Info().Msg("Waiting before proceeding with test until all chaos experiments finish") + chaosError := <-chaosDoneCh + require.NoError(t, chaosError, "Error encountered during chaos experiment") + + // use ridciuously high end block so that we don't have to find out the block number of the last block in which logs were emitted + // as that's not trivial to do (i.e. just because chain was at block X when log emission ended it doesn't mean all events made it to that block) + endBlock := int64(eb) + 10000 + + // logCountWaitDuration, err := time.ParseDuration("5m") + // require.NoError(t, err, "Error parsing log count wait duration") + allNodesLogCountMatches, err := logpoller.FluentlyCheckIfAllNodesHaveLogCount("5m", startBlock, endBlock, totalLogsEmitted, expectedFilters, l, coreLogger, testEnv) + require.NoError(t, err, "Error checking if CL nodes have expected log count") + + conditionallyWaitUntilNodesHaveTheSameLogsAsEvm(l, coreLogger, t, allNodesLogCountMatches, lpTestEnv, cfg, startBlock, endBlock, "5m") +} + +func executeLogPollerReplay(t *testing.T, cfg *logpoller.Config, consistencyTimeout string) { + l := logging.GetTestLogger(t) + coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ + + lpTestEnv := prepareEnvironment(l, t, cfg) + testEnv := lpTestEnv.testEnv + var err error + + // Save block number before starting to emit events, so that we can later use it when querying logs + sb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) + require.NoError(t, err, "Error getting latest block number") + startBlock := int64(sb) + + l.Info().Int64("Starting Block", startBlock).Msg("STARTING EVENT EMISSION") + startTime := time.Now() + totalLogsEmitted, err := logpoller.ExecuteGenerator(t, cfg, lpTestEnv.logEmitters) + endTime := time.Now() + require.NoError(t, err, "Error executing event generator") + expectedLogsEmitted := logpoller.GetExpectedLogCount(cfg) + duration := int(endTime.Sub(startTime).Seconds()) + + // Save block number after finishing to emit events, so that we can later use it when querying logs + eb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) + require.NoError(t, err, "Error getting latest block number") + + endBlock, err := logpoller.GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) + require.NoError(t, err, "Error getting end block to wait for") + + l.Info().Int64("Ending Block", endBlock).Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") + + // Lets make sure no logs are in DB yet + expectedFilters := logpoller.GetExpectedFilters(lpTestEnv.logEmitters, cfg) + logCountMatches, err := logpoller.ClNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), 0, expectedFilters, l, coreLogger, testEnv.ClCluster) + require.NoError(t, err, "Error checking if CL nodes have expected log count") + require.True(t, logCountMatches, "Some CL nodes already had logs in DB") + l.Info().Msg("No logs were saved by CL nodes yet, as expected. Proceeding.") + + // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) + // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) + err = logpoller.RegisterFiltersAndAssertUniquness(l, lpTestEnv.registry, lpTestEnv.upkeepIDs, lpTestEnv.logEmitters, cfg, lpTestEnv.upKeepsNeeded) + require.NoError(t, err, "Error registering filters") + + err = testEnv.EVMClient.WaitForEvents() + require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") + + waitForAllNodesToHaveExpectedFiltersRegisteredOrFail(l, coreLogger, t, testEnv, expectedFilters) + + blockFinalisationWaitDuration := "5m" + l.Warn().Str("Duration", blockFinalisationWaitDuration).Msg("Waiting for all CL nodes to have end block finalised") + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + hasFinalised, err := logpoller.LogPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) + if err != nil { + l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") + } + g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") + }, blockFinalisationWaitDuration, "10s").Should(gomega.Succeed()) + + // Trigger replay + l.Info().Msg("Triggering log poller's replay") + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + response, _, err := testEnv.ClCluster.Nodes[i].API.ReplayLogPollerFromBlock(startBlock, testEnv.EVMClient.GetChainID().Int64()) + require.NoError(t, err, "Error triggering log poller's replay on node %s", nodeName) + require.Equal(t, "Replay started", response.Data.Attributes.Message, "Unexpected response message from log poller's replay") + } + + // so that we don't have to look for block number of the last block in which logs were emitted as that's not trivial to do + endBlock = endBlock + 10000 + l.Warn().Str("Duration", consistencyTimeout).Msg("Waiting for replay logs to be processed by all nodes") + + // logCountWaitDuration, err := time.ParseDuration("5m") + allNodesLogCountMatches, err := logpoller.FluentlyCheckIfAllNodesHaveLogCount("5m", startBlock, endBlock, totalLogsEmitted, expectedFilters, l, coreLogger, testEnv) + require.NoError(t, err, "Error checking if CL nodes have expected log count") + + conditionallyWaitUntilNodesHaveTheSameLogsAsEvm(l, coreLogger, t, allNodesLogCountMatches, lpTestEnv, cfg, startBlock, endBlock, "5m") +} + +type logPollerEnvironment struct { + logEmitters []*contracts.LogEmitter + testEnv *test_env.CLClusterTestEnv + registry contracts.KeeperRegistry + upkeepIDs []*big.Int + upKeepsNeeded int +} + +// prepareEnvironment prepares environment for log poller tests by starting DON, private Ethereum network, +// deploying registry and log emitter contracts and registering log triggered upkeeps +func prepareEnvironment(l zerolog.Logger, t *testing.T, cfg *logpoller.Config) logPollerEnvironment { + if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { + l.Warn().Msg("No events to emit specified, using all events from log emitter contract") + for _, event := range logpoller.EmitterABI.Events { + cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) + } + } + + l.Info().Msg("Starting basic log poller test") + + var ( + err error + upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) + ) + + chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := logpoller.SetupLogPollerTestDocker( + t, ethereum.RegistryVersion_2_1, logpoller.DefaultOCRRegistryConfig, upKeepsNeeded, time.Duration(500*time.Millisecond), cfg.General.UseFinalityTag, + ) + + _, upkeepIDs := actions.DeployConsumers( + t, + registry, + registrar, + linkToken, + contractDeployer, + chainClient, + upKeepsNeeded, + big.NewInt(int64(9e18)), + uint32(2500000), + true, + false, + ) + + err = logpoller.AssertUpkeepIdsUniqueness(upkeepIDs) + require.NoError(t, err, "Error asserting upkeep ids uniqueness") + l.Info().Msg("No duplicate upkeep IDs found. OK!") + + // Deploy Log Emitter contracts + logEmitters := logpoller.UploadLogEmitterContractsAndWaitForFinalisation(l, t, testEnv, cfg) + err = logpoller.AssertContractAddressUniquneness(logEmitters) + require.NoError(t, err, "Error asserting contract addresses uniqueness") + l.Info().Msg("No duplicate contract addresses found. OK!") + + return logPollerEnvironment{ + logEmitters: logEmitters, + registry: registry, + upkeepIDs: upkeepIDs, + upKeepsNeeded: upKeepsNeeded, + testEnv: testEnv, + } +} + +// waitForAllNodesToHaveExpectedFiltersRegisteredOrFail waits until all nodes have expected filters registered until timeout +func waitForAllNodesToHaveExpectedFiltersRegisteredOrFail(l zerolog.Logger, coreLogger core_logger.SugaredLogger, t *testing.T, testEnv *test_env.CLClusterTestEnv, expectedFilters []logpoller.ExpectedFilter) { + // Make sure that all nodes have expected filters registered before starting to emit events + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + hasFilters := false + for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { + nodeName := testEnv.ClCluster.Nodes[i].ContainerName + l.Info(). + Str("Node name", nodeName). + Msg("Fetching filters from log poller's DB") + var message string + var err error + + hasFilters, message, err = logpoller.NodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) + if !hasFilters || err != nil { + l.Warn(). + Str("Details", message). + Msg("Some filters were missing, but we will retry") + break + } + } + g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") + }, "5m", "10s").Should(gomega.Succeed()) + + l.Info(). + Msg("All nodes have expected filters registered") + l.Info(). + Int("Count", len(expectedFilters)). + Msg("Expected filters count") +} + +// conditionallyWaitUntilNodesHaveTheSameLogsAsEvm checks whether all CL nodes have the same number of logs as EVM node +// if not, then it prints missing logs and wait for some time and checks again +func conditionallyWaitUntilNodesHaveTheSameLogsAsEvm(l zerolog.Logger, coreLogger core_logger.SugaredLogger, t *testing.T, allNodesLogCountMatches bool, lpTestEnv logPollerEnvironment, cfg *logpoller.Config, startBlock, endBlock int64, waitDuration string) { + logCountWaitDuration, err := time.ParseDuration(waitDuration) + require.NoError(t, err, "Error parsing log count wait duration") + + allNodesHaveAllExpectedLogs := false + if !allNodesLogCountMatches { + missingLogs, err := logpoller.GetMissingLogs(startBlock, endBlock, lpTestEnv.logEmitters, lpTestEnv.testEnv.EVMClient, lpTestEnv.testEnv.ClCluster, l, coreLogger, cfg) + if err == nil { + if !missingLogs.IsEmpty() { + logpoller.PrintMissingLogsInfo(missingLogs, l, cfg) + } else { + allNodesHaveAllExpectedLogs = true + l.Info().Msg("All CL nodes have all the logs that EVM node has") + } + } + } + + require.True(t, allNodesLogCountMatches, "Not all CL nodes had expected log count afer %s", logCountWaitDuration) + + // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has + // but only in the rare case that first attempt to do it failed (basically here want to know not only + // if log count matches, but whether details of every single log match) + if !allNodesHaveAllExpectedLogs { + logConsistencyWaitDuration := "5m" + l.Info(). + Str("Duration", logConsistencyWaitDuration). + Msg("Waiting for CL nodes to have all the logs that EVM node has") + + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + missingLogs, err := logpoller.GetMissingLogs(startBlock, endBlock, lpTestEnv.logEmitters, lpTestEnv.testEnv.EVMClient, lpTestEnv.testEnv.ClCluster, l, coreLogger, cfg) + if err != nil { + l.Warn(). + Err(err). + Msg("Error getting missing logs. Retrying...") + } + + if !missingLogs.IsEmpty() { + logpoller.PrintMissingLogsInfo(missingLogs, l, cfg) + } + g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") + }, logConsistencyWaitDuration, "10s").Should(gomega.Succeed()) + } } diff --git a/integration-tests/smoke/log_poller_test.go_test_list.json b/integration-tests/smoke/log_poller_test.go_test_list.json new file mode 100644 index 00000000000..5193c3fd06c --- /dev/null +++ b/integration-tests/smoke/log_poller_test.go_test_list.json @@ -0,0 +1,44 @@ +{ + "tests": [ + { + "name": "TestLogPollerFewFiltersFixedDepth", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerFewFiltersFinalityTag", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerWithChaosFixedDepth", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerWithChaosFinalityTag", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerWithChaosPostgresFinalityTag", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerWithChaosPostgresFixedDepth", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerReplayFixedDepth", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerReplayFinalityTag", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerManyFiltersFixedDepth", + "label": "ubuntu-latest" + }, + { + "name": "TestLogPollerManyFiltersFinalityTag", + "label": "ubuntu-latest" + } + ] +} \ No newline at end of file diff --git a/integration-tests/universal/log_poller/config.go b/integration-tests/universal/log_poller/config.go index d42520397e8..b8773d836f6 100644 --- a/integration-tests/universal/log_poller/config.go +++ b/integration-tests/universal/log_poller/config.go @@ -5,7 +5,6 @@ import ( "os" "strconv" - "cosmossdk.io/errors" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/pelletier/go-toml/v2" "github.com/rs/zerolog/log" @@ -57,7 +56,8 @@ type General struct { } type ChaosConfig struct { - ExperimentCount int `toml:"experiment_count"` + ExperimentCount int `toml:"experiment_count"` + TargetComponent string `toml:"target_component"` } type WaspConfig struct { @@ -76,11 +76,11 @@ func ReadConfig(configName string) (*Config, error) { var cfg *Config d, err := os.ReadFile(configName) if err != nil { - return nil, errors.Wrap(err, ErrReadPerfConfig) + return nil, fmt.Errorf("%w: %s", err, ErrReadPerfConfig) } err = toml.Unmarshal(d, &cfg) if err != nil { - return nil, errors.Wrap(err, ErrUnmarshalPerfConfig) + return nil, fmt.Errorf("%w: %s", err, ErrUnmarshalPerfConfig) } if err := cfg.validate(); err != nil { diff --git a/integration-tests/universal/log_poller/helpers.go b/integration-tests/universal/log_poller/helpers.go index 8ad115ecec8..597b82ef6c3 100644 --- a/integration-tests/universal/log_poller/helpers.go +++ b/integration-tests/universal/log_poller/helpers.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/common" geth_types "github.com/ethereum/go-ethereum/core/types" "github.com/jmoiron/sqlx" + "github.com/onsi/gomega" "github.com/rs/zerolog" "github.com/scylladb/go-reflectx" "github.com/stretchr/testify/require" @@ -29,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/logging" "github.com/smartcontractkit/chainlink-testing-framework/networks" "github.com/smartcontractkit/chainlink-testing-framework/utils/ptr" + "github.com/smartcontractkit/chainlink-testing-framework/utils/testcontext" "github.com/smartcontractkit/chainlink/integration-tests/actions" "github.com/smartcontractkit/chainlink/integration-tests/client" "github.com/smartcontractkit/chainlink/integration-tests/contracts" @@ -133,6 +135,7 @@ var registerSingleTopicFilter = func(registry contracts.KeeperRegistry, upkeepID // return nil // } +// NewOrm returns a new logpoller.DbORM instance func NewOrm(logger core_logger.SugaredLogger, chainID *big.Int, postgresDb *ctf_test_env.PostgresDb) (*logpoller.DbORM, *sqlx.DB, error) { dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", "127.0.0.1", postgresDb.ExternalPort, postgresDb.User, postgresDb.Password, postgresDb.DbName) db, err := sqlx.Open("postgres", dsn) @@ -149,7 +152,8 @@ type ExpectedFilter struct { topic common.Hash } -func getExpectedFilters(logEmitters []*contracts.LogEmitter, cfg *Config) []ExpectedFilter { +// GetExpectedFilters returns a slice of ExpectedFilter structs based on the provided log emitters and config +func GetExpectedFilters(logEmitters []*contracts.LogEmitter, cfg *Config) []ExpectedFilter { expectedFilters := make([]ExpectedFilter, 0) for _, emitter := range logEmitters { for _, event := range cfg.General.EventsToEmit { @@ -163,16 +167,17 @@ func getExpectedFilters(logEmitters []*contracts.LogEmitter, cfg *Config) []Expe return expectedFilters } -var nodeHasExpectedFilters = func(expectedFilters []ExpectedFilter, logger core_logger.SugaredLogger, chainID *big.Int, postgresDb *ctf_test_env.PostgresDb) (bool, error) { +// NodeHasExpectedFilters returns true if the provided node has all the expected filters registered +func NodeHasExpectedFilters(expectedFilters []ExpectedFilter, logger core_logger.SugaredLogger, chainID *big.Int, postgresDb *ctf_test_env.PostgresDb) (bool, string, error) { orm, db, err := NewOrm(logger, chainID, postgresDb) if err != nil { - return false, err + return false, "", err } defer db.Close() knownFilters, err := orm.LoadFilters() if err != nil { - return false, err + return false, "", err } for _, expectedFilter := range expectedFilters { @@ -185,14 +190,15 @@ var nodeHasExpectedFilters = func(expectedFilters []ExpectedFilter, logger core_ } if !filterFound { - return false, fmt.Errorf("no filter found for emitter %s and topic %s", expectedFilter.emitterAddress.String(), expectedFilter.topic.Hex()) + return false, fmt.Sprintf("no filter found for emitter %s and topic %s", expectedFilter.emitterAddress.String(), expectedFilter.topic.Hex()), nil } } - return true, nil + return true, "", nil } -var randomWait = func(minMilliseconds, maxMilliseconds int) { +// randomWait waits for a random amount of time between minMilliseconds and maxMilliseconds +func randomWait(minMilliseconds, maxMilliseconds int) { rand.New(rand.NewSource(time.Now().UnixNano())) randomMilliseconds := rand.Intn(maxMilliseconds-minMilliseconds+1) + minMilliseconds time.Sleep(time.Duration(randomMilliseconds) * time.Millisecond) @@ -201,10 +207,9 @@ var randomWait = func(minMilliseconds, maxMilliseconds int) { type LogEmitterChannel struct { logsEmitted int err error - // unused - // currentIndex int } +// getIntSlice returns a slice of ints of the provided length func getIntSlice(length int) []int { result := make([]int, length) for i := 0; i < length; i++ { @@ -214,6 +219,7 @@ func getIntSlice(length int) []int { return result } +// getStringSlice returns a slice of strings of the provided length func getStringSlice(length int) []string { result := make([]string, length) for i := 0; i < length; i++ { @@ -223,17 +229,18 @@ func getStringSlice(length int) []string { return result } -var emitEvents = func(ctx context.Context, l zerolog.Logger, logEmitter *contracts.LogEmitter, cfg *Config, wg *sync.WaitGroup, results chan LogEmitterChannel) { +// emitEvents emits events from the provided log emitter concurrently according to the provided config +func emitEvents(ctx context.Context, l zerolog.Logger, logEmitter *contracts.LogEmitter, cfg *Config, wg *sync.WaitGroup, results chan LogEmitterChannel) { address := (*logEmitter).Address().String() localCounter := 0 - select { - case <-ctx.Done(): - l.Warn().Str("Emitter address", address).Msg("Context cancelled, not emitting events") - return - default: - defer wg.Done() - for i := 0; i < cfg.LoopedConfig.ExecutionCount; i++ { - for _, event := range cfg.General.EventsToEmit { + defer wg.Done() + for i := 0; i < cfg.LoopedConfig.ExecutionCount; i++ { + for _, event := range cfg.General.EventsToEmit { + select { + case <-ctx.Done(): + l.Warn().Str("Emitter address", address).Msg("Context cancelled, not emitting events") + return + default: l.Debug().Str("Emitter address", address).Str("Event type", event.Name).Str("index", fmt.Sprintf("%d/%d", (i+1), cfg.LoopedConfig.ExecutionCount)).Msg("Emitting log from emitter") var err error switch event.Name { @@ -243,14 +250,15 @@ var emitEvents = func(ctx context.Context, l zerolog.Logger, logEmitter *contrac _, err = (*logEmitter).EmitLogIntsIndexed(getIntSlice(cfg.General.EventsPerTx)) case "Log3": _, err = (*logEmitter).EmitLogStrings(getStringSlice(cfg.General.EventsPerTx)) + case "Log4": + _, err = (*logEmitter).EmitLogIntMultiIndexed(1, 1, cfg.General.EventsPerTx) default: err = fmt.Errorf("unknown event name: %s", event.Name) } if err != nil { results <- LogEmitterChannel{ - logsEmitted: 0, - err: err, + err: err, } return } @@ -263,35 +271,25 @@ var emitEvents = func(ctx context.Context, l zerolog.Logger, logEmitter *contrac l.Info().Str("Emitter address", address).Str("Index", fmt.Sprintf("%d/%d", i+1, cfg.LoopedConfig.ExecutionCount)).Msg("Emitted all three events") } } - - l.Info().Str("Emitter address", address).Int("Total logs emitted", localCounter).Msg("Finished emitting events") - - results <- LogEmitterChannel{ - logsEmitted: localCounter, - err: nil, - } - } -} - -var chainHasFinalisedEndBlock = func(l zerolog.Logger, evmClient blockchain.EVMClient, endBlock int64) (bool, error) { - effectiveEndBlock := endBlock + 1 - lastFinalisedBlockHeader, err := evmClient.GetLatestFinalizedBlockHeader(context.Background()) - if err != nil { - return false, err } - l.Info().Int64("Last finalised block header", lastFinalisedBlockHeader.Number.Int64()).Int64("End block", effectiveEndBlock).Int64("Blocks left till end block", effectiveEndBlock-lastFinalisedBlockHeader.Number.Int64()).Msg("Waiting for the finalized block to move beyond end block") + l.Info().Str("Emitter address", address).Int("Total logs emitted", localCounter).Msg("Finished emitting events") - return lastFinalisedBlockHeader.Number.Int64() > effectiveEndBlock, nil + results <- LogEmitterChannel{ + logsEmitted: localCounter, + err: nil, + } } -var logPollerHasFinalisedEndBlock = func(endBlock int64, chainID *big.Int, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { +// LogPollerHasFinalisedEndBlock returns true if all CL nodes have finalised processing the provided end block +func LogPollerHasFinalisedEndBlock(endBlock int64, chainID *big.Int, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { wg := &sync.WaitGroup{} type boolQueryResult struct { - nodeName string - hasFinalised bool - err error + nodeName string + hasFinalised bool + finalizedBlock int64 + err error } endBlockCh := make(chan boolQueryResult, len(nodes.Nodes)-1) @@ -327,9 +325,10 @@ var logPollerHasFinalisedEndBlock = func(endBlock int64, chainID *big.Int, l zer } r <- boolQueryResult{ - nodeName: clNode.ContainerName, - hasFinalised: latestBlock.FinalizedBlockNumber > endBlock, - err: nil, + nodeName: clNode.ContainerName, + finalizedBlock: latestBlock.FinalizedBlockNumber, + hasFinalised: latestBlock.FinalizedBlockNumber > endBlock, + err: nil, } } @@ -352,7 +351,7 @@ var logPollerHasFinalisedEndBlock = func(endBlock int64, chainID *big.Int, l zer if r.hasFinalised { l.Info().Str("Node name", r.nodeName).Msg("CL node has finalised end block") } else { - l.Warn().Str("Node name", r.nodeName).Msg("CL node has not finalised end block yet") + l.Warn().Int64("Has", r.finalizedBlock).Int64("Want", endBlock).Str("Node name", r.nodeName).Msg("CL node has not finalised end block yet") } if len(foundMap) == len(nodes.Nodes)-1 { @@ -376,7 +375,8 @@ var logPollerHasFinalisedEndBlock = func(endBlock int64, chainID *big.Int, l zer return <-allFinalisedCh, err } -var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big.Int, expectedLogCount int, expectedFilters []ExpectedFilter, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { +// ClNodesHaveExpectedLogCount returns true if all CL nodes have the expected log count in the provided block range and matching the provided filters +func ClNodesHaveExpectedLogCount(startBlock, endBlock int64, chainID *big.Int, expectedLogCount int, expectedFilters []ExpectedFilter, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) { wg := &sync.WaitGroup{} type logQueryResult struct { @@ -386,13 +386,13 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. err error } - queryCh := make(chan logQueryResult, len(nodes.Nodes)-1) + resultChan := make(chan logQueryResult, len(nodes.Nodes)-1) ctx, cancelFn := context.WithCancel(context.Background()) for i := 1; i < len(nodes.Nodes); i++ { wg.Add(1) - go func(clNode *test_env.ClNode, r chan logQueryResult) { + go func(clNode *test_env.ClNode, resultChan chan logQueryResult) { defer wg.Done() select { case <-ctx.Done(): @@ -400,7 +400,7 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. default: orm, db, err := NewOrm(coreLogger, chainID, clNode.PostgresDb) if err != nil { - r <- logQueryResult{ + resultChan <- logQueryResult{ nodeName: clNode.ContainerName, logCount: 0, hasExpectedCount: false, @@ -414,7 +414,7 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. for _, filter := range expectedFilters { logs, err := orm.SelectLogs(startBlock, endBlock, filter.emitterAddress, filter.topic) if err != nil { - r <- logQueryResult{ + resultChan <- logQueryResult{ nodeName: clNode.ContainerName, logCount: 0, hasExpectedCount: false, @@ -425,14 +425,14 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. foundLogsCount += len(logs) } - r <- logQueryResult{ + resultChan <- logQueryResult{ nodeName: clNode.ContainerName, logCount: foundLogsCount, hasExpectedCount: foundLogsCount >= expectedLogCount, - err: err, + err: nil, } } - }(nodes.Nodes[i], queryCh) + }(nodes.Nodes[i], resultChan) } var err error @@ -440,7 +440,7 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. go func() { foundMap := make(map[string]bool, 0) - for r := range queryCh { + for r := range resultChan { if r.err != nil { err = r.err cancelFn() @@ -449,15 +449,22 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. foundMap[r.nodeName] = r.hasExpectedCount if r.hasExpectedCount { - l.Info().Str("Node name", r.nodeName).Int("Logs count", r.logCount).Msg("Expected log count found in CL node") + l.Debug(). + Str("Node name", r.nodeName). + Int("Logs count", r.logCount). + Msg("Expected log count found in CL node") } else { - l.Warn().Str("Node name", r.nodeName).Str("Found/Expected logs", fmt.Sprintf("%d/%d", r.logCount, expectedLogCount)).Int("Missing logs", expectedLogCount-r.logCount).Msg("Too low log count found in CL node") + l.Debug(). + Str("Node name", r.nodeName). + Str("Found/Expected logs", fmt.Sprintf("%d/%d", r.logCount, expectedLogCount)). + Int("Missing logs", expectedLogCount-r.logCount). + Msg("Too low log count found in CL node") } if len(foundMap) == len(nodes.Nodes)-1 { allFound := true - for _, v := range foundMap { - if !v { + for _, hadAllLogs := range foundMap { + if !hadAllLogs { allFound = false break } @@ -470,13 +477,14 @@ var clNodesHaveExpectedLogCount = func(startBlock, endBlock int64, chainID *big. }() wg.Wait() - close(queryCh) + close(resultChan) return <-allFoundCh, err } type MissingLogs map[string][]geth_types.Log +// IsEmpty returns true if there are no missing logs func (m *MissingLogs) IsEmpty() bool { for _, v := range *m { if len(v) > 0 { @@ -487,7 +495,8 @@ func (m *MissingLogs) IsEmpty() bool { return true } -var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient blockchain.EVMClient, clnodeCluster *test_env.ClCluster, l zerolog.Logger, coreLogger core_logger.SugaredLogger, cfg *Config) (MissingLogs, error) { +// GetMissingLogs returns a map of CL node name to missing logs in that node compared to EVM node to which the provided evm client is connected +func GetMissingLogs(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient blockchain.EVMClient, clnodeCluster *test_env.ClCluster, l zerolog.Logger, coreLogger core_logger.SugaredLogger, cfg *Config) (MissingLogs, error) { wg := &sync.WaitGroup{} type dbQueryResult struct { @@ -511,7 +520,7 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L default: nodeName := clnodeCluster.Nodes[i].ContainerName - l.Info().Str("Node name", nodeName).Msg("Fetching log poller logs") + l.Debug().Str("Node name", nodeName).Msg("Fetching log poller logs") orm, db, err := NewOrm(coreLogger, evmClient.GetChainID(), clnodeCluster.Nodes[i].PostgresDb) if err != nil { r <- dbQueryResult{ @@ -528,7 +537,7 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L address := (*logEmitters[j]).Address() for _, event := range cfg.General.EventsToEmit { - l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Msg("Fetching single emitter's logs") + l.Trace().Str("Event name", event.Name).Str("Emitter address", address.String()).Msg("Fetching single emitter's logs") result, err := orm.SelectLogs(startBlock, endBlock, address, event.ID) if err != nil { r <- dbQueryResult{ @@ -544,11 +553,11 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L logs = append(logs, result...) - l.Debug().Str("Event name", event.Name).Str("Emitter address", address.String()).Int("Log count", len(result)).Msg("Logs found per node") + l.Trace().Str("Event name", event.Name).Str("Emitter address", address.String()).Int("Log count", len(result)).Msg("Logs found per node") } } - l.Warn().Int("Count", len(logs)).Str("Node name", nodeName).Msg("Fetched log poller logs") + l.Info().Int("Count", len(logs)).Str("Node name", nodeName).Msg("Fetched log poller logs") r <- dbQueryResult{ err: nil, @@ -595,16 +604,17 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L logs []geth_types.Log } - l.Info().Msg("Started comparison of logs from EVM node and CL nodes. This may take a while if there's a lot of logs") - missingCh := make(chan missingLogResult, len(clnodeCluster.Nodes)-1) evmLogCount := len(allLogsInEVMNode) + l.Info().Int("Log count", evmLogCount).Msg("Started comparison of logs from EVM node and CL nodes. This may take a while if there's a lot of logs") + + missingCh := make(chan missingLogResult, len(clnodeCluster.Nodes)-1) for i := 1; i < len(clnodeCluster.Nodes); i++ { wg.Add(1) go func(i int, result chan missingLogResult) { defer wg.Done() nodeName := clnodeCluster.Nodes[i].ContainerName - l.Info().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("0/%d", evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") + l.Debug().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("0/%d", evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") missingLogs := make([]geth_types.Log, 0) for i, evmLog := range allLogsInEVMNode { @@ -618,7 +628,7 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L } if i%10000 == 0 && i != 0 { - l.Info().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("%d/%d", i, evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") + l.Debug().Str("Node name", nodeName).Str("Progress", fmt.Sprintf("%d/%d", i, evmLogCount)).Msg("Comparing single CL node's logs with EVM logs") } if !logFound { @@ -648,15 +658,18 @@ var getMissingLogs = func(startBlock, endBlock int64, logEmitters []*contracts.L } } - expectedTotalLogsEmitted := getExpectedLogCount(cfg) + expectedTotalLogsEmitted := GetExpectedLogCount(cfg) if int64(len(allLogsInEVMNode)) != expectedTotalLogsEmitted { - l.Warn().Str("Actual/Expected", fmt.Sprintf("%d/%d", expectedTotalLogsEmitted, len(allLogsInEVMNode))).Msg("Some of the test logs were not found in EVM node. This is a bug in the test") + l.Warn(). + Str("Actual/Expected", fmt.Sprintf("%d/%d", expectedTotalLogsEmitted, len(allLogsInEVMNode))). + Msg("Some of the test logs were not found in EVM node. This is a bug in the test") } return missingLogs, nil } -var printMissingLogsByType = func(missingLogs map[string][]geth_types.Log, l zerolog.Logger, cfg *Config) { +// PrintMissingLogsInfo prints various useful information about the missing logs +func PrintMissingLogsInfo(missingLogs map[string][]geth_types.Log, l zerolog.Logger, cfg *Config) { var findHumanName = func(topic common.Hash) string { for _, event := range cfg.General.EventsToEmit { if event.ID == topic { @@ -675,12 +688,39 @@ var printMissingLogsByType = func(missingLogs map[string][]geth_types.Log, l zer } } + l.Debug().Msg("Missing log by event name") for k, v := range missingByType { - l.Warn().Str("Event name", k).Int("Missing count", v).Msg("Missing logs by type") + l.Debug().Str("Event name", k).Int("Missing count", v).Msg("Missing logs by type") + } + + missingByBlock := make(map[uint64]int) + for _, logs := range missingLogs { + for _, l := range logs { + missingByBlock[l.BlockNumber]++ + } + } + + l.Debug().Msg("Missing logs by block") + for k, v := range missingByBlock { + l.Debug().Uint64("Block number", k).Int("Missing count", v).Msg("Missing logs by block") + } + + missingByEmitter := make(map[string]int) + for _, logs := range missingLogs { + for _, l := range logs { + missingByEmitter[l.Address.String()]++ + } + } + + l.Debug().Msg("Missing logs by emitter") + for k, v := range missingByEmitter { + l.Debug().Str("Emitter address", k).Int("Missing count", v).Msg("Missing logs by emitter") } } -var getEVMLogs = func(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient blockchain.EVMClient, l zerolog.Logger, cfg *Config) ([]geth_types.Log, error) { +// getEVMLogs returns a slice of all logs emitted by the provided log emitters in the provided block range, +// which are present in the EVM node to which the provided evm client is connected +func getEVMLogs(startBlock, endBlock int64, logEmitters []*contracts.LogEmitter, evmClient blockchain.EVMClient, l zerolog.Logger, cfg *Config) ([]geth_types.Log, error) { allLogsInEVMNode := make([]geth_types.Log, 0) for j := 0; j < len(logEmitters); j++ { address := (*logEmitters[j]).Address() @@ -705,12 +745,13 @@ var getEVMLogs = func(startBlock, endBlock int64, logEmitters []*contracts.LogEm } } - l.Warn().Int("Count", len(allLogsInEVMNode)).Msg("Logs in EVM node") + l.Info().Int("Count", len(allLogsInEVMNode)).Msg("Logs in EVM node") return allLogsInEVMNode, nil } -func executeGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { +// ExecuteGenerator executes the configured generator and returns the total number of logs emitted +func ExecuteGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { if cfg.General.Generator == GeneratorType_WASP { return runWaspGenerator(t, cfg, logEmitters) } @@ -718,6 +759,7 @@ func executeGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmi return runLoopedGenerator(t, cfg, logEmitters) } +// runWaspGenerator runs the wasp generator and returns the total number of logs emitted func runWaspGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { l := logging.GetTestLogger(t) @@ -775,6 +817,7 @@ func runWaspGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmi return counter.value, nil } +// runLoopedGenerator runs the looped generator and returns the total number of logs emitted func runLoopedGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogEmitter) (int, error) { l := logging.GetTestLogger(t) @@ -797,31 +840,37 @@ func runLoopedGenerator(t *testing.T, cfg *Config, logEmitters []*contracts.LogE aggrChan := make(chan int, len(logEmitters)) go func() { - for emitter := range emitterCh { - if emitter.err != nil { - emitErr = emitter.err - cancelFn() + for { + select { + case <-ctx.Done(): return + case emitter := <-emitterCh: + if emitter.err != nil { + emitErr = emitter.err + cancelFn() + return + } + aggrChan <- emitter.logsEmitted } - aggrChan <- emitter.logsEmitted } }() wg.Wait() close(emitterCh) - for i := 0; i < len(logEmitters); i++ { - total += <-aggrChan - } - if emitErr != nil { return 0, emitErr } + for i := 0; i < len(logEmitters); i++ { + total += <-aggrChan + } + return int(total), nil } -func getExpectedLogCount(cfg *Config) int64 { +// GetExpectedLogCount returns the expected number of logs to be emitted based on the provided config +func GetExpectedLogCount(cfg *Config) int64 { if cfg.General.Generator == GeneratorType_WASP { if cfg.Wasp.Load.RPS != 0 { return cfg.Wasp.Load.RPS * int64(cfg.Wasp.Load.Duration.Duration().Seconds()) * int64(cfg.General.EventsPerTx) @@ -832,40 +881,71 @@ func getExpectedLogCount(cfg *Config) int64 { return int64(len(cfg.General.EventsToEmit) * cfg.LoopedConfig.ExecutionCount * cfg.General.Contracts * cfg.General.EventsPerTx) } -var chaosPauseSyncFn = func(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv) error { +type PauseData struct { + StartBlock uint64 + EndBlock uint64 + TargetComponent string + ContaineName string +} + +var ChaosPauses = []PauseData{} + +// chaosPauseSyncFn pauses ranom container of the provided type for a random amount of time between 5 and 20 seconds +func chaosPauseSyncFn(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv, targetComponent string) ChaosPauseData { rand.New(rand.NewSource(time.Now().UnixNano())) - randomBool := rand.Intn(2) == 0 randomNode := testEnv.ClCluster.Nodes[rand.Intn(len(testEnv.ClCluster.Nodes)-1)+1] var component ctf_test_env.EnvComponent - if randomBool { + switch strings.ToLower(targetComponent) { + case "chainlink": component = randomNode.EnvComponent - } else { + case "postgres": component = randomNode.PostgresDb.EnvComponent + default: + return ChaosPauseData{Err: fmt.Errorf("unknown component %s", targetComponent)} } + ctx := context.Background() + pauseStartBlock, err := testEnv.EVMClient.LatestBlockNumber(ctx) + if err != nil { + return ChaosPauseData{Err: err} + } pauseTimeSec := rand.Intn(20-5) + 5 l.Info().Str("Container", component.ContainerName).Int("Pause time", pauseTimeSec).Msg("Pausing component") pauseTimeDur := time.Duration(pauseTimeSec) * time.Second - err := component.ChaosPause(l, pauseTimeDur) + err = component.ChaosPause(l, pauseTimeDur) + if err != nil { + return ChaosPauseData{Err: err} + } l.Info().Str("Container", component.ContainerName).Msg("Component unpaused") + pauseEndBlock, err := testEnv.EVMClient.LatestBlockNumber(ctx) if err != nil { - return err + return ChaosPauseData{Err: err} } - return nil + return ChaosPauseData{PauseData: PauseData{ + StartBlock: pauseStartBlock, + EndBlock: pauseEndBlock, + TargetComponent: targetComponent, + ContaineName: component.ContainerName, + }} +} + +type ChaosPauseData struct { + Err error + PauseData PauseData } -var executeChaosExperiment = func(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv, cfg *Config, errorCh chan error) { +// ExecuteChaosExperiment executes the configured chaos experiment, which consist of pausing CL node or Postgres containers +func ExecuteChaosExperiment(l zerolog.Logger, testEnv *test_env.CLClusterTestEnv, cfg *Config, errorCh chan error) { if cfg.ChaosConfig == nil || cfg.ChaosConfig.ExperimentCount == 0 { errorCh <- nil return } - chaosChan := make(chan error, cfg.ChaosConfig.ExperimentCount) - + chaosChan := make(chan ChaosPauseData, cfg.ChaosConfig.ExperimentCount) wg := &sync.WaitGroup{} go func() { @@ -880,9 +960,11 @@ var executeChaosExperiment = func(l zerolog.Logger, testEnv *test_env.CLClusterT defer func() { <-guardChan wg.Done() - l.Info().Str("Current/Total", fmt.Sprintf("%d/%d", i, cfg.ChaosConfig.ExperimentCount)).Msg("Done with experiment") + current := i + 1 + l.Info().Str("Current/Total", fmt.Sprintf("%d/%d", current, cfg.ChaosConfig.ExperimentCount)).Msg("Done with experiment") }() - chaosChan <- chaosPauseSyncFn(l, testEnv) + chaosChan <- chaosPauseSyncFn(l, testEnv, cfg.ChaosConfig.TargetComponent) + time.Sleep(10 * time.Second) }() } @@ -892,25 +974,28 @@ var executeChaosExperiment = func(l zerolog.Logger, testEnv *test_env.CLClusterT }() go func() { - for err := range chaosChan { - // This will receive errors until chaosChan is closed - if err != nil { - // If an error is encountered, log it, send it to the error channel, and return from the function - l.Err(err).Msg("Error encountered during chaos experiment") - errorCh <- err + var pauseData []PauseData + for result := range chaosChan { + if result.Err != nil { + l.Err(result.Err).Msg("Error encountered during chaos experiment") + errorCh <- result.Err return // Return on actual error } - // No need for an else block here, because if err is nil (which happens when the channel is closed), - // the loop will exit and the following log and nil send will execute. + + pauseData = append(pauseData, result.PauseData) } - // After the loop exits, which it will do when chaosChan is closed, log that all experiments are finished. l.Info().Msg("All chaos experiments finished") errorCh <- nil // Only send nil once, after all errors have been handled and the channel is closed + + for _, p := range pauseData { + l.Debug().Str("Target component", p.TargetComponent).Str("Container", p.ContaineName).Str("Block range", fmt.Sprintf("%d - %d", p.StartBlock, p.EndBlock)).Msgf("Details of executed chaos pause") + } }() } -var GetFinalityDepth = func(chainId int64) (int64, error) { +// GetFinalityDepth returns the finality depth for the provided chain ID +func GetFinalityDepth(chainId int64) (int64, error) { var finalityDepth int64 switch chainId { // Ethereum Sepolia @@ -929,7 +1014,8 @@ var GetFinalityDepth = func(chainId int64) (int64, error) { return finalityDepth, nil } -var GetEndBlockToWaitFor = func(endBlock, chainId int64, cfg *Config) (int64, error) { +// GetEndBlockToWaitFor returns the end block to wait for based on chain id and finality tag provided in config +func GetEndBlockToWaitFor(endBlock, chainId int64, cfg *Config) (int64, error) { if cfg.General.UseFinalityTag { return endBlock + 1, nil } @@ -951,7 +1037,7 @@ const ( ) var ( - defaultOCRRegistryConfig = contracts.KeeperRegistrySettings{ + DefaultOCRRegistryConfig = contracts.KeeperRegistrySettings{ PaymentPremiumPPB: uint32(200000000), FlatFeeMicroLINK: uint32(0), BlockCountPerTurn: big.NewInt(10), @@ -982,7 +1068,8 @@ var ( } ) -func setupLogPollerTestDocker( +// SetupLogPollerTestDocker starts the DON and private Ethereum network +func SetupLogPollerTestDocker( t *testing.T, registryVersion ethereum.KeeperRegistryVersion, registryConfig contracts.KeeperRegistrySettings, @@ -1042,9 +1129,8 @@ func setupLogPollerTestDocker( WithConsensusType(ctf_test_env.ConsensusType_PoS). WithConsensusLayer(ctf_test_env.ConsensusLayer_Prysm). WithExecutionLayer(ctf_test_env.ExecutionLayer_Geth). - WithWaitingForFinalization(). WithEthereumChainConfig(ctf_test_env.EthereumChainConfig{ - SecondsPerSlot: 8, + SecondsPerSlot: 4, SlotsPerEpoch: 2, }). Build() @@ -1116,3 +1202,131 @@ func setupLogPollerTestDocker( return env.EVMClient, nodeClients, env.ContractDeployer, linkToken, registry, registrar, env } + +// UploadLogEmitterContractsAndWaitForFinalisation uploads the configured number of log emitter contracts and waits for the upload blocks to be finalised +func UploadLogEmitterContractsAndWaitForFinalisation(l zerolog.Logger, t *testing.T, testEnv *test_env.CLClusterTestEnv, cfg *Config) []*contracts.LogEmitter { + logEmitters := make([]*contracts.LogEmitter, 0) + for i := 0; i < cfg.General.Contracts; i++ { + logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() + logEmitters = append(logEmitters, &logEmitter) + require.NoError(t, err, "Error deploying log emitter contract") + l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") + time.Sleep(200 * time.Millisecond) + } + afterUploadBlock, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) + require.NoError(t, err, "Error getting latest block number") + + gom := gomega.NewGomegaWithT(t) + gom.Eventually(func(g gomega.Gomega) { + targetBlockNumber := int64(afterUploadBlock + 1) + finalized, err := testEnv.EVMClient.GetLatestFinalizedBlockHeader(testcontext.Get(t)) + if err != nil { + l.Warn().Err(err).Msg("Error checking if contract were uploaded. Retrying...") + return + } + finalizedBlockNumber := finalized.Number.Int64() + + if finalizedBlockNumber < targetBlockNumber { + l.Debug().Int64("Finalized block", finalized.Number.Int64()).Int64("After upload block", int64(afterUploadBlock+1)).Msg("Waiting for contract upload to finalise") + } + + g.Expect(finalizedBlockNumber >= targetBlockNumber).To(gomega.BeTrue(), "Contract upload did not finalize in time") + }, "2m", "10s").Should(gomega.Succeed()) + + return logEmitters +} + +// AssertUpkeepIdsUniqueness asserts that the provided upkeep IDs are unique +func AssertUpkeepIdsUniqueness(upkeepIDs []*big.Int) error { + upKeepIdSeen := make(map[int64]bool) + for _, upkeepID := range upkeepIDs { + if _, ok := upKeepIdSeen[upkeepID.Int64()]; ok { + return fmt.Errorf("Duplicate upkeep ID %d", upkeepID.Int64()) + } + upKeepIdSeen[upkeepID.Int64()] = true + } + + return nil +} + +// AssertContractAddressUniquneness asserts that the provided contract addresses are unique +func AssertContractAddressUniquneness(logEmitters []*contracts.LogEmitter) error { + contractAddressSeen := make(map[string]bool) + for _, logEmitter := range logEmitters { + address := (*logEmitter).Address().String() + if _, ok := contractAddressSeen[address]; ok { + return fmt.Errorf("Duplicate contract address %s", address) + } + contractAddressSeen[address] = true + } + + return nil +} + +// RegisterFiltersAndAssertUniquness registers the configured log filters and asserts that the filters are unique +// meaning that for each log emitter address and topic there is only one filter +func RegisterFiltersAndAssertUniquness(l zerolog.Logger, registry contracts.KeeperRegistry, upkeepIDs []*big.Int, logEmitters []*contracts.LogEmitter, cfg *Config, upKeepsNeeded int) error { + uniqueFilters := make(map[string]bool) + + upkeepIdIndex := 0 + for i := 0; i < len(logEmitters); i++ { + for j := 0; j < len(cfg.General.EventsToEmit); j++ { + emitterAddress := (*logEmitters[i]).Address() + topicId := cfg.General.EventsToEmit[j].ID + + upkeepID := upkeepIDs[upkeepIdIndex] + l.Debug().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") + err := registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) + randomWait(150, 300) + if err != nil { + return fmt.Errorf("%w: Error registering log trigger for log emitter %s", err, emitterAddress.String()) + } + + if i%10 == 0 { + l.Info().Msgf("Registered log trigger for topic %d for log emitter %d/%d", j, i, len(logEmitters)) + } + + key := fmt.Sprintf("%s-%s", emitterAddress.String(), topicId.Hex()) + if _, ok := uniqueFilters[key]; ok { + return fmt.Errorf("Duplicate filter %s", key) + } + uniqueFilters[key] = true + upkeepIdIndex++ + } + } + + if upKeepsNeeded != len(uniqueFilters) { + return fmt.Errorf("Number of unique filters should be equal to number of upkeeps. Expected %d. Got %d", upKeepsNeeded, len(uniqueFilters)) + } + + return nil +} + +// FluentlyCheckIfAllNodesHaveLogCount checks if all CL nodes have the expected log count for the provided block range and expected filters +// It will retry until the provided duration is reached or until all nodes have the expected log count +func FluentlyCheckIfAllNodesHaveLogCount(duration string, startBlock, endBlock int64, expectedLogCount int, expectedFilters []ExpectedFilter, l zerolog.Logger, coreLogger core_logger.SugaredLogger, testEnv *test_env.CLClusterTestEnv) (bool, error) { + logCountWaitDuration, err := time.ParseDuration(duration) + if err != nil { + return false, err + } + endTime := time.Now().Add(logCountWaitDuration) + + // not using gomega here, because I want to see which logs were missing + allNodesLogCountMatches := false + for time.Now().Before(endTime) { + logCountMatches, clErr := ClNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), expectedLogCount, expectedFilters, l, coreLogger, testEnv.ClCluster) + if clErr != nil { + l.Warn(). + Err(clErr). + Msg("Error checking if CL nodes have expected log count. Retrying...") + } + if logCountMatches { + allNodesLogCountMatches = true + break + } + l.Warn(). + Msg("At least one CL node did not have expected log count. Retrying...") + } + + return allNodesLogCountMatches, nil +} diff --git a/integration-tests/universal/log_poller/scenarios.go b/integration-tests/universal/log_poller/scenarios.go deleted file mode 100644 index 5331c63f752..00000000000 --- a/integration-tests/universal/log_poller/scenarios.go +++ /dev/null @@ -1,496 +0,0 @@ -package logpoller - -import ( - "fmt" - "math/big" - "testing" - "time" - - "github.com/onsi/gomega" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-testing-framework/logging" - "github.com/smartcontractkit/chainlink-testing-framework/utils/testcontext" - "github.com/smartcontractkit/chainlink/integration-tests/actions" - "github.com/smartcontractkit/chainlink/integration-tests/contracts" - "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" - core_logger "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -func ExecuteBasicLogPollerTest(t *testing.T, cfg *Config) { - l := logging.GetTestLogger(t) - coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ - - if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { - l.Warn().Msg("No events to emit specified, using all events from log emitter contract") - for _, event := range EmitterABI.Events { - cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) - } - } - - l.Info().Msg("Starting basic log poller test") - - var ( - err error - upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) - ) - - chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( - t, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(500*time.Millisecond), cfg.General.UseFinalityTag, - ) - - _, upkeepIDs := actions.DeployConsumers( - t, - registry, - registrar, - linkToken, - contractDeployer, - chainClient, - upKeepsNeeded, - big.NewInt(automationDefaultLinkFunds), - automationDefaultUpkeepGasLimit, - true, - false, - ) - - // Deploy Log Emitter contracts - logEmitters := make([]*contracts.LogEmitter, 0) - for i := 0; i < cfg.General.Contracts; i++ { - logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() - logEmitters = append(logEmitters, &logEmitter) - require.NoError(t, err, "Error deploying log emitter contract") - l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") - time.Sleep(200 * time.Millisecond) - } - - // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) - // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) - for i := 0; i < len(upkeepIDs); i++ { - emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() - upkeepID := upkeepIDs[i] - topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID - - l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") - err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) - randomWait(50, 200) - require.NoError(t, err, "Error registering log trigger for log emitter") - } - - err = chainClient.WaitForEvents() - require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") - - // Make sure that all nodes have expected filters registered before starting to emit events - expectedFilters := getExpectedFilters(logEmitters, cfg) - gom := gomega.NewGomegaWithT(t) - gom.Eventually(func(g gomega.Gomega) { - for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { - nodeName := testEnv.ClCluster.Nodes[i].ContainerName - l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") - - hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) - if err != nil { - l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") - return - } - - g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") - } - }, "30s", "1s").Should(gomega.Succeed()) - l.Info().Msg("All nodes have expected filters registered") - l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") - - // Save block number before starting to emit events, so that we can later use it when querying logs - sb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - startBlock := int64(sb) - - l.Info().Msg("STARTING EVENT EMISSION") - startTime := time.Now() - - // Start chaos experimnents by randomly pausing random containers (Chainlink nodes or their DBs) - chaosDoneCh := make(chan error, 1) - go func() { - executeChaosExperiment(l, testEnv, cfg, chaosDoneCh) - }() - - totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) - endTime := time.Now() - require.NoError(t, err, "Error executing event generator") - - expectedLogsEmitted := getExpectedLogCount(cfg) - duration := int(endTime.Sub(startTime).Seconds()) - l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") - - // Save block number after finishing to emit events, so that we can later use it when querying logs - eb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - - endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) - require.NoError(t, err, "Error getting end block to wait for") - - l.Info().Msg("Waiting before proceeding with test until all chaos experiments finish") - chaosError := <-chaosDoneCh - require.NoError(t, chaosError, "Error encountered during chaos experiment") - - // Wait until last block in which events were emitted has been finalised - // how long should we wait here until all logs are processed? wait for block X to be processed by all nodes? - waitDuration := "15m" - l.Warn().Str("Duration", waitDuration).Msg("Waiting for logs to be processed by all nodes and for chain to advance beyond finality") - - gom.Eventually(func(g gomega.Gomega) { - hasAdvanced, err := chainHasFinalisedEndBlock(l, testEnv.EVMClient, endBlock) - if err != nil { - l.Warn().Err(err).Msg("Error checking if chain has advanced beyond finality. Retrying...") - } - g.Expect(hasAdvanced).To(gomega.BeTrue(), "Chain has not advanced beyond finality") - }, waitDuration, "30s").Should(gomega.Succeed()) - - l.Warn().Str("Duration", "1m").Msg("Waiting for all CL nodes to have end block finalised") - gom.Eventually(func(g gomega.Gomega) { - hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) - if err != nil { - l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") - } - g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") - }, "1m", "30s").Should(gomega.Succeed()) - - gom.Eventually(func(g gomega.Gomega) { - logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), totalLogsEmitted, expectedFilters, l, coreLogger, testEnv.ClCluster) - if err != nil { - l.Warn().Err(err).Msg("Error checking if CL nodes have expected log count. Retrying...") - } - g.Expect(logCountMatches).To(gomega.BeTrue(), "Not all CL nodes have expected log count") - }, waitDuration, "5s").Should(gomega.Succeed()) - - // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has - logConsistencyWaitDuration := "1m" - l.Warn().Str("Duration", logConsistencyWaitDuration).Msg("Waiting for CL nodes to have all the logs that EVM node has") - - gom.Eventually(func(g gomega.Gomega) { - missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) - if err != nil { - l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") - } - - if !missingLogs.IsEmpty() { - printMissingLogsByType(missingLogs, l, cfg) - } - g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") - }, logConsistencyWaitDuration, "5s").Should(gomega.Succeed()) -} - -func ExecuteLogPollerReplay(t *testing.T, cfg *Config, consistencyTimeout string) { - l := logging.GetTestLogger(t) - coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ - - if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { - l.Warn().Msg("No events to emit specified, using all events from log emitter contract") - for _, event := range EmitterABI.Events { - cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) - } - } - - l.Info().Msg("Starting replay log poller test") - - var ( - err error - upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) - ) - - // we set blockBackfillDepth to 0, to make sure nothing will be backfilled and won't interfere with our test - chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( - t, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(1000*time.Millisecond), cfg.General.UseFinalityTag) - - _, upkeepIDs := actions.DeployConsumers( - t, - registry, - registrar, - linkToken, - contractDeployer, - chainClient, - upKeepsNeeded, - big.NewInt(automationDefaultLinkFunds), - automationDefaultUpkeepGasLimit, - true, - false, - ) - - // Deploy Log Emitter contracts - logEmitters := make([]*contracts.LogEmitter, 0) - for i := 0; i < cfg.General.Contracts; i++ { - logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() - logEmitters = append(logEmitters, &logEmitter) - require.NoError(t, err, "Error deploying log emitter contract") - l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") - time.Sleep(200 * time.Millisecond) - } - - //wait for contracts to be uploaded to chain, TODO: could make this wait fluent - time.Sleep(5 * time.Second) - - // Save block number before starting to emit events, so that we can later use it when querying logs - sb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - startBlock := int64(sb) - - l.Info().Msg("STARTING EVENT EMISSION") - startTime := time.Now() - totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) - endTime := time.Now() - require.NoError(t, err, "Error executing event generator") - expectedLogsEmitted := getExpectedLogCount(cfg) - duration := int(endTime.Sub(startTime).Seconds()) - l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") - - // Save block number after finishing to emit events, so that we can later use it when querying logs - eb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - - endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) - require.NoError(t, err, "Error getting end block to wait for") - - // Lets make sure no logs are in DB yet - expectedFilters := getExpectedFilters(logEmitters, cfg) - logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), 0, expectedFilters, l, coreLogger, testEnv.ClCluster) - require.NoError(t, err, "Error checking if CL nodes have expected log count") - require.True(t, logCountMatches, "Some CL nodes already had logs in DB") - l.Info().Msg("No logs were saved by CL nodes yet, as expected. Proceeding.") - - // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) - // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) - for i := 0; i < len(upkeepIDs); i++ { - emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() - upkeepID := upkeepIDs[i] - topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID - - l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") - err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) - require.NoError(t, err, "Error registering log trigger for log emitter") - } - - err = chainClient.WaitForEvents() - require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") - - // Make sure that all nodes have expected filters registered before starting to emit events - gom := gomega.NewGomegaWithT(t) - gom.Eventually(func(g gomega.Gomega) { - for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { - nodeName := testEnv.ClCluster.Nodes[i].ContainerName - l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") - - hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) - if err != nil { - l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") - return - } - - g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") - } - }, "30s", "1s").Should(gomega.Succeed()) - l.Info().Msg("All nodes have expected filters registered") - l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") - - l.Warn().Str("Duration", "1m").Msg("Waiting for all CL nodes to have end block finalised") - gom.Eventually(func(g gomega.Gomega) { - hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) - if err != nil { - l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") - } - g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") - }, "1m", "30s").Should(gomega.Succeed()) - - // Trigger replay - l.Info().Msg("Triggering log poller's replay") - for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { - nodeName := testEnv.ClCluster.Nodes[i].ContainerName - response, _, err := testEnv.ClCluster.Nodes[i].API.ReplayLogPollerFromBlock(startBlock, testEnv.EVMClient.GetChainID().Int64()) - require.NoError(t, err, "Error triggering log poller's replay on node %s", nodeName) - require.Equal(t, "Replay started", response.Data.Attributes.Message, "Unexpected response message from log poller's replay") - } - - l.Warn().Str("Duration", consistencyTimeout).Msg("Waiting for replay logs to be processed by all nodes") - - gom.Eventually(func(g gomega.Gomega) { - logCountMatches, err := clNodesHaveExpectedLogCount(startBlock, endBlock, testEnv.EVMClient.GetChainID(), totalLogsEmitted, expectedFilters, l, coreLogger, testEnv.ClCluster) - if err != nil { - l.Warn().Err(err).Msg("Error checking if CL nodes have expected log count. Retrying...") - } - g.Expect(logCountMatches).To(gomega.BeTrue(), "Not all CL nodes have expected log count") - }, consistencyTimeout, "30s").Should(gomega.Succeed()) - - // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has - l.Warn().Str("Duration", consistencyTimeout).Msg("Waiting for CL nodes to have all the logs that EVM node has") - - gom.Eventually(func(g gomega.Gomega) { - missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) - if err != nil { - l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") - } - - if !missingLogs.IsEmpty() { - printMissingLogsByType(missingLogs, l, cfg) - } - g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") - }, consistencyTimeout, "10s").Should(gomega.Succeed()) -} - -type FinalityBlockFn = func(chainId int64, endBlock int64) (int64, error) - -func ExecuteCILogPollerTest(t *testing.T, cfg *Config) { - l := logging.GetTestLogger(t) - coreLogger := core_logger.TestLogger(t) //needed by ORM ¯\_(ツ)_/¯ - - if cfg.General.EventsToEmit == nil || len(cfg.General.EventsToEmit) == 0 { - l.Warn().Msg("No events to emit specified, using all events from log emitter contract") - for _, event := range EmitterABI.Events { - cfg.General.EventsToEmit = append(cfg.General.EventsToEmit, event) - } - } - - l.Info().Msg("Starting CI log poller test") - - var ( - err error - upKeepsNeeded = cfg.General.Contracts * len(cfg.General.EventsToEmit) - ) - - chainClient, _, contractDeployer, linkToken, registry, registrar, testEnv := setupLogPollerTestDocker( - t, ethereum.RegistryVersion_2_1, defaultOCRRegistryConfig, upKeepsNeeded, time.Duration(1000*time.Millisecond), cfg.General.UseFinalityTag, - ) - - _, upkeepIDs := actions.DeployConsumers( - t, - registry, - registrar, - linkToken, - contractDeployer, - chainClient, - upKeepsNeeded, - big.NewInt(automationDefaultLinkFunds), - automationDefaultUpkeepGasLimit, - true, - false, - ) - - // Deploy Log Emitter contracts - logEmitters := make([]*contracts.LogEmitter, 0) - for i := 0; i < cfg.General.Contracts; i++ { - logEmitter, err := testEnv.ContractDeployer.DeployLogEmitterContract() - logEmitters = append(logEmitters, &logEmitter) - require.NoError(t, err, "Error deploying log emitter contract") - l.Info().Str("Contract address", logEmitter.Address().Hex()).Msg("Log emitter contract deployed") - time.Sleep(200 * time.Millisecond) - } - - // Register log triggered upkeep for each combination of log emitter contract and event signature (topic) - // We need to register a separate upkeep for each event signature, because log trigger doesn't support multiple topics (even if log poller does) - for i := 0; i < len(upkeepIDs); i++ { - emitterAddress := (*logEmitters[i%cfg.General.Contracts]).Address() - upkeepID := upkeepIDs[i] - topicId := cfg.General.EventsToEmit[i%len(cfg.General.EventsToEmit)].ID - - l.Info().Int("Upkeep id", int(upkeepID.Int64())).Str("Emitter address", emitterAddress.String()).Str("Topic", topicId.Hex()).Msg("Registering log trigger for log emitter") - err = registerSingleTopicFilter(registry, upkeepID, emitterAddress, topicId) - randomWait(50, 200) - require.NoError(t, err, "Error registering log trigger for log emitter") - } - - err = chainClient.WaitForEvents() - require.NoError(t, err, "Error encountered when waiting for setting trigger config for upkeeps") - - // Make sure that all nodes have expected filters registered before starting to emit events - expectedFilters := getExpectedFilters(logEmitters, cfg) - gom := gomega.NewGomegaWithT(t) - gom.Eventually(func(g gomega.Gomega) { - for i := 1; i < len(testEnv.ClCluster.Nodes); i++ { - nodeName := testEnv.ClCluster.Nodes[i].ContainerName - l.Info().Str("Node name", nodeName).Msg("Fetching filters from log poller's DB") - - hasFilters, err := nodeHasExpectedFilters(expectedFilters, coreLogger, testEnv.EVMClient.GetChainID(), testEnv.ClCluster.Nodes[i].PostgresDb) - if err != nil { - l.Warn().Err(err).Msg("Error checking if node has expected filters. Retrying...") - return - } - - g.Expect(hasFilters).To(gomega.BeTrue(), "Not all expected filters were found in the DB") - } - }, "1m", "1s").Should(gomega.Succeed()) - l.Info().Msg("All nodes have expected filters registered") - l.Info().Int("Count", len(expectedFilters)).Msg("Expected filters count") - - // Save block number before starting to emit events, so that we can later use it when querying logs - sb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - startBlock := int64(sb) - - l.Info().Msg("STARTING EVENT EMISSION") - startTime := time.Now() - - // Start chaos experimnents by randomly pausing random containers (Chainlink nodes or their DBs) - chaosDoneCh := make(chan error, 1) - go func() { - executeChaosExperiment(l, testEnv, cfg, chaosDoneCh) - }() - - totalLogsEmitted, err := executeGenerator(t, cfg, logEmitters) - endTime := time.Now() - require.NoError(t, err, "Error executing event generator") - - expectedLogsEmitted := getExpectedLogCount(cfg) - duration := int(endTime.Sub(startTime).Seconds()) - l.Info().Int("Total logs emitted", totalLogsEmitted).Int64("Expected total logs emitted", expectedLogsEmitted).Str("Duration", fmt.Sprintf("%d sec", duration)).Str("LPS", fmt.Sprintf("%d/sec", totalLogsEmitted/duration)).Msg("FINISHED EVENT EMISSION") - - // Save block number after finishing to emit events, so that we can later use it when querying logs - eb, err := testEnv.EVMClient.LatestBlockNumber(testcontext.Get(t)) - require.NoError(t, err, "Error getting latest block number") - - endBlock, err := GetEndBlockToWaitFor(int64(eb), testEnv.EVMClient.GetChainID().Int64(), cfg) - require.NoError(t, err, "Error getting end block to wait for") - - l.Info().Msg("Waiting before proceeding with test until all chaos experiments finish") - chaosError := <-chaosDoneCh - require.NoError(t, chaosError, "Error encountered during chaos experiment") - - // Wait until last block in which events were emitted has been finalised (with buffer) - waitDuration := "45m" - l.Warn().Str("Duration", waitDuration).Msg("Waiting for chain to advance beyond finality") - - gom.Eventually(func(g gomega.Gomega) { - hasAdvanced, err := chainHasFinalisedEndBlock(l, testEnv.EVMClient, endBlock) - if err != nil { - l.Warn().Err(err).Msg("Error checking if chain has advanced beyond finality. Retrying...") - } - g.Expect(hasAdvanced).To(gomega.BeTrue(), "Chain has not advanced beyond finality") - }, waitDuration, "30s").Should(gomega.Succeed()) - - l.Warn().Str("Duration", waitDuration).Msg("Waiting for all CL nodes to have end block finalised") - gom.Eventually(func(g gomega.Gomega) { - hasFinalised, err := logPollerHasFinalisedEndBlock(endBlock, testEnv.EVMClient.GetChainID(), l, coreLogger, testEnv.ClCluster) - if err != nil { - l.Warn().Err(err).Msg("Error checking if nodes have finalised end block. Retrying...") - } - g.Expect(hasFinalised).To(gomega.BeTrue(), "Some nodes have not finalised end block") - }, waitDuration, "30s").Should(gomega.Succeed()) - - // Wait until all CL nodes have exactly the same logs emitted by test contracts as the EVM node has - logConsistencyWaitDuration := "10m" - l.Warn().Str("Duration", logConsistencyWaitDuration).Msg("Waiting for CL nodes to have all the logs that EVM node has") - - gom.Eventually(func(g gomega.Gomega) { - missingLogs, err := getMissingLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, testEnv.ClCluster, l, coreLogger, cfg) - if err != nil { - l.Warn().Err(err).Msg("Error getting missing logs. Retrying...") - } - - if !missingLogs.IsEmpty() { - printMissingLogsByType(missingLogs, l, cfg) - } - g.Expect(missingLogs.IsEmpty()).To(gomega.BeTrue(), "Some CL nodes were missing logs") - }, logConsistencyWaitDuration, "20s").Should(gomega.Succeed()) - - evmLogs, _ := getEVMLogs(startBlock, endBlock, logEmitters, testEnv.EVMClient, l, cfg) - - if totalLogsEmitted != len(evmLogs) { - l.Warn().Int("Total logs emitted", totalLogsEmitted).Int("Total logs in EVM", len(evmLogs)).Msg("Test passed, but total logs emitted does not match total logs in EVM") - } -}