Skip to content

Commit

Permalink
Fix Mercury Persistence Manager test flake (#10574)
Browse files Browse the repository at this point in the history
* Fix test flake

* Refactor tests using logs
  • Loading branch information
martin-cll authored Sep 12, 2023
1 parent cabe17c commit 7d74c39
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
6 changes: 4 additions & 2 deletions core/services/relay/evm/mercury/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func (pm *PersistenceManager) runFlushDeletesLoop() {
ticker.Stop()
return
case <-ticker.C:
pm.lggr.Trace("Deleting queued requests from transmit requests table")
queuedReqs := pm.resetDeleteQueue()
if err := pm.orm.DeleteTransmitRequests(queuedReqs, pg.WithParentCtx(ctx)); err != nil {
pm.lggr.Errorw("Failed to delete queued transmit requests", "err", err)
pm.addToDeleteQueue(queuedReqs...)
} else {
pm.lggr.Debugw("Deleted queued transmit requests")
}
}
}
Expand All @@ -117,9 +118,10 @@ func (pm *PersistenceManager) runPruneLoop() {
ticker.Stop()
return
case <-ticker.C:
pm.lggr.Trace("Pruning transmit requests table")
if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil {
pm.lggr.Errorw("Failed to prune transmit requests table", "err", err)
} else {
pm.lggr.Debugw("Pruned transmit requests table")
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions core/services/relay/evm/mercury/persistence_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,28 @@ import (

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
)

func bootstrapPersistenceManager(t *testing.T) *PersistenceManager {
func bootstrapPersistenceManager(t *testing.T) (*PersistenceManager, *observer.ObservedLogs) {
t.Helper()
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
lggr := logger.TestLogger(t)
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
orm := NewORM(db, lggr, pgtest.NewQConfig(true))
return NewPersistenceManager(lggr, orm, 0, 2, 10*time.Millisecond, 10*time.Millisecond)
return NewPersistenceManager(lggr, orm, 0, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs
}

func TestPersistenceManager(t *testing.T) {
ctx := context.Background()
pm := bootstrapPersistenceManager(t)
pm, _ := bootstrapPersistenceManager(t)

reports := sampleReports

Expand Down Expand Up @@ -53,7 +56,7 @@ func TestPersistenceManager(t *testing.T) {

func TestPersistenceManagerAsyncDelete(t *testing.T) {
ctx := context.Background()
pm := bootstrapPersistenceManager(t)
pm, observedLogs := bootstrapPersistenceManager(t)

reports := sampleReports

Expand All @@ -67,7 +70,9 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) {

pm.AsyncDelete(&pb.TransmitRequest{Payload: reports[0]})

time.Sleep(15 * time.Millisecond)
// Wait for next poll.
observedLogs.TakeAll()
testutils.WaitForLogMessage(t, observedLogs, "Deleted queued transmit requests")

transmissions, err := pm.Load(ctx)
require.NoError(t, err)
Expand All @@ -92,7 +97,7 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) {

func TestPersistenceManagerPrune(t *testing.T) {
ctx := context.Background()
pm := bootstrapPersistenceManager(t)
pm, observedLogs := bootstrapPersistenceManager(t)

reports := sampleReports

Expand All @@ -106,7 +111,9 @@ func TestPersistenceManagerPrune(t *testing.T) {
err = pm.Start(ctx)
require.NoError(t, err)

time.Sleep(15 * time.Millisecond)
// Wait for next poll.
observedLogs.TakeAll()
testutils.WaitForLogMessage(t, observedLogs, "Pruned transmit requests table")

transmissions, err := pm.Load(ctx)
require.NoError(t, err)
Expand Down

0 comments on commit 7d74c39

Please sign in to comment.