diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index b8ae9bf72c0..9e8df72a155 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -94,11 +94,12 @@ func (pm *PersistenceManager) runFlushDeletesLoop() { ticker.Stop() return case <-ticker.C: - pm.lggr.Trace("Deleting queued requests from transmit requests table") queuedReqs := pm.resetDeleteQueue() if err := pm.orm.DeleteTransmitRequests(queuedReqs, pg.WithParentCtx(ctx)); err != nil { pm.lggr.Errorw("Failed to delete queued transmit requests", "err", err) pm.addToDeleteQueue(queuedReqs...) + } else { + pm.lggr.Debugw("Deleted queued transmit requests") } } } @@ -117,9 +118,10 @@ func (pm *PersistenceManager) runPruneLoop() { ticker.Stop() return case <-ticker.C: - pm.lggr.Trace("Pruning transmit requests table") if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { pm.lggr.Errorw("Failed to prune transmit requests table", "err", err) + } else { + pm.lggr.Debugw("Pruned transmit requests table") } } } diff --git a/core/services/relay/evm/mercury/persistence_manager_test.go b/core/services/relay/evm/mercury/persistence_manager_test.go index 211e2c20c32..97628ed9c2b 100644 --- a/core/services/relay/evm/mercury/persistence_manager_test.go +++ b/core/services/relay/evm/mercury/persistence_manager_test.go @@ -7,25 +7,28 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) -func bootstrapPersistenceManager(t *testing.T) *PersistenceManager { +func bootstrapPersistenceManager(t *testing.T) (*PersistenceManager, *observer.ObservedLogs) { t.Helper() db := pgtest.NewSqlxDB(t) pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) - lggr := logger.TestLogger(t) + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) orm := NewORM(db, lggr, pgtest.NewQConfig(true)) - return NewPersistenceManager(lggr, orm, 0, 2, 10*time.Millisecond, 10*time.Millisecond) + return NewPersistenceManager(lggr, orm, 0, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs } func TestPersistenceManager(t *testing.T) { ctx := context.Background() - pm := bootstrapPersistenceManager(t) + pm, _ := bootstrapPersistenceManager(t) reports := sampleReports @@ -53,7 +56,7 @@ func TestPersistenceManager(t *testing.T) { func TestPersistenceManagerAsyncDelete(t *testing.T) { ctx := context.Background() - pm := bootstrapPersistenceManager(t) + pm, observedLogs := bootstrapPersistenceManager(t) reports := sampleReports @@ -67,7 +70,9 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) { pm.AsyncDelete(&pb.TransmitRequest{Payload: reports[0]}) - time.Sleep(15 * time.Millisecond) + // Wait for next poll. + observedLogs.TakeAll() + testutils.WaitForLogMessage(t, observedLogs, "Deleted queued transmit requests") transmissions, err := pm.Load(ctx) require.NoError(t, err) @@ -92,7 +97,7 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) { func TestPersistenceManagerPrune(t *testing.T) { ctx := context.Background() - pm := bootstrapPersistenceManager(t) + pm, observedLogs := bootstrapPersistenceManager(t) reports := sampleReports @@ -106,7 +111,9 @@ func TestPersistenceManagerPrune(t *testing.T) { err = pm.Start(ctx) require.NoError(t, err) - time.Sleep(15 * time.Millisecond) + // Wait for next poll. + observedLogs.TakeAll() + testutils.WaitForLogMessage(t, observedLogs, "Pruned transmit requests table") transmissions, err := pm.Load(ctx) require.NoError(t, err)