Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 30, 2024
1 parent 3512e45 commit 9f3015c
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 77 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ type IngestionQ interface {
GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error)
TryStateVerificationLock(context.Context) (bool, error)
TryReaperLock(context.Context) (bool, error)
TryLookupTableReaperLock(ctx context.Context) (bool, error)
ElderLedger(context.Context, interface{}) error
}

Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/db2/history/verify_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const (
// reaperLockId is the objid for the advisory lock acquired during
// reaping. The value is arbitrary. The only requirement is that
// all ingesting nodes use the same value which is why it's hard coded here.
reaperLockId = 944670730
reaperLockId = 944670730
lookupTableReaperLockId = 329518896
)

// TryStateVerificationLock attempts to acquire the state verification lock
Expand All @@ -34,6 +35,10 @@ func (q *Q) TryReaperLock(ctx context.Context) (bool, error) {
return q.tryAdvisoryLock(ctx, reaperLockId)
}

func (q *Q) TryLookupTableReaperLock(ctx context.Context) (bool, error) {
return q.tryAdvisoryLock(ctx, lookupTableReaperLockId)
}

func (q *Q) tryAdvisoryLock(ctx context.Context, lockId int) (bool, error) {
if tx := q.GetTx(); tx == nil {
return false, errors.New("cannot be called outside of a transaction")
Expand Down
77 changes: 8 additions & 69 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ const (
// * Ledger ingestion,
// * State verifications,
// * Metrics updates.
// * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock)
MaxDBConnections = 5
// * Reaping of history (requires 2 connections, the extra connection is used for holding the advisory lock)
// * Reaping of lookup tables (requires 2 connections, the extra connection is used for holding the advisory lock)
MaxDBConnections = 7

stateVerificationErrorThreshold = 3

// 100 ledgers per flush has shown in stress tests
// to be best point on performance curve, default to that.
MaxLedgersPerFlush uint32 = 100

reapLookupTablesBatchSize = 1000
)

var log = logpkg.DefaultLogger.WithField("service", "ingest")
Expand Down Expand Up @@ -172,9 +171,6 @@ type Metrics struct {
// duration of rebuilding trade aggregation buckets.
LedgerIngestionTradeAggregationDuration prometheus.Summary

ReapDurationByLookupTable *prometheus.SummaryVec
RowsReapedByLookupTable *prometheus.SummaryVec

// StateVerifyDuration exposes timing metrics about the rate and
// duration of state verification.
StateVerifyDuration prometheus.Summary
Expand Down Expand Up @@ -256,7 +252,8 @@ type system struct {

maxLedgerPerFlush uint32

reaper *Reaper
reaper *Reaper
lookupTableReaper *lookupTableReaper

currentStateMutex sync.Mutex
currentState State
Expand Down Expand Up @@ -369,6 +366,7 @@ func NewSystem(config Config) (System, error) {
config.ReapConfig,
config.HistorySession,
),
lookupTableReaper: newLookupTableReaper(config.HistorySession),
}

system.initMetrics()
Expand Down Expand Up @@ -409,18 +407,6 @@ func (s *system) initMetrics() {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})

s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds",
Help: "reap lookup tables durations, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table", "type"})

s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped",
Help: "rows deleted during lookup tables reap, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table"})

s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds",
Help: "state verification durations, sliding window = 10m",
Expand Down Expand Up @@ -538,8 +524,6 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LocalLatestLedger)
registry.MustRegister(s.metrics.LedgerIngestionDuration)
registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration)
registry.MustRegister(s.metrics.ReapDurationByLookupTable)
registry.MustRegister(s.metrics.RowsReapedByLookupTable)
registry.MustRegister(s.metrics.StateVerifyDuration)
registry.MustRegister(s.metrics.StateInvalidGauge)
registry.MustRegister(s.metrics.LedgerStatsCounter)
Expand All @@ -552,6 +536,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.IngestionErrorCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
s.reaper.RegisterMetrics(registry)
s.lookupTableReaper.RegisterMetrics(registry)
}

// Run starts ingestion system. Ingestion system supports distributed ingestion
Expand Down Expand Up @@ -825,53 +810,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
s.wg.Add(1)
go func() {
defer s.wg.Done()

reapStart := time.Now()
var totalQueryDuration, totalDeleteDuration time.Duration
var totalDeleted int64
for _, table := range []string{
"history_accounts", "history_claimable_balances",
"history_assets", "history_liquidity_pools",
} {
startTime := time.Now()
ids, offset, err := s.historyQ.FindLookupTableRowsToReap(s.ctx, table, reapLookupTablesBatchSize)
if err != nil {
log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows")
return
}
queryDuration := time.Since(startTime)
totalQueryDuration += queryDuration

deleteStartTime := time.Now()
var rowsDeleted int64
rowsDeleted, err = s.historyQ.ReapLookupTable(s.ctx, table, ids, offset)
deleteDuration := time.Since(deleteStartTime)
totalDeleteDuration += deleteDuration

s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).
Observe(float64(rowsDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}).
Observe(float64(queryDuration))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}).
Observe(float64(deleteDuration))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}).
Observe(float64(queryDuration + deleteDuration))

