Skip to content

Commit

Permalink
added the ability to ignore the cursor on the block poller
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Nov 30, 2023
1 parent f5e9f30 commit 7aa2064
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
8 changes: 8 additions & 0 deletions blockpoller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ func WithStoringState(stateStorePath string) Option {
}
}

// WithCursorPurged ensures the poller will ignore the cursor and start from the startBlockNum
// the cursor will still be saved as the poller progresses
func WithCursorPurged() Option {
return func(p *BlockPoller) {
p.ignoreCursor = true
}
}

func WithLogger(logger *zap.Logger) Option {
return func(p *BlockPoller) {
p.logger = logger
Expand Down
3 changes: 2 additions & 1 deletion blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type BlockPoller struct {
startBlockNumGate uint64
fetchBlockRetryCount uint64
stateStorePath string
ignoreCursor bool

blockFetcher BlockFetcher
blockHandler BlockHandler
Expand Down Expand Up @@ -77,7 +78,7 @@ func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, chainLatest

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef) (err error) {

p.forkDB, resolvedStartBlock, err = initState(resolvedStartBlock, p.stateStorePath, p.logger)
p.forkDB, resolvedStartBlock, err = initState(resolvedStartBlock, p.stateStorePath, p.ignoreCursor, p.logger)
if err != nil {
return fmt.Errorf("unable to initialize cursor: %w", err)
}
Expand Down
18 changes: 15 additions & 3 deletions blockpoller/state_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,30 @@ func (p *BlockPoller) saveState(blocks []*forkable.Block) error {
return nil
}

func initState(resolvedStartBlock bstream.BlockRef, stateStorePath string, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) {
func initState(resolvedStartBlock bstream.BlockRef, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) {
forkDB := forkable.NewForkDB(forkable.ForkDBWithLogger(logger))

useStartBlockFunc := func() (*forkable.ForkDB, bstream.BlockRef, error) {
forkDB.InitLIB(resolvedStartBlock)
return forkDB, resolvedStartBlock, nil
}

if ignoreCursor {
logger.Info("ignorign cursor",
zap.Stringer("start_block", resolvedStartBlock),
zap.Stringer("lib", resolvedStartBlock),
)
return useStartBlockFunc()
}

sf, err := getState(stateStorePath)
if err != nil {
logger.Warn("unable to load cursor file, initializing a new forkdb",
zap.Stringer("start_block", resolvedStartBlock),
zap.Stringer("lib", resolvedStartBlock),
zap.Error(err),
)
forkDB.InitLIB(resolvedStartBlock)
return forkDB, resolvedStartBlock, nil
return useStartBlockFunc()
}

forkDB.InitLIB(bstream.NewBlockRef(sf.Lib.Id, sf.Lib.Num))
Expand Down
4 changes: 2 additions & 2 deletions blockpoller/state_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestFireBlockFinalizer_state(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, expectedStateFileCnt, string(cnt))

forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, zap.NewNop())
forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, zap.NewNop())
require.NoError(t, err)

blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("105a", 105))
Expand All @@ -70,7 +70,7 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) {
require.NoError(t, err)
defer os.Remove(dirName)

forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, logger)
forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, logger)
require.NoError(t, err)

blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("60a", 60))
Expand Down

0 comments on commit 7aa2064

Please sign in to comment.