Skip to content

Commit

Permalink
Introduce minimum and maximum batch sizes for parallel workers.
Browse files Browse the repository at this point in the history
The min batch size is set to the HistoryCheckpointLedgerInterval for both Captive Core and Buffered Storage backends.
The max batch size is larger for the Captive Core backend as compared to the Buffered Storage backend. These batch sizes
are estimated to optimize throughput for each ledger backend.
  • Loading branch information
urvisavla committed Oct 14, 2024
1 parent 45d9ed6 commit a136126
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 33 deletions.
12 changes: 9 additions & 3 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func ingestRangeCmdOpts() support.ConfigOptions {
var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmdOpts = ingestRangeCmdOpts()

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
var err error

if reingestForce && parallelWorkers > 1 {
Expand Down Expand Up @@ -202,7 +202,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}

if parallelWorkers > 1 {
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers)
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize)
if systemErr != nil {
return systemErr
}
Expand Down Expand Up @@ -464,6 +464,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
}
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
Expand All @@ -472,6 +473,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
return err
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -481,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
[]history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}},
reingestForce,
parallelWorkers,
ingest.MinBatchSize,
maxBatchSize,
*horizonConfig,
storageBackendConfig,
)
Expand Down Expand Up @@ -521,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
withRange = true
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
Expand All @@ -529,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
return err
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -549,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
hlog.Infof("found gaps %v", gaps)
}

return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig)
return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig)
},
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct {
}

func (s *DBCommandsTestSuite) SetupSuite() {
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint,
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint,
horizon.Config, ingest.StorageBackendConfig) error {
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string {
return ""
}

const (
HistoryCheckpointLedgerInterval uint = 64
// MinBatchSize is the minimum batch size for reingestion
MinBatchSize uint = HistoryCheckpointLedgerInterval
// MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion
MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval
// MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion
MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval
)

type StorageBackendConfig struct {
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"`
Expand Down
49 changes: 31 additions & 18 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
logpkg "github.com/stellar/go/support/log"
)

const (
historyCheckpointLedgerInterval = 64
minBatchSize = historyCheckpointLedgerInterval
)

type rangeError struct {
err error
ledgerRange history.LedgerRange
Expand All @@ -27,23 +22,32 @@ func (e rangeError) Error() string {
type ParallelSystems struct {
config Config
workerCount uint
minBatchSize uint
maxBatchSize uint
systemFactory func(Config) (System, error)
}

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) {
func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) {
// Leaving this because used in tests, will update after a code review.
return newParallelSystems(config, workerCount, NewSystem)
return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem)
}

// private version of NewParallel systems, allowing to inject a mock system
func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
if workerCount < 1 {
return nil, errors.New("workerCount must be > 0")
}

if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval {
return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)
}
if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize {
return nil, errors.New("maxBatchSize cannot be less than minBatchSize")
}
return &ParallelSystems{
config: config,
workerCount: workerCount,
maxBatchSize: maxBatchSize,
minBatchSize: minBatchSize,
systemFactory: systemFactory,
}, nil
}
Expand Down Expand Up @@ -112,18 +116,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32,
}
return lowestLedger
}
func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 {
// calculate the initial batch size based on available workers
batchSize := rangeSize / uint32(ps.workerCount)

// ensure the batch size meets min threshold
if ps.minBatchSize > 0 {
batchSize = max(batchSize, uint32(ps.minBatchSize))

Check failure on line 125 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: max (typecheck)
}

func calculateParallelLedgerBatchSize(rangeSize uint32, workerCount uint) uint32 {
// let's try to make use of all the workers
batchSize := rangeSize / uint32(workerCount)
// ensure the batch size does not exceed max threshold
if ps.maxBatchSize > 0 {
batchSize = min(batchSize, uint32(ps.maxBatchSize))

Check failure on line 130 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: min (typecheck)
}

// Use a minimum batch size to make it worth it in terms of overhead
if batchSize < minBatchSize {
batchSize = minBatchSize
// round down to the nearest multiple of HistoryCheckpointLedgerInterval
if batchSize > uint32(HistoryCheckpointLedgerInterval) {
return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval)
}

// Also, round the batch size to the closest, lower or equal 64 multiple
return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval
// HistoryCheckpointLedgerInterval is the minimum batch size.
return uint32(HistoryCheckpointLedgerInterval)
}

func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
Expand All @@ -136,7 +149,7 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error {
var (
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), ps.workerCount)
batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges))
reingestJobQueue = make(chan history.LedgerRange)
wg sync.WaitGroup

