Skip to content

Commit

Permalink
Fix data race in PersistenceManager tests
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-cll committed Aug 30, 2023
1 parent 093f570 commit f0cb137
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
23 changes: 15 additions & 8 deletions core/services/relay/evm/mercury/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ type PersistenceManager struct {
deleteQueue []*pb.TransmitRequest

jobID int32

maxTransmitQueueSize int
flushDeletesFrequency time.Duration
pruneFrequency time.Duration
}

func NewPersistenceManager(lggr logger.Logger, orm ORM, jobID int32) *PersistenceManager {
func NewPersistenceManager(lggr logger.Logger, orm ORM, jobID int32, maxTransmitQueueSize int, flushDeletesFrequency, pruneFrequency time.Duration) *PersistenceManager {
return &PersistenceManager{
lggr: lggr.Named("MercuryPersistenceManager"),
orm: orm,
stopCh: make(chan struct{}),
jobID: jobID,
lggr: lggr.Named("MercuryPersistenceManager"),
orm: orm,
stopCh: make(chan struct{}),
jobID: jobID,
maxTransmitQueueSize: maxTransmitQueueSize,
flushDeletesFrequency: flushDeletesFrequency,
pruneFrequency: pruneFrequency,
}
}

Expand Down Expand Up @@ -80,7 +87,7 @@ func (pm *PersistenceManager) runFlushDeletesLoop() {
ctx, cancel := pm.stopCh.Ctx(context.Background())
defer cancel()

ticker := time.NewTicker(utils.WithJitter(flushDeletesFrequency))
ticker := time.NewTicker(utils.WithJitter(pm.flushDeletesFrequency))
for {
select {
case <-ctx.Done():
Expand All @@ -103,15 +110,15 @@ func (pm *PersistenceManager) runPruneLoop() {
ctx, cancel := pm.stopCh.Ctx(context.Background())
defer cancel()

ticker := time.NewTicker(utils.WithJitter(pruneFrequency))
ticker := time.NewTicker(utils.WithJitter(pm.pruneFrequency))
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
pm.lggr.Trace("Pruning transmit requests table")
if err := pm.orm.PruneTransmitRequests(maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil {
if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil {
pm.lggr.Errorw("Failed to prune transmit requests table", "err", err)
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/services/relay/evm/mercury/persistence_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

func bootstrapPersistenceManager(t *testing.T) *PersistenceManager {
t.Helper()
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
lggr := logger.TestLogger(t)
orm := NewORM(db, lggr, pgtest.NewQConfig(true))
return NewPersistenceManager(lggr, orm, 0)
return NewPersistenceManager(lggr, orm, 0, 2, 10*time.Millisecond, 10*time.Millisecond)
}

func TestPersistenceManager(t *testing.T) {
Expand Down Expand Up @@ -61,7 +62,6 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) {
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[1]}, ocrtypes.ReportContext{})
require.NoError(t, err)

flushDeletesFrequency = 10 * time.Millisecond
err = pm.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -103,8 +103,6 @@ func TestPersistenceManagerPrune(t *testing.T) {
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[2]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}})
require.NoError(t, err)

maxTransmitQueueSize = 2
pruneFrequency = 10 * time.Millisecond
err = pm.Start(ctx)
require.NoError(t, err)

Expand Down
11 changes: 4 additions & 7 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"

relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/sqlx"

relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"

"github.com/smartcontractkit/chainlink/v2/core/services/pg"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -122,7 +119,7 @@ func getPayloadTypes() abi.Arguments {

func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
persistenceManager := NewPersistenceManager(lggr, NewORM(db, lggr, cfg), jobID)
persistenceManager := NewPersistenceManager(lggr, NewORM(db, lggr, cfg), jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency)
return &mercuryTransmitter{
utils.StartStopOnce{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
Expand Down

0 comments on commit f0cb137

Please sign in to comment.