Skip to content

Commit

Permalink
feat(schema/indexer)!: implement start indexing (#21636)
Browse files Browse the repository at this point in the history
Co-authored-by: Marko <[email protected]>
  • Loading branch information
aaronc and tac0turtle authored Sep 23, 2024
1 parent f3a295a commit d273ae0
Show file tree
Hide file tree
Showing 17 changed files with 570 additions and 170 deletions.
34 changes: 12 additions & 22 deletions indexer/postgres/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/schema/logutil"
Expand All @@ -21,8 +21,6 @@ type Config struct {
DisableRetainDeletions bool `json:"disable_retain_deletions"`
}

type SqlLogger = func(msg, sql string, params ...interface{})

type indexerImpl struct {
ctx context.Context
db *sql.DB
Expand All @@ -32,10 +30,17 @@ type indexerImpl struct {
logger logutil.Logger
}

func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
config, err := decodeConfig(params.Config.Config)
if err != nil {
return indexer.InitResult{}, err
func init() {
indexer.Register("postgres", indexer.Initializer{
InitFunc: startIndexer,
ConfigType: Config{},
})
}

func startIndexer(params indexer.InitParams) (indexer.InitResult, error) {
config, ok := params.Config.Config.(Config)
if !ok {
return indexer.InitResult{}, fmt.Errorf("invalid config type, expected %T got %T", Config{}, params.Config.Config)
}

ctx := params.Context
Expand Down Expand Up @@ -89,18 +94,3 @@ func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
View: idx,
}, nil
}

func decodeConfig(rawConfig map[string]interface{}) (*Config, error) {
bz, err := json.Marshal(rawConfig)
if err != nil {
return nil, err
}

var config Config
err = json.Unmarshal(bz, &config)
if err != nil {
return nil, err
}

return &config, nil
}
26 changes: 0 additions & 26 deletions indexer/postgres/tests/config.go

This file was deleted.

23 changes: 13 additions & 10 deletions indexer/postgres/tests/init_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st
connectionUrl := createTestDB(t)

buf := &strings.Builder{}

cfg, err := postgresConfigToIndexerConfig(postgres.Config{
DatabaseURL: connectionUrl,
DisableRetainDeletions: disableRetainDeletions,
})
require.NoError(t, err)

res, err := postgres.StartIndexer(indexer.InitParams{
Config: cfg,
res, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexer.IndexingConfig{
Target: map[string]indexer.Config{
"postgres": {
Type: "postgres",
Config: postgres.Config{
DatabaseURL: connectionUrl,
DisableRetainDeletions: disableRetainDeletions,
},
},
},
},
Context: context.Background(),
Logger: &prettyLogger{buf},
Logger: prettyLogger{buf},
})
require.NoError(t, err)
listener := res.Listener
Expand Down
28 changes: 18 additions & 10 deletions indexer/postgres/tests/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,39 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) {
require.NoError(t, err)
})

cfg, err := postgresConfigToIndexerConfig(postgres.Config{
DatabaseURL: dbUrl,
DisableRetainDeletions: !retainDeletions,
})
require.NoError(t, err)

debugLog := &strings.Builder{}

pgIndexer, err := postgres.StartIndexer(indexer.InitParams{
Config: cfg,
res, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexer.IndexingConfig{
Target: map[string]indexer.Config{
"postgres": {
Type: "postgres",
Config: postgres.Config{
DatabaseURL: dbUrl,
DisableRetainDeletions: !retainDeletions,
},
},
},
},
Context: ctx,
Logger: &prettyLogger{debugLog},
AddressCodec: addressutil.HexAddressCodec{},
})
require.NoError(t, err)
require.NoError(t, err)

sim, err := appdatasim.NewSimulator(appdatasim.Options{
Listener: pgIndexer.Listener,
Listener: res.Listener,
AppSchema: indexertesting.ExampleAppSchema,
StateSimOptions: statesim.Options{
CanRetainDeletions: retainDeletions,
},
})
require.NoError(t, err)

pgIndexerView := res.IndexerInfos["postgres"].View
require.NotNil(t, pgIndexerView)

blockDataGen := sim.BlockDataGenN(10, 100)
numBlocks := 200
if testing.Short() {
Expand All @@ -93,7 +101,7 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) {
require.NoError(t, sim.ProcessBlockData(blockData), debugLog.String())

// compare the expected state in the simulator to the actual state in the indexer and expect the diff to be empty
require.Empty(t, appdatasim.DiffAppData(sim, pgIndexer.View), debugLog.String())
require.Empty(t, appdatasim.DiffAppData(sim, pgIndexerView), debugLog.String())

// reset the debug log after each successful block so that it doesn't get too long when debugging
debugLog.Reset()
Expand Down
4 changes: 4 additions & 0 deletions indexer/postgres/tests/testdata/init_schema.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
INFO: Starting indexing
INFO: Starting indexer
target_name: postgres
type: postgres
DEBUG: Creating enum type
sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
DEBUG: Creating enum type
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
INFO: Starting indexing
INFO: Starting indexer
target_name: postgres
type: postgres
DEBUG: Creating enum type
sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
DEBUG: Creating enum type
Expand Down
6 changes: 3 additions & 3 deletions schema/decoding/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type DecoderResolver interface {
// EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate.
EncodeModuleName(string) ([]byte, error)

// IterateAll iterates over all available module decoders.
IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error
// AllDecoders iterates over all available module decoders.
AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error) error

// LookupDecoder looks up a specific module decoder.
LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error)
Expand Down Expand Up @@ -48,7 +48,7 @@ func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) {
return nil, fmt.Errorf("module %s not found", s)
}

