Skip to content

Commit

Permalink
Add metrics to reaper
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jun 3, 2024
1 parent 083b7bb commit 468fc6c
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 35 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func (a *App) init() error {
a.config.HistoryRetentionReapCount,
a.HorizonSession(),
a.ledgerState)
a.reaper.RegisterMetrics(a.prometheusRegistry)

// go metrics
initGoMetrics(a)
Expand Down
12 changes: 7 additions & 5 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ type IngestionQ interface {
GetOfferCompactionSequence(context.Context) (uint32, error)
GetLiquidityPoolCompactionSequence(context.Context) (uint32, error)
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
DeleteRangeAll(ctx context.Context, start, end int64) (int64, error)
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
TryStateVerificationLock(ctx context.Context) (bool, error)
}
Expand Down Expand Up @@ -1154,7 +1154,8 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie

// DeleteRangeAll deletes a range of rows from all history tables between
// `start` and `end` (exclusive).
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) (int64, error) {
var total int64
for table, column := range map[string]string{
"history_effects": "history_operation_id",
"history_ledgers": "id",
Expand All @@ -1169,12 +1170,13 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
"history_transaction_liquidity_pools": "history_transaction_id",
"history_transactions": "id",
} {
err := q.DeleteRange(ctx, start, end, table, column)
count, err := q.DeleteRange(ctx, start, end, table, column)
if err != nil {
return errors.Wrapf(err, "Error clearing %s", table)
return 0, errors.Wrapf(err, "Error clearing %s", table)
}
total += count
}
return nil
return total, nil
}

// upsertRows builds and executes an upsert query that allows very fast upserts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u
return errors.Wrap(err, "Invalid range")
}

err = s.historyQ.DeleteRangeAll(s.ctx, start, end)
_, err = s.historyQ.DeleteRangeAll(s.ctx, start, end)
if err != nil {
return errors.Wrap(err, "error in DeleteRangeAll")
}
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,8 @@ func TestFillGaps(t *testing.T) {
var oldestLedger, latestLedger int64
tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger))
tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger))
tt.NoError(historyQ.DeleteRangeAll(context.Background(), oldestLedger, latestLedger))
_, err = historyQ.DeleteRangeAll(context.Background(), oldestLedger, latestLedger)
tt.NoError(err)

horizonConfig.CaptiveCoreConfigPath = filepath.Join(
filepath.Dir(horizonConfig.CaptiveCoreConfigPath),
Expand Down
21 changes: 18 additions & 3 deletions services/horizon/internal/reap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package reap
import (
"context"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/support/db"
Expand All @@ -18,6 +20,9 @@ type System struct {
RetentionCount uint
RetentionBatch uint

deleteBatchDuration prometheus.Summary
rowsDeleted prometheus.Summary

ledgerState *ledger.State
ctx context.Context
cancel context.CancelFunc
Expand All @@ -32,9 +37,19 @@ func New(retention, retentionBatchSize uint, dbSession db.SessionInterface, ledg
HistoryQ: &history.Q{dbSession.Clone()},
RetentionCount: retention,
RetentionBatch: retentionBatchSize,
ledgerState: ledgerState,
ctx: ctx,
cancel: cancel,
deleteBatchDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "delete_batch_duration",
Help: "reap batch duration in seconds, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
rowsDeleted: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "rows_deleted",
Help: "rows deleted during reap batch , sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
ledgerState: ledgerState,
ctx: ctx,
cancel: cancel,
}

return r
Expand Down
49 changes: 33 additions & 16 deletions services/horizon/internal/reap/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"

herrors "github.com/stellar/go/services/horizon/internal/errors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
Expand Down Expand Up @@ -52,6 +54,11 @@ func (r *System) Run() {
}
}

// RegisterMetrics registers the prometheus metrics
func (s *System) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.deleteBatchDuration, s.rowsDeleted)
}

func (r *System) Shutdown() {
r.cancel()
}
Expand Down Expand Up @@ -97,29 +104,39 @@ func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error
WithField("end_ledger", batchEndSeq).
Info("reaper: clearing")

batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
if err := r.deleteBatch(ctx, batchStartSeq, batchEndSeq); err != nil {
return err
}
time.Sleep(sleep)
}

err = r.HistoryQ.Begin(ctx)
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()
return nil
}

err = r.HistoryQ.DeleteRangeAll(ctx, batchStart, batchEnd)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}
func (r *System) deleteBatch(ctx context.Context, batchStartSeq int32, batchEndSeq int32) error {
batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
return err
}

err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
}
startTime := time.Now()
err = r.HistoryQ.Begin(ctx)
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()

time.Sleep(sleep)
count, err := r.HistoryQ.DeleteRangeAll(ctx, batchStart, batchEnd)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}

err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
}

r.rowsDeleted.Observe(float64(count))
r.deleteBatchDuration.Observe(time.Since(startTime).Seconds())
return nil
}
2 changes: 1 addition & 1 deletion support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type SessionInterface interface {
start, end int64,
table string,
idCol string,
) error
) (int64, error)
}

// Table helps to build sql queries against a given table. It logically
Expand Down
6 changes: 3 additions & 3 deletions support/db/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (s *SessionWithMetrics) DeleteRange(
start, end int64,
table string,
idCol string,
) (err error) {
) (count int64, err error) {
queryType := "delete"
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
s.queryDurationSummary.With(prometheus.Labels{
Expand All @@ -538,6 +538,6 @@ func (s *SessionWithMetrics) DeleteRange(
}).Inc()
}()

err = s.SessionInterface.DeleteRange(ctx, start, end, table, idCol)
return err
count, err = s.SessionInterface.DeleteRange(ctx, start, end, table, idCol)
return count, err
}
5 changes: 3 additions & 2 deletions support/db/mock_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (m *MockSession) DeleteRange(
start, end int64,
table string,
idCol string,
) (err error) {
return m.Called(ctx, start, end, table, idCol).Error(0)
) (int64, error) {
args := m.Called(ctx, start, end, table, idCol)
return args.Get(0).(int64), args.Error(1)
}
9 changes: 6 additions & 3 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,17 @@ func (s *Session) DeleteRange(
start, end int64,
table string,
idCol string,
) error {
) (int64, error) {
del := sq.Delete(table).Where(
fmt.Sprintf("%s >= ? AND %s < ?", idCol, idCol),
start,
end,
)
_, err := s.Exec(ctx, del)
return err
result, err := s.Exec(ctx, del)
if err != nil {
return 0, err
}
return result.RowsAffected()
}

// Get runs `query`, setting the first result found on `dest`, if
Expand Down

0 comments on commit 468fc6c

Please sign in to comment.