Skip to content

Commit

Permalink
fix asset loader
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 7, 2024
1 parent 1aa2c4a commit 473bab5
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 13 deletions.
10 changes: 5 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
ctx,
q,
"history_accounts",
[]string{"address"},
[]bulkInsertField{
{
name: "address",
Expand Down Expand Up @@ -146,7 +145,7 @@ type bulkInsertField struct {
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField, response interface{}) error {
func bulkInsert(ctx context.Context, q *Q, table string, fields []bulkInsertField, response interface{}) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))
Expand All @@ -166,19 +165,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
)
}

columns := strings.Join(insertFieldsPart, ",")
sql := `
WITH rows AS
(SELECT ` + strings.Join(unnestPart, ",") + `),
inserted_rows AS (
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
(` + columns + `)
SELECT * FROM rows
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING
ON CONFLICT (` + columns + `) DO NOTHING
RETURNING *
)
SELECT * FROM inserted_rows
UNION ALL
SELECT * FROM ` + table + ` WHERE (` + strings.Join(conflictFields, ",") + `) IN
SELECT * FROM ` + table + ` WHERE (` + columns + `) IN
(SELECT * FROM rows)`

return q.SelectRaw(
Expand Down
25 changes: 25 additions & 0 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,29 @@ func TestAccountLoader(t *testing.T) {
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
loader = NewAccountLoader()
for _, address := range addresses {
future := loader.GetFuture(address)
_, err := future.Value()

Check failure on line 62 in services/horizon/internal/db2/history/account_loader_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

declaration of "err" shadows declaration at line 34
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, address := range addresses {
var internalId int64
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

}
11 changes: 5 additions & 6 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,22 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
ctx,
q,
"history_assets",
[]string{"asset_code", "asset_type", "asset_issuer"},
[]bulkInsertField{
{
name: "asset_code",
dbType: "character varying(12)",
objects: assetCodes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
{
name: "asset_type",
dbType: "character varying(64)",
objects: assetTypes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
},
&rows,
)
Expand Down
29 changes: 29 additions & 0 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,33 @@ func TestAssetLoader(t *testing.T) {
_, err = loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
loader = NewAssetLoader()
for _, key := range keys {
future := loader.GetFuture(key)
_, err := future.Value()

Check failure on line 113 in services/horizon/internal/db2/history/asset_loader_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

declaration of "err" shadows declaration at line 79
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
}
assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, key := range keys {
var internalID int64
internalID, err = loader.GetNow(key)
assert.NoError(t, err)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
} else {
assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer)
}
var assetID int64
assetID, err = q.GetAssetID(context.Background(), assetXDR)
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
ctx,
q,
"history_claimable_balances",
[]string{"claimable_balance_id"},
[]bulkInsertField{
{
name: "claimable_balance_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,28 @@ func TestClaimableBalanceLoader(t *testing.T) {
_, err = futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
loader = NewClaimableBalanceLoader()
for _, id := range ids {
future := loader.GetFuture(id)
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid claimable balance loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, id := range ids {
internalID, err := loader.getNow(id)
assert.NoError(t, err)
var cb HistoryClaimableBalance
cb, err = q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
ctx,
q,
"history_liquidity_pools",
[]string{"liquidity_pool_id"},
[]bulkInsertField{
{
name: "liquidity_pool_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,29 @@ func TestLiquidityPoolLoader(t *testing.T) {
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
loader = NewLiquidityPoolLoader()
for _, id := range ids {
future := loader.GetFuture(id)
_, err := future.Value()

Check failure on line 66 in services/horizon/internal/db2/history/liquidity_pool_loader_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

declaration of "err" shadows declaration at line 37
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, id := range ids {
var internalID int64
internalID, err = loader.GetNow(id)
assert.NoError(t, err)
var lp HistoryLiquidityPool
lp, err = q.LiquidityPoolByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, lp.PoolID, id)
assert.Equal(t, lp.InternalID, internalID)
}
}

0 comments on commit 473bab5

Please sign in to comment.