Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Remove --parallel-job-size config parameter used for reingestion. #5484

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) {
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) {
startLedger := uint32(10)
endLedger := uint32(30)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 10
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) {
startLedger := uint32(1)
endLedger := uint32(15)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 100
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestBSBGetLatestLedgerSequence(t *testing.T) {
startLedger := uint32(3)
endLedger := uint32(5)
Expand Down
6 changes: 6 additions & 0 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stellar/go/support/collections/heap"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/ordered"

"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
less := func(a, b ledgerBatchObject) bool {
return a.startLedger < b.startLedger
}
// ensure BufferSize does not exceed the total range
if ledgerRange.bounded {
bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1))
}
pq := heap.New(less, int(bsb.config.BufferSize))

ledgerBuffer := &ledgerBuffer{
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).
## Pending

### Breaking Changes

- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed.
Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing
the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to
the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484))

## 2.32.0

Expand Down
34 changes: 10 additions & 24 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
dbDetectGapsCmd *cobra.Command
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
ledgerBackendStr string
Expand Down Expand Up @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions {
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -178,17 +169,14 @@ func ingestRangeCmdOpts() support.ConfigOptions {
var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmdOpts = ingestRangeCmdOpts()

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
var err error

if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}

maxLedgersPerFlush := ingest.MaxLedgersPerFlush
if parallelJobSize < maxLedgersPerFlush {
maxLedgersPerFlush = parallelJobSize
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
Expand All @@ -214,15 +202,12 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}

if parallelWorkers > 1 {
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers)
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize)
if systemErr != nil {
return systemErr
}

return system.ReingestRange(
ledgerRanges,
parallelJobSize,
)
return system.ReingestRange(ledgerRanges)
}

system, systemErr := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -479,19 +464,16 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
}
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
if ledgerBackendType == ingest.BufferedStorageBackend {
if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil {
return err
}
// when using buffered storage, performance observations have noted optimal parallel batch size
// of 100, apply that as default if the flag was absent.
if !viper.IsSet("parallel-job-size") {
parallelJobSize = 100
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -501,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
[]history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}},
reingestForce,
parallelWorkers,
ingest.MinBatchSize,
maxBatchSize,
*horizonConfig,
storageBackendConfig,
)
Expand Down Expand Up @@ -541,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
withRange = true
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
Expand All @@ -549,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
return err
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -569,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
hlog.Infof("found gaps %v", gaps)
}

return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig)
return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig)
},
}

Expand Down
55 changes: 4 additions & 51 deletions services/horizon/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct {
}

func (s *DBCommandsTestSuite) SetupSuite() {
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint,
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint,
horizon.Config, ingest.StorageBackendConfig) error {
return nil
}
Expand All @@ -45,66 +45,19 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) {
s.rootCmd = NewRootCmd()
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() {
func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "10",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100))
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100_000))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error())
}

func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() {
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string {
return ""
}

const (
HistoryCheckpointLedgerInterval uint = 64
// MinBatchSize is the minimum batch size for reingestion
MinBatchSize uint = HistoryCheckpointLedgerInterval
// MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion
MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval
// MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion
MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval
)

type StorageBackendConfig struct {
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"`
Expand Down
54 changes: 33 additions & 21 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ import (
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
)

const (
historyCheckpointLedgerInterval = 64
minBatchSize = historyCheckpointLedgerInterval
"github.com/stellar/go/support/ordered"
)

type rangeError struct {
Expand All @@ -27,23 +23,32 @@ func (e rangeError) Error() string {
type ParallelSystems struct {
config Config
workerCount uint
minBatchSize uint
maxBatchSize uint
systemFactory func(Config) (System, error)
}

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) {
func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) {
// Leaving this because used in tests, will update after a code review.
return newParallelSystems(config, workerCount, NewSystem)
return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem)
}

// private version of NewParallel systems, allowing to inject a mock system
func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
if workerCount < 1 {
return nil, errors.New("workerCount must be > 0")
}

if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval {
return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)
}
if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize {
return nil, errors.New("maxBatchSize cannot be less than minBatchSize")
}
return &ParallelSystems{
config: config,
workerCount: workerCount,
maxBatchSize: maxBatchSize,
minBatchSize: minBatchSize,
systemFactory: systemFactory,
}, nil
}
Expand Down Expand Up @@ -112,20 +117,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32,
}
return lowestLedger
}
func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 {
// calculate the initial batch size based on available workers
batchSize := rangeSize / uint32(ps.workerCount)

// ensure the batch size meets min threshold
if ps.minBatchSize > 0 {
batchSize = ordered.Max(batchSize, uint32(ps.minBatchSize))
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
batchSize := batchSizeSuggestion
if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) {
// let's try to make use of all the workers
batchSize = rangeSize / uint32(workerCount)
// ensure the batch size does not exceed max threshold
if ps.maxBatchSize > 0 {
batchSize = ordered.Min(batchSize, uint32(ps.maxBatchSize))
}
// Use a minimum batch size to make it worth it in terms of overhead
if batchSize < minBatchSize {
batchSize = minBatchSize

// round down to the nearest multiple of HistoryCheckpointLedgerInterval
if batchSize > uint32(HistoryCheckpointLedgerInterval) {
return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval)
}

// Also, round the batch size to the closest, lower or equal 64 multiple
return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval
// HistoryCheckpointLedgerInterval is the minimum batch size.
return uint32(HistoryCheckpointLedgerInterval)
}

func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
Expand All @@ -136,9 +148,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
return sum
}

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error {
func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error {
var (
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount)
batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges))
reingestJobQueue = make(chan history.LedgerRange)
wg sync.WaitGroup

Expand Down
Loading
Loading