Skip to content

Commit

Permalink
fix substreams block-type
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Nov 30, 2023
1 parent 1d6f3c3 commit 2e7b9f9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
5 changes: 0 additions & 5 deletions substreams_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/spf13/cobra"
"github.com/streamingfast/substreams/pipeline"
"github.com/streamingfast/substreams/wasm"
"google.golang.org/protobuf/proto"
)

var registerSSOnce sync.Once
Expand All @@ -20,10 +19,6 @@ func registerCommonSubstreamsFlags(cmd *cobra.Command) {
})
}

func getSubstreamsBlockMessageType[B Block](chain *Chain[B]) string {
return string(proto.MessageName(chain.BlockFactory()))
}

func getSubstreamsExtensions[B Block](chain *Chain[B]) ([]wasm.WASMExtensioner, []pipeline.PipelineOptioner, error) {
var wasmExtensions []wasm.WASMExtensioner
var pipelineOptions []pipeline.PipelineOptioner
Expand Down
9 changes: 7 additions & 2 deletions substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("substreams-tier1-grpc-listen-addr", SubstreamsTier1GRPCServingAddr, "Address on which the Substreams tier1 will listen, listen by default in plain text, appending a '*' to the end of the address make it listen in snake-oil (inscure) TLS")
cmd.Flags().String("substreams-tier1-subrequests-endpoint", SubstreamsTier2GRPCServingAddr, "Address on which the Substreans tier1 can reach the tier2")

// communication with tier2
cmd.Flags().String("substreams-tier1-discovery-service-url", "", "URL to configure the grpc discovery service, used for communication with tier2") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().Bool("substreams-tier1-subrequests-insecure", false, "Connect to tier2 without checking certificate validity")
cmd.Flags().Bool("substreams-tier1-subrequests-plaintext", true, "Connect to tier2 without client in plaintext mode")
cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request")
cmd.Flags().String("substreams-tier1-block-type", "", "fully qualified name of the block type to use for the substreams tier1 (i.e. sf.ethereum.v1.Block)")

// all substreams
registerCommonSubstreamsFlags(cmd)
Expand Down Expand Up @@ -81,6 +81,11 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
subrequestsInsecure := viper.GetBool("substreams-tier1-subrequests-insecure")
subrequestsPlaintext := viper.GetBool("substreams-tier1-subrequests-plaintext")
maxSubrequests := viper.GetUint64("substreams-tier1-max-subrequests")
substreamsBlockType := viper.GetString("substreams-tier1-block-type")

if substreamsBlockType == "" {
return nil, fmt.Errorf("substreams-tier1-block-type is required")
}

tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec"

Expand Down Expand Up @@ -111,7 +116,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
BlockType: getSubstreamsBlockMessageType(chain),
BlockType: substreamsBlockType,
MaxSubrequests: maxSubrequests,
SubrequestsEndpoint: subrequestsEndpoint,
SubrequestsInsecure: subrequestsInsecure,
Expand Down
8 changes: 7 additions & 1 deletion substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("substreams-tier2-grpc-listen-addr", SubstreamsTier2GRPCServingAddr, "Address on which the substreams tier2 will listen. Default is plain-text, appending a '*' to the end to jkkkj")
cmd.Flags().String("substreams-tier2-discovery-service-url", "", "URL to advertise presence to the grpc discovery service") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().String("substreams-tier2-block-type", "", "fully qualified name of the block type to use for the substreams tier1 (i.e. sf.ethereum.v1.Block)")

// all substreams
registerCommonSubstreamsFlags(cmd)
Expand All @@ -61,6 +62,11 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag")

stateBundleSize := viper.GetUint64("substreams-state-bundle-size")
substreamsBlockType := viper.GetString("substreams-tier2-block-type")

if substreamsBlockType == "" {
return nil, fmt.Errorf("substreams-tier2-block-type is required")
}

tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec"

Expand Down Expand Up @@ -88,7 +94,7 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
BlockType: getSubstreamsBlockMessageType(chain),
BlockType: substreamsBlockType,

WASMExtensions: wasmExtensions,
PipelineOptions: pipelineOptioner,
Expand Down

0 comments on commit 2e7b9f9

Please sign in to comment.