Skip to content

Commit

Permalink
services/horizon: Remove --parallel-job-size config parameter used fo…
Browse files Browse the repository at this point in the history
…r reingestion. (stellar#5484)

* Remove --parallel-job-size config parameter used for reingestion.
* Introduce minimum and maximum batch sizes for parallel workers. The min batch size is set to the HistoryCheckpointLedgerInterval for both Captive Core and Buffered Storage backends. The max batch size is larger for the Captive Core backend as compared to the Buffered Storage backend. These batch sizes are estimated to optimize throughput for each ledger backend.
  • Loading branch information
urvisavla authored Oct 14, 2024
1 parent fe25b61 commit 64efc32
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 124 deletions.
44 changes: 44 additions & 0 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) {
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) {
startLedger := uint32(10)
endLedger := uint32(30)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 10
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) {
startLedger := uint32(1)
endLedger := uint32(15)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 100
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestBSBGetLatestLedgerSequence(t *testing.T) {
startLedger := uint32(3)
endLedger := uint32(5)
Expand Down
6 changes: 6 additions & 0 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stellar/go/support/collections/heap"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/ordered"

"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
less := func(a, b ledgerBatchObject) bool {
return a.startLedger < b.startLedger
}
// ensure BufferSize does not exceed the total range
if ledgerRange.bounded {
bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1))
}
pq := heap.New(less, int(bsb.config.BufferSize))

ledgerBuffer := &ledgerBuffer{
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).
## Pending

### Breaking Changes

- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed.
Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing
the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to
the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484))

## 2.32.0

Expand Down
34 changes: 10 additions & 24 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
dbDetectGapsCmd *cobra.Command
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
ledgerBackendStr string
Expand Down Expand Up @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions {
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -178,17 +169,14 @@ func ingestRangeCmdOpts() support.ConfigOptions {
var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmdOpts = ingestRangeCmdOpts()

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
var err error

if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}

maxLedgersPerFlush := ingest.MaxLedgersPerFlush
if parallelJobSize < maxLedgersPerFlush {
maxLedgersPerFlush = parallelJobSize
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
Expand All @@ -214,15 +202,12 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}

if parallelWorkers > 1 {
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers)
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize)
if systemErr != nil {
return systemErr
}

return system.ReingestRange(
ledgerRanges,
parallelJobSize,
)
return system.ReingestRange(ledgerRanges)
}

system, systemErr := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -479,19 +464,16 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
}
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
if ledgerBackendType == ingest.BufferedStorageBackend {
if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil {
return err
}
// when using buffered storage, performance observations have noted optimal parallel batch size
// of 100, apply that as default if the flag was absent.
if !viper.IsSet("parallel-job-size") {
parallelJobSize = 100
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -501,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
[]history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}},
reingestForce,
parallelWorkers,
ingest.MinBatchSize,
maxBatchSize,
*horizonConfig,
storageBackendConfig,
)
Expand Down Expand Up @@ -541,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
withRange = true
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
Expand All @@ -549,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
return err
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -569,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
hlog.Infof("found gaps %v", gaps)
}

return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig)
return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig)
},
}

Expand Down
55 changes: 4 additions & 51 deletions services/horizon/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct {
}

func (s *DBCommandsTestSuite) SetupSuite() {
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint,
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint,
horizon.Config, ingest.StorageBackendConfig) error {
return nil
}
Expand All @@ -45,66 +45,19 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) {
s.rootCmd = NewRootCmd()
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() {
func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "10",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100))
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100_000))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error())
}

func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() {
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string {
return ""
}

const (
HistoryCheckpointLedgerInterval uint = 64
// MinBatchSize is the minimum batch size for reingestion
MinBatchSize uint = HistoryCheckpointLedgerInterval
// MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion
MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval
// MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion
MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval
)

type StorageBackendConfig struct {
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"`
Expand Down
54 changes: 33 additions & 21 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ import (
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
)

const (
historyCheckpointLedgerInterval = 64
minBatchSize = historyCheckpointLedgerInterval
"github.com/stellar/go/support/ordered"
)

type rangeError struct {
Expand All @@ -27,23 +23,32 @@ func (e rangeError) Error() string {
type ParallelSystems struct {
config Config
workerCount uint
minBatchSize uint
maxBatchSize uint
systemFactory func(Config) (System, error)
}

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) {
func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) {
// Leaving this because used in tests, will update after a code review.
return newParallelSystems(config, workerCount, NewSystem)
return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem)
}

// private version of NewParallel systems, allowing to inject a mock system
func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
if workerCount < 1 {
return nil, errors.New("workerCount must be > 0")
}

if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval {
return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)
}
if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize {
return nil, errors.New("maxBatchSize cannot be less than minBatchSize")
}
return &ParallelSystems{
config: config,
workerCount: workerCount,
maxBatchSize: maxBatchSize,
minBatchSize: minBatchSize,
systemFactory: systemFactory,
}, nil
}
Expand Down Expand Up @@ -112,20 +117,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32,
}
return lowestLedger
}
func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 {
// calculate the initial batch size based on available workers
batchSize := rangeSize / uint32(ps.workerCount)

// ensure the batch size meets min threshold
if ps.minBatchSize > 0 {
batchSize = ordered.Max(batchSize, uint32(ps.minBatchSize))
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
batchSize := batchSizeSuggestion
if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) {
// let's try to make use of all the workers
batchSize = rangeSize / uint32(workerCount)
// ensure the batch size does not exceed max threshold
if ps.maxBatchSize > 0 {
batchSize = ordered.Min(batchSize, uint32(ps.maxBatchSize))
}
// Use a minimum batch size to make it worth it in terms of overhead
if batchSize < minBatchSize {
batchSize = minBatchSize

// round down to the nearest multiple of HistoryCheckpointLedgerInterval
if batchSize > uint32(HistoryCheckpointLedgerInterval) {
return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval)
}

// Also, round the batch size to the closest, lower or equal 64 multiple
return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval
// HistoryCheckpointLedgerInterval is the minimum batch size.
return uint32(HistoryCheckpointLedgerInterval)
}

func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
Expand All @@ -136,9 +148,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
return sum
}

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error {
func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error {
var (
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount)
batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges))
reingestJobQueue = make(chan history.LedgerRange)
wg sync.WaitGroup

Expand Down
Loading

0 comments on commit 64efc32

Please sign in to comment.