log.WithField("table", table).
WithField("offset", offset).
WithField(table+"rows_deleted", rowsDeleted).
WithField("query_duration", queryDuration.Seconds()).
WithField("delete_duration", deleteDuration.Seconds())
}

s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).
Observe(float64(totalDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}).
Observe(float64(totalQueryDuration))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}).
Observe(float64(totalDeleteDuration))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}).
Observe(time.Since(reapStart).Seconds())
s.lookupTableReaper.deleteOrphanedRows(s.ctx)
}()
}

Expand Down
20 changes: 13 additions & 7 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ func (m *mockDBQ) TryReaperLock(ctx context.Context) (bool, error) {
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) TryLookupTableReaperLock(ctx context.Context) (bool, error) {
args := m.Called(ctx)
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) {
args := m.Called(ctx, start)
return args.Get(0).(uint32), args.Get(1).(bool), args.Error(2)
Expand Down Expand Up @@ -562,13 +567,14 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) {
args := m.Called(ctx, batchSize)
var r1 map[string]history.LookupTableReapResult
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]history.LookupTableReapResult)
}
return r1, args.Error(2)
func (m *mockDBQ) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) {
args := m.Called(ctx, table, batchSize)
return args.Get(0).([]int64), args.Get(1).(int64), args.Error(2)
}

func (m *mockDBQ) ReapLookupTable(ctx context.Context, table string, ids []int64, offset int64) (int64, error) {
args := m.Called(ctx, table, ids, offset)
return args.Get(0).(int64), args.Error(1)
}

func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {
Expand Down
111 changes: 111 additions & 0 deletions services/horizon/internal/ingest/reap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/stellar/go/toid"
)

const reapLookupTablesBatchSize = 1000

// Reaper represents the history reaping subsystem of horizon.
type Reaper struct {
historyQ history.IngestionQ
Expand Down Expand Up @@ -243,3 +245,112 @@ func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uin
r.deleteBatchDuration.Observe(elapsedSeconds)
return count, nil
}

type lookupTableReaper struct {
historyQ history.IngestionQ
reapLockQ history.IngestionQ
pending atomic.Bool
logger *logpkg.Entry

reapDurationByLookupTable *prometheus.SummaryVec
rowsReapedByLookupTable *prometheus.SummaryVec
}

func newLookupTableReaper(dbSession db.SessionInterface) *lookupTableReaper {
return &lookupTableReaper{
historyQ: &history.Q{dbSession.Clone()},
reapLockQ: &history.Q{dbSession.Clone()},
pending: atomic.Bool{},
logger: log.WithField("subservice", "lookuptable-reaper"),
reapDurationByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds",
Help: "reap lookup tables durations, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table", "type"}),
rowsReapedByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped",
Help: "rows deleted during lookup tables reap, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table"}),
}
}

func (r *lookupTableReaper) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(
r.reapDurationByLookupTable,
r.rowsReapedByLookupTable,
)
}

func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error {
// check if reap is already in progress on this horizon node
if !r.pending.CompareAndSwap(false, true) {
r.logger.Infof("existing reap already in progress, skipping request to start a new one")
return nil
}
defer r.pending.Store(false)

if err := r.reapLockQ.Begin(ctx); err != nil {
return errors.Wrap(err, "error while starting reaper lock transaction")
}
defer func() {
if err := r.reapLockQ.Rollback(); err != nil {
r.logger.WithField("error", err).Error("failed to release reaper lock")
}
}()
// check if reap is already in progress on another horizon node
if acquired, err := r.reapLockQ.TryLookupTableReaperLock(ctx); err != nil {
return errors.Wrap(err, "error while acquiring reaper database lock")
} else if !acquired {
r.logger.Info("reap already in progress on another node")
return nil
}

reapStart := time.Now()
var totalQueryDuration, totalDeleteDuration time.Duration
var totalDeleted int64
for _, table := range []string{
"history_accounts", "history_claimable_balances",
"history_assets", "history_liquidity_pools",
} {
startTime := time.Now()
ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize)
if err != nil {
log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows")
return err
}
queryDuration := time.Since(startTime)
totalQueryDuration += queryDuration

deleteStartTime := time.Now()
var rowsDeleted int64
rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset)

Check failure on line 327 in services/horizon/internal/ingest/reap.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

this value of err is never used (SA4006)
deleteDuration := time.Since(deleteStartTime)
totalDeleteDuration += deleteDuration

r.rowsReapedByLookupTable.With(prometheus.Labels{"table": table}).
Observe(float64(rowsDeleted))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}).
Observe(float64(queryDuration))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}).
Observe(float64(deleteDuration))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}).
Observe(float64(queryDuration + deleteDuration))

log.WithField("table", table).
WithField("offset", offset).
WithField(table+"rows_deleted", rowsDeleted).
WithField("query_duration", queryDuration.Seconds()).
WithField("delete_duration", deleteDuration.Seconds())
}

r.rowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).
Observe(float64(totalDeleted))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}).
Observe(float64(totalQueryDuration))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}).
Observe(float64(totalDeleteDuration))
r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}).
Observe(time.Since(reapStart).Seconds())
return nil
}

0 comments on commit 9f3015c

Please sign in to comment.