Skip to content

Commit

Permalink
[BCF-2674] Make BalanceMonitor test deterministic; add WorkDone to *s…
Browse files Browse the repository at this point in the history
…leeperTask for tests. (#10821)

* Fix balance monitor flakey test

* Refactor session reaper test to use the WorkDone() test helper method
  • Loading branch information
cedric-cordenier authored Sep 28, 2023
1 parent 17772f3 commit 71d314f
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 48 deletions.
4 changes: 3 additions & 1 deletion core/chains/evm/monitor/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type (
NullBalanceMonitor struct{}
)

var _ BalanceMonitor = (*balanceMonitor)(nil)

// NewBalanceMonitor returns a new balanceMonitor
func NewBalanceMonitor(ethClient evmclient.Client, ethKeyStore keystore.Eth, logger logger.Logger) BalanceMonitor {
func NewBalanceMonitor(ethClient evmclient.Client, ethKeyStore keystore.Eth, logger logger.Logger) *balanceMonitor {
chainId := ethClient.ConfiguredChainID()
bm := &balanceMonitor{
utils.StartStopOnce{},
Expand Down
7 changes: 7 additions & 0 deletions core/chains/evm/monitor/balance_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package monitor

func (bm *balanceMonitor) WorkDone() <-chan struct{} {
return bm.sleeperTask.(interface {
WorkDone() <-chan struct{}
}).WorkDone()
}
18 changes: 6 additions & 12 deletions core/chains/evm/monitor/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,9 @@ func TestBalanceMonitor_OnNewLongestChain_UpdatesBalance(t *testing.T) {
// Do the thing
bm.OnNewLongestChain(testutils.Context(t), head)

gomega.NewWithT(t).Eventually(func() *big.Int {
return bm.GetEthBalance(k0Addr).ToInt()
}).Should(gomega.Equal(k0bal))
gomega.NewWithT(t).Eventually(func() *big.Int {
return bm.GetEthBalance(k1Addr).ToInt()
}).Should(gomega.Equal(k1bal))
<-bm.WorkDone()
assert.Equal(t, k0bal, bm.GetEthBalance(k0Addr).ToInt())
assert.Equal(t, k1bal, bm.GetEthBalance(k1Addr).ToInt())

// Do it again
k0bal2 := big.NewInt(142)
Expand All @@ -187,12 +184,9 @@ func TestBalanceMonitor_OnNewLongestChain_UpdatesBalance(t *testing.T) {

bm.OnNewLongestChain(testutils.Context(t), head)

gomega.NewWithT(t).Eventually(func() *big.Int {
return bm.GetEthBalance(k0Addr).ToInt()
}).Should(gomega.Equal(k0bal2))
gomega.NewWithT(t).Eventually(func() *big.Int {
return bm.GetEthBalance(k1Addr).ToInt()
}).Should(gomega.Equal(k1bal2))
<-bm.WorkDone()
assert.Equal(t, k0bal2, bm.GetEthBalance(k0Addr).ToInt())
assert.Equal(t, k1bal2, bm.GetEthBalance(k1Addr).ToInt())
})
}

Expand Down
20 changes: 2 additions & 18 deletions core/sessions/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type sessionReaper struct {
db *sql.DB
config SessionReaperConfig
lggr logger.Logger

// Receive from this for testing via sr.RunSignal()
// to be notified after each reaper run.
runSignal chan struct{}
}

type SessionReaperConfig interface {
Expand All @@ -26,18 +22,11 @@ type SessionReaperConfig interface {

// NewSessionReaper creates a reaper that cleans stale sessions from the store.
func NewSessionReaper(db *sql.DB, config SessionReaperConfig, lggr logger.Logger) utils.SleeperTask {
return utils.NewSleeperTask(NewSessionReaperWorker(db, config, lggr))
}

func NewSessionReaperWorker(db *sql.DB, config SessionReaperConfig, lggr logger.Logger) *sessionReaper {
return &sessionReaper{
return utils.NewSleeperTask(&sessionReaper{
db,
config,
lggr.Named("SessionReaper"),

// For testing only.
make(chan struct{}, 10),
}
})
}

func (sr *sessionReaper) Name() string {
Expand All @@ -51,11 +40,6 @@ func (sr *sessionReaper) Work() {
if err != nil {
sr.lggr.Error("unable to reap stale sessions: ", err)
}

select {
case sr.runSignal <- struct{}{}:
default:
}
}

// DeleteStaleSessions deletes all sessions before the passed time.
Expand Down
5 changes: 0 additions & 5 deletions core/sessions/reaper_helper_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions core/sessions/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger/audit"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -34,8 +33,7 @@ func TestSessionReaper_ReapSessions(t *testing.T) {
lggr := logger.TestLogger(t)
orm := sessions.NewORM(db, config.SessionTimeout().Duration(), lggr, pgtest.NewQConfig(true), audit.NoopLogger)

rw := sessions.NewSessionReaperWorker(db.DB, config, lggr)
r := utils.NewSleeperTask(rw)
r := sessions.NewSessionReaper(db.DB, config, lggr)

t.Cleanup(func() {
assert.NoError(t, r.Stop())
Expand Down Expand Up @@ -70,7 +68,9 @@ func TestSessionReaper_ReapSessions(t *testing.T) {
})

r.WakeUp()
<-rw.RunSignal()
<-r.(interface {
WorkDone() <-chan struct{}
}).WorkDone()
sessions, err := orm.Sessions(0, 10)
assert.NoError(t, err)

Expand Down
32 changes: 24 additions & 8 deletions core/utils/sleeper_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type Worker interface {
}

type sleeperTask struct {
worker Worker
chQueue chan struct{}
chStop chan struct{}
chDone chan struct{}
worker Worker
chQueue chan struct{}
chStop chan struct{}
chDone chan struct{}
chWorkDone chan struct{}
StartStopOnce
}

Expand All @@ -36,10 +37,11 @@ type sleeperTask struct {
// WakeUp does not block.
func NewSleeperTask(worker Worker) SleeperTask {
s := &sleeperTask{
worker: worker,
chQueue: make(chan struct{}, 1),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
worker: worker,
chQueue: make(chan struct{}, 1),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
chWorkDone: make(chan struct{}, 10),
}

_ = s.StartOnce("SleeperTask-"+worker.Name(), func() error {
Expand Down Expand Up @@ -83,13 +85,27 @@ func (s *sleeperTask) WakeUp() {
}
}

func (s *sleeperTask) workDone() {
select {
case s.chWorkDone <- struct{}{}:
default:
}
}

// WorkDone isn't part of the SleeperTask interface, but can be
// useful in tests to assert that the work has been done.
func (s *sleeperTask) WorkDone() <-chan struct{} {
return s.chWorkDone
}

func (s *sleeperTask) workerLoop() {
defer close(s.chDone)

for {
select {
case <-s.chQueue:
s.worker.Work()
s.workDone()
case <-s.chStop:
return
}
Expand Down

0 comments on commit 71d314f

Please sign in to comment.