From 661d3797136a71b630552e1a0bdb97d7cc7435f6 Mon Sep 17 00:00:00 2001 From: billettc Date: Thu, 25 Jan 2024 07:59:28 -0500 Subject: [PATCH] added upgrade-merged-blocks --- cmd/firesol/block/upgrader.go | 93 +++++++++++++++++++++++++++++++++++ cmd/firesol/main.go | 15 ++---- 2 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 cmd/firesol/block/upgrader.go diff --git a/cmd/firesol/block/upgrader.go b/cmd/firesol/block/upgrader.go new file mode 100644 index 00000000..c1ffb125 --- /dev/null +++ b/cmd/firesol/block/upgrader.go @@ -0,0 +1,93 @@ +package block + +import ( + "context" + "errors" + "fmt" + "io" + "strconv" + + "github.com/spf13/cobra" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/bstream/stream" + "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" + "github.com/streamingfast/logging" + "go.uber.org/zap" +) + +func NewBlockCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command { + cmd := &cobra.Command{ + Use: "block ", + Short: "upgrade-merged-blocks from legacy to new format using anypb.Any as payload", + } + + cmd.AddCommand(NewFetchCmd(logger, tracer)) + + return cmd +} + +func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade-merged-blocks ", + Short: "upgrade-merged-blocks from legacy to new format using anypb.Any as payload", + Args: cobra.ExactArgs(4), + RunE: getMergedBlockUpgrader(logger), + } + + return cmd +} + +func getMergedBlockUpgrader(rootLog *zap.Logger) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + source := args[0] + sourceStore, err := dstore.NewDBinStore(source) + if err != nil { + return fmt.Errorf("reading source store: %w", err) + } + + dest := args[1] + destStore, err := dstore.NewStore(dest, "dbin.zst", "zstd", true) + if err != nil { + return fmt.Errorf("reading destination store: %w", err) + } + + start, err := strconv.ParseUint(args[2], 10, 64) + if err != nil { + return fmt.Errorf("parsing start block num: %w", err) + } + stop, err := strconv.ParseUint(args[3], 10, 64) + if err != nil { + return fmt.Errorf("parsing stop block num: %w", err) + } + + rootLog.Info("starting block upgrader process", zap.Uint64("start", start), zap.Uint64("stop", stop), zap.String("source", source), zap.String("dest", dest)) + writer := &firecore.MergedBlocksWriter{ + Cmd: cmd, + Store: destStore, + LowBlockNum: firecore.LowBoundary(start), + StopBlockNum: stop, + TweakBlock: setParentBlockNumber, + } + blockStream := stream.New(nil, sourceStore, nil, int64(start), writer, stream.WithFinalBlocksOnly()) + + err = blockStream.Run(context.Background()) + if errors.Is(err, io.EOF) { + rootLog.Info("Complete!") + return nil + } + return err + } +} + +func setParentBlockNumber(block *pbbstream.Block) (*pbbstream.Block, error) { + b := &pbsol.Block{} + err := block.Payload.UnmarshalTo(b) + if err != nil { + return nil, fmt.Errorf("unmarshaling solana block %d: %w", block.Number, err) + } + + block.ParentNum = b.ParentSlot + return block, nil +} diff --git a/cmd/firesol/main.go b/cmd/firesol/main.go index 39df8c5a..3cb2ebd1 100644 --- a/cmd/firesol/main.go +++ b/cmd/firesol/main.go @@ -4,14 +4,14 @@ import ( "fmt" "os" - "github.com/streamingfast/firehose-core/cmd/tools" + "github.com/streamingfast/firehose-solana/cmd/firesol/block" + "github.com/spf13/cobra" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/cmd/tools" "github.com/streamingfast/firehose-core/cmd/tools/compare" - pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" - - "github.com/spf13/cobra" "github.com/streamingfast/firehose-solana/cmd/firesol/rpc" + pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" "github.com/streamingfast/logging" "go.uber.org/zap" ) @@ -26,16 +26,11 @@ func init() { logging.InstantiateLoggers(logging.WithDefaultLevel(zap.InfoLevel)) rootCmd.AddCommand(newFetchCmd(logger, tracer)) chain := &firecore.Chain[*pbsol.Block]{ - //ShortName: "sol", - //LongName: "Solana", - //ExecutableName: "firesol", - //FullyQualifiedModule: "github.com/streamingfast/firehose-solana", - //Protocol: "SOL", - //ProtocolVersion: 1, BlockFactory: func() firecore.Block { return new(pbsol.Block) }, } rootCmd.AddCommand(tools.ToolsCmd) + rootCmd.AddCommand(block.NewBlockCmd(logger, tracer)) tools.ToolsCmd.AddCommand(compare.NewToolsCompareBlocksCmd(chain)) }