diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 9a19c47518..35f32f5434 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -304,7 +304,10 @@ type IngestionQ interface { TruncateIngestStateTables(context.Context) error DeleteRangeAll(ctx context.Context, start, end int64) (int64, error) DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error) - TryStateVerificationLock(ctx context.Context) (bool, error) + GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error) + TryStateVerificationLock(context.Context) (bool, error) + TryReaperLock(context.Context) (bool, error) + ElderLedger(context.Context, interface{}) error } // QAccounts defines account related queries. diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 8acee8786b..ade7ffa234 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -716,9 +716,7 @@ func (s *system) maybeReapHistory(lastIngestedLedger uint32) { s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.reaper.DeleteUnretainedHistory(s.ctx); err != nil { - log.WithError(err).Warn("could not reap history") - } + s.reaper.DeleteUnretainedHistory(s.ctx) }() } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 7dc285366b..4f0e220ebe 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -457,6 +457,21 @@ func (m *mockDBQ) TryStateVerificationLock(ctx context.Context) (bool, error) { return args.Get(0).(bool), args.Error(1) } +func (m *mockDBQ) TryReaperLock(ctx context.Context) (bool, error) { + args := m.Called(ctx) + return args.Get(0).(bool), args.Error(1) +} + +func (m *mockDBQ) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) { + args := m.Called(ctx, start) + return args.Get(0).(uint32), args.Get(1).(bool), args.Error(2) +} + +func (m *mockDBQ) ElderLedger(ctx context.Context, dest interface{}) error { + args := m.Called(ctx, dest) + return args.Error(0) +} + func (m *mockDBQ) GetTx() *sqlx.Tx { args := m.Called() if args.Get(0) == nil { diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 162c4411a2..546f1715cb 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -18,13 +18,15 @@ import ( // Reaper represents the history reaping subsystem of horizon. type Reaper struct { - historyQ *history.Q - reapLockQ *history.Q + historyQ history.IngestionQ + reapLockQ history.IngestionQ config ReapConfig logger *logpkg.Entry + totalDuration *prometheus.SummaryVec + totalDeleted *prometheus.SummaryVec deleteBatchDuration prometheus.Summary - rowsDeleted prometheus.Summary + rowsInBatchDeleted prometheus.Summary lock sync.Mutex } @@ -36,24 +38,36 @@ type ReapConfig struct { // NewReaper creates a new Reaper instance func NewReaper(config ReapConfig, dbSession db.SessionInterface) *Reaper { - r := &Reaper{ - historyQ: &history.Q{dbSession.Clone()}, - reapLockQ: &history.Q{dbSession.Clone()}, + return newReaper(config, &history.Q{dbSession.Clone()}, &history.Q{dbSession.Clone()}) +} + +func newReaper(config ReapConfig, historyQ, reapLockQ history.IngestionQ) *Reaper { + return &Reaper{ + historyQ: historyQ, + reapLockQ: reapLockQ, config: config, deleteBatchDuration: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "reap", Name: "delete_batch_duration", + Namespace: "horizon", Subsystem: "reap", Name: "batch_duration", Help: "reap batch duration in seconds, sliding window = 10m", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), - rowsDeleted: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "reap", Name: "rows_deleted", + rowsInBatchDeleted: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "reap", Name: "batch_rows_deleted", Help: "rows deleted during reap batch , sliding window = 10m", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), + totalDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "reap", Name: "duration", + Help: "reap invocation duration in seconds, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"complete"}), + totalDeleted: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "reap", Name: "rows_deleted", + Help: "rows deleted during reap invocation, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"complete"}), logger: log.WithField("subservice", "reaper"), } - - return r } // DeleteUnretainedHistory removes all data associated with unretained ledgers. @@ -70,7 +84,7 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error { defer r.lock.Unlock() if err := r.reapLockQ.Begin(ctx); err != nil { - return errors.Wrap(err, "error while acquiring reaper lock transaction") + return errors.Wrap(err, "error while starting reaper lock transaction") } defer func() { if err := r.reapLockQ.Rollback(); err != nil { @@ -104,20 +118,40 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error { return nil } - if err = r.clearBefore(ctx, oldest, targetElder); err != nil { - return err - } + startTime := time.Now() + var totalDeleted int64 + var complete bool + totalDeleted, err = r.clearBefore(ctx, oldest, targetElder) + elapsedSeconds := time.Since(startTime).Seconds() + logger := r.logger. + WithField("duration", elapsedSeconds). + WithField("rows_deleted", totalDeleted) - r.logger. - WithField("new_elder", targetElder). - Info("reaper succeeded") + if err != nil { + logger.WithError(err).Warn("reaper failed") + } else { + complete = true + logger. + WithField("new_elder", targetElder). + Info("reaper succeeded") + } - return nil + labels := prometheus.Labels{ + "complete": strconv.FormatBool(complete), + } + r.totalDeleted.With(labels).Observe(float64(totalDeleted)) + r.totalDuration.With(labels).Observe(elapsedSeconds) + return err } // RegisterMetrics registers the prometheus metrics func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) { - registry.MustRegister(s.deleteBatchDuration, s.rowsDeleted) + registry.MustRegister( + s.deleteBatchDuration, + s.rowsInBatchDeleted, + s.totalDuration, + s.totalDeleted, + ) } // Work backwards in 50k (by default, otherwise configurable via the CLI) ledger @@ -131,10 +165,11 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) { // hour, and slowing it down enough to leave some CPU for other processes. var sleep = 1 * time.Second -func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) error { +func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) (int64, error) { batchSize := r.config.ReapBatchSize + var sum int64 if batchSize <= 0 { - return fmt.Errorf("invalid batch size for reaping (%d)", batchSize) + return sum, fmt.Errorf("invalid batch size for reaping (%d)", batchSize) } r.logger.WithField("start_ledger", startSeq). @@ -150,24 +185,25 @@ func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) error count, err := r.deleteBatch(ctx, batchStartSeq, batchEndSeq) if err != nil { - return err + return sum, err } + sum += count if count == 0 { next, ok, err := r.historyQ.GetNextLedgerSequence(ctx, batchStartSeq) if err != nil { - return errors.Wrapf(err, "could not find next ledger sequence after %d", batchStartSeq) + return sum, errors.Wrapf(err, "could not find next ledger sequence after %d", batchStartSeq) } if !ok { break } batchStartSeq = next } else { - batchStartSeq += batchSize + batchStartSeq += batchSize + 1 } time.Sleep(sleep) } - return nil + return sum, nil } func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uint32) (int64, error) { @@ -200,7 +236,7 @@ func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uin WithField("duration", elapsedSeconds). Info("successfully deleted batch") - r.rowsDeleted.Observe(float64(count)) + r.rowsInBatchDeleted.Observe(float64(count)) r.deleteBatchDuration.Observe(elapsedSeconds) return count, nil } diff --git a/services/horizon/internal/ingest/reap_test.go b/services/horizon/internal/ingest/reap_test.go index dcb380a205..cad1397f5b 100644 --- a/services/horizon/internal/ingest/reap_test.go +++ b/services/horizon/internal/ingest/reap_test.go @@ -1,9 +1,16 @@ package ingest import ( + "context" + "fmt" "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/toid" ) func TestDeleteUnretainedHistory(t *testing.T) { @@ -19,7 +26,11 @@ func TestDeleteUnretainedHistory(t *testing.T) { }, db) // Disable sleeps for this. + prevSleep := sleep sleep = 0 + t.Cleanup(func() { + sleep = prevSleep + }) var ( prev int @@ -51,3 +62,280 @@ func TestDeleteUnretainedHistory(t *testing.T) { tt.Assert.Equal(1, cur) } } + +type ReaperTestSuite struct { + suite.Suite + ctx context.Context + historyQ *mockDBQ + reapLockQ *mockDBQ + reaper *Reaper + prevSleep time.Duration +} + +func TestReaper(t *testing.T) { + suite.Run(t, new(ReaperTestSuite)) +} + +func (t *ReaperTestSuite) SetupTest() { + t.ctx = context.Background() + t.historyQ = &mockDBQ{} + t.reapLockQ = &mockDBQ{} + t.reaper = newReaper(ReapConfig{ + RetentionCount: 30, + ReapBatchSize: 10, + }, t.historyQ, t.reapLockQ) + t.prevSleep = sleep + sleep = 0 +} + +func (t *ReaperTestSuite) TearDownTest() { + t.historyQ.AssertExpectations(t.T()) + t.reapLockQ.AssertExpectations(t.T()) + sleep = t.prevSleep +} + +func (t *ReaperTestSuite) TestDisabled() { + t.reaper.config.RetentionCount = 0 + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func assertMocksInOrder(calls ...*mock.Call) { + for i := len(calls) - 1; i > 0; i-- { + calls[i].NotBefore(calls[i-1]) + } +} + +func (t *ReaperTestSuite) TestInProgressOnOtherNode() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(false, nil).Once(), + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestInProgress() { + t.reapLockQ.On("Begin", t.ctx).Return(fmt.Errorf("transient error")).Once().Run( + func(args mock.Arguments) { + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) + }, + ) + t.Assert().EqualError( + t.reaper.DeleteUnretainedHistory(t.ctx), + "error while starting reaper lock transaction: transient error", + ) +} + +func (t *ReaperTestSuite) TestLatestLedgerTooSmall() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(30), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 1 + }), + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestNotEnoughHistory() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 85 + }), + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestSucceeds() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 55 + }), + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(55, 0, 0).ToInt64(), toid.New(61, 0, 0).ToInt64(), + ).Return(int64(400), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestFails() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 2 + }), + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(2, 0, 0).ToInt64(), toid.New(13, 0, 0).ToInt64(), + ).Return(int64(0), fmt.Errorf("transient error")).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().EqualError(t.reaper.DeleteUnretainedHistory(t.ctx), "Error in DeleteRangeAll: transient error") +} + +func (t *ReaperTestSuite) TestPartiallySucceeds() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 30 + }), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(30, 0, 0).ToInt64(), toid.New(41, 0, 0).ToInt64(), + ).Return(int64(200), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(41, 0, 0).ToInt64(), toid.New(52, 0, 0).ToInt64(), + ).Return(int64(0), fmt.Errorf("transient error")).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().EqualError(t.reaper.DeleteUnretainedHistory(t.ctx), "Error in DeleteRangeAll: transient error") +} + +func (t *ReaperTestSuite) TestSucceedsOnMultipleBatches() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 35 + }), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(35, 0, 0).ToInt64(), toid.New(46, 0, 0).ToInt64(), + ).Return(int64(200), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(46, 0, 0).ToInt64(), toid.New(57, 0, 0).ToInt64(), + ).Return(int64(150), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(57, 0, 0).ToInt64(), toid.New(61, 0, 0).ToInt64(), + ).Return(int64(80), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestSkipGap() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 2 + }), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(2, 0, 0).ToInt64(), toid.New(13, 0, 0).ToInt64(), + ).Return(int64(200), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(13, 0, 0).ToInt64(), toid.New(24, 0, 0).ToInt64(), + ).Return(int64(0), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + t.historyQ.On("GetNextLedgerSequence", t.ctx, uint32(13)).Return(uint32(55), true, nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(55, 0, 0).ToInt64(), toid.New(61, 0, 0).ToInt64(), + ).Return(int64(20), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +} + +func (t *ReaperTestSuite) TestSkipGapTerminatesEarly() { + assertMocksInOrder( + t.reapLockQ.On("Begin", t.ctx).Return(nil).Once(), + t.reapLockQ.On("TryReaperLock", t.ctx).Return(true, nil).Once(), + t.historyQ.On("GetLatestHistoryLedger", t.ctx).Return(uint32(90), nil).Once(), + t.historyQ.On("ElderLedger", t.ctx, mock.AnythingOfType("*uint32")). + Return(nil).Once().Run( + func(args mock.Arguments) { + ledger := args.Get(1).(*uint32) + *ledger = 2 + }), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(2, 0, 0).ToInt64(), toid.New(13, 0, 0).ToInt64(), + ).Return(int64(200), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + + t.historyQ.On("Begin", t.ctx).Return(nil).Once(), + t.historyQ.On("DeleteRangeAll", t.ctx, + toid.New(13, 0, 0).ToInt64(), toid.New(24, 0, 0).ToInt64(), + ).Return(int64(0), nil).Once(), + t.historyQ.On("Commit").Return(nil).Once(), + t.historyQ.On("Rollback").Return(nil).Once(), + t.historyQ.On("GetNextLedgerSequence", t.ctx, uint32(13)).Return(uint32(65), true, nil).Once(), + + t.reapLockQ.On("Rollback").Return(nil).Once(), + ) + t.Assert().NoError(t.reaper.DeleteUnretainedHistory(t.ctx)) +}