func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error {
func (a moduleSetDecoderResolver) AllDecoders(f func(string, schema.ModuleCodec) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
Expand Down
4 changes: 2 additions & 2 deletions schema/decoding/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var testResolver = ModuleSetDecoderResolver(moduleSet)

func TestModuleSetDecoderResolver_IterateAll(t *testing.T) {
objectTypes := map[string]bool{}
err := testResolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
err := testResolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
cdc.Schema.AllTypes(func(t schema.Type) bool {
objTyp, ok := t.(schema.StateObjectType)
if ok {
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) {
resolver := ModuleSetDecoderResolver(map[string]interface{}{
"modD": modD{},
})
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
err := resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
if moduleName == "modD" {
t.Fatalf("expected error")
}
Expand Down
2 changes: 1 addition & 1 deletion schema/decoding/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Sync(listener appdata.Listener, source SyncSource, resolver DecoderResolver
return nil
}

return resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
return resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) {
// ignore this module
return nil
Expand Down
52 changes: 52 additions & 0 deletions schema/indexer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package indexer

// Config species the configuration passed to an indexer initialization function.
// It includes both common configuration options related to include or excluding
// parts of the data stream as well as indexer specific options under the config
// subsection.
//
// NOTE: it is an error for an indexer to change its common options, such as adding
// or removing indexed modules, after the indexer has been initialized because this
// could result in an inconsistent state.
type Config struct {
// Type is the name of the indexer type as registered with Register.
Type string `mapstructure:"type" toml:"type" json:"type" comment:"The name of the registered indexer type."`

// Config are the indexer specific config options specified by the user.
Config interface{} `mapstructure:"config" toml:"config" json:"config,omitempty" comment:"Indexer specific configuration options."`

// Filter is the filter configuration for the indexer.
Filter *FilterConfig `mapstructure:"filter" toml:"filter" json:"filter,omitempty" comment:"Filter configuration for the indexer. Currently UNSUPPORTED!"`
}

// FilterConfig specifies the configuration for filtering the data stream
type FilterConfig struct {
// ExcludeState specifies that the indexer will not receive state updates.
ExcludeState bool `mapstructure:"exclude_state" toml:"exclude_state" json:"exclude_state" comment:"Exclude all state updates."`

// ExcludeEvents specifies that the indexer will not receive events.
ExcludeEvents bool `mapstructure:"exclude_events" toml:"exclude_events" json:"exclude_events" comment:"Exclude all events."`

// ExcludeTxs specifies that the indexer will not receive transaction's.
ExcludeTxs bool `mapstructure:"exclude_txs" toml:"exclude_txs" json:"exclude_txs" comment:"Exclude all transactions."`

// ExcludeBlockHeaders specifies that the indexer will not receive block headers,
// although it will still receive StartBlock and Commit callbacks, just without
// the header data.
ExcludeBlockHeaders bool `mapstructure:"exclude_block_headers" toml:"exclude_block_headers" json:"exclude_block_headers" comment:"Exclude all block headers."`

Modules *ModuleFilterConfig `mapstructure:"modules" toml:"modules" json:"modules,omitempty" comment:"Module filter configuration."`
}

// ModuleFilterConfig specifies the configuration for filtering modules.
type ModuleFilterConfig struct {
// Include specifies a list of modules whose state the indexer will
// receive state updates for.
// Only one of include or exclude modules should be specified.
Include []string `mapstructure:"include" toml:"include" json:"include" comment:"List of modules to include. Only one of include or exclude should be specified."`

// Exclude specifies a list of modules whose state the indexer will not
// receive state updates for.
// Only one of include or exclude modules should be specified.
Exclude []string `mapstructure:"exclude" toml:"exclude" json:"exclude" comment:"List of modules to exclude. Only one of include or exclude should be specified."`
}
43 changes: 6 additions & 37 deletions schema/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,13 @@ import (
"cosmossdk.io/schema/view"
)

// Config species the configuration passed to an indexer initialization function.
// It includes both common configuration options related to include or excluding
// parts of the data stream as well as indexer specific options under the config
// subsection.
//
// NOTE: it is an error for an indexer to change its common options, such as adding
// or removing indexed modules, after the indexer has been initialized because this
// could result in an inconsistent state.
type Config struct {
// Type is the name of the indexer type as registered with Register.
Type string `json:"type"`
// Initializer describes an indexer initialization function and other metadata.
type Initializer struct {
// InitFunc is the function that initializes the indexer.
InitFunc InitFunc

// Config are the indexer specific config options specified by the user.
Config map[string]interface{} `json:"config"`

// ExcludeState specifies that the indexer will not receive state updates.
ExcludeState bool `json:"exclude_state"`

// ExcludeEvents specifies that the indexer will not receive events.
ExcludeEvents bool `json:"exclude_events"`

// ExcludeTxs specifies that the indexer will not receive transaction's.
ExcludeTxs bool `json:"exclude_txs"`

// ExcludeBlockHeaders specifies that the indexer will not receive block headers,
// although it will still receive StartBlock and Commit callbacks, just without
// the header data.
ExcludeBlockHeaders bool `json:"exclude_block_headers"`

// IncludeModules specifies a list of modules whose state the indexer will
// receive state updates for.
// Only one of include or exclude modules should be specified.
IncludeModules []string `json:"include_modules"`

// ExcludeModules specifies a list of modules whose state the indexer will not
// receive state updates for.
// Only one of include or exclude modules should be specified.
ExcludeModules []string `json:"exclude_modules"`
// ConfigType is the type of the configuration object that the indexer expects.
ConfigType interface{}
}

type InitFunc = func(InitParams) (InitResult, error)
Expand Down
50 changes: 0 additions & 50 deletions schema/indexer/manager.go

This file was deleted.

Loading

0 comments on commit d273ae0

Please sign in to comment.