Skip to content

Commit

Permalink
made code more testable and add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 17, 2024
1 parent f441bd0 commit 990aaae
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,4 @@ venv
/firehose-data/storage/one-blocks/0240816598-nRQqTFc22RED3W5g-AQipdgoR9gVQLc2k-240816597-default.dbin.zst
/firehose-data/storage/one-blocks/0240816599-fUUN3FS7js2v2wci-nRQqTFc22RED3W5g-240816598-default.dbin.zst
/firesol-legacy
/block/fetcher/secret.go
52 changes: 34 additions & 18 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"math"
"time"
Expand All @@ -28,6 +29,8 @@ var GetBlockOpts = &rpc.GetBlockOpts{
MaxSupportedTransactionVersion: &MaxSupportedTransactionVersion,
}

type fetchBlock func(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error)

type RPCFetcher struct {
rpcClient *rpc.Client
latestConfirmedSlot uint64
Expand All @@ -36,15 +39,18 @@ type RPCFetcher struct {
fetchInterval time.Duration
lastFetchAt time.Time
logger *zap.Logger
fetchBlock fetchBlock
}

func NewRPC(rpcClient *rpc.Client, fetchInterval time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *RPCFetcher {
return &RPCFetcher{
f := &RPCFetcher{
rpcClient: rpcClient,
fetchInterval: fetchInterval,
latestBlockRetryInterval: latestBlockRetryInterval,
logger: logger,
}
f.fetchBlock = f.fetchRpcBlock
return f
}

func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, err error) {
Expand Down Expand Up @@ -79,32 +85,42 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs
break
}

//todo : if err is a type skipped block error here, requestedSlot will be requestSlot + 1 while it's returning no skipped error
var blockResult *rpc.GetBlockResult

for {
blockResult, err = f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)
if err != nil {
rpcErr := err.(*jsonrpc.RPCError)
if rpcErr != nil && rpcErr.Code == -32009 {
requestedSlot += 1
continue
}
return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
break
resolvedSlot, blockResult, err := f.fetch(ctx, requestedSlot)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}

block, err := blockFromBlockResult(requestedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult)
block, err := blockFromBlockResult(resolvedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult)
if err != nil {
return nil, fmt.Errorf("decoding block %d: %w", requestedSlot, err)
return nil, fmt.Errorf("decoding block %d: %w", resolvedSlot, err)
}

return block, nil
}

func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedSlot uint64, result *rpc.GetBlockResult) (*pbbstream.Block, error) {
func (f *RPCFetcher) fetchRpcBlock(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error) {
b, err := f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)
return requestedSlot, b, err
}

func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error) {
for {
resolvedSlot, blockResult, err := f.fetchBlock(ctx, requestedSlot)
if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {
if rpcErr.Code == -32009 {
requestedSlot += 1
continue
}
}
return 0, nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
return resolvedSlot, blockResult, nil
}
}

func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedSlot uint64, result *rpc.GetBlockResult) (*pbbstream.Block, error) {
libNum := finalizedSlot

if finalizedSlot > requestedSlot {
Expand Down
46 changes: 42 additions & 4 deletions block/fetcher/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,61 @@ package fetcher
import (
"bytes"
"context"
"errors"
"testing"
"time"

"go.uber.org/zap"

"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
bin "github.com/streamingfast/binary"
"github.com/test-go/testify/require"
"go.uber.org/zap"
)

func Test_ToPBTransaction(t *testing.T) {
func Test_DoIt(t *testing.T) {
t.Skip("TODO: fix this test")
ctx := context.Background()
rpcClient := rpc.New("https://icy-old-cloud.solana-mainnet.quiknode.pro/75682dc94e37cd35ccf9351e11669dcabc3ad176/")
rpcClient := rpc.New(quicknodeURL) //put your own URL in a file call secret.go that will be ignore by git
f := NewRPC(rpcClient, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop())
_, err := f.Fetch(ctx, 240816644)

require.NoError(t, err)
}

func Test_fetchBlock(t *testing.T) {
ctx := context.Background()

f := NewRPC(nil, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop())
f.fetchBlock = func(ctx context.Context, slot uint64) (uint64, *rpc.GetBlockResult, error) {
if slot == 240816644 || slot == 240816645 {
return 0, nil, &jsonrpc.RPCError{
Code: -32009,
}
}
return slot, &rpc.GetBlockResult{}, nil
}

slot, _, err := f.fetch(ctx, 240816644)
require.NoError(t, err)
require.Equal(t, uint64(240816646), slot)

}

func Test_fetchBlockFailing(t *testing.T) {
ctx := context.Background()

f := NewRPC(nil, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop())
f.fetchBlock = func(ctx context.Context, slot uint64) (uint64, *rpc.GetBlockResult, error) {
return 0, nil, &jsonrpc.RPCError{
Code: -00001,
}
}

slot, b, err := f.fetch(ctx, 240816644)
var rpcErr *jsonrpc.RPCError
require.True(t, errors.As(err, &rpcErr))
require.Nil(t, b)
require.Equal(t, uint64(0), slot)

}

Expand Down

0 comments on commit 990aaae

Please sign in to comment.