diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index dcb21fb20d..b45632cce2 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -9,6 +9,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strconv" "strings" "sync" "time" @@ -1120,11 +1121,10 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ // when it reaches the table size so eventually all orphaned rows are // deleted. func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) { - deleteQuery, args := constructDeleteLookupTableRowsQuery(table, ids) + deleteQuery := constructDeleteLookupTableRowsQuery(table, ids) result, err := q.ExecRaw( context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), deleteQuery, - args, ) if err != nil { return 0, fmt.Errorf("error running query: %w", err) @@ -1137,7 +1137,7 @@ func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64 return deletedCount, nil } -func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []interface{}) { +func constructDeleteLookupTableRowsQuery(table string, ids []int64) string { var conditions []string for _, referencedTable := range historyLookupTables[table] { conditions = append( @@ -1150,9 +1150,15 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []i ) } - innerQuery, args := sq.Select("id").From(table).Where(map[string]interface{}{ - "id": ids, - }).OrderBy("id asc").Suffix("FOR UPDATE").MustSql() + stringIds := make([]string, len(ids)) + for i, id := range ids { + stringIds[i] = strconv.FormatInt(id, 10) + } + innerQuery := fmt.Sprintf( + "SELECT id FROM %s WHERE id IN (%s) ORDER BY id asc FOR UPDATE", + table, + strings.Join(stringIds, ", "), + ) deleteQuery := fmt.Sprintf( "DELETE FROM %s WHERE id IN ("+ @@ -1160,7 +1166,7 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []i "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, innerQuery, ) + strings.Join(conditions, " AND ") + ")" - return deleteQuery, args + return deleteQuery } func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string { diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index b15ca25250..3778ca9346 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -70,19 +70,18 @@ func TestElderLedger(t *testing.T) { } func TestConstructDeleteLookupTableRowsQuery(t *testing.T) { - query, args := constructDeleteLookupTableRowsQuery( + query := constructDeleteLookupTableRowsQuery( "history_accounts", []int64{100, 20, 30}, ) assert.Equal(t, - "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (?,?,?) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ + "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) - assert.Equal(t, []interface{}([]interface{}{int64(100), int64(20), int64(30)}), args) } func TestConstructReapLookupTablesQuery(t *testing.T) { diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index b767007a7c..681e312ef5 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -316,7 +316,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { startTime := time.Now() ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize) if err != nil { - log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") + r.logger.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") return err } queryDuration := time.Since(startTime) @@ -326,7 +326,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { var rowsDeleted int64 rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset) if err != nil { - log.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") + r.logger.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") return err } deleteDuration := time.Since(deleteStartTime) @@ -341,7 +341,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). Observe(float64((queryDuration + deleteDuration).Seconds())) - log.WithField("table", table). + r.logger.WithField("table", table). WithField("offset", offset). WithField(table+"rows_deleted", rowsDeleted). WithField("query_duration", queryDuration.Seconds()).