diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 8a61038b92..a9e87aeca6 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -27,6 +27,8 @@ import ( hlog "github.com/stellar/go/support/log" ) +var runDBReingestRangeFn = runDBReingestRange + var dbCmd = &cobra.Command{ Use: "db [command]", Short: "commands to manage horizon's postgres db", @@ -390,13 +392,18 @@ var dbReingestRangeCmd = &cobra.Command{ } storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend storageBackendConfig.DataStoreFactory = datastore.NewDataStore + // when using buffered storage, performance observations have noted optimal parallel batch size + // of 100, apply that as default if the flag was absent. + if !viper.IsSet("parallel-job-size") { + parallelJobSize = 100 + } } err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false}) if err != nil { return err } - return runDBReingestRange( + return runDBReingestRangeFn( []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, reingestForce, parallelWorkers, diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go new file mode 100644 index 0000000000..1763d5ea29 --- /dev/null +++ b/services/horizon/cmd/db_test.go @@ -0,0 +1,80 @@ +package cmd + +import ( + "testing" + + horizon "github.com/stellar/go/services/horizon/internal" + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest" + "github.com/stellar/go/support/db/dbtest" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +func TestDBCommandsTestSuite(t *testing.T) { + dbCmdSuite := &DBCommandsTestSuite{} + suite.Run(t, dbCmdSuite) +} + +type DBCommandsTestSuite struct { + suite.Suite + dsn string +} + +func (s *DBCommandsTestSuite) SetupSuite() { + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + horizon.Config, ingest.StorageBackendConfig) error { + return nil + } + + newDB := dbtest.Postgres(s.T()) + s.dsn = newDB.DSN + + RootCmd.SetArgs([]string{ + "db", "migrate", "up", "--db-url", s.dsn}) + require.NoError(s.T(), RootCmd.Execute()) +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--parallel-workers", "2", + "--ledgerbackend", "datastore", + "--datastore-config", "../config.storagebackend.toml", + "2", + "10"}) + + require.NoError(s.T(), dbReingestRangeCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100)) +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--parallel-workers", "2", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), RootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100_000)) +} + +func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSet() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--parallel-workers", "2", + "--parallel-job-size", "5", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), RootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(5)) +}