Skip to content

Commit

Permalink
Implement dynamic protobuf parsing in block printing
Browse files Browse the repository at this point in the history
The update introduces dynamic parsing of Protocol Buffer files in block printing functionalities. New `dynamicPrinter` structure and factory method have been incorporated to facilitate protobuf file parsing.
  • Loading branch information
billettc committed Nov 28, 2023
1 parent bc92305 commit f5e9f30
Showing 1 changed file with 127 additions and 14 deletions.
141 changes: 127 additions & 14 deletions tools_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@ import (
"fmt"
"io"
"os"
"os/user"
"path/filepath"
"strconv"
"strings"

"github.com/go-json-experiment/json"
"github.com/go-json-experiment/json/jsontext"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
"github.com/mr-tron/base58"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dstore"
"github.com/streamingfast/firehose-core/tools"
"google.golang.org/protobuf/proto"
)

var toolsPrintCmd = &cobra.Command{
Expand All @@ -58,6 +63,7 @@ func init() {
toolsPrintCmd.AddCommand(toolsPrintMergedBlocksCmd)

toolsPrintCmd.PersistentFlags().StringP("output", "o", "text", "Output mode for block printing, either 'text', 'json' or 'jsonl'")
toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{"~/.proto"}, "Paths to proto files to use for dynamic decoding of blocks")
toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary")
}

Expand All @@ -76,6 +82,7 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor {
}

printTransactions := sflags.MustGetBool(cmd, "transactions")
protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths")

storeURL := args[0]
store, err := dstore.NewDBinStore(storeURL)
Expand Down Expand Up @@ -103,6 +110,11 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor {
return err
}

dPrinter, err := newDynamicPrinter(protoPaths)
if err != nil {
return fmt.Errorf("unable to create dynamic printer: %w", err)
}

seenBlockCount := 0
for {
block, err := readerFactory.Read()
Expand All @@ -116,7 +128,7 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor {

seenBlockCount++

if err := printBlock(block, chain, outputMode, printTransactions); err != nil {
if err := printBlock(block, chain, outputMode, printTransactions, dPrinter); err != nil {
// Error is ready to be passed to the user as-is
return err
}
Expand All @@ -139,6 +151,7 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor {
}

printTransactions := sflags.MustGetBool(cmd, "transactions")
protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths")

storeURL := args[0]
store, err := dstore.NewDBinStore(storeURL)
Expand All @@ -161,6 +174,10 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor {
return fmt.Errorf("unable to find on block files: %w", err)
}

dPrinter, err := newDynamicPrinter(protoPaths)
if err != nil {
return fmt.Errorf("unable to create dynamic printer: %w", err)
}
for _, filepath := range files {
reader, err := store.OpenObject(ctx, filepath)
if err != nil {
Expand All @@ -183,7 +200,7 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor {
return fmt.Errorf("reading block: %w", err)
}

if err := printBlock(block, chain, outputMode, printTransactions); err != nil {
if err := printBlock(block, chain, outputMode, printTransactions, dPrinter); err != nil {
// Error is ready to be passed to the user as-is
return err
}
Expand All @@ -194,13 +211,6 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor {

//go:generate go-enum -f=$GOFILE --marshal --names --nocase

// ENUM(
//
// Text
// JSON
// JSONL
//
// )
type PrintOutputMode uint

func toolsPrintCmdGetOutputMode(cmd *cobra.Command) (PrintOutputMode, error) {
Expand All @@ -214,7 +224,10 @@ func toolsPrintCmdGetOutputMode(cmd *cobra.Command) (PrintOutputMode, error) {
return out, nil
}

func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode PrintOutputMode, printTransactions bool) error {
func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode PrintOutputMode, printTransactions bool, dPrinter *dynamicPrinter) error {
if pbBlock == nil {
return fmt.Errorf("block is nil")
}
switch outputMode {
case PrintOutputModeText:
err := pbBlock.PrintBlock(printTransactions, os.Stdout)
Expand Down Expand Up @@ -247,9 +260,20 @@ func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode P

var marshallableBlock Block = pbBlock
chainBlock := chain.BlockFactory()
if _, ok := chainBlock.(*pbbstream.Block); !ok {
isLegacyBlock := chainBlock == nil
if isLegacyBlock {
err := proto.Unmarshal(pbBlock.GetPayloadBuffer(), chainBlock)
if err != nil {
return fmt.Errorf("unmarshalling legacy pb block : %w", err)
}
marshallableBlock = chainBlock

} else if _, ok := chainBlock.(*pbbstream.Block); ok {
return dPrinter.printBlock(pbBlock, encoder, marshallers)

} else {
marshallableBlock = chainBlock

err := pbBlock.Payload.UnmarshalTo(marshallableBlock)
if err != nil {
return fmt.Errorf("pbBlock payload unmarshal: %w", err)
Expand All @@ -264,3 +288,92 @@ func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode P

return nil
}

type dynamicPrinter struct {
fileDescriptors []*desc.FileDescriptor
}

func newDynamicPrinter(importPaths []string) (*dynamicPrinter, error) {
fileDescriptors, err := parseProtoFiles(importPaths)
if err != nil {
return nil, fmt.Errorf("parsing proto files: %w", err)
}
return &dynamicPrinter{
fileDescriptors: fileDescriptors,
}, nil
}

func (d *dynamicPrinter) printBlock(block *pbbstream.Block, encoder *jsontext.Encoder, marshalers *json.Marshalers) error {
for _, fd := range d.fileDescriptors {
md := fd.FindSymbol(block.Payload.TypeUrl)
if md != nil {
dynMsg := dynamic.NewMessageFactoryWithDefaults().NewDynamicMessage(md.(*desc.MessageDescriptor))
if err := dynMsg.Unmarshal(block.Payload.Value); err != nil {
return fmt.Errorf("unmarshalling block: %w", err)
}
err := json.MarshalEncode(encoder, dynMsg, json.WithMarshalers(marshalers))
if err != nil {
return fmt.Errorf("pbBlock JSON printing: json marshal: %w", err)
}
return nil
}
}
return fmt.Errorf("no message descriptor in proto paths for type url %q", block.Payload.TypeUrl)
}

func parseProtoFiles(importPaths []string) (fds []*desc.FileDescriptor, err error) {
usr, err := user.Current()
if err != nil {
return nil, fmt.Errorf("getting current user: %w", err)
}
userDir := usr.HomeDir

var ip []string
for _, importPath := range importPaths {
if importPath == "~" {
importPath = userDir
} else if strings.HasPrefix(importPath, "~/") {
importPath = filepath.Join(userDir, importPath[2:])
}

importPath, err = filepath.Abs(importPath)
if err != nil {
return nil, fmt.Errorf("getting absolute path for %q: %w", importPath, err)
}

if !strings.HasSuffix(importPath, "/") {
importPath += "/"
}
ip = append(ip, importPath)
}

fmt.Println("importPaths", importPaths)

parser := protoparse.Parser{
ImportPaths: ip,
}

var protoFiles []string
for _, importPath := range ip {
err := filepath.Walk(importPath,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasSuffix(path, ".proto") && !info.IsDir() {
protoFiles = append(protoFiles, strings.TrimPrefix(path, importPath))
}
return nil
})
if err != nil {
return nil, fmt.Errorf("walking import path %q: %w", importPath, err)
}
}

fds, err = parser.ParseFiles(protoFiles...)
if err != nil {
return nil, fmt.Errorf("parsing proto files: %w", err)
}
return

}

0 comments on commit f5e9f30

Please sign in to comment.