From 8cdac2d270567ace51a85527c847c3ab591cd940 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 3 Oct 2024 00:50:06 -0700 Subject: [PATCH 1/5] Remove --parallel-job-size config parameter used for reingestion. --- .../buffered_storage_backend_test.go | 44 +++++++++++++ ingest/ledgerbackend/ledger_buffer.go | 6 ++ services/horizon/cmd/db.go | 22 +------ services/horizon/cmd/db_test.go | 62 ------------------- services/horizon/internal/ingest/parallel.go | 14 ++--- .../horizon/internal/ingest/parallel_test.go | 44 ++++++------- 6 files changed, 77 insertions(+), 115 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 0d461cff07..f183e927ef 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) { assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) } +func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) { + startLedger := uint32(10) + endLedger := uint32(30) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 10 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + +func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) { + startLedger := uint32(1) + endLedger := uint32(15) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 100 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := uint32(0); i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(startLedger+i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + func TestBSBGetLatestLedgerSequence(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index d23bf0bfbd..7ee9dda083 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -13,6 +13,8 @@ import ( "github.com/stellar/go/support/collections/heap" "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" ) @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu less := func(a, b ledgerBatchObject) bool { return a.startLedger < b.startLedger } + // ensure BufferSize does not exceed the total range + if ledgerRange.bounded { + bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1)) + } pq := heap.New(less, int(bsb.config.BufferSize)) ledgerBuffer := &ledgerBuffer{ diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 92a732e002..f36c730880 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -42,7 +42,6 @@ var ( dbDetectGapsCmd *cobra.Command reingestForce bool parallelWorkers uint - parallelJobSize uint32 retries uint retryBackoffSeconds uint ledgerBackendStr string @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(100000), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, { Name: "retries", ConfigKey: &retries, @@ -186,9 +177,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } maxLedgersPerFlush := ingest.MaxLedgersPerFlush - if parallelJobSize < maxLedgersPerFlush { - maxLedgersPerFlush = parallelJobSize - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, @@ -219,10 +207,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return systemErr } - return system.ReingestRange( - ledgerRanges, - parallelJobSize, - ) + return system.ReingestRange(ledgerRanges) } system, systemErr := ingest.NewSystem(ingestConfig) @@ -486,11 +471,6 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err } - // when using buffered storage, performance observations have noted optimal parallel batch size - // of 100, apply that as default if the flag was absent. - if !viper.IsSet("parallel-job-size") { - parallelJobSize = 100 - } options.NoCaptiveCore = true } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 6a00576bd3..694535492c 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -45,68 +45,6 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { s.rootCmd = NewRootCmd() } -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100)) -} - -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100_000)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { tests := []struct { name string diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 4f07c21cc4..679224b176 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -113,12 +113,10 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, return lowestLedger } -func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { - batchSize := batchSizeSuggestion - if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { - // let's try to make use of all the workers - batchSize = rangeSize / uint32(workerCount) - } +func calculateParallelLedgerBatchSize(rangeSize uint32, workerCount uint) uint32 { + // let's try to make use of all the workers + batchSize := rangeSize / uint32(workerCount) + // Use a minimum batch size to make it worth it in terms of overhead if batchSize < minBatchSize { batchSize = minBatchSize @@ -136,9 +134,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { return sum } -func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error { +func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error { var ( - batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount) + batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), ps.workerCount) reingestJobQueue = make(chan history.LedgerRange) wg sync.WaitGroup diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 8004a4048c..1a8e04980b 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -15,13 +15,13 @@ import ( ) func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) + assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 3)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 4)) + assert.Equal(t, uint32(20096), calculateParallelLedgerBatchSize(20096, 1)) } func TestParallelReingestRange(t *testing.T) { @@ -45,15 +45,14 @@ func TestParallelReingestRange(t *testing.T) { } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) assert.NoError(t, err) sort.Slice(rangesCalled, func(i, j int) bool { return rangesCalled[i].StartSequence < rangesCalled[j].StartSequence }) expected := []history.LedgerRange{ - {StartSequence: 1, EndSequence: 256}, {StartSequence: 257, EndSequence: 512}, {StartSequence: 513, EndSequence: 768}, {StartSequence: 769, EndSequence: 1024}, {StartSequence: 1025, EndSequence: 1280}, - {StartSequence: 1281, EndSequence: 1536}, {StartSequence: 1537, EndSequence: 1792}, {StartSequence: 1793, EndSequence: 2048}, {StartSequence: 2049, EndSequence: 2050}, + {StartSequence: 1, EndSequence: 640}, {StartSequence: 641, EndSequence: 1280}, {StartSequence: 1281, EndSequence: 1920}, {StartSequence: 1921, EndSequence: 2050}, } assert.Equal(t, expected, rangesCalled) @@ -61,13 +60,10 @@ func TestParallelReingestRange(t *testing.T) { system, err = newParallelSystems(config, 1, factory) assert.NoError(t, err) result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() - err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64) + err = system.ReingestRange([]history.LedgerRange{{1, 1024}}) result.AssertExpectations(t) expected = []history.LedgerRange{ - {StartSequence: 1, EndSequence: 64}, {StartSequence: 65, EndSequence: 128}, {StartSequence: 129, EndSequence: 192}, {StartSequence: 193, EndSequence: 256}, {StartSequence: 257, EndSequence: 320}, - {StartSequence: 321, EndSequence: 384}, {StartSequence: 385, EndSequence: 448}, {StartSequence: 449, EndSequence: 512}, {StartSequence: 513, EndSequence: 576}, {StartSequence: 577, EndSequence: 640}, - {StartSequence: 641, EndSequence: 704}, {StartSequence: 705, EndSequence: 768}, {StartSequence: 769, EndSequence: 832}, {StartSequence: 833, EndSequence: 896}, {StartSequence: 897, EndSequence: 960}, - {StartSequence: 961, EndSequence: 1024}, + {StartSequence: 1, EndSequence: 1024}, } assert.NoError(t, err) assert.Equal(t, expected, rangesCalled) @@ -77,19 +73,19 @@ func TestParallelReingestRangeError(t *testing.T) { config := Config{} result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Return(errors.New("failed because of foo")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1537, 2050]: error when processing [1537, 1792] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) } func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { @@ -98,27 +94,27 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) result := &mockSystem{} // Fail on an lower subrange after the first error - result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) }).Return(errors.New("failed because of foo")).Once() - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{1281, 1920}}, false, false).Run(func(mock.Arguments) { wg.Done() }).Return(errors.New("failed because of bar")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil)) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1025, 2050]: error when processing [1025, 1280] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) } From 168344dcb5808e7fb3d40aea45cd1dbdc398f7d0 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 3 Oct 2024 16:22:02 -0700 Subject: [PATCH 2/5] Update changelog. Add unit test --- services/horizon/CHANGELOG.md | 8 ++++++++ services/horizon/cmd/db_test.go | 15 +++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 6a8b0a708d..e3af62d580 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### Breaking Changes + +- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed. + Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing + the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to + the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484)) ## 2.32.0 diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 694535492c..9eb83e15b6 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -45,6 +45,21 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { s.rootCmd = NewRootCmd() } +func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() { + s.rootCmd.SetArgs([]string{ + "db", "reingest", "range", + "--db-url", s.db.DSN, + "--network", "testnet", + "--parallel-workers", "2", + "--parallel-job-size", "10", + "--ledgerbackend", "datastore", + "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", + "2", + "10"}) + + require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error()) +} + func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { tests := []struct { name string From 45d9ed6ba4bbb5770a71c819b6bafbe6da998a13 Mon Sep 17 00:00:00 2001 From: Urvi Date: Fri, 4 Oct 2024 06:16:46 -0700 Subject: [PATCH 3/5] update unit test --- ingest/ledgerbackend/buffered_storage_backend_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index f183e927ef..fe3ca0266c 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -196,10 +196,10 @@ func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) { assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50) assert.NoError(t, err) - for i := uint32(0); i < endLedger; i++ { + for i := startLedger; i < endLedger; i++ { lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) assert.NoError(t, err) - assert.Equal(t, xdr.Uint32(startLedger+i), lcm.StartSequence) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) } assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) } From a136126c6f3d6cb2f93ac34045681e03d1c36aab Mon Sep 17 00:00:00 2001 From: Urvi Date: Sun, 13 Oct 2024 21:11:19 -0700 Subject: [PATCH 4/5] Introduce minimum and maximum batch sizes for parallel workers. The min batch size is set to the HistoryCheckpointLedgerInterval for both Captive Core and Buffered Storage backends. The max batch size is larger for the Captive Core backend as compared to the Buffered Storage backend. These batch sizes are estimated to optimize throughput for each ledger backend. --- services/horizon/cmd/db.go | 12 ++- services/horizon/cmd/db_test.go | 2 +- services/horizon/internal/ingest/main.go | 10 ++ services/horizon/internal/ingest/parallel.go | 49 +++++---- .../horizon/internal/ingest/parallel_test.go | 99 ++++++++++++++++--- 5 files changed, 139 insertions(+), 33 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index f36c730880..58c096a782 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -169,7 +169,7 @@ func ingestRangeCmdOpts() support.ConfigOptions { var dbReingestRangeCmdOpts = ingestRangeCmdOpts() var dbFillGapsCmdOpts = ingestRangeCmdOpts() -func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { +func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { var err error if reingestForce && parallelWorkers > 1 { @@ -202,7 +202,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } if parallelWorkers > 1 { - system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers) + system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize) if systemErr != nil { return systemErr } @@ -464,6 +464,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor } } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -472,6 +473,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor return err } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -481,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, reingestForce, parallelWorkers, + ingest.MinBatchSize, + maxBatchSize, *horizonConfig, storageBackendConfig, ) @@ -521,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor withRange = true } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -529,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor return err } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -549,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor hlog.Infof("found gaps %v", gaps) } - return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig) + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig) }, } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 9eb83e15b6..a2dd5f014c 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct { } func (s *DBCommandsTestSuite) SetupSuite() { - runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint, horizon.Config, ingest.StorageBackendConfig) error { return nil } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 63ee7ba457..38f9fe1d3a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string { return "" } +const ( + HistoryCheckpointLedgerInterval uint = 64 + // MinBatchSize is the minimum batch size for reingestion + MinBatchSize uint = HistoryCheckpointLedgerInterval + // MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion + MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval + // MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion + MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval +) + type StorageBackendConfig struct { DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"` diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 679224b176..ecdaae1c89 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -10,11 +10,6 @@ import ( logpkg "github.com/stellar/go/support/log" ) -const ( - historyCheckpointLedgerInterval = 64 - minBatchSize = historyCheckpointLedgerInterval -) - type rangeError struct { err error ledgerRange history.LedgerRange @@ -27,23 +22,32 @@ func (e rangeError) Error() string { type ParallelSystems struct { config Config workerCount uint + minBatchSize uint + maxBatchSize uint systemFactory func(Config) (System, error) } -func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { +func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) { // Leaving this because used in tests, will update after a code review. - return newParallelSystems(config, workerCount, NewSystem) + return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem) } // private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { +func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { if workerCount < 1 { return nil, errors.New("workerCount must be > 0") } - + if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval { + return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval) + } + if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize { + return nil, errors.New("maxBatchSize cannot be less than minBatchSize") + } return &ParallelSystems{ config: config, workerCount: workerCount, + maxBatchSize: maxBatchSize, + minBatchSize: minBatchSize, systemFactory: systemFactory, }, nil } @@ -112,18 +116,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, } return lowestLedger } +func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 { + // calculate the initial batch size based on available workers + batchSize := rangeSize / uint32(ps.workerCount) + + // ensure the batch size meets min threshold + if ps.minBatchSize > 0 { + batchSize = max(batchSize, uint32(ps.minBatchSize)) + } -func calculateParallelLedgerBatchSize(rangeSize uint32, workerCount uint) uint32 { - // let's try to make use of all the workers - batchSize := rangeSize / uint32(workerCount) + // ensure the batch size does not exceed max threshold + if ps.maxBatchSize > 0 { + batchSize = min(batchSize, uint32(ps.maxBatchSize)) + } - // Use a minimum batch size to make it worth it in terms of overhead - if batchSize < minBatchSize { - batchSize = minBatchSize + // round down to the nearest multiple of HistoryCheckpointLedgerInterval + if batchSize > uint32(HistoryCheckpointLedgerInterval) { + return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval) } - // Also, round the batch size to the closest, lower or equal 64 multiple - return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval + // HistoryCheckpointLedgerInterval is the minimum batch size. + return uint32(HistoryCheckpointLedgerInterval) } func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { @@ -136,7 +149,7 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error { var ( - batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), ps.workerCount) + batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges)) reingestJobQueue = make(chan history.LedgerRange) wg sync.WaitGroup diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 1a8e04980b..f011f51021 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -1,6 +1,8 @@ package ingest import ( + "fmt" + "math" "math/rand" "sort" "sync" @@ -15,13 +17,88 @@ import ( ) func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 4)) - assert.Equal(t, uint32(20096), calculateParallelLedgerBatchSize(20096, 1)) + config := Config{} + result := &mockSystem{} + factory := func(c Config) (System, error) { + return result, nil + } + + // worker count 0 + system, err := newParallelSystems(config, 0, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.EqualError(t, err, "workerCount must be > 0") + + // worker count 1, range smaller than HistoryCheckpointLedgerInterval + system, err = newParallelSystems(config, 1, 50, 200, factory) + assert.EqualError(t, err, fmt.Sprintf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)) + + // worker count 1, max batch size smaller than min batch size + system, err = newParallelSystems(config, 1, 5000, 200, factory) + assert.EqualError(t, err, "maxBatchSize cannot be less than minBatchSize") + + // worker count 1, captive core batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.Equal(t, uint32(MaxCaptiveCoreBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxCaptiveCoreBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, buffered storage batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxBufferedStorageBackendBatchSize, factory) + assert.Equal(t, uint32(MaxBufferedStorageBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxBufferedStorageBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, no min/max batch size + system, _ = newParallelSystems(config, 1, 0, 0, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(20032), system.calculateParallelLedgerBatchSize(20090)) // round down + + // worker count 1, min/max batch size + system, _ = newParallelSystems(config, 1, 64, 20000, factory) + assert.Equal(t, uint32(19968), system.calculateParallelLedgerBatchSize(20096)) // round down + system, _ = newParallelSystems(config, 1, 64, 30000, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + + // Tests for worker count 2 + + // no min/max batch size + system, _ = newParallelSystems(config, 2, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(60)) // range smaller than 64 + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(128)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) + + // range larger than max batch size + system, _ = newParallelSystems(config, 2, 64, 10000, factory) + assert.Equal(t, uint32(9984), system.calculateParallelLedgerBatchSize(20096)) // round down + + // range smaller than min batch size + system, _ = newParallelSystems(config, 2, 64, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(50)) // min batch size + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // min batch size + + // batch size equal to min + system, _ = newParallelSystems(config, 2, 100, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // round down + + // equal min/max batch size + system, _ = newParallelSystems(config, 2, 5000, 5000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) // round down + + // worker count 3 + system, _ = newParallelSystems(config, 3, 64, 7000, factory) + assert.Equal(t, uint32(6656), system.calculateParallelLedgerBatchSize(20096)) + + // worker count 4 + system, _ = newParallelSystems(config, 4, 64, 20000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) //round down + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(64)) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(2)) + + // max possible workers + system, _ = newParallelSystems(config, math.MaxUint32, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(math.MaxUint32)) } func TestParallelReingestRange(t *testing.T) { @@ -43,7 +120,7 @@ func TestParallelReingestRange(t *testing.T) { factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) assert.NoError(t, err) @@ -57,7 +134,7 @@ func TestParallelReingestRange(t *testing.T) { assert.Equal(t, expected, rangesCalled) rangesCalled = nil - system, err = newParallelSystems(config, 1, factory) + system, err = newParallelSystems(config, 1, 0, 0, factory) assert.NoError(t, err) result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() err = system.ReingestRange([]history.LedgerRange{{1, 1024}}) @@ -80,7 +157,7 @@ func TestParallelReingestRangeError(t *testing.T) { factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) @@ -110,7 +187,7 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, 0, 0, factory) assert.NoError(t, err) err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) From 6e049f4e80c12079db1531777cde27cd31586bc0 Mon Sep 17 00:00:00 2001 From: Urvi Date: Mon, 14 Oct 2024 09:22:14 -0700 Subject: [PATCH 5/5] fix ci warning --- services/horizon/internal/ingest/parallel.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index ecdaae1c89..a2a641c5cf 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -8,6 +8,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/support/ordered" ) type rangeError struct { @@ -122,12 +123,12 @@ func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) ui // ensure the batch size meets min threshold if ps.minBatchSize > 0 { - batchSize = max(batchSize, uint32(ps.minBatchSize)) + batchSize = ordered.Max(batchSize, uint32(ps.minBatchSize)) } // ensure the batch size does not exceed max threshold if ps.maxBatchSize > 0 { - batchSize = min(batchSize, uint32(ps.maxBatchSize)) + batchSize = ordered.Min(batchSize, uint32(ps.maxBatchSize)) } // round down to the nearest multiple of HistoryCheckpointLedgerInterval