From 990aaae7413612ac0240de0557ba415337df467d Mon Sep 17 00:00:00 2001 From: billettc Date: Wed, 17 Jan 2024 08:54:49 -0500 Subject: [PATCH] made code more testable and add some unit tests --- .gitignore | 1 + block/fetcher/rpc.go | 52 +++++++++++++++++++++++++-------------- block/fetcher/rpc_test.go | 46 +++++++++++++++++++++++++++++++--- 3 files changed, 77 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 50b10de0..47b564c7 100644 --- a/.gitignore +++ b/.gitignore @@ -93,3 +93,4 @@ venv /firehose-data/storage/one-blocks/0240816598-nRQqTFc22RED3W5g-AQipdgoR9gVQLc2k-240816597-default.dbin.zst /firehose-data/storage/one-blocks/0240816599-fUUN3FS7js2v2wci-nRQqTFc22RED3W5g-240816598-default.dbin.zst /firesol-legacy +/block/fetcher/secret.go diff --git a/block/fetcher/rpc.go b/block/fetcher/rpc.go index 64b38f54..9c0be2ce 100644 --- a/block/fetcher/rpc.go +++ b/block/fetcher/rpc.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "errors" "fmt" "math" "time" @@ -28,6 +29,8 @@ var GetBlockOpts = &rpc.GetBlockOpts{ MaxSupportedTransactionVersion: &MaxSupportedTransactionVersion, } +type fetchBlock func(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error) + type RPCFetcher struct { rpcClient *rpc.Client latestConfirmedSlot uint64 @@ -36,15 +39,18 @@ type RPCFetcher struct { fetchInterval time.Duration lastFetchAt time.Time logger *zap.Logger + fetchBlock fetchBlock } func NewRPC(rpcClient *rpc.Client, fetchInterval time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *RPCFetcher { - return &RPCFetcher{ + f := &RPCFetcher{ rpcClient: rpcClient, fetchInterval: fetchInterval, latestBlockRetryInterval: latestBlockRetryInterval, logger: logger, } + f.fetchBlock = f.fetchRpcBlock + return f } func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, err error) { @@ -79,32 +85,42 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs break } - //todo : if err is a type skipped block error here, requestedSlot will be requestSlot + 1 while it's returning no skipped error - var blockResult *rpc.GetBlockResult - - for { - blockResult, err = f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts) - if err != nil { - rpcErr := err.(*jsonrpc.RPCError) - if rpcErr != nil && rpcErr.Code == -32009 { - requestedSlot += 1 - continue - } - return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err) - } - break + resolvedSlot, blockResult, err := f.fetch(ctx, requestedSlot) + if err != nil { + return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err) } - block, err := blockFromBlockResult(requestedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult) + block, err := blockFromBlockResult(resolvedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult) if err != nil { - return nil, fmt.Errorf("decoding block %d: %w", requestedSlot, err) + return nil, fmt.Errorf("decoding block %d: %w", resolvedSlot, err) } return block, nil } -func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedSlot uint64, result *rpc.GetBlockResult) (*pbbstream.Block, error) { +func (f *RPCFetcher) fetchRpcBlock(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error) { + b, err := f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts) + return requestedSlot, b, err +} +func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (slot uint64, out *rpc.GetBlockResult, err error) { + for { + resolvedSlot, blockResult, err := f.fetchBlock(ctx, requestedSlot) + if err != nil { + var rpcErr *jsonrpc.RPCError + if errors.As(err, &rpcErr) { + if rpcErr.Code == -32009 { + requestedSlot += 1 + continue + } + } + return 0, nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err) + } + return resolvedSlot, blockResult, nil + } +} + +func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedSlot uint64, result *rpc.GetBlockResult) (*pbbstream.Block, error) { libNum := finalizedSlot if finalizedSlot > requestedSlot { diff --git a/block/fetcher/rpc_test.go b/block/fetcher/rpc_test.go index 8123918c..b64aa898 100644 --- a/block/fetcher/rpc_test.go +++ b/block/fetcher/rpc_test.go @@ -3,23 +3,61 @@ package fetcher import ( "bytes" "context" + "errors" "testing" "time" - "go.uber.org/zap" - "github.com/gagliardetto/solana-go/rpc" + "github.com/gagliardetto/solana-go/rpc/jsonrpc" bin "github.com/streamingfast/binary" "github.com/test-go/testify/require" + "go.uber.org/zap" ) -func Test_ToPBTransaction(t *testing.T) { +func Test_DoIt(t *testing.T) { + t.Skip("TODO: fix this test") ctx := context.Background() - rpcClient := rpc.New("https://icy-old-cloud.solana-mainnet.quiknode.pro/75682dc94e37cd35ccf9351e11669dcabc3ad176/") + rpcClient := rpc.New(quicknodeURL) //put your own URL in a file call secret.go that will be ignore by git f := NewRPC(rpcClient, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop()) _, err := f.Fetch(ctx, 240816644) require.NoError(t, err) +} + +func Test_fetchBlock(t *testing.T) { + ctx := context.Background() + + f := NewRPC(nil, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop()) + f.fetchBlock = func(ctx context.Context, slot uint64) (uint64, *rpc.GetBlockResult, error) { + if slot == 240816644 || slot == 240816645 { + return 0, nil, &jsonrpc.RPCError{ + Code: -32009, + } + } + return slot, &rpc.GetBlockResult{}, nil + } + + slot, _, err := f.fetch(ctx, 240816644) + require.NoError(t, err) + require.Equal(t, uint64(240816646), slot) + +} + +func Test_fetchBlockFailing(t *testing.T) { + ctx := context.Background() + + f := NewRPC(nil, 0*time.Millisecond, 0*time.Millisecond, zap.NewNop()) + f.fetchBlock = func(ctx context.Context, slot uint64) (uint64, *rpc.GetBlockResult, error) { + return 0, nil, &jsonrpc.RPCError{ + Code: -00001, + } + } + + slot, b, err := f.fetch(ctx, 240816644) + var rpcErr *jsonrpc.RPCError + require.True(t, errors.As(err, &rpcErr)) + require.Nil(t, b) + require.Equal(t, uint64(0), slot) }