Skip to content

Commit

Permalink
Instead of adding extra columns to claimable_balance_claimants table,…
Browse files Browse the repository at this point in the history
… use SQL JOIN to filter by destination
  • Loading branch information
urvisavla committed Sep 19, 2023
1 parent c4890ec commit ae514ba
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 90 deletions.
4 changes: 0 additions & 4 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
### Breaking Changes
- The command line flag `--remote-captive-core-url` has been removed, as remote captive core functionality is now deprecated ([4940](https://github.com/stellar/go/pull/4940)).

### DB Schema Migration

- Added `asset` column and an index to `claimable_balance_claimant` table ([5032](https://github.com/stellar/go/pull/5032)).

### Added
- Added new command-line flag `--network` to specify the Stellar network (pubnet or testnet), aiming at simplifying the configuration process by automatically configuring the following parameters based on the chosen network: `--history-archive-urls`, `--network-passphrase`, and `--captive-core-config-path` ([4949](https://github.com/stellar/go/pull/4949)).

Expand Down
40 changes: 23 additions & 17 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rC
// ClaimableBalanceClaimant is a row of data from the `claimable_balances_claimants` table.
// This table exists to allow faster querying for claimable balances for a specific claimant.
type ClaimableBalanceClaimant struct {
BalanceID string `db:"id"`
Destination string `db:"destination"`
LastModifiedLedger uint32 `db:"last_modified_ledger"`
Asset xdr.Asset `db:"asset"`
BalanceID string `db:"id"`
Destination string `db:"destination"`
LastModifiedLedger uint32 `db:"last_modified_ledger"`
}

// ClaimableBalance is a row of data from the `claimable_balances` table.
Expand Down Expand Up @@ -248,23 +247,30 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
return nil, errors.Wrap(err, "could not apply query to page")
}

if query.Asset != nil {
// when search by asset, profiling has shown best performance to have the LIMIT on inner query
sql = sql.Where("cb.asset = ?", query.Asset)
}
if query.Asset != nil || query.Sponsor != nil {

if query.Sponsor != nil {
sql = sql.Where("cb.sponsor = ?", query.Sponsor.Address())
}
// JOIN with claimable_balance_claimants table to query by claimants
if query.Claimant != nil {
sql = sql.Join("claimable_balance_claimants on claimable_balance_claimants.id = cb.id")
sql = sql.Where("claimable_balance_claimants.destination = ?", query.Claimant.Address())
}

// Apply filters for asset and sponsor
if query.Asset != nil {
sql = sql.Where("cb.asset = ?", query.Asset)
}
if query.Sponsor != nil {
sql = sql.Where("cb.sponsor = ?", query.Sponsor.Address())
}

} else if query.Claimant != nil {
// If only the claimant is provided without additional filters, a JOIN with claimable_balance_claimants
// does not perform efficiently. Instead, use a subquery (with LIMIT) to retrieve claimable balances based on
// the claimant's address.

if query.Claimant != nil {
var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants").
Where("destination = ?", query.Claimant.Address())

// https://github.com/stellar/go/issues/4907
if query.Asset != nil {
selectClaimableBalanceClaimants = selectClaimableBalanceClaimants.Where("asset = ?", query.Asset)
}
selectClaimableBalanceClaimants.Limit(query.PageQuery.Limit)

subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order)
Expand All @@ -285,7 +291,7 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue

var results []ClaimableBalance
if err := q.Select(ctx, &results, sql); err != nil {
return nil, errors.Wrap(err, "could not run select query")
return nil, errors.Wrap(err, fmt.Sprintf("could not run select query"))

Check failure on line 294 in services/horizon/internal/db2/history/claimable_balances.go

View workflow job for this annotation

GitHub Actions / golangci

S1039: unnecessary use of fmt.Sprintf (gosimple)
}

return results, nil
Expand Down
107 changes: 82 additions & 25 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func insertClaimants(q *Q, tt *test.T, cBalance ClaimableBalance) error {
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
Asset: cBalance.Asset,
}
err := claimantsInsertBuilder.Add(tt.Ctx, claimant)
if err != nil {
Expand All @@ -250,7 +249,29 @@ func insertClaimants(q *Q, tt *test.T, cBalance ClaimableBalance) error {
return claimantsInsertBuilder.Exec(tt.Ctx)
}

// TestFindClaimableBalancesByDestinationWithLimit tests querying claimable balances by destination and asset with query limit = 1.
func validateClaimableBalanceQuery(t *test.T, q *Q, query ClaimableBalancesQuery, expectedLen int, expectedClaimants []string, expectedAsset string, expectedSponsor string) {

Check failure on line 252 in services/horizon/internal/db2/history/claimable_balances_test.go

View workflow job for this annotation

GitHub Actions / golangci

line is 174 characters (lll)
cbs, err := q.GetClaimableBalances(t.Ctx, query)
t.Assert.NoError(err)
t.Assert.Len(cbs, expectedLen)

if expectedLen > 0 {
t.Assert.Equal(expectedClaimants[0], cbs[0].Claimants[0].Destination)
}

if expectedLen > 1 {
t.Assert.Equal(expectedClaimants[1], cbs[0].Claimants[1].Destination)
}

if expectedAsset != "" {
t.Assert.Equal(expectedAsset, cbs[0].Asset.String())
}

if expectedSponsor != "" {
t.Assert.Equal(expectedSponsor, cbs[0].Sponsor.String)
}
}

// TestFindClaimableBalancesByDestinationWithLimit tests querying claimable balances by destination and asset
func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {

Check failure on line 275 in services/horizon/internal/db2/history/claimable_balances_test.go

View workflow job for this annotation

GitHub Actions / golangci

Function 'TestFindClaimableBalancesByDestinationWithLimit' is too long (129 > 100) (funlen)
tt := test.Start(t)
defer tt.Finish()
Expand All @@ -262,9 +283,11 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {
asset1 := xdr.MustNewCreditAsset("ASSET1", assetIssuer)
asset2 := xdr.MustNewCreditAsset("ASSET2", assetIssuer)

sponsor1 := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"
sponsor2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"

dest1 := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
dest2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"
dest3 := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"

claimants := []Claimant{
{
Expand All @@ -291,22 +314,34 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {
BalanceID: id,
Claimants: claimants,
Asset: asset1,
Sponsor: null.StringFrom(sponsor1),
LastModifiedLedger: 123,
Amount: 10,
}
err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance1})
tt.Assert.NoError(err)

claimants2 := []Claimant{
{
Destination: dest2,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
}

balanceID2 := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{4, 5, 6},
}
id, err = xdr.MarshalHex(balanceID2)
tt.Assert.NoError(err)
cBalance2 := ClaimableBalance{
BalanceID: id,
Claimants: claimants,
Asset: asset2,
BalanceID: id,
Claimants: claimants2,
Asset: asset2,
Sponsor: null.StringFrom(sponsor2),

LastModifiedLedger: 456,
Amount: 10,
}
Expand All @@ -319,32 +354,54 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {
err = insertClaimants(q, tt, cBalance2)
tt.Assert.NoError(err)

pageQuery := db2.MustPageQuery("", false, "", 1)

// no claimant parameter, no filters
query := ClaimableBalancesQuery{
PageQuery: pageQuery,
}
validateClaimableBalanceQuery(tt, q, query, 1, []string{dest1, dest2}, "", "")

// invalid claimant parameter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr("GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"),
Asset: &asset2,
PageQuery: db2.MustPageQuery("", false, "", 1),
Sponsor: xdr.MustAddressPtr(sponsor1),
}
validateClaimableBalanceQuery(tt, q, query, 0, []string{}, "", "")

// claimant parameter, no filters
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest1),
}
validateClaimableBalanceQuery(tt, q, query, 1, []string{dest1, dest2}, "", "")

// Validate the query with claimant parameter
cbs, err := q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
tt.Assert.Len(cbs, 1)
tt.Assert.Equal(dest1, cbs[0].Claimants[0].Destination)
tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
// claimant parameter, asset filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Asset: &asset1,
}
validateClaimableBalanceQuery(tt, q, query, 1, []string{dest1, dest2}, asset1.String(), "")

// Validate the query with a different claimant parameter
query.Claimant = xdr.MustAddressPtr(dest1)
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
tt.Assert.Len(cbs, 1)
tt.Assert.Equal(dest1, cbs[0].Claimants[0].Destination)
tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
// claimant parameter, sponsor filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Sponsor: xdr.MustAddressPtr(sponsor1),
}
validateClaimableBalanceQuery(tt, q, query, 1, []string{dest1, dest2}, "", sponsor1)

// Validate the query with unknown claimant parameter
query.Claimant = xdr.MustAddressPtr(dest3)
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
tt.Assert.Len(cbs, 0)
//claimant parameter, asset filter, sponsor filter
query = ClaimableBalancesQuery{
PageQuery: pageQuery,
Claimant: xdr.MustAddressPtr(dest2),
Asset: &asset2,
Sponsor: xdr.MustAddressPtr(sponsor2),
}
validateClaimableBalanceQuery(tt, q, query, 1, []string{dest2}, asset2.String(), sponsor2)
}

func TestUpdateClaimableBalance(t *testing.T) {
Expand Down
23 changes: 0 additions & 23 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

3 changes: 1 addition & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ const (
// claimable balances for claimant queries.
// - 17: Add contract_id column to exp_asset_stats table which is derived by ingesting
// contract data ledger entries.
// - 18: Add asset column to claimable_balance_claimants table to support searching for claimants by asset (#4907)
CurrentVersion = 18
CurrentVersion = 17

// MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion:
// * Ledger ingestion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
BalanceID: cb.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cb.LastModifiedLedger,
Asset: cb.Asset,
}
if err := p.claimantsInsertBuilder.Add(ctx, claimant); err != nil {
return errors.Wrap(err, "error adding to claimantsInsertBuilder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteState) TestCreatesClaimableBal
BalanceID: id,
Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
Asset: cBalance.Asset,
}).Return(nil).Once()

s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()
Expand Down
11 changes: 3 additions & 8 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const assetStatsBatchSize = 500
// check them.
// There is a test that checks it, to fix it: update the actual `verifyState`
// method instead of just updating this value!
const stateVerifierExpectedIngestionVersion = 18
const stateVerifierExpectedIngestionVersion = 17

// verifyState is called as a go routine from pipeline post hook every 64
// ledgers. It checks if the state is correct. If another go routine is already
Expand Down Expand Up @@ -774,19 +774,14 @@ func addClaimableBalanceToStateVerifier(

for i, claimant := range claimants {
if claimant.MustV0().Destination.Address() != cBalancesClaimants[row.BalanceID][i].Destination ||
row.LastModifiedLedger != cBalancesClaimants[row.BalanceID][i].LastModifiedLedger ||
!row.Asset.Equals(cBalancesClaimants[row.BalanceID][i].Asset) {
row.LastModifiedLedger != cBalancesClaimants[row.BalanceID][i].LastModifiedLedger {
return fmt.Errorf(
"claimable_balance_claimants table for balance %s does not match. expectedDestination=%s"+
" actualDestination=%s, expectedLastModifiedLedger=%d actualLastModifiedLedger=%d,"+
" expectedAsset=%s actualAsset=%s",
"claimable_balance_claimants table for balance %s does not match. expectedDestination=%s actualDestination=%s, expectedLastModifiedLedger=%d actualLastModifiedLedger=%d",
row.BalanceID,
claimant.MustV0().Destination.Address(),
cBalancesClaimants[row.BalanceID][i].Destination,
row.LastModifiedLedger,
cBalancesClaimants[row.BalanceID][i].LastModifiedLedger,
row.Asset,
cBalancesClaimants[row.BalanceID][i].Asset,
)
}
}
Expand Down

0 comments on commit ae514ba

Please sign in to comment.