Skip to content

Commit

Permalink
reap lookup tables without blocking ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 30, 2024
1 parent ecd28b6 commit 3512e45
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 280 deletions.
19 changes: 18 additions & 1 deletion services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ func (a *AccountLoader) GetNow(address string) (int64, error) {
}
}

func (a *AccountLoader) lookupKeysForKeyShare(ctx context.Context, q *Q, addresses []string) error {
var accounts []Account

err := q.Select(ctx, &accounts, selectAccount.Where(map[string]interface{}{
"ha.address": addresses, // ha.address IN (...)
}).OrderBy("ha.id asc").Suffix("FOR KEY SHARE"))
if err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}

return nil
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := ordered.Min(len(addresses), i+loaderLookupBatchSize)
Expand Down Expand Up @@ -123,7 +140,7 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
addresses = append(addresses, address)
}

if err := a.lookupKeys(ctx, q, addresses); err != nil {
if err := a.lookupKeysForKeyShare(ctx, q, addresses); err != nil {
return err
}
a.stats.Total += len(addresses)
Expand Down
32 changes: 31 additions & 1 deletion services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,36 @@ func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
}
}

func (a *AssetLoader) lookupKeysForKeyShare(ctx context.Context, q *Q, keys []AssetKey) error {
args := make([]interface{}, 0, 3*len(keys))
placeHolders := make([]string, 0, len(keys))
for _, key := range keys {
args = append(args, key.Code, key.Type, key.Issuer)
placeHolders = append(placeHolders, "(?, ?, ?)")
}

var rows []Asset
rawSQL := fmt.Sprintf(
"SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s) "+
"ORDER BY id asc FOR KEY SHARE",
strings.Join(placeHolders, ", "),
)
err := q.SelectRaw(ctx, &rows, rawSQL, args...)
if err != nil {
return errors.Wrap(err, "could not select assets")
}

for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}

return nil
}

func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error {
var rows []Asset
for i := 0; i < len(keys); i += loaderLookupBatchSize {
Expand Down Expand Up @@ -146,7 +176,7 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
keys = append(keys, key)
}

if err := a.lookupKeys(ctx, q, keys); err != nil {
if err := a.lookupKeysForKeyShare(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
}
}

func (a *ClaimableBalanceLoader) lookupKeysForKeyShare(ctx context.Context, q *Q, ids []string) error {
var cbs []HistoryClaimableBalance

err := q.Select(ctx, &cbs, selectHistoryClaimableBalance.Where(map[string]interface{}{
"hcb.claimable_balance_id": ids, // hcb.claimable_balance_id IN (...)
}).OrderBy("hcb.id asc").Suffix("FOR KEY SHARE"))
if err != nil {
return errors.Wrap(err, "could not select claimable balances")
}

for _, cb := range cbs {
a.ids[cb.BalanceID] = cb.InternalID
}

return nil
}

func (a *ClaimableBalanceLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error {
for i := 0; i < len(ids); i += loaderLookupBatchSize {
end := ordered.Min(len(ids), i+loaderLookupBatchSize)
Expand Down Expand Up @@ -108,7 +125,7 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
ids = append(ids, id)
}

if err := a.lookupKeys(ctx, q, ids); err != nil {
if err := a.lookupKeysForKeyShare(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)
Expand Down
44 changes: 14 additions & 30 deletions services/horizon/internal/db2/history/key_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"

Expand Down Expand Up @@ -207,41 +206,26 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) (
return value, nil
}

type KeyValuePair struct {
Key string `db:"key"`
Value string `db:"value"`
}

func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) {
keys := make([]string, 0, len(historyLookupTables))
for table := range historyLookupTables {
keys = append(keys, table+lookupTableReapOffsetSuffix)
}
offsets := map[string]int64{}
var pairs []KeyValuePair
query := sq.Select("key", "value").
func (q *Q) getLookupTableReapOffset(ctx context.Context, table string) (int64, error) {
query := sq.Select("value").
From("key_value_store").
Where(map[string]interface{}{
"key": keys,
"key": table + lookupTableReapOffsetSuffix,
})
err := q.Select(ctx, &pairs, query)
var text string
err := q.Get(ctx, &text, query)
if err != nil {
return nil, err
}
for _, pair := range pairs {
table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix)
if _, ok := historyLookupTables[table]; !ok {
return nil, fmt.Errorf("invalid key: %s", pair.Key)
}

var offset int64
offset, err = strconv.ParseInt(pair.Value, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset: %s", pair.Value)
if errors.Cause(err) == sql.ErrNoRows {
return 0, nil
}
offsets[table] = offset
return 0, err
}
var offset int64
offset, err = strconv.ParseInt(text, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid offset: %s", text)
}
return offsets, err
return offset, nil
}

func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error {
Expand Down
19 changes: 18 additions & 1 deletion services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) {
}
}

func (a *LiquidityPoolLoader) lookupKeysForKeyShare(ctx context.Context, q *Q, ids []string) error {
var lps []HistoryLiquidityPool

err := q.Select(ctx, &lps, selectHistoryLiquidityPool.Where(map[string]interface{}{
"hlp.liquidity_pool_id": ids, // hlp.liquidity_pool_id IN (...)
}).OrderBy("hlp.id asc").Suffix("FOR KEY SHARE"))
if err != nil {
return errors.Wrap(err, "could not select liquidity pools")
}

for _, lp := range lps {
a.ids[lp.PoolID] = lp.InternalID
}

return nil
}

func (a *LiquidityPoolLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error {
for i := 0; i < len(ids); i += loaderLookupBatchSize {
end := ordered.Min(len(ids), i+loaderLookupBatchSize)
Expand Down Expand Up @@ -108,7 +125,7 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
ids = append(ids, id)
}

if err := a.lookupKeys(ctx, q, ids); err != nil {
if err := a.lookupKeysForKeyShare(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)
Expand Down
141 changes: 88 additions & 53 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error)
ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error)
FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -977,63 +978,55 @@ type LookupTableReapResult struct {
Duration time.Duration
}

// ReapLookupTables removes rows from lookup tables like history_claimable_balances
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) (
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
func (q *Q) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) {
offset, err := q.getLookupTableReapOffset(ctx, table)
if err != nil {
return nil, 0, fmt.Errorf("could not obtain offsets: %w", err)
}

offsets, err := q.getLookupTableReapOffsets(ctx)
// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offset, batchSize))
if err != nil {
return nil, fmt.Errorf("could not obtain offsets: %w", err)
if q.NoRows(err) {
newOffset = 0
} else {
return nil, 0, err
}
}

results := map[string]LookupTableReapResult{}
for table, historyTables := range historyLookupTables {
startTime := time.Now()
query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
var ids []int64
err = q.SelectRaw(ctx, &ids, constructFindReapLookupTablesQuery(table, batchSize, offset))
if err != nil {
return nil, 0, fmt.Errorf("could not query orphaned rows: %w", err)
}

// Find new offset before removing the rows
var newOffset int64
err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
} else {
return nil, err
}
}
return ids, newOffset, nil
}

res, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
)
if err != nil {
return nil, errors.Wrapf(err, "error running query: %s", query)
}
func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) {
if err := q.Begin(ctx); err != nil {
return 0, fmt.Errorf("could not start transaction: %w", err)
}
defer q.Rollback()

if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return nil, fmt.Errorf("error updating offset: %w", err)
}
if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return 0, fmt.Errorf("error updating offset: %w", err)
}

