Skip to content

Commit

Permalink
feat: directly subscribe logs from indexer (#34)
Browse files Browse the repository at this point in the history
* change filter system to directly subscribe from indexer

* use mutex and pass logs in bulk
  • Loading branch information
beer-1 authored Jul 12, 2024
1 parent 4d2f2a2 commit 4488b5b
Show file tree
Hide file tree
Showing 27 changed files with 775 additions and 2,812 deletions.
48 changes: 48 additions & 0 deletions app/ante/account_number.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ante

import (
storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
cosmosante "github.com/cosmos/cosmos-sdk/x/auth/ante"
authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper"
)

// AccountNumberDecorator is a custom ante handler that increments the account number depending on
// the execution mode (Simulate, CheckTx, Finalize).
//
// This is to avoid account number conflicts when running concurrent Simulate, CheckTx, and Finalize.
type AccountNumberDecorator struct {
ak cosmosante.AccountKeeper
}

// NewAccountNumberDecorator creates a new instance of AccountNumberDecorator.
func NewAccountNumberDecorator(ak cosmosante.AccountKeeper) AccountNumberDecorator {
return AccountNumberDecorator{ak}
}

// AnteHandle is the AnteHandler implementation for AccountNumberDecorator.
func (and AccountNumberDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (sdk.Context, error) {
if !ctx.IsCheckTx() && !ctx.IsReCheckTx() && !simulate {
return next(ctx, tx, simulate)
}

ak := and.ak.(*authkeeper.AccountKeeper)

gasFreeCtx := ctx.WithGasMeter(storetypes.NewInfiniteGasMeter())
num, err := ak.AccountNumber.Peek(gasFreeCtx)
if err != nil {
return ctx, err
}

accountNumAddition := uint64(1_000_000)
if simulate {
accountNumAddition += 1_000_000
}

if err := ak.AccountNumber.Set(gasFreeCtx, num+accountNumAddition); err != nil {
return ctx, err
}

return next(ctx, tx, simulate)
}
1 change: 1 addition & 0 deletions app/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) {
}

anteDecorators := []sdk.AnteDecorator{
NewAccountNumberDecorator(options.AccountKeeper),
ante.NewSetUpContextDecorator(), // outermost AnteDecorator. SetUpContext must be called first
ante.NewExtensionOptionsDecorator(options.ExtensionOptionChecker),
ante.NewValidateBasicDecorator(),
Expand Down
67 changes: 35 additions & 32 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type MinitiaApp struct {
func NewMinitiaApp(
logger log.Logger,
db dbm.DB,
indexerDB dbm.DB,
traceStore io.Writer,
loadLatest bool,
evmConfig evmconfig.EVMConfig,
Expand Down Expand Up @@ -767,7 +768,7 @@ func NewMinitiaApp(
marketmap.NewAppModule(appCodec, app.MarketMapKeeper),
)

if err := app.setupIndexer(appOpts, homePath, ac, vc, appCodec); err != nil {
if err := app.setupIndexer(indexerDB, appOpts, homePath, ac, vc, appCodec); err != nil {
panic(err)
}

Expand Down Expand Up @@ -909,7 +910,8 @@ func NewMinitiaApp(
panic(err)
}

app.SetMempool(mempool)
// wrap mempool to receive pending txs from the indexer
app.SetMempool(app.evmIndexer.MempoolWrapper(mempool))
anteHandler := app.setAnteHandler(mevLane, freeLane)

// NOTE seems this optional, to reduce mempool logic cost
Expand Down Expand Up @@ -1267,11 +1269,11 @@ func VerifyAddressLen() func(addr []byte) error {
}
}

func (app *MinitiaApp) setupIndexer(appOpts servertypes.AppOptions, homePath string, ac, vc address.Codec, appCodec codec.Codec) error {
func (app *MinitiaApp) setupIndexer(indexerDB dbm.DB, appOpts servertypes.AppOptions, homePath string, ac, vc address.Codec, appCodec codec.Codec) error {
// initialize the indexer fake-keeper
indexerConfig, err := indexerconfig.NewConfig(appOpts)
if err != nil {
panic(err)
return err
}
app.indexerKeeper = indexerkeeper.NewKeeper(
appCodec,
Expand All @@ -1284,65 +1286,66 @@ func (app *MinitiaApp) setupIndexer(appOpts servertypes.AppOptions, homePath str

smBlock, err := blocksubmodule.NewBlockSubmodule(appCodec, app.indexerKeeper, app.OPChildKeeper)
if err != nil {
panic(err)
return err
}
smTx, err := tx.NewTxSubmodule(appCodec, app.indexerKeeper)
if err != nil {
panic(err)
return err
}
smPair, err := pair.NewPairSubmodule(appCodec, app.indexerKeeper, app.IBCKeeper.ChannelKeeper, app.TransferKeeper)
if err != nil {
panic(err)
return err
}
/*
smNft, err := nft.NewMoveNftSubmodule(ac, appCodec, app.indexerKeeper, app.EvmKeeper, smPair)
if err != nil {
panic(err)
}
err = app.indexerKeeper.RegisterSubmodules(smBlock, smTx, smPair, smNft)
*/

err = app.indexerKeeper.RegisterSubmodules(smBlock, smTx, smPair)
if err != nil {
panic(err)
return err
}

app.indexerModule = indexermodule.NewAppModuleBasic(app.indexerKeeper)

// Add your implementation here

indexer, err := indexer.NewIndexer(app.GetBaseApp().Logger(), app.indexerKeeper)
if err != nil || indexer == nil {
if err != nil {
return nil
}

if err = indexer.Validate(); err != nil {
return err
}
liseners := []storetypes.ABCIListener{}
if indexer != nil {
if err = indexer.Validate(); err != nil {
return err
}

if err = indexer.Prepare(nil); err != nil {
return err
}
if err = indexer.Prepare(nil); err != nil {
return err
}

if err = app.indexerKeeper.Seal(); err != nil {
return err
}
if err = app.indexerKeeper.Seal(); err != nil {
return err
}

if err = indexer.Start(nil); err != nil {
return err
if err = indexer.Start(nil); err != nil {
return err
}

liseners = append(liseners, indexer)
}

// add evm indexer
evmIndexer, err := evmindexer.NewEVMIndexer(appOpts, appCodec, app.Logger(), app.txConfig, app.EVMKeeper)
evmIndexer, err := evmindexer.NewEVMIndexer(indexerDB, appCodec, app.Logger(), app.txConfig, app.EVMKeeper)
if err != nil {
return err
}

// register evm indexer to app
app.evmIndexer = evmIndexer
liseners = append(liseners, evmIndexer)

streamingManager := storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{indexer, evmIndexer},
app.SetStreamingManager(storetypes.StreamingManager{
ABCIListeners: liseners,
StopNodeOnErr: true,
}
app.SetStreamingManager(streamingManager)
})

return nil
}
Expand Down
10 changes: 8 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestInitGenesisOnMigration(t *testing.T) {
db := dbm.NewMemDB()
logger := log.NewLogger(os.Stdout)
app := NewMinitiaApp(
logger, db, nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
logger, db, dbm.NewMemDB(),
nil, true, evmconfig.DefaultEVMConfig(),
EmptyAppOptions{},
)
ctx := app.NewContextLegacy(true, cmtproto.Header{Height: app.LastBlockHeight()})

// Create a mock module. This module will serve as the new module we're
Expand Down Expand Up @@ -121,7 +124,10 @@ func TestGetKey(t *testing.T) {
db := dbm.NewMemDB()
app := NewMinitiaApp(
log.NewLogger(os.Stdout),
db, nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
db, dbm.NewMemDB(), nil, true,
evmconfig.DefaultEVMConfig(),
EmptyAppOptions{},
)

require.NotEmpty(t, app.GetKey(banktypes.StoreKey))
require.NotEmpty(t, app.GetMemKey(capabilitytypes.MemStoreKey))
Expand Down
6 changes: 3 additions & 3 deletions app/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// MakeEncodingConfig creates an EncodingConfig for testing
func MakeEncodingConfig() params.EncodingConfig {
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
encodingConfig := params.EncodingConfig{
InterfaceRegistry: tempApp.InterfaceRegistry(),
Codec: tempApp.AppCodec(),
Expand All @@ -30,7 +30,7 @@ func MakeEncodingConfig() params.EncodingConfig {
}

func AutoCliOpts() autocli.AppOptions {
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
modules := make(map[string]appmodule.AppModule, 0)
for _, m := range tempApp.ModuleManager.Modules {
if moduleWithName, ok := m.(module.HasName); ok {
Expand All @@ -51,7 +51,7 @@ func AutoCliOpts() autocli.AppOptions {
}

func BasicManager() module.BasicManager {
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
tempApp := NewMinitiaApp(log.NewNopLogger(), dbm.NewMemDB(), dbm.NewMemDB(), nil, true, evmconfig.DefaultEVMConfig(), EmptyAppOptions{})
return tempApp.BasicModuleManager
}

Expand Down
1 change: 1 addition & 0 deletions app/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setup(db *dbm.DB, withGenesis bool) (*MinitiaApp, GenesisState) {
app := NewMinitiaApp(
log.NewNopLogger(),
getOrCreateMemDB(db),
dbm.NewMemDB(),
nil,
true,
evmconfig.DefaultEVMConfig(),
Expand Down
32 changes: 29 additions & 3 deletions cmd/minitiad/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"io"
"os"
"path"
"path/filepath"

tmcli "github.com/cometbft/cometbft/libs/cli"
"golang.org/x/sync/errgroup"

"github.com/spf13/cast"
"github.com/spf13/cobra"
"github.com/spf13/viper"

Expand Down Expand Up @@ -268,8 +270,15 @@ func (a *appCreator) AppCreator() servertypes.AppCreator {
return func(logger log.Logger, db dbm.DB, traceStore io.Writer, appOpts servertypes.AppOptions) servertypes.Application {
baseappOptions := server.DefaultBaseappOptions(appOpts)

// create EVM indexer db
dbDir, dbBackend := getDBConfig(appOpts)
indexerDB, err := dbm.NewDB("eth_index", dbBackend, dbDir)
if err != nil {
panic(err)
}

app := minitiaapp.NewMinitiaApp(
logger, db, traceStore, true,
logger, db, indexerDB, traceStore, true,
evmconfig.GetConfig(appOpts),
appOpts,
baseappOptions...,
Expand Down Expand Up @@ -305,13 +314,13 @@ func (a appCreator) appExport(

var initiaApp *minitiaapp.MinitiaApp
if height != -1 {
initiaApp = minitiaapp.NewMinitiaApp(logger, db, traceStore, false, evmconfig.DefaultEVMConfig(), appOpts)
initiaApp = minitiaapp.NewMinitiaApp(logger, db, dbm.NewMemDB(), traceStore, false, evmconfig.DefaultEVMConfig(), appOpts)

if err := initiaApp.LoadHeight(height); err != nil {
return servertypes.ExportedApp{}, err
}
} else {
initiaApp = minitiaapp.NewMinitiaApp(logger, db, traceStore, true, evmconfig.DefaultEVMConfig(), appOpts)
initiaApp = minitiaapp.NewMinitiaApp(logger, db, dbm.NewMemDB(), traceStore, true, evmconfig.DefaultEVMConfig(), appOpts)
}

return initiaApp.ExportAppStateAndValidators(forZeroHeight, jailAllowedAddrs, modulesToExport)
Expand Down Expand Up @@ -360,3 +369,20 @@ func readEnv(clientCtx client.Context) (client.Context, error) {

return clientCtx, nil
}

// getDBConfig returns the database configuration for the EVM indexer
func getDBConfig(appOpts servertypes.AppOptions) (string, dbm.BackendType) {
rootDir := cast.ToString(appOpts.Get("home"))
dbDir := cast.ToString(appOpts.Get("db_dir"))
dbBackend := server.GetAppDBBackend(appOpts)

return rootify(dbDir, rootDir), dbBackend
}

// helper function to make config creation independent of root dir
func rootify(path, root string) string {
if filepath.IsAbs(path) {
return path
}
return filepath.Join(root, path)
}
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/hashicorp/go-metrics v0.5.3
github.com/holiman/uint256 v1.2.4
github.com/initia-labs/OPinit v0.3.2
github.com/initia-labs/initia v0.3.3
github.com/initia-labs/initia v0.3.4
github.com/initia-labs/kvindexer v0.1.3
github.com/initia-labs/kvindexer/submodules/block v0.1.0
github.com/initia-labs/kvindexer/submodules/pair v0.1.1
Expand Down Expand Up @@ -86,6 +86,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.5 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/celestiaorg/go-square v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down Expand Up @@ -173,7 +174,8 @@ require (
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/improbable-eng/grpc-web v0.15.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/initia-labs/movevm v0.3.3 // indirect
github.com/initia-labs/OPinit/api v0.3.0 // indirect
github.com/initia-labs/movevm v0.3.4 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
Expand Down Expand Up @@ -282,8 +284,7 @@ replace (

// initia custom
replace (
github.com/cometbft/cometbft => github.com/initia-labs/cometbft v0.0.0-20240704071957-c46468756c01 //v0.0.0-20240621094738-408dc5262680
github.com/cosmos/cosmos-sdk => github.com/initia-labs/cosmos-sdk v0.0.0-20240627065534-d2180fcfd501
github.com/cometbft/cometbft => github.com/initia-labs/cometbft v0.0.0-20240704071917-6c77a401128c
github.com/cosmos/ibc-go/v8 => github.com/initia-labs/ibc-go/v8 v8.0.0-20240419124350-4275a05abe2c
github.com/ethereum/go-ethereum => github.com/initia-labs/evm v0.0.0-20240620024053-f13ebda716b7
)
Loading

0 comments on commit 4488b5b

Please sign in to comment.