From 2c17f4fed0c7f98c031a690cc11422b579b907cf Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 27 Aug 2024 15:17:12 +0100 Subject: [PATCH] add concurrency modes --- .../internal/db2/history/account_loader.go | 81 +++++++++++++------ .../db2/history/account_loader_test.go | 12 ++- .../internal/db2/history/asset_loader.go | 5 +- .../internal/db2/history/asset_loader_test.go | 11 ++- .../db2/history/claimable_balance_loader.go | 5 +- .../history/claimable_balance_loader_test.go | 11 ++- .../effect_batch_insert_builder_test.go | 2 +- .../internal/db2/history/effect_test.go | 6 +- .../internal/db2/history/fee_bump_scenario.go | 2 +- .../db2/history/liquidity_pool_loader.go | 7 +- .../db2/history/liquidity_pool_loader_test.go | 11 ++- ...n_participant_batch_insert_builder_test.go | 2 +- .../internal/db2/history/operation_test.go | 2 +- .../internal/db2/history/participants_test.go | 2 +- .../internal/db2/history/transaction_test.go | 8 +- .../internal/ingest/processor_runner.go | 18 ++--- .../internal/ingest/processor_runner_test.go | 2 +- ...ble_balances_transaction_processor_test.go | 2 +- .../processors/effects_processor_test.go | 12 +-- ...uidity_pools_transaction_processor_test.go | 2 +- .../processors/participants_processor_test.go | 2 +- 21 files changed, 133 insertions(+), 72 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index fad6931d8a..e402129a44 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -17,6 +17,14 @@ import ( var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") +type ConcurrencyMode int + +const ( + _ ConcurrencyMode = iota + ConcurrentInserts + ConcurrentDeletes +) + // LoaderStats describes the result of executing a history lookup id Loader type LoaderStats struct { // Total is the number of elements registered to the Loader @@ -38,7 +46,7 @@ type FutureAccountID = future[string, Account] type AccountLoader = loader[string, Account] // NewAccountLoader will construct a new AccountLoader instance. -func NewAccountLoader() *AccountLoader { +func NewAccountLoader(concurrencyMode ConcurrencyMode) *AccountLoader { return &AccountLoader{ sealed: false, set: set.Set[string]{}, @@ -58,20 +66,22 @@ func NewAccountLoader() *AccountLoader { mappingFromRow: func(account Account) (string, int64) { return account.Address, account.ID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } type loader[K comparable, T any] struct { - sealed bool - set set.Set[K] - ids map[K]int64 - stats LoaderStats - name string - table string - columnsForKeys func([]K) []columnValues - mappingFromRow func(T) (K, int64) - less func(K, K) bool + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool + concurrencyMode ConcurrencyMode } type future[K comparable, T any] struct { @@ -134,17 +144,34 @@ func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) er return l.less(keys[i], keys[j]) }) - if count, err := l.query(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - } + if l.concurrencyMode == ConcurrentInserts { + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } - if count, err := l.insert(ctx, q, keys); err != nil { - return err + if count, err := l.query(ctx, q, keys, false); err != nil { + return err + } else { + l.stats.Total += count + } + } else if l.concurrencyMode == ConcurrentDeletes { + if count, err := l.query(ctx, q, keys, true); err != nil { + return err + } else { + l.stats.Total += count + } + + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } } else { - l.stats.Total += count - l.stats.Inserted += count + return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode) } return nil @@ -204,11 +231,15 @@ func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) return len(rows), nil } -func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) { keys = l.filter(keys) if len(keys) == 0 { return 0, nil } + var suffix string + if lockRows { + suffix = "ORDER BY id ASC FOR KEY SHARE" + } var rows []T err := bulkGet( @@ -217,6 +248,7 @@ func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { l.table, l.columnsForKeys(keys), &rows, + suffix, ) if err != nil { return 0, err @@ -293,7 +325,7 @@ func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, ) } -func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error { unnestPart := make([]string, 0, len(fields)) columns := make([]string, 0, len(fields)) pqArrays := make([]interface{}, 0, len(fields)) @@ -328,9 +360,8 @@ func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, res pq.Array(field.objects), ) } - lockSuffix := "ORDER BY id ASC FOR KEY SHARE" sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN - (SELECT ` + strings.Join(unnestPart, ",") + `) ` + lockSuffix + (SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix return q.SelectRaw( ctx, @@ -348,7 +379,7 @@ type AccountLoaderStub struct { // NewAccountLoaderStub returns a new AccountLoaderStub instance func NewAccountLoaderStub() AccountLoaderStub { - return AccountLoaderStub{Loader: NewAccountLoader()} + return AccountLoaderStub{Loader: NewAccountLoader(ConcurrentInserts)} } // Insert updates the wrapped AccountLoader so that the given account diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 9a9fb30445..83b172b40b 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" ) func TestAccountLoader(t *testing.T) { @@ -16,12 +17,18 @@ func TestAccountLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAccountLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAccountLoader(t, session, ConcurrentDeletes) +} + +func testAccountLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var addresses []string for i := 0; i < 100; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } - loader := NewAccountLoader() + loader := NewAccountLoader(mode) for _, address := range addresses { future := loader.GetFuture(address) _, err := future.Value() @@ -58,7 +65,7 @@ func TestAccountLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAccountLoader() + loader = NewAccountLoader(mode) for i := 0; i < 10; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } @@ -85,5 +92,4 @@ func TestAccountLoader(t *testing.T) { assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } - } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index cdd2a0d714..33c5c333dd 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -42,7 +42,7 @@ type FutureAssetID = future[AssetKey, Asset] type AssetLoader = loader[AssetKey, Asset] // NewAssetLoader will construct a new AssetLoader instance. -func NewAssetLoader() *AssetLoader { +func NewAssetLoader(concurrencyMode ConcurrencyMode) *AssetLoader { return &AssetLoader{ sealed: false, set: set.Set[AssetKey]{}, @@ -88,6 +88,7 @@ func NewAssetLoader() *AssetLoader { less: func(a AssetKey, b AssetKey) bool { return a.String() < b.String() }, + concurrencyMode: concurrencyMode, } } @@ -99,7 +100,7 @@ type AssetLoaderStub struct { // NewAssetLoaderStub returns a new AssetLoaderStub instance func NewAssetLoaderStub() AssetLoaderStub { - return AssetLoaderStub{Loader: NewAssetLoader()} + return AssetLoaderStub{Loader: NewAssetLoader(ConcurrentInserts)} } // Insert updates the wrapped AssetLoaderStub so that the given asset diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index ca65cebb7e..e7a0495cad 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -9,6 +9,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -40,6 +41,12 @@ func TestAssetLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAssetLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAssetLoader(t, session, ConcurrentDeletes) +} + +func testAssetLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var keys []AssetKey for i := 0; i < 100; i++ { var key AssetKey @@ -66,7 +73,7 @@ func TestAssetLoader(t *testing.T) { keys = append(keys, key) } - loader := NewAssetLoader() + loader := NewAssetLoader(mode) for _, key := range keys { future := loader.GetFuture(key) _, err := future.Value() @@ -109,7 +116,7 @@ func TestAssetLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAssetLoader() + loader = NewAssetLoader(mode) for i := 0; i < 10; i++ { var key AssetKey if i%2 == 0 { diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index f775ea4b24..9107d4fb9f 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -19,7 +19,7 @@ type FutureClaimableBalanceID = future[string, HistoryClaimableBalance] type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance] // NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance. -func NewClaimableBalanceLoader() *ClaimableBalanceLoader { +func NewClaimableBalanceLoader(concurrencyMode ConcurrencyMode) *ClaimableBalanceLoader { return &ClaimableBalanceLoader{ sealed: false, set: set.Set[string]{}, @@ -39,6 +39,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader { mappingFromRow: func(row HistoryClaimableBalance) (string, int64) { return row.BalanceID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index f5759015c7..490d5a0f70 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -17,6 +18,12 @@ func TestClaimableBalanceLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testCBLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testCBLoader(t, tt, session, ConcurrentDeletes) +} + +func testCBLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { balanceID := xdr.ClaimableBalanceId{ @@ -28,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) { ids = append(ids, id) } - loader := NewClaimableBalanceLoader() + loader := NewClaimableBalanceLoader(mode) var futures []FutureClaimableBalanceID for _, id := range ids { future := loader.GetFuture(id) @@ -70,7 +77,7 @@ func TestClaimableBalanceLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewClaimableBalanceLoader() + loader = NewClaimableBalanceLoader(mode) for i := 100; i < 110; i++ { balanceID := xdr.ClaimableBalanceId{ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go index e1ac998953..e917983b8f 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go @@ -20,7 +20,7 @@ func TestAddEffect(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/effect_test.go b/services/horizon/internal/db2/history/effect_test.go index 19af0ceff8..ba59cf3fd4 100644 --- a/services/horizon/internal/db2/history/effect_test.go +++ b/services/horizon/internal/db2/history/effect_test.go @@ -23,7 +23,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Effect address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -47,7 +47,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "abcde" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID))) @@ -78,7 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index da6563c732..e161d686d1 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -288,7 +288,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err = json.Marshal(map[string]interface{}{"new_seq": 98}) tt.Assert.NoError(err) - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) err = effectBuilder.Add( accountLoader.GetFuture(account.Address()), diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index a03caaa988..5da2a7b6fd 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -19,7 +19,7 @@ type FutureLiquidityPoolID = future[string, HistoryLiquidityPool] type LiquidityPoolLoader = loader[string, HistoryLiquidityPool] // NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance. -func NewLiquidityPoolLoader() *LiquidityPoolLoader { +func NewLiquidityPoolLoader(concurrencyMode ConcurrencyMode) *LiquidityPoolLoader { return &LiquidityPoolLoader{ sealed: false, set: set.Set[string]{}, @@ -39,7 +39,8 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader { mappingFromRow: func(row HistoryLiquidityPool) (string, int64) { return row.PoolID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } @@ -51,7 +52,7 @@ type LiquidityPoolLoaderStub struct { // NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { - return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()} + return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader(ConcurrentInserts)} } // Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index aec2fcd886..c7a1282760 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -16,6 +17,12 @@ func TestLiquidityPoolLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testLPLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testLPLoader(t, tt, session, ConcurrentDeletes) +} + +func testLPLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { poolID := xdr.PoolId{byte(i)} @@ -24,7 +31,7 @@ func TestLiquidityPoolLoader(t *testing.T) { ids = append(ids, id) } - loader := NewLiquidityPoolLoader() + loader := NewLiquidityPoolLoader(mode) for _, id := range ids { future := loader.GetFuture(id) _, err := future.Value() @@ -62,7 +69,7 @@ func TestLiquidityPoolLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewLiquidityPoolLoader() + loader = NewLiquidityPoolLoader(mode) for i := 100; i < 110; i++ { poolID := xdr.PoolId{byte(i)} var id string diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index 7e823064f2..eb30fe6659 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -15,7 +15,7 @@ func TestAddOperationParticipants(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) address := keypair.MustRandom().Address() tt.Assert.NoError(q.Begin(tt.Ctx)) builder := q.NewOperationParticipantBatchInsertBuilder() diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index 1d20a9cb10..8899bfcdf6 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -125,7 +125,7 @@ func TestOperationByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpOperationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpOperationBuilder.Add(opID1, lpLoader.GetFuture(liquidityPoolID))) diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 37f7654abb..15d09da0ac 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -35,7 +35,7 @@ func TestTransactionParticipantsBatch(t *testing.T) { q := &Q{tt.HorizonSession()} batch := q.NewTransactionParticipantsBatchInsertBuilder() - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) transactionID := int64(1) otherTransactionID := int64(2) diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 65c6734644..bd8cb1673c 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -79,7 +79,7 @@ func TestTransactionByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpTransactionBuilder := q.NewTransactionLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpTransactionBuilder.Add(txID, lpLoader.GetFuture(liquidityPoolID))) tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) @@ -940,15 +940,15 @@ func TestTransactionQueryBuilder(t *testing.T) { tt.Assert.NoError(q.Begin(tt.Ctx)) address := "GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) accountLoader.GetFuture(address) cbID := "00000000178826fbfe339e1f5c53417c6fedfe2c05e8bec14303143ec46b38981b09c3f9" - cbLoader := NewClaimableBalanceLoader() + cbLoader := NewClaimableBalanceLoader(ConcurrentInserts) cbLoader.GetFuture(cbID) lpID := "0000a8198b5e25994c1ca5b0556faeb27325ac746296944144e0a7406d501e8a" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpLoader.GetFuture(lpID) tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q)) diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index e6f0e0cf74..75b8645953 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -133,11 +133,11 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) (groupLoaders, *groupTransactionProcessors) { - accountLoader := history.NewAccountLoader() - assetLoader := history.NewAssetLoader() - lpLoader := history.NewLiquidityPoolLoader() - cbLoader := history.NewClaimableBalanceLoader() +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor, concurrencyMode history.ConcurrencyMode) (groupLoaders, *groupTransactionProcessors) { + accountLoader := history.NewAccountLoader(concurrencyMode) + assetLoader := history.NewAssetLoader(concurrencyMode) + lpLoader := history.NewLiquidityPoolLoader(concurrencyMode) + cbLoader := history.NewClaimableBalanceLoader(concurrencyMode) loaders := newGroupLoaders([]horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader}) statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() @@ -366,7 +366,7 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, return nil } -func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta) ( +func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta, concurrencyMode history.ConcurrencyMode) ( transactionStats processors.StatsLedgerTransactionProcessorResults, transactionDurations runDurations, tradeStats processors.TradeStats, @@ -381,7 +381,7 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry groupTransactionFilterers := s.buildTransactionFilterer() // when in online mode, the submission result processor must always run (regardless of whether filter rules exist or not) groupFilteredOutProcessors := s.buildFilteredOutProcessor() - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, concurrencyMode) if err = registerTransactionProcessors( registry, @@ -494,7 +494,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger groupTransactionFilterers := s.buildTransactionFilterer() // intentionally skip filtered out processor groupFilteredOutProcessors := newGroupTransactionProcessors(nil, nil, nil) - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) startTime := time.Now() curHeap, sysHeap := getMemStats() @@ -611,7 +611,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( return } - transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger) + transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger, history.ConcurrentDeletes) stats.changeStats = changeStatsProcessor.GetResults() stats.changeDurations = groupChangeProcessors.processorsRunDurations diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index e6ce6b512c..82c712b737 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -248,7 +248,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ledgersProcessor := &processors.LedgersProcessor{} - _, processor := runner.buildTransactionProcessor(ledgersProcessor) + _, processor := runner.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go index ca918e08ea..5967ef41b1 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.cbLoader = history.NewClaimableBalanceLoader() + s.cbLoader = history.NewClaimableBalanceLoader(history.ConcurrentInserts) s.processor = NewClaimableBalancesTransactionProcessor( s.cbLoader, diff --git a/services/horizon/internal/ingest/processors/effects_processor_test.go b/services/horizon/internal/ingest/processors/effects_processor_test.go index 0243768fde..276f6fcb03 100644 --- a/services/horizon/internal/ingest/processors/effects_processor_test.go +++ b/services/horizon/internal/ingest/processors/effects_processor_test.go @@ -7,11 +7,11 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "math/big" + "strings" "testing" "github.com/guregu/null" - "math/big" - "strings" "github.com/stellar/go/keypair" "github.com/stellar/go/protocols/horizon/base" @@ -62,7 +62,7 @@ func TestEffectsProcessorTestSuiteLedger(t *testing.T) { func (s *EffectsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.mockBatchInsertBuilder = &history.MockEffectBatchInsertBuilder{} s.lcm = xdr.LedgerCloseMeta{ @@ -446,7 +446,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { } assert.True(t, err2 != nil || err == nil, s) }() - err = operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err = operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) }() } @@ -468,7 +468,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { ledgerSequence: 1, } // calling effects should error due to the unknown operation - err := operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err := operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) assert.Contains(t, err.Error(), "Unknown operation type") } @@ -2558,7 +2558,7 @@ type effect struct { } func assertIngestEffects(t *testing.T, operation transactionOperationWrapper, expected []effect) { - accountLoader := history.NewAccountLoader() + accountLoader := history.NewAccountLoader(history.ConcurrentInserts) mockBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} for _, expectedEffect := range expected { diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go index 8d08e44d44..cdafc5bcc3 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.lpLoader = history.NewLiquidityPoolLoader() + s.lpLoader = history.NewLiquidityPoolLoader(history.ConcurrentInserts) s.processor = NewLiquidityPoolsTransactionProcessor( s.lpLoader, diff --git a/services/horizon/internal/ingest/processors/participants_processor_test.go b/services/horizon/internal/ingest/processors/participants_processor_test.go index b81bd22f67..f6154b2b39 100644 --- a/services/horizon/internal/ingest/processors/participants_processor_test.go +++ b/services/horizon/internal/ingest/processors/participants_processor_test.go @@ -86,7 +86,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) SetupTest() { s.thirdTx.Envelope.V1.Tx.SourceAccount = aid.ToMuxedAccount() s.thirdTxID = toid.New(int32(sequence), 3, 0).ToInt64() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.addressToFuture = map[string]history.FutureAccountID{} for _, address := range s.addresses { s.addressToFuture[address] = s.accountLoader.GetFuture(address)