Skip to content

Commit

Permalink
Insert time interval based heartbeats in BHS store (#10682)
Browse files Browse the repository at this point in the history
* Insert time interval based heartbeats in BHS store

* Use BlockhashStoreSpec.HeartbeatPeriodTime in service delegation

* Updated Job ORM test

* Used Go timer and storeEarliest instead of headBroadcaster

* Remove headBroadcaster from NewFeeder

* Adding hearbeatPeriodTime to web view layer

* Start heartbeat in delegate

* Disable heartbeat by default

* Test context

* Get Test context

* Refactor heartbeatPeriodTime (int32 seconds) to heartbeatPeriod time.Duration

* Use custom timer to make it easy to test

* Mock timer and add unit test for StartHeartbeats

* Add db migration

* Added unit test for sad path of StartHeartbeats

* Minor fix

* Integration test for bhs feeder heartbeat service

* Refactoring to address comments

* Renamed after merge

* Refactoring based on PR comments

* Added comments explaining blockhash verification

* Minor fix

---------

Co-authored-by: Sri Kidambi <[email protected]>
  • Loading branch information
kidambisrinivas and kidambisrinivas authored Sep 21, 2023
1 parent 6f7e6b8 commit 5b80ba6
Show file tree
Hide file tree
Showing 24 changed files with 812 additions and 11 deletions.
1 change: 1 addition & 0 deletions core/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func init() {
var _ relaylogger.Logger = (Logger)(nil)

//go:generate mockery --quiet --name Logger --output . --filename logger_mock_test.go --inpackage --case=underscore
//go:generate mockery --quiet --name Logger --output ./mocks/ --case=underscore

// Logger is the main interface of this package.
// It implements uber/zap's SugaredLogger interface and adds conditional logging helpers.
Expand Down
302 changes: 302 additions & 0 deletions core/logger/mocks/logger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/services/blockhashstore/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Event struct {
}

// BHS defines an interface for interacting with a BlockhashStore contract.
//
//go:generate mockery --quiet --name BHS --output ./mocks/ --case=underscore
type BHS interface {
// Store the hash associated with blockNum.
Store(ctx context.Context, blockNum uint64) error
Expand Down
14 changes: 10 additions & 4 deletions core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockhashstore
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -165,6 +166,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC
jb.BlockhashStoreSpec.TrustedBlockhashStoreBatchSize,
int(jb.BlockhashStoreSpec.WaitBlocks),
int(jb.BlockhashStoreSpec.LookbackBlocks),
jb.BlockhashStoreSpec.HeartbeatPeriod,
func(ctx context.Context) (uint64, error) {
head, err := lp.LatestBlock(pg.WithParentCtx(ctx))
if err != nil {
Expand All @@ -178,7 +180,6 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC
pollPeriod: jb.BlockhashStoreSpec.PollPeriod,
runTimeout: jb.BlockhashStoreSpec.RunTimeout,
logger: log,
done: make(chan struct{}),
}}, nil
}

Expand All @@ -198,7 +199,7 @@ func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }
type service struct {
utils.StartStopOnce
feeder *Feeder
done chan struct{}
wg sync.WaitGroup
pollPeriod time.Duration
runTimeout time.Duration
logger logger.Logger
Expand All @@ -212,8 +213,13 @@ func (s *service) Start(context.Context) error {
s.logger.Infow("Starting BHS feeder")
ticker := time.NewTicker(utils.WithJitter(s.pollPeriod))
s.parentCtx, s.cancel = context.WithCancel(context.Background())
s.wg.Add(2)
go func() {
defer close(s.done)
defer s.wg.Done()
s.feeder.StartHeartbeats(s.parentCtx, &realTimer{})
}()
go func() {
defer s.wg.Done()
defer ticker.Stop()
for {
select {
Expand All @@ -233,7 +239,7 @@ func (s *service) Close() error {
return s.StopOnce("BHS Feeder Service", func() error {
s.logger.Infow("Stopping BHS feeder")
s.cancel()
<-s.done
s.wg.Wait()
return nil
})
}
Expand Down
Loading

0 comments on commit 5b80ba6

Please sign in to comment.