Skip to content

Commit

Permalink
services/horizon: Fixing Claimable Balances Query Limit Issue (#5032)
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla authored and 2opremio committed Oct 16, 2023
1 parent 83fa763 commit 6d75a08
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 12 deletions.
33 changes: 21 additions & 12 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,30 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
return nil, errors.Wrap(err, "could not apply query to page")
}

if query.Asset != nil {
// when search by asset, profiling has shown best performance to have the LIMIT on inner query
sql = sql.Where("cb.asset = ?", query.Asset)
}
if query.Asset != nil || query.Sponsor != nil {

if query.Sponsor != nil {
sql = sql.Where("cb.sponsor = ?", query.Sponsor.Address())
}
// JOIN with claimable_balance_claimants table to query by claimants
if query.Claimant != nil {
sql = sql.Join("claimable_balance_claimants on claimable_balance_claimants.id = cb.id")
sql = sql.Where("claimable_balance_claimants.destination = ?", query.Claimant.Address())
}

// Apply filters for asset and sponsor
if query.Asset != nil {
sql = sql.Where("cb.asset = ?", query.Asset)
}
if query.Sponsor != nil {
sql = sql.Where("cb.sponsor = ?", query.Sponsor.Address())
}

} else if query.Claimant != nil {
// If only the claimant is provided without additional filters, a JOIN with claimable_balance_claimants
// does not perform efficiently. Instead, use a subquery (with LIMIT) to retrieve claimable balances based on
// the claimant's address.

if query.Claimant != nil {
var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants").
Where("destination = ?", query.Claimant.Address()).
// Given that each destination can be a claimant for each balance maximum once
// we can LIMIT the subquery.
Limit(query.PageQuery.Limit)
Where("destination = ?", query.Claimant.Address()).Limit(query.PageQuery.Limit)

subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply subquery to page")
Expand Down
181 changes: 181 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,187 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
tt.Assert.Len(cbs, 1)
}

func insertClaimants(q *Q, tt *test.T, cBalance ClaimableBalance) error {
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
for _, claimant := range cBalance.Claimants {
claimant := ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err := claimantsInsertBuilder.Add(tt.Ctx, claimant)
if err != nil {
return err
}
}
return claimantsInsertBuilder.Exec(tt.Ctx)
}

type claimableBalanceQueryResult struct {
Claimants []string
Asset string
Sponsor string
}

func validateClaimableBalanceQuery(t *test.T, q *Q, query ClaimableBalancesQuery, expectedQueryResult []claimableBalanceQueryResult) {
cbs, err := q.GetClaimableBalances(t.Ctx, query)
t.Assert.NoError(err)
for i, expected := range expectedQueryResult {
for j, claimant := range expected.Claimants {
t.Assert.Equal(claimant, cbs[i].Claimants[j].Destination)
}
if expected.Asset != "" {
t.Assert.Equal(expected.Asset, cbs[i].Asset.String())
}
if expected.Sponsor != "" {
t.Assert.Equal(expected.Sponsor, cbs[i].Sponsor.String)
}
}
}

// TestFindClaimableBalancesByDestinationWithLimit tests querying claimable balances by destination and asset
func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()

test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

assetIssuer := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"
asset1 := xdr.MustNewCreditAsset("ASSET1", assetIssuer)
asset2 := xdr.MustNewCreditAsset("ASSET2", assetIssuer)

sponsor1 := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"
sponsor2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"

dest1 := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
dest2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"

claimants := []Claimant{
{
Destination: dest1,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
{
Destination: dest2,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
}

balanceID1 := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
id, err := xdr.MarshalHex(balanceID1)
tt.Assert.NoError(err)
cBalance1 := ClaimableBalance{
BalanceID: id,
Claimants: claimants,
Asset: asset1,
Sponsor: null.StringFrom(sponsor1),
LastModifiedLedger: 123,
Amount: 10,
}
err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance1})
tt.Assert.NoError(err)

claimants2 := []Claimant{
{
Destination: dest2,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
}

balanceID2 := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{4, 5, 6},
}
id, err = xdr.MarshalHex(balanceID2)
tt.Assert.NoError(err)
cBalance2 := ClaimableBalance{
BalanceID: id,
Claimants: claimants2,
Asset: asset2,
Sponsor: null.StringFrom(sponsor2),

LastModifiedLedger: 456,
Amount: 10,
}
err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2})
tt.Assert.NoError(err)

err = insertClaimants(q, tt, cBalance1)
tt.Assert.NoError(err)

err = insertClaimants(q, tt, cBalance2)
tt.Assert.NoError(err)

pageQuery := db2.MustPageQuery("", false, "", 1)

// no claimant parameter, no filters
query := ClaimableBalancesQuery{
PageQuery: pageQuery,
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{
{Claimants: []string{dest1, dest2}},
})

// invalid claimant parameter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr("GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"),
Asset: &asset2,
Sponsor: xdr.MustAddressPtr(sponsor1),
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{})

// claimant parameter, no filters
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest1),
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{
{Claimants: []string{dest1, dest2}},
})

// claimant parameter, asset filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Asset: &asset1,
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{
{Claimants: []string{dest1, dest2}, Asset: asset1.String()},
})

// claimant parameter, sponsor filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Sponsor: xdr.MustAddressPtr(sponsor1),
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{
{Claimants: []string{dest1, dest2}, Sponsor: sponsor1},
})

//claimant parameter, asset filter, sponsor filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Asset: &asset2,
Sponsor: xdr.MustAddressPtr(sponsor2),
}
validateClaimableBalanceQuery(tt, q, query, []claimableBalanceQueryResult{
{Claimants: []string{dest2}, Asset: asset2.String(), Sponsor: sponsor2},
})
}

func TestUpdateClaimableBalance(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down

0 comments on commit 6d75a08

Please sign in to comment.