Skip to content

Commit

Permalink
PoC exposing isHealthy function in LogPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Mar 6, 2024
1 parent 9442992 commit 815df30
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (

type disabled struct{}

func (disabled) IsHealthy() bool { return false }

func (disabled) Name() string { return "disabledLogPoller" }

func (disabled) Start(ctx context.Context) error { return ErrDisabled }
Expand Down
12 changes: 12 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -34,6 +35,7 @@ import (
//go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go
type LogPoller interface {
services.Service
IsHealthy() bool
Replay(ctx context.Context, fromBlock int64) error
ReplayAsync(fromBlock int64)
RegisterFilter(filter Filter, qopts ...pg.QOpt) error
Expand Down Expand Up @@ -118,6 +120,7 @@ type logPoller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
isReorged *atomic.Bool
}

// NewLogPoller creates a log poller. Note there is an assumption
Expand All @@ -133,6 +136,8 @@ type logPoller struct {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
isReorged := atomic.Bool{}
isReorged.Store(false)
return &logPoller{
ctx: ctx,
cancel: cancel,
Expand All @@ -149,6 +154,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
isReorged: &isReorged,
}
}

Expand Down Expand Up @@ -203,6 +209,10 @@ func (filter *Filter) Contains(other *Filter) bool {
return true
}

func (lp *logPoller) IsHealthy() bool {
return !lp.isReorged.Load()
}

// RegisterFilter adds the provided EventSigs and Addresses to the log poller's log filter query.
// If any eventSig is emitted from any address, it will be captured by the log poller.
// If an event matching any of the given event signatures is emitted from any of the provided Addresses,
Expand Down Expand Up @@ -744,6 +754,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
return nil, errors.New("Unable to find LCA after reorg, retrying")
}

lp.isReorged.Store(false)
lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber)
// We truncate all the blocks and logs after the LCA.
// We could preserve the logs for forensics, since its possible
Expand Down Expand Up @@ -924,6 +935,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
rerr := errors.New("Reorg greater than finality depth")
lp.SvcErrBuffer.Append(rerr)
lp.isReorged.Store(true)
return nil, rerr
}

Expand Down
18 changes: 18 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 815df30

Please sign in to comment.