Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Remove --parallel-job-size config parameter used for reingestion. #5484

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) {
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) {
startLedger := uint32(10)
endLedger := uint32(30)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 10
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) {
startLedger := uint32(1)
endLedger := uint32(15)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 100
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestBSBGetLatestLedgerSequence(t *testing.T) {
startLedger := uint32(3)
endLedger := uint32(5)
Expand Down
6 changes: 6 additions & 0 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stellar/go/support/collections/heap"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/ordered"

"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
less := func(a, b ledgerBatchObject) bool {
return a.startLedger < b.startLedger
}
// ensure BufferSize does not exceed the total range
if ledgerRange.bounded {
bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1))
}
pq := heap.New(less, int(bsb.config.BufferSize))

ledgerBuffer := &ledgerBuffer{
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).
## Pending

### Breaking Changes

- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed.
Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing
the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to
the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484))

## 2.32.0

Expand Down
34 changes: 10 additions & 24 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
dbDetectGapsCmd *cobra.Command
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
ledgerBackendStr string
Expand Down Expand Up @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions {
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -178,17 +169,14 @@ func ingestRangeCmdOpts() support.ConfigOptions {
var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmdOpts = ingestRangeCmdOpts()

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error {
var err error

if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}

maxLedgersPerFlush := ingest.MaxLedgersPerFlush
if parallelJobSize < maxLedgersPerFlush {
maxLedgersPerFlush = parallelJobSize
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
Expand All @@ -214,15 +202,12 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}

if parallelWorkers > 1 {
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers)
system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize)
if systemErr != nil {
return systemErr
}

return system.ReingestRange(
ledgerRanges,
parallelJobSize,
)
return system.ReingestRange(ledgerRanges)
}

system, systemErr := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -479,19 +464,16 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
}
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
if ledgerBackendType == ingest.BufferedStorageBackend {
if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil {
return err
}
// when using buffered storage, performance observations have noted optimal parallel batch size
// of 100, apply that as default if the flag was absent.
if !viper.IsSet("parallel-job-size") {
parallelJobSize = 100
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -501,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
[]history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}},
reingestForce,
parallelWorkers,
ingest.MinBatchSize,
maxBatchSize,
*horizonConfig,
storageBackendConfig,
)
Expand Down Expand Up @@ -541,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
withRange = true
}

maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize
var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
Expand All @@ -549,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
return err
}
options.NoCaptiveCore = true
maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
Expand All @@ -569,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
hlog.Infof("found gaps %v", gaps)
}

return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig)
return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig)
},
}

Expand Down
55 changes: 4 additions & 51 deletions services/horizon/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
}

func (s *DBCommandsTestSuite) SetupSuite() {
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint,
runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint,
horizon.Config, ingest.StorageBackendConfig) error {
return nil
}
Expand All @@ -45,66 +45,19 @@
s.rootCmd = NewRootCmd()
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() {
func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "10",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100))
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100_000))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error())

Check failure on line 60 in services/horizon/cmd/db_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.T undefined (type *DBCommandsTestSuite has no field or method T) (typecheck)
}

func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() {
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string {
return ""
}

const (
HistoryCheckpointLedgerInterval uint = 64
// MinBatchSize is the minimum batch size for reingestion
MinBatchSize uint = HistoryCheckpointLedgerInterval
// MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion
MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval
// MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion
MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval
)

type StorageBackendConfig struct {
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"`
Expand Down
53 changes: 32 additions & 21 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
logpkg "github.com/stellar/go/support/log"
)

const (
historyCheckpointLedgerInterval = 64
minBatchSize = historyCheckpointLedgerInterval
)

type rangeError struct {
err error
ledgerRange history.LedgerRange
Expand All @@ -27,23 +22,32 @@
type ParallelSystems struct {
config Config
workerCount uint
minBatchSize uint
maxBatchSize uint
systemFactory func(Config) (System, error)
}

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) {
func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) {
// Leaving this because used in tests, will update after a code review.
return newParallelSystems(config, workerCount, NewSystem)
return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem)
}

// private version of NewParallel systems, allowing to inject a mock system
func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) {
if workerCount < 1 {
return nil, errors.New("workerCount must be > 0")
}

if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval {
return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)
}
if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize {
return nil, errors.New("maxBatchSize cannot be less than minBatchSize")
}
return &ParallelSystems{
config: config,
workerCount: workerCount,
maxBatchSize: maxBatchSize,
minBatchSize: minBatchSize,
systemFactory: systemFactory,
}, nil
}
Expand Down Expand Up @@ -112,20 +116,27 @@
}
return lowestLedger
}
func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 {
// calculate the initial batch size based on available workers
batchSize := rangeSize / uint32(ps.workerCount)

// ensure the batch size meets min threshold
if ps.minBatchSize > 0 {
batchSize = max(batchSize, uint32(ps.minBatchSize))

Check failure on line 125 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: max (typecheck)
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
batchSize := batchSizeSuggestion
if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) {
// let's try to make use of all the workers
batchSize = rangeSize / uint32(workerCount)
// ensure the batch size does not exceed max threshold
if ps.maxBatchSize > 0 {
batchSize = min(batchSize, uint32(ps.maxBatchSize))

Check failure on line 130 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: min (typecheck)
}
// Use a minimum batch size to make it worth it in terms of overhead
if batchSize < minBatchSize {
batchSize = minBatchSize

// round down to the nearest multiple of HistoryCheckpointLedgerInterval
if batchSize > uint32(HistoryCheckpointLedgerInterval) {
return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval)
}

// Also, round the batch size to the closest, lower or equal 64 multiple
return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval
// HistoryCheckpointLedgerInterval is the minimum batch size.
return uint32(HistoryCheckpointLedgerInterval)
}

func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
Expand All @@ -136,9 +147,9 @@
return sum
}

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error {
func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error {
var (
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount)
batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges))
reingestJobQueue = make(chan history.LedgerRange)
wg sync.WaitGroup

Expand Down
Loading
Loading