From d273ae03da3c9efac7fb53a123f0de592ce22d92 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 23 Sep 2024 12:09:01 -0400 Subject: [PATCH] feat(schema/indexer)!: implement start indexing (#21636) Co-authored-by: Marko --- indexer/postgres/indexer.go | 34 +-- indexer/postgres/tests/config.go | 26 --- indexer/postgres/tests/init_schema_test.go | 23 +- indexer/postgres/tests/postgres_test.go | 28 ++- .../postgres/tests/testdata/init_schema.txt | 4 + .../testdata/init_schema_no_retain_delete.txt | 4 + schema/decoding/resolver.go | 6 +- schema/decoding/resolver_test.go | 4 +- schema/decoding/sync.go | 2 +- schema/indexer/config.go | 52 +++++ schema/indexer/indexer.go | 43 +--- schema/indexer/manager.go | 50 ---- schema/indexer/registry.go | 10 +- schema/indexer/registry_test.go | 16 +- schema/indexer/start.go | 212 +++++++++++++++++ schema/indexer/start_test.go | 219 ++++++++++++++++++ schema/logutil/logger.go | 7 + 17 files changed, 570 insertions(+), 170 deletions(-) delete mode 100644 indexer/postgres/tests/config.go create mode 100644 schema/indexer/config.go delete mode 100644 schema/indexer/manager.go create mode 100644 schema/indexer/start.go create mode 100644 schema/indexer/start_test.go diff --git a/indexer/postgres/indexer.go b/indexer/postgres/indexer.go index bfaac25842e5..a4a35d730b67 100644 --- a/indexer/postgres/indexer.go +++ b/indexer/postgres/indexer.go @@ -3,8 +3,8 @@ package postgres import ( "context" "database/sql" - "encoding/json" "errors" + "fmt" "cosmossdk.io/schema/indexer" "cosmossdk.io/schema/logutil" @@ -21,8 +21,6 @@ type Config struct { DisableRetainDeletions bool `json:"disable_retain_deletions"` } -type SqlLogger = func(msg, sql string, params ...interface{}) - type indexerImpl struct { ctx context.Context db *sql.DB @@ -32,10 +30,17 @@ type indexerImpl struct { logger logutil.Logger } -func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) { - config, err := decodeConfig(params.Config.Config) - if err != nil { - return indexer.InitResult{}, err +func init() { + indexer.Register("postgres", indexer.Initializer{ + InitFunc: startIndexer, + ConfigType: Config{}, + }) +} + +func startIndexer(params indexer.InitParams) (indexer.InitResult, error) { + config, ok := params.Config.Config.(Config) + if !ok { + return indexer.InitResult{}, fmt.Errorf("invalid config type, expected %T got %T", Config{}, params.Config.Config) } ctx := params.Context @@ -89,18 +94,3 @@ func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) { View: idx, }, nil } - -func decodeConfig(rawConfig map[string]interface{}) (*Config, error) { - bz, err := json.Marshal(rawConfig) - if err != nil { - return nil, err - } - - var config Config - err = json.Unmarshal(bz, &config) - if err != nil { - return nil, err - } - - return &config, nil -} diff --git a/indexer/postgres/tests/config.go b/indexer/postgres/tests/config.go deleted file mode 100644 index 78e41f6059b5..000000000000 --- a/indexer/postgres/tests/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package tests - -import ( - "encoding/json" - - "cosmossdk.io/indexer/postgres" - "cosmossdk.io/schema/indexer" -) - -func postgresConfigToIndexerConfig(cfg postgres.Config) (indexer.Config, error) { - cfgBz, err := json.Marshal(cfg) - if err != nil { - return indexer.Config{}, err - } - - var cfgMap map[string]interface{} - err = json.Unmarshal(cfgBz, &cfgMap) - if err != nil { - return indexer.Config{}, err - } - - return indexer.Config{ - Type: "postgres", - Config: cfgMap, - }, nil -} diff --git a/indexer/postgres/tests/init_schema_test.go b/indexer/postgres/tests/init_schema_test.go index 4257f37d5ab7..e3fef06fc265 100644 --- a/indexer/postgres/tests/init_schema_test.go +++ b/indexer/postgres/tests/init_schema_test.go @@ -33,17 +33,20 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st connectionUrl := createTestDB(t) buf := &strings.Builder{} - - cfg, err := postgresConfigToIndexerConfig(postgres.Config{ - DatabaseURL: connectionUrl, - DisableRetainDeletions: disableRetainDeletions, - }) - require.NoError(t, err) - - res, err := postgres.StartIndexer(indexer.InitParams{ - Config: cfg, + res, err := indexer.StartIndexing(indexer.IndexingOptions{ + Config: indexer.IndexingConfig{ + Target: map[string]indexer.Config{ + "postgres": { + Type: "postgres", + Config: postgres.Config{ + DatabaseURL: connectionUrl, + DisableRetainDeletions: disableRetainDeletions, + }, + }, + }, + }, Context: context.Background(), - Logger: &prettyLogger{buf}, + Logger: prettyLogger{buf}, }) require.NoError(t, err) listener := res.Listener diff --git a/indexer/postgres/tests/postgres_test.go b/indexer/postgres/tests/postgres_test.go index 0d5207ddcd92..6256f83db31c 100644 --- a/indexer/postgres/tests/postgres_test.go +++ b/indexer/postgres/tests/postgres_test.go @@ -52,24 +52,29 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { require.NoError(t, err) }) - cfg, err := postgresConfigToIndexerConfig(postgres.Config{ - DatabaseURL: dbUrl, - DisableRetainDeletions: !retainDeletions, - }) - require.NoError(t, err) - debugLog := &strings.Builder{} - pgIndexer, err := postgres.StartIndexer(indexer.InitParams{ - Config: cfg, + res, err := indexer.StartIndexing(indexer.IndexingOptions{ + Config: indexer.IndexingConfig{ + Target: map[string]indexer.Config{ + "postgres": { + Type: "postgres", + Config: postgres.Config{ + DatabaseURL: dbUrl, + DisableRetainDeletions: !retainDeletions, + }, + }, + }, + }, Context: ctx, Logger: &prettyLogger{debugLog}, AddressCodec: addressutil.HexAddressCodec{}, }) require.NoError(t, err) + require.NoError(t, err) sim, err := appdatasim.NewSimulator(appdatasim.Options{ - Listener: pgIndexer.Listener, + Listener: res.Listener, AppSchema: indexertesting.ExampleAppSchema, StateSimOptions: statesim.Options{ CanRetainDeletions: retainDeletions, @@ -77,6 +82,9 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { }) require.NoError(t, err) + pgIndexerView := res.IndexerInfos["postgres"].View + require.NotNil(t, pgIndexerView) + blockDataGen := sim.BlockDataGenN(10, 100) numBlocks := 200 if testing.Short() { @@ -93,7 +101,7 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { require.NoError(t, sim.ProcessBlockData(blockData), debugLog.String()) // compare the expected state in the simulator to the actual state in the indexer and expect the diff to be empty - require.Empty(t, appdatasim.DiffAppData(sim, pgIndexer.View), debugLog.String()) + require.Empty(t, appdatasim.DiffAppData(sim, pgIndexerView), debugLog.String()) // reset the debug log after each successful block so that it doesn't get too long when debugging debugLog.Reset() diff --git a/indexer/postgres/tests/testdata/init_schema.txt b/indexer/postgres/tests/testdata/init_schema.txt index 4d18d2fb23b1..43b91a6a7a83 100644 --- a/indexer/postgres/tests/testdata/init_schema.txt +++ b/indexer/postgres/tests/testdata/init_schema.txt @@ -1,3 +1,7 @@ +INFO: Starting indexing +INFO: Starting indexer + target_name: postgres + type: postgres DEBUG: Creating enum type sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c'); DEBUG: Creating enum type diff --git a/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt b/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt index 0ec17ae1ea1d..71dfd4d08290 100644 --- a/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt +++ b/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt @@ -1,3 +1,7 @@ +INFO: Starting indexing +INFO: Starting indexer + target_name: postgres + type: postgres DEBUG: Creating enum type sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c'); DEBUG: Creating enum type diff --git a/schema/decoding/resolver.go b/schema/decoding/resolver.go index cb022dbb6947..5478573401f6 100644 --- a/schema/decoding/resolver.go +++ b/schema/decoding/resolver.go @@ -15,8 +15,8 @@ type DecoderResolver interface { // EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate. EncodeModuleName(string) ([]byte, error) - // IterateAll iterates over all available module decoders. - IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error + // AllDecoders iterates over all available module decoders. + AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error) error // LookupDecoder looks up a specific module decoder. LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error) @@ -48,7 +48,7 @@ func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) { return nil, fmt.Errorf("module %s not found", s) } -func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error { +func (a moduleSetDecoderResolver) AllDecoders(f func(string, schema.ModuleCodec) error) error { keys := make([]string, 0, len(a.moduleSet)) for k := range a.moduleSet { keys = append(keys, k) diff --git a/schema/decoding/resolver_test.go b/schema/decoding/resolver_test.go index e3a98863681a..2cf13b52b6cf 100644 --- a/schema/decoding/resolver_test.go +++ b/schema/decoding/resolver_test.go @@ -43,7 +43,7 @@ var testResolver = ModuleSetDecoderResolver(moduleSet) func TestModuleSetDecoderResolver_IterateAll(t *testing.T) { objectTypes := map[string]bool{} - err := testResolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + err := testResolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { cdc.Schema.AllTypes(func(t schema.Type) bool { objTyp, ok := t.(schema.StateObjectType) if ok { @@ -128,7 +128,7 @@ func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) { resolver := ModuleSetDecoderResolver(map[string]interface{}{ "modD": modD{}, }) - err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + err := resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { if moduleName == "modD" { t.Fatalf("expected error") } diff --git a/schema/decoding/sync.go b/schema/decoding/sync.go index d8aee9884c6a..68690b5716c9 100644 --- a/schema/decoding/sync.go +++ b/schema/decoding/sync.go @@ -27,7 +27,7 @@ func Sync(listener appdata.Listener, source SyncSource, resolver DecoderResolver return nil } - return resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + return resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) { // ignore this module return nil diff --git a/schema/indexer/config.go b/schema/indexer/config.go new file mode 100644 index 000000000000..2b2ea13485bd --- /dev/null +++ b/schema/indexer/config.go @@ -0,0 +1,52 @@ +package indexer + +// Config species the configuration passed to an indexer initialization function. +// It includes both common configuration options related to include or excluding +// parts of the data stream as well as indexer specific options under the config +// subsection. +// +// NOTE: it is an error for an indexer to change its common options, such as adding +// or removing indexed modules, after the indexer has been initialized because this +// could result in an inconsistent state. +type Config struct { + // Type is the name of the indexer type as registered with Register. + Type string `mapstructure:"type" toml:"type" json:"type" comment:"The name of the registered indexer type."` + + // Config are the indexer specific config options specified by the user. + Config interface{} `mapstructure:"config" toml:"config" json:"config,omitempty" comment:"Indexer specific configuration options."` + + // Filter is the filter configuration for the indexer. + Filter *FilterConfig `mapstructure:"filter" toml:"filter" json:"filter,omitempty" comment:"Filter configuration for the indexer. Currently UNSUPPORTED!"` +} + +// FilterConfig specifies the configuration for filtering the data stream +type FilterConfig struct { + // ExcludeState specifies that the indexer will not receive state updates. + ExcludeState bool `mapstructure:"exclude_state" toml:"exclude_state" json:"exclude_state" comment:"Exclude all state updates."` + + // ExcludeEvents specifies that the indexer will not receive events. + ExcludeEvents bool `mapstructure:"exclude_events" toml:"exclude_events" json:"exclude_events" comment:"Exclude all events."` + + // ExcludeTxs specifies that the indexer will not receive transaction's. + ExcludeTxs bool `mapstructure:"exclude_txs" toml:"exclude_txs" json:"exclude_txs" comment:"Exclude all transactions."` + + // ExcludeBlockHeaders specifies that the indexer will not receive block headers, + // although it will still receive StartBlock and Commit callbacks, just without + // the header data. + ExcludeBlockHeaders bool `mapstructure:"exclude_block_headers" toml:"exclude_block_headers" json:"exclude_block_headers" comment:"Exclude all block headers."` + + Modules *ModuleFilterConfig `mapstructure:"modules" toml:"modules" json:"modules,omitempty" comment:"Module filter configuration."` +} + +// ModuleFilterConfig specifies the configuration for filtering modules. +type ModuleFilterConfig struct { + // Include specifies a list of modules whose state the indexer will + // receive state updates for. + // Only one of include or exclude modules should be specified. + Include []string `mapstructure:"include" toml:"include" json:"include" comment:"List of modules to include. Only one of include or exclude should be specified."` + + // Exclude specifies a list of modules whose state the indexer will not + // receive state updates for. + // Only one of include or exclude modules should be specified. + Exclude []string `mapstructure:"exclude" toml:"exclude" json:"exclude" comment:"List of modules to exclude. Only one of include or exclude should be specified."` +} diff --git a/schema/indexer/indexer.go b/schema/indexer/indexer.go index 3b82e3254e5a..6954dee082c7 100644 --- a/schema/indexer/indexer.go +++ b/schema/indexer/indexer.go @@ -9,44 +9,13 @@ import ( "cosmossdk.io/schema/view" ) -// Config species the configuration passed to an indexer initialization function. -// It includes both common configuration options related to include or excluding -// parts of the data stream as well as indexer specific options under the config -// subsection. -// -// NOTE: it is an error for an indexer to change its common options, such as adding -// or removing indexed modules, after the indexer has been initialized because this -// could result in an inconsistent state. -type Config struct { - // Type is the name of the indexer type as registered with Register. - Type string `json:"type"` +// Initializer describes an indexer initialization function and other metadata. +type Initializer struct { + // InitFunc is the function that initializes the indexer. + InitFunc InitFunc - // Config are the indexer specific config options specified by the user. - Config map[string]interface{} `json:"config"` - - // ExcludeState specifies that the indexer will not receive state updates. - ExcludeState bool `json:"exclude_state"` - - // ExcludeEvents specifies that the indexer will not receive events. - ExcludeEvents bool `json:"exclude_events"` - - // ExcludeTxs specifies that the indexer will not receive transaction's. - ExcludeTxs bool `json:"exclude_txs"` - - // ExcludeBlockHeaders specifies that the indexer will not receive block headers, - // although it will still receive StartBlock and Commit callbacks, just without - // the header data. - ExcludeBlockHeaders bool `json:"exclude_block_headers"` - - // IncludeModules specifies a list of modules whose state the indexer will - // receive state updates for. - // Only one of include or exclude modules should be specified. - IncludeModules []string `json:"include_modules"` - - // ExcludeModules specifies a list of modules whose state the indexer will not - // receive state updates for. - // Only one of include or exclude modules should be specified. - ExcludeModules []string `json:"exclude_modules"` + // ConfigType is the type of the configuration object that the indexer expects. + ConfigType interface{} } type InitFunc = func(InitParams) (InitResult, error) diff --git a/schema/indexer/manager.go b/schema/indexer/manager.go deleted file mode 100644 index 60c19b4dd5c7..000000000000 --- a/schema/indexer/manager.go +++ /dev/null @@ -1,50 +0,0 @@ -package indexer - -import ( - "context" - - "cosmossdk.io/schema/addressutil" - "cosmossdk.io/schema/appdata" - "cosmossdk.io/schema/decoding" - "cosmossdk.io/schema/logutil" -) - -// ManagerOptions are the options for starting the indexer manager. -type ManagerOptions struct { - // Config is the user configuration for all indexing. It should generally be an instance of map[string]interface{} - // and match the json structure of ManagerConfig. The manager will attempt to convert it to ManagerConfig. - Config interface{} - - // Resolver is the decoder resolver that will be used to decode the data. It is required. - Resolver decoding.DecoderResolver - - // SyncSource is a representation of the current state of key-value data to be used in a catch-up sync. - // Catch-up syncs will be performed at initialization when necessary. SyncSource is optional but if - // it is omitted, indexers will only be able to start indexing state from genesis. - SyncSource decoding.SyncSource - - // Logger is the logger that indexers can use to write logs. It is optional. - Logger logutil.Logger - - // Context is the context that indexers should use for shutdown signals via Context.Done(). It can also - // be used to pass down other parameters to indexers if necessary. If it is omitted, context.Background - // will be used. - Context context.Context - - // AddressCodec is the address codec that indexers can use to encode and decode addresses. It should always be - // provided, but if it is omitted, the indexer manager will use a default codec which encodes and decodes addresses - // as hex strings. - AddressCodec addressutil.AddressCodec -} - -// ManagerConfig is the configuration of the indexer manager and contains the configuration for each indexer target. -type ManagerConfig struct { - // Target is a map of named indexer targets to their configuration. - Target map[string]Config -} - -// StartManager starts the indexer manager with the given options. The state machine should write all relevant app data to -// the returned listener. -func StartManager(opts ManagerOptions) (appdata.Listener, error) { - panic("TODO: this will be implemented in a follow-up PR, this function is just a stub to demonstrate the API") -} diff --git a/schema/indexer/registry.go b/schema/indexer/registry.go index 445f56876add..0345ed6ad7ef 100644 --- a/schema/indexer/registry.go +++ b/schema/indexer/registry.go @@ -3,12 +3,16 @@ package indexer import "fmt" // Register registers an indexer type with the given initialization function. -func Register(indexerType string, initFunc InitFunc) { +func Register(indexerType string, descriptor Initializer) { if _, ok := indexerRegistry[indexerType]; ok { panic(fmt.Sprintf("indexer %s already registered", indexerType)) } - indexerRegistry[indexerType] = initFunc + if descriptor.InitFunc == nil { + panic(fmt.Sprintf("indexer %s has no initialization function", indexerType)) + } + + indexerRegistry[indexerType] = descriptor } -var indexerRegistry = map[string]InitFunc{} +var indexerRegistry = map[string]Initializer{} diff --git a/schema/indexer/registry_test.go b/schema/indexer/registry_test.go index b9f46910c8fd..0cd26b9629d5 100644 --- a/schema/indexer/registry_test.go +++ b/schema/indexer/registry_test.go @@ -3,15 +3,17 @@ package indexer import "testing" func TestRegister(t *testing.T) { - Register("test", func(params InitParams) (InitResult, error) { - return InitResult{}, nil + Register("test", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + return InitResult{}, nil + }, }) - if indexerRegistry["test"] == nil { + if _, ok := indexerRegistry["test"]; !ok { t.Fatalf("expected to find indexer") } - if indexerRegistry["test2"] != nil { + if _, ok := indexerRegistry["test2"]; ok { t.Fatalf("expected not to find indexer") } @@ -20,7 +22,9 @@ func TestRegister(t *testing.T) { t.Fatalf("expected to panic") } }() - Register("test", func(params InitParams) (InitResult, error) { - return InitResult{}, nil + Register("test", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + return InitResult{}, nil + }, }) } diff --git a/schema/indexer/start.go b/schema/indexer/start.go new file mode 100644 index 000000000000..48ab3f7fc7e0 --- /dev/null +++ b/schema/indexer/start.go @@ -0,0 +1,212 @@ +package indexer + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sync" + + "cosmossdk.io/schema/addressutil" + "cosmossdk.io/schema/appdata" + "cosmossdk.io/schema/decoding" + "cosmossdk.io/schema/logutil" + "cosmossdk.io/schema/view" +) + +// IndexingOptions are the options for starting the indexer manager. +type IndexingOptions struct { + // Config is the user configuration for all indexing. It should generally be an instance map[string]interface{} + // or json.RawMessage and match the json structure of IndexingConfig, or it can be an instance of IndexingConfig. + // The manager will attempt to convert it to IndexingConfig. + Config interface{} + + // Resolver is the decoder resolver that will be used to decode the data. It is required. + Resolver decoding.DecoderResolver + + // SyncSource is a representation of the current state of key-value data to be used in a catch-up sync. + // Catch-up syncs will be performed at initialization when necessary. SyncSource is optional but if + // it is omitted, indexers will only be able to start indexing state from genesis. + SyncSource decoding.SyncSource + + // Logger is the logger that indexers can use to write logs. It is optional. + Logger logutil.Logger + + // Context is the context that indexers should use for shutdown signals via Context.Done(). It can also + // be used to pass down other parameters to indexers if necessary. If it is omitted, context.Background + // will be used. + Context context.Context + + // AddressCodec is the address codec that indexers can use to encode and decode addresses. It should always be + // provided, but if it is omitted, the indexer manager will use a default codec which encodes and decodes addresses + // as hex strings. + AddressCodec addressutil.AddressCodec + + // DoneWaitGroup is a wait group that all indexer manager go routines will wait on before returning when the context + // is done. + // It is optional. + DoneWaitGroup *sync.WaitGroup +} + +// IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target. +type IndexingConfig struct { + // Target is a map of named indexer targets to their configuration. + Target map[string]Config + + // ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines. + // It defaults to 1024. + ChannelBufferSize *int `json:"channel_buffer_size,omitempty"` +} + +// IndexingTarget returns the indexing target listener and associated data. +// The returned listener is the root listener to which app data should be sent. +type IndexingTarget struct { + // Listener is the root listener to which app data should be sent. + // It will do all processing in the background so updates should be sent synchronously. + Listener appdata.Listener + + // ModuleFilter returns the root module filter which an app can use to exclude modules at the storage level, + // if such a filter is set. + ModuleFilter *ModuleFilterConfig + + IndexerInfos map[string]IndexerInfo +} + +// IndexerInfo contains data returned by a specific indexer after initialization that maybe useful for the app. +type IndexerInfo struct { + // View is the view returned by the indexer in its InitResult. It is optional and may be nil. + View view.AppData +} + +// StartIndexing starts the indexer manager with the given options. The state machine should write all relevant app data to +// the returned listener. +func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { + logger := opts.Logger + if logger == nil { + logger = logutil.NoopLogger{} + } + + logger.Info("Starting indexing") + + cfg, err := unmarshalIndexingConfig(opts.Config) + if err != nil { + return IndexingTarget{}, err + } + + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + + listeners := make([]appdata.Listener, 0, len(cfg.Target)) + indexerInfos := make(map[string]IndexerInfo, len(cfg.Target)) + + for targetName, targetCfg := range cfg.Target { + init, ok := indexerRegistry[targetCfg.Type] + if !ok { + return IndexingTarget{}, fmt.Errorf("indexer type %q not found", targetCfg.Type) + } + + logger.Info("Starting indexer", "target_name", targetName, "type", targetCfg.Type) + + if targetCfg.Filter != nil { + return IndexingTarget{}, fmt.Errorf("indexer filter options are not supported yet") + } + + childLogger := logger + if scopeableLogger, ok := logger.(logutil.ScopeableLogger); ok { + childLogger = scopeableLogger.WithContext("indexer", targetName).(logutil.Logger) + } + + targetCfg.Config, err = unmarshalIndexerCustomConfig(targetCfg.Config, init.ConfigType) + if err != nil { + return IndexingTarget{}, fmt.Errorf("failed to unmarshal indexer config for target %q: %v", targetName, err) + } + + initRes, err := init.InitFunc(InitParams{ + Config: targetCfg, + Context: ctx, + Logger: childLogger, + AddressCodec: opts.AddressCodec, + }) + if err != nil { + return IndexingTarget{}, err + } + + listener := initRes.Listener + listeners = append(listeners, listener) + + indexerInfos[targetName] = IndexerInfo{ + View: initRes.View, + } + } + + bufSize := 1024 + if cfg.ChannelBufferSize != nil { + bufSize = *cfg.ChannelBufferSize + } + asyncOpts := appdata.AsyncListenerOptions{ + Context: ctx, + DoneWaitGroup: opts.DoneWaitGroup, + BufferSize: bufSize, + } + + rootListener := appdata.AsyncListenerMux( + asyncOpts, + listeners..., + ) + + rootListener, err = decoding.Middleware(rootListener, opts.Resolver, decoding.MiddlewareOptions{}) + if err != nil { + return IndexingTarget{}, err + } + rootListener = appdata.AsyncListener(asyncOpts, rootListener) + + return IndexingTarget{ + Listener: rootListener, + IndexerInfos: indexerInfos, + }, nil +} + +func unmarshalIndexingConfig(cfg interface{}) (*IndexingConfig, error) { + if x, ok := cfg.(*IndexingConfig); ok { + return x, nil + } + if x, ok := cfg.(IndexingConfig); ok { + return &x, nil + } + + var jsonBz []byte + var err error + + switch cfg := cfg.(type) { + case map[string]interface{}: + jsonBz, err = json.Marshal(cfg) + if err != nil { + return nil, err + } + case json.RawMessage: + jsonBz = cfg + default: + return nil, fmt.Errorf("can't convert %T to %T", cfg, IndexingConfig{}) + } + + var res IndexingConfig + err = json.Unmarshal(jsonBz, &res) + return &res, err +} + +func unmarshalIndexerCustomConfig(cfg interface{}, expectedType interface{}) (interface{}, error) { + typ := reflect.TypeOf(expectedType) + if reflect.TypeOf(cfg).AssignableTo(typ) { + return cfg, nil + } + + res := reflect.New(typ).Interface() + bz, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + err = json.Unmarshal(bz, res) + return reflect.ValueOf(res).Elem().Interface(), err +} diff --git a/schema/indexer/start_test.go b/schema/indexer/start_test.go new file mode 100644 index 000000000000..e7efc9f62322 --- /dev/null +++ b/schema/indexer/start_test.go @@ -0,0 +1,219 @@ +package indexer + +import ( + "context" + "encoding/json" + "reflect" + "sync" + "testing" + + "cosmossdk.io/schema/appdata" +) + +func TestStart(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + var test1CommitCalled, test2CommitCalled int + Register("test1", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + if params.Config.Config.(testConfig).SomeParam != "foobar" { + t.Fatalf("expected %q, got %q", "foobar", params.Config.Config.(testConfig).SomeParam) + } + return InitResult{ + Listener: appdata.Listener{ + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { + test1CommitCalled++ + return nil, nil + }, + }, + }, nil + }, + ConfigType: testConfig{}, + }) + Register("test2", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + if params.Config.Config.(testConfig2).Foo != "bar" { + t.Fatalf("expected %q, got %q", "bar", params.Config.Config.(testConfig2).Foo) + } + return InitResult{ + Listener: appdata.Listener{ + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { + test2CommitCalled++ + return nil, nil + }, + }, + }, nil + }, + ConfigType: testConfig2{}, + }) + + var wg sync.WaitGroup + target, err := StartIndexing(IndexingOptions{ + Config: IndexingConfig{Target: map[string]Config{ + "t1": {Type: "test1", Config: testConfig{SomeParam: "foobar"}}, + "t2": {Type: "test2", Config: testConfig2{Foo: "bar"}}, + }}, + Resolver: nil, + SyncSource: nil, + Logger: nil, + Context: ctx, + AddressCodec: nil, + DoneWaitGroup: &wg, + }) + if err != nil { + t.Fatal(err) + } + + const COMMIT_COUNT = 10 + for i := 0; i < COMMIT_COUNT; i++ { + callCommit(t, target.Listener) + } + + cancelFn() + wg.Wait() + + if test1CommitCalled != COMMIT_COUNT { + t.Fatalf("expected %d, got %d", COMMIT_COUNT, test1CommitCalled) + } + if test2CommitCalled != COMMIT_COUNT { + t.Fatalf("expected %d, got %d", COMMIT_COUNT, test2CommitCalled) + } +} + +func callCommit(t *testing.T, listener appdata.Listener) { + cb, err := listener.Commit(appdata.CommitData{}) + if err != nil { + t.Fatal(err) + } + if cb != nil { + err = cb() + if err != nil { + t.Fatal(err) + } + } +} + +func TestUnmarshalIndexingConfig(t *testing.T) { + cfg := &IndexingConfig{Target: map[string]Config{"target": {Type: "type"}}} + jsonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + + t.Run("json", func(t *testing.T) { + res, err := unmarshalIndexingConfig(json.RawMessage(jsonBz)) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("map", func(t *testing.T) { + var m map[string]interface{} + err := json.Unmarshal(jsonBz, &m) + if err != nil { + t.Fatal(err) + } + + res, err := unmarshalIndexingConfig(m) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("ptr", func(t *testing.T) { + res, err := unmarshalIndexingConfig(cfg) + if err != nil { + t.Fatal(err) + } + if res != cfg { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("struct", func(t *testing.T) { + res, err := unmarshalIndexingConfig(*cfg) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) +} + +func TestUnmarshalIndexerConfig(t *testing.T) { + t.Run("struct", func(t *testing.T) { + cfg := testConfig{SomeParam: "foobar"} + cfg2, err := unmarshalIndexerCustomConfig(cfg, testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + + t.Run("ptr", func(t *testing.T) { + cfg := &testConfig{SomeParam: "foobar"} + cfg2, err := unmarshalIndexerCustomConfig(cfg, &testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + + t.Run("map -> struct", func(t *testing.T) { + cfg := testConfig{SomeParam: "foobar"} + jzonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + var m map[string]interface{} + err = json.Unmarshal(jzonBz, &m) + if err != nil { + t.Fatal(err) + } + cfg2, err := unmarshalIndexerCustomConfig(m, testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + + t.Run("map -> ptr", func(t *testing.T) { + cfg := &testConfig{SomeParam: "foobar"} + jzonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + var m map[string]interface{} + err = json.Unmarshal(jzonBz, &m) + if err != nil { + t.Fatal(err) + } + cfg2, err := unmarshalIndexerCustomConfig(m, &testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) +} + +type testConfig struct { + SomeParam string `json:"some_param"` +} + +type testConfig2 struct { + Foo string `json:"foo"` +} diff --git a/schema/logutil/logger.go b/schema/logutil/logger.go index cb6b34ebfd2b..a93b91567df2 100644 --- a/schema/logutil/logger.go +++ b/schema/logutil/logger.go @@ -21,6 +21,13 @@ type Logger interface { Debug(msg string, keyVals ...interface{}) } +// ScopeableLogger is a logger that can be scoped with key/value pairs. +// It is implemented by all the loggers in cosmossdk.io/log. +type ScopeableLogger interface { + // WithContext returns a new logger with the provided key/value pairs set. + WithContext(keyVals ...interface{}) interface{} +} + // NoopLogger is a logger that doesn't do anything. type NoopLogger struct{}