From f5e9f304eac25a5635adcb08f730de43c03ed369 Mon Sep 17 00:00:00 2001 From: billettc Date: Tue, 28 Nov 2023 14:05:42 -0500 Subject: [PATCH] Implement dynamic protobuf parsing in block printing The update introduces dynamic parsing of Protocol Buffer files in block printing functionalities. New `dynamicPrinter` structure and factory method have been incorporated to facilitate protobuf file parsing. --- tools_print.go | 141 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 127 insertions(+), 14 deletions(-) diff --git a/tools_print.go b/tools_print.go index bba8e4d..db83e3b 100644 --- a/tools_print.go +++ b/tools_print.go @@ -19,19 +19,24 @@ import ( "fmt" "io" "os" + "os/user" + "path/filepath" "strconv" + "strings" "github.com/go-json-experiment/json" "github.com/go-json-experiment/json/jsontext" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" + "github.com/jhump/protoreflect/dynamic" "github.com/mr-tron/base58" - - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - "github.com/spf13/cobra" "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dstore" "github.com/streamingfast/firehose-core/tools" + "google.golang.org/protobuf/proto" ) var toolsPrintCmd = &cobra.Command{ @@ -58,6 +63,7 @@ func init() { toolsPrintCmd.AddCommand(toolsPrintMergedBlocksCmd) toolsPrintCmd.PersistentFlags().StringP("output", "o", "text", "Output mode for block printing, either 'text', 'json' or 'jsonl'") + toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{"~/.proto"}, "Paths to proto files to use for dynamic decoding of blocks") toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary") } @@ -76,6 +82,7 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor { } printTransactions := sflags.MustGetBool(cmd, "transactions") + protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths") storeURL := args[0] store, err := dstore.NewDBinStore(storeURL) @@ -103,6 +110,11 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor { return err } + dPrinter, err := newDynamicPrinter(protoPaths) + if err != nil { + return fmt.Errorf("unable to create dynamic printer: %w", err) + } + seenBlockCount := 0 for { block, err := readerFactory.Read() @@ -116,7 +128,7 @@ func createToolsPrintMergedBlocksE[B Block](chain *Chain[B]) CommandExecutor { seenBlockCount++ - if err := printBlock(block, chain, outputMode, printTransactions); err != nil { + if err := printBlock(block, chain, outputMode, printTransactions, dPrinter); err != nil { // Error is ready to be passed to the user as-is return err } @@ -139,6 +151,7 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor { } printTransactions := sflags.MustGetBool(cmd, "transactions") + protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths") storeURL := args[0] store, err := dstore.NewDBinStore(storeURL) @@ -161,6 +174,10 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor { return fmt.Errorf("unable to find on block files: %w", err) } + dPrinter, err := newDynamicPrinter(protoPaths) + if err != nil { + return fmt.Errorf("unable to create dynamic printer: %w", err) + } for _, filepath := range files { reader, err := store.OpenObject(ctx, filepath) if err != nil { @@ -183,7 +200,7 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor { return fmt.Errorf("reading block: %w", err) } - if err := printBlock(block, chain, outputMode, printTransactions); err != nil { + if err := printBlock(block, chain, outputMode, printTransactions, dPrinter); err != nil { // Error is ready to be passed to the user as-is return err } @@ -194,13 +211,6 @@ func createToolsPrintOneBlockE[B Block](chain *Chain[B]) CommandExecutor { //go:generate go-enum -f=$GOFILE --marshal --names --nocase -// ENUM( -// -// Text -// JSON -// JSONL -// -// ) type PrintOutputMode uint func toolsPrintCmdGetOutputMode(cmd *cobra.Command) (PrintOutputMode, error) { @@ -214,7 +224,10 @@ func toolsPrintCmdGetOutputMode(cmd *cobra.Command) (PrintOutputMode, error) { return out, nil } -func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode PrintOutputMode, printTransactions bool) error { +func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode PrintOutputMode, printTransactions bool, dPrinter *dynamicPrinter) error { + if pbBlock == nil { + return fmt.Errorf("block is nil") + } switch outputMode { case PrintOutputModeText: err := pbBlock.PrintBlock(printTransactions, os.Stdout) @@ -247,9 +260,20 @@ func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode P var marshallableBlock Block = pbBlock chainBlock := chain.BlockFactory() - if _, ok := chainBlock.(*pbbstream.Block); !ok { + isLegacyBlock := chainBlock == nil + if isLegacyBlock { + err := proto.Unmarshal(pbBlock.GetPayloadBuffer(), chainBlock) + if err != nil { + return fmt.Errorf("unmarshalling legacy pb block : %w", err) + } + marshallableBlock = chainBlock + } else if _, ok := chainBlock.(*pbbstream.Block); ok { + return dPrinter.printBlock(pbBlock, encoder, marshallers) + + } else { marshallableBlock = chainBlock + err := pbBlock.Payload.UnmarshalTo(marshallableBlock) if err != nil { return fmt.Errorf("pbBlock payload unmarshal: %w", err) @@ -264,3 +288,92 @@ func printBlock[B Block](pbBlock *pbbstream.Block, chain *Chain[B], outputMode P return nil } + +type dynamicPrinter struct { + fileDescriptors []*desc.FileDescriptor +} + +func newDynamicPrinter(importPaths []string) (*dynamicPrinter, error) { + fileDescriptors, err := parseProtoFiles(importPaths) + if err != nil { + return nil, fmt.Errorf("parsing proto files: %w", err) + } + return &dynamicPrinter{ + fileDescriptors: fileDescriptors, + }, nil +} + +func (d *dynamicPrinter) printBlock(block *pbbstream.Block, encoder *jsontext.Encoder, marshalers *json.Marshalers) error { + for _, fd := range d.fileDescriptors { + md := fd.FindSymbol(block.Payload.TypeUrl) + if md != nil { + dynMsg := dynamic.NewMessageFactoryWithDefaults().NewDynamicMessage(md.(*desc.MessageDescriptor)) + if err := dynMsg.Unmarshal(block.Payload.Value); err != nil { + return fmt.Errorf("unmarshalling block: %w", err) + } + err := json.MarshalEncode(encoder, dynMsg, json.WithMarshalers(marshalers)) + if err != nil { + return fmt.Errorf("pbBlock JSON printing: json marshal: %w", err) + } + return nil + } + } + return fmt.Errorf("no message descriptor in proto paths for type url %q", block.Payload.TypeUrl) +} + +func parseProtoFiles(importPaths []string) (fds []*desc.FileDescriptor, err error) { + usr, err := user.Current() + if err != nil { + return nil, fmt.Errorf("getting current user: %w", err) + } + userDir := usr.HomeDir + + var ip []string + for _, importPath := range importPaths { + if importPath == "~" { + importPath = userDir + } else if strings.HasPrefix(importPath, "~/") { + importPath = filepath.Join(userDir, importPath[2:]) + } + + importPath, err = filepath.Abs(importPath) + if err != nil { + return nil, fmt.Errorf("getting absolute path for %q: %w", importPath, err) + } + + if !strings.HasSuffix(importPath, "/") { + importPath += "/" + } + ip = append(ip, importPath) + } + + fmt.Println("importPaths", importPaths) + + parser := protoparse.Parser{ + ImportPaths: ip, + } + + var protoFiles []string + for _, importPath := range ip { + err := filepath.Walk(importPath, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.HasSuffix(path, ".proto") && !info.IsDir() { + protoFiles = append(protoFiles, strings.TrimPrefix(path, importPath)) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("walking import path %q: %w", importPath, err) + } + } + + fds, err = parser.ParseFiles(protoFiles...) + if err != nil { + return nil, fmt.Errorf("parsing proto files: %w", err) + } + return + +}