diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 2769273493..8f3e2ae9eb 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -27,6 +27,8 @@ import ( hlog "github.com/stellar/go/support/log" ) +var runDBReingestRangeFn = runDBReingestRange + var dbCmd = &cobra.Command{ Use: "db [command]", Short: "commands to manage horizon's postgres db", @@ -391,6 +393,11 @@ var dbReingestRangeCmd = &cobra.Command{ } storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend storageBackendConfig.DataStoreFactory = datastore.NewDataStore + // when using buffered storage, performance observations have noted optimal parallel batch size + // of 100, apply that as default if the flag was absent. + if !viper.IsSet("parallel-job-size") { + parallelJobSize = 100 + } options.NoCaptiveCore = true } @@ -398,7 +405,7 @@ var dbReingestRangeCmd = &cobra.Command{ if err != nil { return err } - return runDBReingestRange( + return runDBReingestRangeFn( []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, reingestForce, parallelWorkers, @@ -477,7 +484,7 @@ var dbFillGapsCmd = &cobra.Command{ hlog.Infof("found gaps %v", gaps) } - return runDBReingestRange(gaps, reingestForce, parallelWorkers, *globalConfig, storageBackendConfig) + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *globalConfig, storageBackendConfig) }, } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go new file mode 100644 index 0000000000..3942f8d91a --- /dev/null +++ b/services/horizon/cmd/db_test.go @@ -0,0 +1,98 @@ +package cmd + +import ( + "testing" + + horizon "github.com/stellar/go/services/horizon/internal" + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest" + "github.com/stellar/go/support/db/dbtest" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +func TestDBCommandsTestSuite(t *testing.T) { + dbCmdSuite := &DBCommandsTestSuite{} + suite.Run(t, dbCmdSuite) +} + +type DBCommandsTestSuite struct { + suite.Suite + dsn string +} + +func (s *DBCommandsTestSuite) SetupSuite() { + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + horizon.Config, ingest.StorageBackendConfig) error { + return nil + } + + newDB := dbtest.Postgres(s.T()) + s.dsn = newDB.DSN + + RootCmd.SetArgs([]string{ + "db", "migrate", "up", "--db-url", s.dsn}) + require.NoError(s.T(), RootCmd.Execute()) +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--parallel-workers", "2", + "--ledgerbackend", "datastore", + "--datastore-config", "../config.storagebackend.toml", + "2", + "10"}) + + require.NoError(s.T(), dbReingestRangeCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100)) +} + +func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--stellar-core-binary-path", "/test/core/bin/path", + "--parallel-workers", "2", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), RootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(100_000)) +} + +func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--stellar-core-binary-path", "/test/core/bin/path", + "--parallel-workers", "2", + "--parallel-job-size", "5", + "--ledgerbackend", "captive-core", + "2", + "10"}) + + require.NoError(s.T(), RootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(5)) +} + +func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { + RootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.dsn, + "--network", "testnet", + "--parallel-workers", "2", + "--parallel-job-size", "5", + "--ledgerbackend", "datastore", + "--datastore-config", "../config.storagebackend.toml", + "2", + "10"}) + + require.NoError(s.T(), RootCmd.Execute()) + require.Equal(s.T(), parallelJobSize, uint32(5)) +} diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index f50647abc7..333ed744c6 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -127,8 +127,7 @@ func TestEnvironmentPreserved(t *testing.T) { // using NETWORK environment variables, history archive urls or network passphrase // parameters are also set. func TestInvalidNetworkParameters(t *testing.T) { - var captiveCoreConfigErrMsg = integration.HorizonInitErrStr + ": error generating captive " + - "core configuration: invalid config: %s parameter not allowed with the %s parameter" + var captiveCoreConfigErrMsg = integration.HorizonInitErrStr + ": invalid config: %s parameter not allowed with the %s parameter" testCases := []struct { name string errMsg string