Skip to content

Commit

Permalink
Merge pull request #67 from streamingfast/feature/first_streamable_bl…
Browse files Browse the repository at this point in the history
…ock_refactor

Refactor blockpoller initialization to use first streamable block
  • Loading branch information
billettc authored Dec 11, 2024
2 parents b7cf709 + 7278c1c commit 841201c
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 100 deletions.
8 changes: 6 additions & 2 deletions blockpoller/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
)

type BlockFetcher interface {
type BlockFetcher[C any] interface {
IsBlockAvailable(requestedSlot uint64) bool
Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
Fetch(ctx context.Context, client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
}

type HeadBlockNumberFetcher[C any] interface {
FetchHeadBlockNumber(ctx context.Context, client C) (uint64, error)
}
16 changes: 8 additions & 8 deletions blockpoller/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ type TestBlock struct {
send *pbbstream.Block
}

var _ BlockFetcher = &TestBlockFetcher{}
var _ BlockFetcher[any] = &TestBlockFetcher[any]{}

type TestBlockFetcher struct {
type TestBlockFetcher[C any] struct {
t *testing.T
blocks []*TestBlock
idx uint64
completed bool
}

func newTestBlockFetcher(t *testing.T, blocks []*TestBlock) *TestBlockFetcher {
return &TestBlockFetcher{
func newTestBlockFetcher[C any](t *testing.T, blocks []*TestBlock) *TestBlockFetcher[C] {
return &TestBlockFetcher[C]{
t: t,
blocks: blocks,
}
}

func (b *TestBlockFetcher) PollingInterval() time.Duration {
func (b *TestBlockFetcher[C]) PollingInterval() time.Duration {
return 0
}

func (b *TestBlockFetcher) IsBlockAvailable(requestedSlot uint64) bool {
func (b *TestBlockFetcher[C]) IsBlockAvailable(requestedSlot uint64) bool {
return true
}

func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) {
func (b *TestBlockFetcher[C]) Fetch(ctx context.Context, c C, blkNum uint64) (*pbbstream.Block, bool, error) {
if len(b.blocks) == 0 {
assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum))
}
Expand All @@ -69,7 +69,7 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B
return blkToSend, false, nil
}

func (b *TestBlockFetcher) check(t *testing.T) {
func (b *TestBlockFetcher[C]) check(t *testing.T) {
t.Helper()
require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx)
}
Expand Down
18 changes: 9 additions & 9 deletions blockpoller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ package blockpoller

import "go.uber.org/zap"

type Option func(*BlockPoller)
type Option[C any] func(*BlockPoller[C])

func WithBlockFetchRetryCount(v uint64) Option {
return func(p *BlockPoller) {
func WithBlockFetchRetryCount[C any](v uint64) Option[C] {
return func(p *BlockPoller[C]) {
p.fetchBlockRetryCount = v
}
}

func WithStoringState(stateStorePath string) Option {
return func(p *BlockPoller) {
func WithStoringState[C any](stateStorePath string) Option[C] {
return func(p *BlockPoller[C]) {
p.stateStorePath = stateStorePath
}
}

// IgnoreCursor ensures the poller will ignore the cursor and start from the startBlockNum
// the cursor will still be saved as the poller progresses
func IgnoreCursor() Option {
return func(p *BlockPoller) {
func IgnoreCursor[C any]() Option[C] {
return func(p *BlockPoller[C]) {
p.ignoreCursor = true
}
}

func WithLogger(logger *zap.Logger) Option {
return func(p *BlockPoller) {
func WithLogger[C any](logger *zap.Logger) Option[C] {
return func(p *BlockPoller[C]) {
p.logger = logger
}
}
122 changes: 80 additions & 42 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/streamingfast/bstream"

"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
"github.com/streamingfast/dhammer"
"github.com/streamingfast/firehose-core/internal/utils"
"github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
Expand All @@ -27,17 +27,19 @@ func newBlock(block2 *pbbstream.Block) *block {
return &block{block2, false}
}

type BlockPoller struct {
type BlockPoller[C any] struct {
*shutter.Shutter
startBlockNumGate uint64
fetchBlockRetryCount uint64
stateStorePath string
ignoreCursor bool
forceFinalityAfterBlocks *uint64

blockFetcher BlockFetcher
blockFetcher BlockFetcher[C]
blockHandler BlockHandler
forkDB *forkable.ForkDB
clients *rpc.Clients[C]

forkDB *forkable.ForkDB

logger *zap.Logger

Expand All @@ -47,16 +49,18 @@ type BlockPoller struct {
optimisticallyPolledBlocksLock sync.Mutex
}

func New(
blockFetcher BlockFetcher,
func New[C any](
blockFetcher BlockFetcher[C],
blockHandler BlockHandler,
opts ...Option,
) *BlockPoller {
clients *rpc.Clients[C],
opts ...Option[C],
) *BlockPoller[C] {

b := &BlockPoller{
b := &BlockPoller[C]{
Shutter: shutter.New(),
blockFetcher: blockFetcher,
blockHandler: blockHandler,
clients: clients,
fetchBlockRetryCount: math.MaxUint64,
logger: zap.NewNop(),
forceFinalityAfterBlocks: utils.GetEnvForceFinalityAfterBlocks(),
Expand All @@ -69,7 +73,7 @@ func New(
return b
}

func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error {
func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = firstStreamableBlockNum
p.logger.Info("starting poller",
zap.Uint64("first_streamable_block", firstStreamableBlockNum),
Expand All @@ -83,14 +87,25 @@ func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, b
}
p.forkDB = forkDB

return p.run(resolvedStartBlock, blockFetchBatchSize)
resolveStopBlock := uint64(math.MaxUint64)
if stopBlock != nil {
resolveStopBlock = *stopBlock
}

return p.run(resolvedStartBlock, resolveStopBlock, blockFetchBatchSize)
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) {
func (p *BlockPoller[C]) run(resolvedStartBlock bstream.BlockRef, stopBlock uint64, blockFetchBatchSize int) (err error) {
currentCursor := &cursor{state: ContinuousSegState, logger: p.logger}
blockToFetch := resolvedStartBlock.Num()
var hashToFetch *string
for {

if blockToFetch >= stopBlock {
p.logger.Info("stop block reach", zap.Uint64("stop_block", stopBlock))
return nil
}

if p.IsTerminating() {
p.logger.Info("block poller is terminating")
}
Expand Down Expand Up @@ -133,7 +148,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSi
}
}

func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
func (p *BlockPoller[C]) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum))
if block.Number < p.forkDB.LIBNum() {
panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum()))
Expand Down Expand Up @@ -189,31 +204,40 @@ type BlockItem struct {
skipped bool
}

func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
func (p *BlockPoller[C]) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}
p.fetching = true

nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) {
var blockItem *BlockItem
err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch)
if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}
if skip {
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,

bi, err := rpc.WithClients(p.clients, func(ctx context.Context, client C) (*BlockItem, error) {
b, skipped, err := p.blockFetcher.Fetch(ctx, client, blockToFetch)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockToFetch, err)
}
return nil
}
//todo: add block to cache
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,

if skipped {
return &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,
}, nil
}

return &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block %d with retry : %w", blockToFetch, err)
}
blockItem = bi

return nil

})
Expand Down Expand Up @@ -272,7 +296,7 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch
return nil
}

func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
func (p *BlockPoller[C]) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
p.logger.Info("requesting block", zap.Uint64("block_num", blockNumber))
requestedBlock := make(chan *BlockItem)

Expand Down Expand Up @@ -314,24 +338,38 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int)
return requestedBlock
}

func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
type FetchResponse struct {
Block *pbbstream.Block
Skipped bool
}

func (p *BlockPoller[C]) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
p.logger.Info("fetching block with hash", zap.Uint64("block_num", blkNum), zap.String("hash", hash))
_ = hash //todo: hash will be used to fetch block from cache

p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}

var out *pbbstream.Block
var skipped bool

err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
//todo: get block from cache
var fetchErr error
out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum)
if fetchErr != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr)
}
if skipped {
return nil
br, err := rpc.WithClients(p.clients, func(ctx context.Context, client C) (br *FetchResponse, err error) {
b, skipped, err := p.blockFetcher.Fetch(ctx, client, blkNum)
if err != nil {
return nil, fmt.Errorf("fetching block block %d: %w", blkNum, err)
}
return &FetchResponse{
Block: b,
Skipped: skipped,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block with retry %d: %w", blkNum, err)
}

out = br.Block
skipped = br.Skipped
return nil
})

Expand All @@ -350,7 +388,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream
return out, nil
}

func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
func (p *BlockPoller[C]) fireCompleteSegment(blocks []*forkable.Block) error {
for _, blk := range blocks {
b := blk.Object.(*block)
if _, err := p.fire(b); err != nil {
Expand All @@ -360,7 +398,7 @@ func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
return nil
}

func (p *BlockPoller) fire(blk *block) (bool, error) {
func (p *BlockPoller[C]) fire(blk *block) (bool, error) {
if blk.fired {
return false, nil
}
Expand Down
Loading

0 comments on commit 841201c

Please sign in to comment.