From 473bab58a82159030ae0ca7cb9d00de4fa565e02 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 7 Aug 2024 14:16:23 +0100 Subject: [PATCH] fix asset loader --- .../internal/db2/history/account_loader.go | 10 +++---- .../db2/history/account_loader_test.go | 25 ++++++++++++++++ .../internal/db2/history/asset_loader.go | 11 ++++--- .../internal/db2/history/asset_loader_test.go | 29 +++++++++++++++++++ .../db2/history/claimable_balance_loader.go | 1 - .../history/claimable_balance_loader_test.go | 24 +++++++++++++++ .../db2/history/liquidity_pool_loader.go | 1 - .../db2/history/liquidity_pool_loader_test.go | 25 ++++++++++++++++ 8 files changed, 113 insertions(+), 13 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index cd30a70bf9..aedd7e726c 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -110,7 +110,6 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e ctx, q, "history_accounts", - []string{"address"}, []bulkInsertField{ { name: "address", @@ -146,7 +145,7 @@ type bulkInsertField struct { objects []string } -func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField, response interface{}) error { +func bulkInsert(ctx context.Context, q *Q, table string, fields []bulkInsertField, response interface{}) error { unnestPart := make([]string, 0, len(fields)) insertFieldsPart := make([]string, 0, len(fields)) pqArrays := make([]interface{}, 0, len(fields)) @@ -166,19 +165,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string ) } + columns := strings.Join(insertFieldsPart, ",") sql := ` WITH rows AS (SELECT ` + strings.Join(unnestPart, ",") + `), inserted_rows AS ( INSERT INTO ` + table + ` - (` + strings.Join(insertFieldsPart, ",") + `) + (` + columns + `) SELECT * FROM rows - ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING + ON CONFLICT (` + columns + `) DO NOTHING RETURNING * ) SELECT * FROM inserted_rows UNION ALL - SELECT * FROM ` + table + ` WHERE (` + strings.Join(conflictFields, ",") + `) IN + SELECT * FROM ` + table + ` WHERE (` + columns + `) IN (SELECT * FROM rows)` return q.SelectRaw( diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index f78c23172c..4952b909c5 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -54,4 +54,29 @@ func TestAccountLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that loader works when all the values are already present in the db + loader = NewAccountLoader() + for _, address := range addresses { + future := loader.GetFuture(address) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid account loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 100, + }, loader.Stats()) + + for _, address := range addresses { + var internalId int64 + internalId, err = loader.GetNow(address) + assert.NoError(t, err) + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, internalId) + assert.Equal(t, account.Address, address) + } + } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index b9f3996cab..bfe82ac243 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -132,23 +132,22 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err ctx, q, "history_assets", - []string{"asset_code", "asset_type", "asset_issuer"}, []bulkInsertField{ { name: "asset_code", dbType: "character varying(12)", objects: assetCodes, }, - { - name: "asset_issuer", - dbType: "character varying(56)", - objects: assetIssuers, - }, { name: "asset_type", dbType: "character varying(64)", objects: assetTypes, }, + { + name: "asset_issuer", + dbType: "character varying(56)", + objects: assetIssuers, + }, }, &rows, ) diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index 46561da6a6..555360a5ce 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -105,4 +105,33 @@ func TestAssetLoader(t *testing.T) { _, err = loader.GetNow(AssetKey{}) assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that loader works when all the values are already present in the db + loader = NewAssetLoader() + for _, key := range keys { + future := loader.GetFuture(key) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid asset loader state,`) + } + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 100, + }, loader.Stats()) + + for _, key := range keys { + var internalID int64 + internalID, err = loader.GetNow(key) + assert.NoError(t, err) + var assetXDR xdr.Asset + if key.Type == "native" { + assetXDR = xdr.MustNewNativeAsset() + } else { + assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer) + } + var assetID int64 + assetID, err = q.GetAssetID(context.Background(), assetXDR) + assert.NoError(t, err) + assert.Equal(t, assetID, internalID) + } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index 289e8c2474..c1e1cda5c0 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -98,7 +98,6 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt ctx, q, "history_claimable_balances", - []string{"claimable_balance_id"}, []bulkInsertField{ { name: "claimable_balance_id", diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index ee852b1c7b..2f6268624b 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -66,4 +66,28 @@ func TestClaimableBalanceLoader(t *testing.T) { _, err = futureCb.Value() assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that loader works when all the values are already present in the db + loader = NewClaimableBalanceLoader() + for _, id := range ids { + future := loader.GetFuture(id) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 100, + }, loader.Stats()) + + for _, id := range ids { + internalID, err := loader.getNow(id) + assert.NoError(t, err) + var cb HistoryClaimableBalance + cb, err = q.ClaimableBalanceByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, cb.BalanceID, id) + assert.Equal(t, cb.InternalID, internalID) + } } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index 538dd85444..6282d324b5 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -98,7 +98,6 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf ctx, q, "history_liquidity_pools", - []string{"liquidity_pool_id"}, []bulkInsertField{ { name: "liquidity_pool_id", diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index 81e60b9cd6..7d1fef1396 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -58,4 +58,29 @@ func TestLiquidityPoolLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that loader works when all the values are already present in the db + loader = NewLiquidityPoolLoader() + for _, id := range ids { + future := loader.GetFuture(id) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 100, + }, loader.Stats()) + + for _, id := range ids { + var internalID int64 + internalID, err = loader.GetNow(id) + assert.NoError(t, err) + var lp HistoryLiquidityPool + lp, err = q.LiquidityPoolByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, lp.PoolID, id) + assert.Equal(t, lp.InternalID, internalID) + } }