Expand Down
99 changes: 88 additions & 11 deletions services/horizon/internal/ingest/parallel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ingest

import (
"fmt"
"math"
"math/rand"
"sort"
"sync"
Expand All @@ -15,13 +17,88 @@ import (
)

func TestCalculateParallelLedgerBatchSize(t *testing.T) {
assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 3))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 4))
assert.Equal(t, uint32(20096), calculateParallelLedgerBatchSize(20096, 1))
config := Config{}
result := &mockSystem{}
factory := func(c Config) (System, error) {
return result, nil
}

// worker count 0
system, err := newParallelSystems(config, 0, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory)
assert.EqualError(t, err, "workerCount must be > 0")

// worker count 1, range smaller than HistoryCheckpointLedgerInterval
system, err = newParallelSystems(config, 1, 50, 200, factory)
assert.EqualError(t, err, fmt.Sprintf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval))

// worker count 1, max batch size smaller than min batch size
system, err = newParallelSystems(config, 1, 5000, 200, factory)
assert.EqualError(t, err, "maxBatchSize cannot be less than minBatchSize")

// worker count 1, captive core batch size
system, _ = newParallelSystems(config, 1, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory)
assert.Equal(t, uint32(MaxCaptiveCoreBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxCaptiveCoreBackendBatchSize)+10))
assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0))
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down

// worker count 1, buffered storage batch size
system, _ = newParallelSystems(config, 1, MinBatchSize, MaxBufferedStorageBackendBatchSize, factory)
assert.Equal(t, uint32(MaxBufferedStorageBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxBufferedStorageBackendBatchSize)+10))
assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0))
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down

// worker count 1, no min/max batch size
system, _ = newParallelSystems(config, 1, 0, 0, factory)
assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple
assert.Equal(t, uint32(20032), system.calculateParallelLedgerBatchSize(20090)) // round down

// worker count 1, min/max batch size
system, _ = newParallelSystems(config, 1, 64, 20000, factory)
assert.Equal(t, uint32(19968), system.calculateParallelLedgerBatchSize(20096)) // round down
system, _ = newParallelSystems(config, 1, 64, 30000, factory)
assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple

// Tests for worker count 2

// no min/max batch size
system, _ = newParallelSystems(config, 2, 0, 0, factory)
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(60)) // range smaller than 64
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(128)) // exact multiple
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096))

// range larger than max batch size
system, _ = newParallelSystems(config, 2, 64, 10000, factory)
assert.Equal(t, uint32(9984), system.calculateParallelLedgerBatchSize(20096)) // round down

// range smaller than min batch size
system, _ = newParallelSystems(config, 2, 64, 0, factory)
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(50)) // min batch size
assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) // exact multiple
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // min batch size

// batch size equal to min
system, _ = newParallelSystems(config, 2, 100, 0, factory)
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // round down

// equal min/max batch size
system, _ = newParallelSystems(config, 2, 5000, 5000, factory)
assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) // round down

// worker count 3
system, _ = newParallelSystems(config, 3, 64, 7000, factory)
assert.Equal(t, uint32(6656), system.calculateParallelLedgerBatchSize(20096))

// worker count 4
system, _ = newParallelSystems(config, 4, 64, 20000, factory)
assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) //round down
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(64))
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(2))

// max possible workers
system, _ = newParallelSystems(config, math.MaxUint32, 0, 0, factory)
assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(math.MaxUint32))
}

func TestParallelReingestRange(t *testing.T) {
Expand All @@ -43,7 +120,7 @@ func TestParallelReingestRange(t *testing.T) {
factory := func(c Config) (System, error) {
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
assert.NoError(t, err)
Expand All @@ -57,7 +134,7 @@ func TestParallelReingestRange(t *testing.T) {
assert.Equal(t, expected, rangesCalled)

rangesCalled = nil
system, err = newParallelSystems(config, 1, factory)
system, err = newParallelSystems(config, 1, 0, 0, factory)
assert.NoError(t, err)
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once()
err = system.ReingestRange([]history.LedgerRange{{1, 1024}})
Expand All @@ -80,7 +157,7 @@ func TestParallelReingestRangeError(t *testing.T) {
factory := func(c Config) (System, error) {
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
result.AssertExpectations(t)
Expand Down Expand Up @@ -110,7 +187,7 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) {
factory := func(c Config) (System, error) {
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
system, err := newParallelSystems(config, 3, 0, 0, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
result.AssertExpectations(t)
Expand Down

0 comments on commit a136126

Please sign in to comment.