Skip to content

Commit

Permalink
fix streamer config
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Nov 28, 2022
1 parent 3bcb198 commit 7d1201a
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 212 deletions.
104 changes: 43 additions & 61 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package app

import (
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"

"github.com/crypto-org-chain/cronos/x/cronos"
"github.com/crypto-org-chain/cronos/x/cronos/middleware"
Expand All @@ -29,7 +28,7 @@ import (
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -150,8 +149,6 @@ const (
//
// NOTE: In the SDK, the default value is 255.
AddrLen = 20

FileStreamerDirectory = "file_streamer"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -351,65 +348,50 @@ func New(
tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey, evmtypes.TransientKey, feemarkettypes.TransientKey)
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey)

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// Only support file streamer right now.
streamers := cast.ToString(appOpts.Get(cronosappclient.FlagStreamers))
if strings.Contains(streamers, "file") {
streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory)
if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil {
panic(err)
}

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, false)
if err != nil {
panic(err)
}
bApp.SetStreamingService(service)

wg := new(sync.WaitGroup)
if err := service.Stream(wg); err != nil {
panic(err)
}
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, logger, keys, homePath); err != nil {
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

if strings.Contains(streamers, "versiondb") {
rootDir := cast.ToString(appOpts.Get(flags.FlagHome))
dataDir := filepath.Join(rootDir, "data", "versiondb")
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
panic(err)
}
backendType := server.GetAppDBBackend(appOpts)
plainDB, err := dbm.NewDB("plain", backendType, dataDir)
if err != nil {
panic(err)
}
historyDB, err := dbm.NewDB("history", backendType, dataDir)
if err != nil {
panic(err)
}
changesetDB, err := dbm.NewDB("changeset", backendType, dataDir)
if err != nil {
panic(err)
}
versionDB := tmdb.NewStore(plainDB, historyDB, changesetDB)

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
streamers := cast.ToStringSlice(appOpts.Get(cronosappclient.FlagStreamers))
for _, streamerName := range streamers {
if streamerName == "versiondb" {
rootDir := cast.ToString(appOpts.Get(flags.FlagHome))
dataDir := filepath.Join(rootDir, "data", "versiondb")
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
panic(err)
}
backendType := server.GetAppDBBackend(appOpts)
plainDB, err := dbm.NewDB("plain", backendType, dataDir)
if err != nil {
panic(err)
}
historyDB, err := dbm.NewDB("history", backendType, dataDir)
if err != nil {
panic(err)
}
changesetDB, err := dbm.NewDB("changeset", backendType, dataDir)
if err != nil {
panic(err)
}
versionDB := tmdb.NewStore(plainDB, historyDB, changesetDB)

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
service := versiondb.NewStreamingService(versionDB, exposeStoreKeys)
bApp.SetStreamingService(service)
qms := versiondb.NewMultiStore(versionDB, exposeStoreKeys)
qms.MountTransientStores(tkeys)
qms.MountMemoryStores(memKeys)
bApp.SetQueryMultiStore(qms)
break
}
service := versiondb.NewStreamingService(versionDB, exposeStoreKeys)
bApp.SetStreamingService(service)
qms := versiondb.NewMultiStore(versionDB, exposeStoreKeys)
qms.MountTransientStores(tkeys)
qms.MountMemoryStores(memKeys)
bApp.SetQueryMultiStore(qms)
}

app := &App{
Expand Down
2 changes: 1 addition & 1 deletion client/flags.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package client

const FlagStreamers = "streamers"
const FlagStreamers = "store.streamers"
4 changes: 1 addition & 3 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
ethermint "github.com/evmos/ethermint/types"

"github.com/crypto-org-chain/cronos/app"
cronosclient "github.com/crypto-org-chain/cronos/client"
// this line is used by starport scaffolding # stargate/root/import
)

Expand Down Expand Up @@ -148,7 +147,6 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
func addModuleInitFlags(startCmd *cobra.Command) {
crisis.AddModuleInitFlags(startCmd)
cronos.AddModuleInitFlags(startCmd)
startCmd.Flags().String(cronosclient.FlagStreamers, "", "Enable streamers, only file streamer is supported right now")
// this line is used by starport scaffolding # stargate/root/initFlags
}

Expand Down Expand Up @@ -269,7 +267,7 @@ func (a appCreator) newApp(logger log.Logger, db dbm.DB, traceStore io.Writer, a
baseapp.SetIndexEvents(cast.ToStringSlice(appOpts.Get(server.FlagIndexEvents))),
baseapp.SetSnapshot(snapshotStore, snapshotOptions),
baseapp.SetIAVLCacheSize(cast.ToInt(appOpts.Get(server.FlagIAVLCacheSize))),
baseapp.SetIAVLDisableFastNode(cast.ToBool(appOpts.Get(server.FlagIAVLFastNode))),
baseapp.SetIAVLDisableFastNode(cast.ToBool(appOpts.Get(server.FlagDisableIAVLFastNode))),
)
}

Expand Down
34 changes: 17 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/RoaringBitmap/roaring v1.2.1
github.com/armon/go-metrics v0.4.1
github.com/cosmos/cosmos-sdk v0.46.3
github.com/cosmos/gogoproto v1.4.3
github.com/cosmos/ibc-go/v5 v5.0.0
github.com/ethereum/go-ethereum v1.10.19
github.com/evmos/ethermint v0.6.1-0.20221003153722-491c3da7ebd7
Expand All @@ -17,14 +18,14 @@ require (
github.com/peggyjv/gravity-bridge/module/v2 v2.0.0-20220420162017-838c0d25e974
github.com/rakyll/statik v0.1.7
github.com/spf13/cast v1.5.0
github.com/spf13/cobra v1.5.0
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/tendermint/tendermint v0.34.22
github.com/tendermint/tendermint v0.34.24
github.com/tendermint/tm-db v0.6.7
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -63,7 +64,7 @@ require (
github.com/cosmos/cosmos-proto v1.0.0-alpha7 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gorocksdb v1.2.0 // indirect
github.com/cosmos/iavl v0.19.3 // indirect
github.com/cosmos/iavl v0.19.4 // indirect
github.com/cosmos/ledger-cosmos-go v0.11.1 // indirect
github.com/cosmos/ledger-go v0.9.2 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
Expand Down Expand Up @@ -95,7 +96,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
Expand Down Expand Up @@ -124,7 +125,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/lib/pq v1.10.6 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
Expand All @@ -140,7 +141,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/natefinch/atomic v1.0.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
Expand All @@ -150,7 +150,7 @@ require (
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/regen-network/cosmos-proto v0.3.1 // indirect
Expand All @@ -176,14 +176,14 @@ require (
github.com/zondax/hid v0.9.1-0.20220302062450-5552068d2266 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect
golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/api v0.93.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand All @@ -195,7 +195,7 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/yihuang/cosmos-sdk v0.43.0-beta1.0.20221014023203-2c6b9d06b12d
github.com/cosmos/cosmos-sdk => github.com/yihuang/cosmos-sdk v0.43.0-beta1.0.20221128050842-6df48c3ab38b
github.com/ethereum/go-ethereum => github.com/ethereum/go-ethereum v1.10.19

// Fix upstream GHSA-h395-qcrw-5vmq vulnerability.
Expand Down
Loading

0 comments on commit 7d1201a

Please sign in to comment.