Skip to content

Commit

Permalink
fix delete query
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 13, 2024
1 parent 9cca880 commit 6972b04
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
20 changes: 13 additions & 7 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -1120,11 +1121,10 @@ var historyLookupTables = map[string][]tableObjectFieldPair{
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) {
deleteQuery, args := constructDeleteLookupTableRowsQuery(table, ids)
deleteQuery := constructDeleteLookupTableRowsQuery(table, ids)
result, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
deleteQuery,
args,
)
if err != nil {
return 0, fmt.Errorf("error running query: %w", err)
Expand All @@ -1137,7 +1137,7 @@ func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64
return deletedCount, nil
}

func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []interface{}) {
func constructDeleteLookupTableRowsQuery(table string, ids []int64) string {
var conditions []string
for _, referencedTable := range historyLookupTables[table] {
conditions = append(
Expand All @@ -1150,17 +1150,23 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []i
)
}

innerQuery, args := sq.Select("id").From(table).Where(map[string]interface{}{
"id": ids,
}).OrderBy("id asc").Suffix("FOR UPDATE").MustSql()
stringIds := make([]string, len(ids))
for i, id := range ids {
stringIds[i] = strconv.FormatInt(id, 10)
}
innerQuery := fmt.Sprintf(
"SELECT id FROM %s WHERE id IN (%s) ORDER BY id asc FOR UPDATE",
table,
strings.Join(stringIds, ", "),
)

deleteQuery := fmt.Sprintf(
"DELETE FROM %s WHERE id IN ("+
"WITH ha_batch AS (%s) "+
"SELECT e1.id as id FROM ha_batch e1 WHERE ",
table, innerQuery,
) + strings.Join(conditions, " AND ") + ")"
return deleteQuery, args
return deleteQuery
}

func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string {
Expand Down
5 changes: 2 additions & 3 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,18 @@ func TestElderLedger(t *testing.T) {
}

func TestConstructDeleteLookupTableRowsQuery(t *testing.T) {
query, args := constructDeleteLookupTableRowsQuery(
query := constructDeleteLookupTableRowsQuery(
"history_accounts",
[]int64{100, 20, 30},
)

assert.Equal(t,
"DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (?,?,?) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+
"DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+
"WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+
"AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+
"AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+
"AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+
"AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query)
assert.Equal(t, []interface{}([]interface{}{int64(100), int64(20), int64(30)}), args)
}

func TestConstructReapLookupTablesQuery(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/reap.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error {
startTime := time.Now()
ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize)
if err != nil {
log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows")
r.logger.WithField("table", table).WithError(err).Warn("Error finding orphaned rows")
return err
}
queryDuration := time.Since(startTime)
Expand All @@ -326,7 +326,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error {
var rowsDeleted int64
rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset)
if err != nil {
log.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows")
r.logger.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows")
return err
}
deleteDuration := time.Since(deleteStartTime)
Expand All @@ -341,7 +341,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error {
r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}).
Observe(float64((queryDuration + deleteDuration).Seconds()))

log.WithField("table", table).
r.logger.WithField("table", table).
WithField("offset", offset).
WithField(table+"rows_deleted", rowsDeleted).
WithField("query_duration", queryDuration.Seconds()).
Expand Down

0 comments on commit 6972b04

Please sign in to comment.