rows, err := res.RowsAffected()
var rowsDeleted int64
if len(ids) > 0 {
var err error
rowsDeleted, err = q.deleteLookupTableRows(ctx, table, ids)
if err != nil {
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
return 0, fmt.Errorf("could not delete orphaned rows: %w", err)
}
}

results[table] = LookupTableReapResult{
Offset: newOffset,
RowsDeleted: rows,
Duration: time.Since(startTime),
}
if err := q.Commit(); err != nil {
return 0, fmt.Errorf("could not commit transaction: %w", err)
}
return results, nil
return rowsDeleted, nil
}

var historyLookupTables = map[string][]tableObjectFieldPair{
Expand Down Expand Up @@ -1125,29 +1118,71 @@ var historyLookupTables = map[string][]tableObjectFieldPair{
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string {
func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) {
deleteQuery, args := constructDeleteLookupTableRowsQuery(table, ids)
result, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
deleteQuery,
args,
)
if err != nil {
return 0, fmt.Errorf("error running query: %w", err)
}
var deletedCount int64
deletedCount, err = result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("error getting deleted count: %w", err)
}
return deletedCount, nil
}

func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []interface{}) {
var conditions []string
for _, referencedTable := range historyLookupTables[table] {
conditions = append(
conditions,
fmt.Sprintf(
"NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)",
referencedTable.name,
referencedTable.name, referencedTable.objectField,
),
)
}

innerQuery, args := sq.Select("id").From(table).Where(map[string]interface{}{
"id": ids,
}).OrderBy("id asc").Suffix("FOR UPDATE").MustSql()

deleteQuery := fmt.Sprintf(
"DELETE FROM %s WHERE id IN ("+
"WITH ha_batch AS (%s) "+
"SELECT e1.id as id FROM ha_batch e1 WHERE ",
table, innerQuery,
) + strings.Join(conditions, " AND ") + ")"
return deleteQuery, args
}

func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string {
var conditions []string

for _, historyTable := range historyTables {
for _, referencedTable := range historyLookupTables[table] {
conditions = append(
conditions,
fmt.Sprintf(
"NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)",
historyTable.name,
historyTable.name, historyTable.objectField,
referencedTable.name,
referencedTable.name, referencedTable.objectField,
),
)
}

return fmt.Sprintf(
"DELETE FROM %s WHERE id IN ("+
"WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+
"WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+
"SELECT e1.id as id FROM ha_batch e1 WHERE ",
table,
table,
offset,
batchSize,
) + strings.Join(conditions, " AND ") + ")"
) + strings.Join(conditions, " AND ")
}

// DeleteRangeAll deletes a range of rows from all history tables between
Expand Down
Loading

0 comments on commit 3512e45

Please sign in to comment.