diff --git a/substreams_common.go b/substreams_common.go index 3e5f73d..0183114 100644 --- a/substreams_common.go +++ b/substreams_common.go @@ -7,7 +7,6 @@ import ( "github.com/spf13/cobra" "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/wasm" - "google.golang.org/protobuf/proto" ) var registerSSOnce sync.Once @@ -20,10 +19,6 @@ func registerCommonSubstreamsFlags(cmd *cobra.Command) { }) } -func getSubstreamsBlockMessageType[B Block](chain *Chain[B]) string { - return string(proto.MessageName(chain.BlockFactory())) -} - func getSubstreamsExtensions[B Block](chain *Chain[B]) ([]wasm.WASMExtensioner, []pipeline.PipelineOptioner, error) { var wasmExtensions []wasm.WASMExtensioner var pipelineOptions []pipeline.PipelineOptioner diff --git a/substreams_tier1.go b/substreams_tier1.go index 0af1f9d..c45c479 100644 --- a/substreams_tier1.go +++ b/substreams_tier1.go @@ -42,12 +42,12 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { RegisterFlags: func(cmd *cobra.Command) error { cmd.Flags().String("substreams-tier1-grpc-listen-addr", SubstreamsTier1GRPCServingAddr, "Address on which the Substreams tier1 will listen, listen by default in plain text, appending a '*' to the end of the address make it listen in snake-oil (inscure) TLS") cmd.Flags().String("substreams-tier1-subrequests-endpoint", SubstreamsTier2GRPCServingAddr, "Address on which the Substreans tier1 can reach the tier2") - // communication with tier2 cmd.Flags().String("substreams-tier1-discovery-service-url", "", "URL to configure the grpc discovery service, used for communication with tier2") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true cmd.Flags().Bool("substreams-tier1-subrequests-insecure", false, "Connect to tier2 without checking certificate validity") cmd.Flags().Bool("substreams-tier1-subrequests-plaintext", true, "Connect to tier2 without client in plaintext mode") cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request") + cmd.Flags().String("substreams-tier1-block-type", "", "fully qualified name of the block type to use for the substreams tier1 (i.e. sf.ethereum.v1.Block)") // all substreams registerCommonSubstreamsFlags(cmd) @@ -81,6 +81,11 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { subrequestsInsecure := viper.GetBool("substreams-tier1-subrequests-insecure") subrequestsPlaintext := viper.GetBool("substreams-tier1-subrequests-plaintext") maxSubrequests := viper.GetUint64("substreams-tier1-max-subrequests") + substreamsBlockType := viper.GetString("substreams-tier1-block-type") + + if substreamsBlockType == "" { + return nil, fmt.Errorf("substreams-tier1-block-type is required") + } tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec" @@ -111,7 +116,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { StateStoreURL: stateStoreURL, StateStoreDefaultTag: stateStoreDefaultTag, StateBundleSize: stateBundleSize, - BlockType: getSubstreamsBlockMessageType(chain), + BlockType: substreamsBlockType, MaxSubrequests: maxSubrequests, SubrequestsEndpoint: subrequestsEndpoint, SubrequestsInsecure: subrequestsInsecure, diff --git a/substreams_tier2.go b/substreams_tier2.go index 01e60e9..e9aaea9 100644 --- a/substreams_tier2.go +++ b/substreams_tier2.go @@ -40,6 +40,7 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { RegisterFlags: func(cmd *cobra.Command) error { cmd.Flags().String("substreams-tier2-grpc-listen-addr", SubstreamsTier2GRPCServingAddr, "Address on which the substreams tier2 will listen. Default is plain-text, appending a '*' to the end to jkkkj") cmd.Flags().String("substreams-tier2-discovery-service-url", "", "URL to advertise presence to the grpc discovery service") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true + cmd.Flags().String("substreams-tier2-block-type", "", "fully qualified name of the block type to use for the substreams tier1 (i.e. sf.ethereum.v1.Block)") // all substreams registerCommonSubstreamsFlags(cmd) @@ -61,6 +62,11 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag") stateBundleSize := viper.GetUint64("substreams-state-bundle-size") + substreamsBlockType := viper.GetString("substreams-tier2-block-type") + + if substreamsBlockType == "" { + return nil, fmt.Errorf("substreams-tier2-block-type is required") + } tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec" @@ -88,7 +94,7 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { StateStoreURL: stateStoreURL, StateStoreDefaultTag: stateStoreDefaultTag, StateBundleSize: stateBundleSize, - BlockType: getSubstreamsBlockMessageType(chain), + BlockType: substreamsBlockType, WASMExtensions: wasmExtensions, PipelineOptions: pipelineOptioner,