Skip to content

Commit

Permalink
added upgrade-merged-blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 25, 2024
1 parent a35335e commit 661d379
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 10 deletions.
93 changes: 93 additions & 0 deletions cmd/firesol/block/upgrader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package block

import (
"context"
"errors"
"fmt"
"io"
"strconv"

"github.com/spf13/cobra"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func NewBlockCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "block <source> <destination> <range>",
Short: "upgrade-merged-blocks from legacy to new format using anypb.Any as payload",
}

cmd.AddCommand(NewFetchCmd(logger, tracer))

return cmd
}

func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade-merged-blocks <source> <destination> <range>",
Short: "upgrade-merged-blocks from legacy to new format using anypb.Any as payload",
Args: cobra.ExactArgs(4),
RunE: getMergedBlockUpgrader(logger),
}

return cmd
}

func getMergedBlockUpgrader(rootLog *zap.Logger) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
source := args[0]
sourceStore, err := dstore.NewDBinStore(source)
if err != nil {
return fmt.Errorf("reading source store: %w", err)
}

dest := args[1]
destStore, err := dstore.NewStore(dest, "dbin.zst", "zstd", true)
if err != nil {
return fmt.Errorf("reading destination store: %w", err)
}

start, err := strconv.ParseUint(args[2], 10, 64)
if err != nil {
return fmt.Errorf("parsing start block num: %w", err)
}
stop, err := strconv.ParseUint(args[3], 10, 64)
if err != nil {
return fmt.Errorf("parsing stop block num: %w", err)
}

rootLog.Info("starting block upgrader process", zap.Uint64("start", start), zap.Uint64("stop", stop), zap.String("source", source), zap.String("dest", dest))
writer := &firecore.MergedBlocksWriter{
Cmd: cmd,
Store: destStore,
LowBlockNum: firecore.LowBoundary(start),
StopBlockNum: stop,
TweakBlock: setParentBlockNumber,
}
blockStream := stream.New(nil, sourceStore, nil, int64(start), writer, stream.WithFinalBlocksOnly())

err = blockStream.Run(context.Background())
if errors.Is(err, io.EOF) {
rootLog.Info("Complete!")
return nil
}
return err
}
}

func setParentBlockNumber(block *pbbstream.Block) (*pbbstream.Block, error) {
b := &pbsol.Block{}
err := block.Payload.UnmarshalTo(b)
if err != nil {
return nil, fmt.Errorf("unmarshaling solana block %d: %w", block.Number, err)
}

block.ParentNum = b.ParentSlot
return block, nil
}
15 changes: 5 additions & 10 deletions cmd/firesol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"
"os"

"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-solana/cmd/firesol/block"

"github.com/spf13/cobra"
firecore "github.com/streamingfast/firehose-core"

Check failure on line 10 in cmd/firesol/main.go

View workflow job for this annotation

GitHub Actions / build (1.21.x)

github.com/streamingfast/[email protected]: replacement directory ../firehose-core does not exist
"github.com/streamingfast/firehose-core/cmd/tools"

Check failure on line 11 in cmd/firesol/main.go

View workflow job for this annotation

GitHub Actions / build (1.21.x)

github.com/streamingfast/[email protected]: replacement directory ../firehose-core does not exist
"github.com/streamingfast/firehose-core/cmd/tools/compare"

Check failure on line 12 in cmd/firesol/main.go

View workflow job for this annotation

GitHub Actions / build (1.21.x)

github.com/streamingfast/[email protected]: replacement directory ../firehose-core does not exist
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"

"github.com/spf13/cobra"
"github.com/streamingfast/firehose-solana/cmd/firesol/rpc"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)
Expand All @@ -26,16 +26,11 @@ func init() {
logging.InstantiateLoggers(logging.WithDefaultLevel(zap.InfoLevel))
rootCmd.AddCommand(newFetchCmd(logger, tracer))
chain := &firecore.Chain[*pbsol.Block]{
//ShortName: "sol",
//LongName: "Solana",
//ExecutableName: "firesol",
//FullyQualifiedModule: "github.com/streamingfast/firehose-solana",
//Protocol: "SOL",
//ProtocolVersion: 1,
BlockFactory: func() firecore.Block { return new(pbsol.Block) },
}

rootCmd.AddCommand(tools.ToolsCmd)
rootCmd.AddCommand(block.NewBlockCmd(logger, tracer))
tools.ToolsCmd.AddCommand(compare.NewToolsCompareBlocksCmd(chain))
}

Expand Down

0 comments on commit 661d379

Please sign in to comment.