From 8fa07347db8e8f35ef086d75414448b8144127a0 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Sat, 18 Nov 2023 07:30:07 -0600 Subject: [PATCH] core/chains/evm/logpoller: fix context wiring --- .../v21/logprovider/integration_test.go | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index 5ef06f1bd08..e4a1ae28b4e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -39,8 +39,7 @@ import ( ) func TestIntegration_LogEventProvider(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) backend, stopMining, accounts := setupBackend(t) defer stopMining() @@ -64,12 +63,10 @@ func TestIntegration_LogEventProvider(t *testing.T) { ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider) lp.PollAndSaveLogs(ctx, int64(n)) - go func() { - if err := logProvider.Start(ctx); err != nil { - t.Logf("error starting log provider: %s", err) - t.Fail() - } - }() + if err := logProvider.Start(ctx); err != nil { + t.Logf("error starting log provider: %s", err) + t.Fail() + } defer logProvider.Close() logsRounds := 10 @@ -89,6 +86,7 @@ func TestIntegration_LogEventProvider(t *testing.T) { "failed to get logs after restart") t.Run("Restart", func(t *testing.T) { + ctx = testutils.Context(t) t.Log("restarting log provider") // assuming that our service was closed and restarted, // we should be able to backfill old logs and fetch new ones @@ -96,12 +94,10 @@ func TestIntegration_LogEventProvider(t *testing.T) { logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, logprovider.NewLogEventsPacker(), filterStore, opts) poll(backend.Commit()) - go func() { - if err2 := logProvider2.Start(ctx); err2 != nil { - t.Logf("error starting log provider: %s", err2) - t.Fail() - } - }() + if err2 := logProvider2.Start(ctx); err2 != nil { + t.Logf("error starting log provider: %s", err2) + t.Fail() + } defer logProvider2.Close() // re-register filters @@ -126,8 +122,7 @@ func TestIntegration_LogEventProvider(t *testing.T) { } func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) backend, stopMining, accounts := setupBackend(t) defer stopMining() @@ -152,6 +147,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { require.Equal(t, 1, len(addrs)) t.Run("update filter config", func(t *testing.T) { + ctx = testutils.Context(t) upkeepID := evmregistry21.GenUpkeepID(types.LogTrigger, "111") id := upkeepID.BigInt() cfg := newPlainLogTriggerConfig(addrs[0]) @@ -184,6 +180,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { }) t.Run("register same log filter", func(t *testing.T) { + ctx = testutils.Context(t) upkeepID := evmregistry21.GenUpkeepID(types.LogTrigger, "222") id := upkeepID.BigInt() cfg := newPlainLogTriggerConfig(addrs[0]) @@ -235,12 +232,10 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) { waitLogPoller(ctx, t, backend, lp, ethClient) // starting the log provider should backfill logs - go func() { - if startErr := logProvider.Start(ctx); startErr != nil { - t.Logf("error starting log provider: %s", startErr) - t.Fail() - } - }() + if startErr := logProvider.Start(ctx); startErr != nil { + t.Logf("error starting log provider: %s", startErr) + t.Fail() + } defer logProvider.Close() waitLogProvider(ctx, t, logProvider, 3) @@ -505,12 +500,10 @@ func TestIntegration_LogRecoverer_Backfill(t *testing.T) { blockNumber = bn.Int64() } // starting the log recoverer should backfill logs - go func() { - if startErr := recoverer.Start(ctx); startErr != nil { - t.Logf("error starting log provider: %s", startErr) - t.Fail() - } - }() + if startErr := recoverer.Start(ctx); startErr != nil { + t.Logf("error starting log provider: %s", startErr) + t.Fail() + } defer recoverer.Close() var allProposals []ocr2keepers.UpkeepPayload @@ -535,7 +528,7 @@ func collectPayloads(ctx context.Context, t *testing.T, logProvider logprovider. for ctx.Err() == nil && len(allPayloads) < n && rounds > 0 { logs, err := logProvider.GetLatestPayloads(ctx) require.NoError(t, err) - require.LessOrEqual(t, len(logs), logprovider.AllowedLogsPerUpkeep, "failed to get all logs") + require.LessOrEqual(t, len(logs), logprovider.AllowedLogsPerUpkeep) allPayloads = append(allPayloads, logs...) rounds-- } @@ -662,6 +655,8 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac pollerLggr.SetLogLevel(zapcore.WarnLevel) lorm := logpoller.NewORM(big.NewInt(1337), db, pollerLggr, pgtest.NewQConfig(false)) lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, 100*time.Millisecond, false, 1, 2, 2, 1000) + require.NoError(t, lp.Start(testutils.Context(t))) + t.Cleanup(func() { require.NoError(t, lp.Close()) }) return lp, ethClient }