Skip to content

Commit

Permalink
core/chains/evm/logpoller: fix context wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Nov 27, 2023
1 parent a5e1873 commit 3713078
Showing 1 changed file with 24 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import (
)

func TestIntegration_LogEventProvider(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
Expand All @@ -62,12 +61,10 @@ func TestIntegration_LogEventProvider(t *testing.T) {
ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider)
lp.PollAndSaveLogs(ctx, int64(n))

go func() {
if err := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", err)
t.Fail()
}
}()
if err := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", err)
t.Fail()
}
defer logProvider.Close()

logsRounds := 10
Expand All @@ -87,19 +84,18 @@ func TestIntegration_LogEventProvider(t *testing.T) {
"failed to get logs after restart")

t.Run("Restart", func(t *testing.T) {
ctx = testutils.Context(t)
t.Log("restarting log provider")
// assuming that our service was closed and restarted,
// we should be able to backfill old logs and fetch new ones
filterStore := logprovider.NewUpkeepFilterStore()
logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, logprovider.NewLogEventsPacker(), filterStore, opts)

poll(backend.Commit())
go func() {
if err2 := logProvider2.Start(ctx); err2 != nil {
t.Logf("error starting log provider: %s", err2)
t.Fail()
}
}()
if err2 := logProvider2.Start(ctx); err2 != nil {
t.Logf("error starting log provider: %s", err2)
t.Fail()
}
defer logProvider2.Close()

// re-register filters
Expand All @@ -124,8 +120,7 @@ func TestIntegration_LogEventProvider(t *testing.T) {
}

func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
Expand All @@ -150,6 +145,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
require.Equal(t, 1, len(addrs))

t.Run("update filter config", func(t *testing.T) {
ctx = testutils.Context(t)
upkeepID := kevmcore.GenUpkeepID(ocr2keepers.LogTrigger, "111")
id := upkeepID.BigInt()
cfg := newPlainLogTriggerConfig(addrs[0])
Expand Down Expand Up @@ -182,6 +178,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
})

t.Run("register same log filter", func(t *testing.T) {
ctx = testutils.Context(t)
upkeepID := kevmcore.GenUpkeepID(ocr2keepers.LogTrigger, "222")
id := upkeepID.BigInt()
cfg := newPlainLogTriggerConfig(addrs[0])
Expand Down Expand Up @@ -233,12 +230,10 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
waitLogPoller(ctx, t, backend, lp, ethClient)

// starting the log provider should backfill logs
go func() {
if startErr := logProvider.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
}()
if startErr := logProvider.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
defer logProvider.Close()

waitLogProvider(ctx, t, logProvider, 3)
Expand Down Expand Up @@ -503,12 +498,10 @@ func TestIntegration_LogRecoverer_Backfill(t *testing.T) {
blockNumber = bn.Int64()
}
// starting the log recoverer should backfill logs
go func() {
if startErr := recoverer.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
}()
if startErr := recoverer.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
defer recoverer.Close()

var allProposals []ocr2keepers.UpkeepPayload
Expand All @@ -533,7 +526,7 @@ func collectPayloads(ctx context.Context, t *testing.T, logProvider logprovider.
for ctx.Err() == nil && len(allPayloads) < n && rounds > 0 {
logs, err := logProvider.GetLatestPayloads(ctx)
require.NoError(t, err)
require.LessOrEqual(t, len(logs), logprovider.AllowedLogsPerUpkeep, "failed to get all logs")
require.LessOrEqual(t, len(logs), logprovider.AllowedLogsPerUpkeep)
allPayloads = append(allPayloads, logs...)
rounds--
}
Expand Down Expand Up @@ -660,6 +653,8 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac
pollerLggr.SetLogLevel(zapcore.WarnLevel)
lorm := logpoller.NewORM(big.NewInt(1337), db, pollerLggr, pgtest.NewQConfig(false))
lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, 100*time.Millisecond, false, 1, 2, 2, 1000)
require.NoError(t, lp.Start(testutils.Context(t)))
t.Cleanup(func() { require.NoError(t, lp.Close()) })
return lp, ethClient
}

Expand Down

0 comments on commit 3713078

Please sign in to comment.