Skip to content

Commit

Permalink
#5256: added horizon_ingest_error_restarts metric output
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed May 7, 2024
1 parent 9808f37 commit 2b44923
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
20 changes: 19 additions & 1 deletion services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingest
import (
"context"
"fmt"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -405,7 +406,19 @@ func (resumeState) GetState() State {
return Resume
}

func (r resumeState) run(s *system) (transition, error) {
func (r resumeState) run(s *system) (transitionResult transition, errorResult error) {
defer func() {
if errorResult != nil {
// capture any restarts that are being triggered by the state
switch reflect.TypeOf(transitionResult.node) {
case (reflect.TypeFor[startState]()):
r.incrementRestartMetric(s, "start")
case (reflect.TypeFor[resumeState]()):
r.incrementRestartMetric(s, "retry")
}
}
}()

if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
}
Expand Down Expand Up @@ -574,6 +587,11 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func (r resumeState) incrementRestartMetric(s *system, restartType string) {
s.Metrics().IngestionErrorRestartCounter.
With(prometheus.Labels{"type": restartType}).Inc()
}

func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for loaderName, value := range m {
s.Metrics().LoadersRunDurationSummary.
Expand Down
18 changes: 18 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ type Metrics struct {

// ArchiveRequestCounter counts how many http requests are sent to history server
HistoryArchiveStatsCounter *prometheus.CounterVec

// IngestionErrorRestartCounter counts the number of times the live/forward ingestion state machine
// initiates a restart or retry.
IngestionErrorRestartCounter *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -443,6 +447,19 @@ func (s *system) initMetrics() {
},
[]string{"source", "type"},
)

s.metrics.IngestionErrorRestartCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "error_restarts",
Help: "Counters of the number of times the live/forward ingestion state machine initiates a restart. " +
"when 'type' label is 'start' means some aspect of ledger order is out of sync between data from " +
"captive core meta pipe and horizon's db, restarting to see if condition resolves. " +
"when 'type' label is 'retry' means ingestion is getting an unexpected error while " +
"processing network ledger data which it can't resolve. If this metric is constantly increasing, " +
"it means ingestion is stuck in a retry loop on an error it can't resolve, effectively halted.",
},
[]string{"type"},
)
}

func (s *system) GetCurrentState() State {
Expand Down Expand Up @@ -471,6 +488,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LoadersStatsSummary)
registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount)
registry.MustRegister(s.metrics.HistoryArchiveStatsCounter)
registry.MustRegister(s.metrics.IngestionErrorRestartCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
}

Expand Down
71 changes: 70 additions & 1 deletion services/horizon/internal/ingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

Expand All @@ -28,6 +30,7 @@ type ResumeTestTestSuite struct {
runner *mockProcessorsRunner
stellarCoreClient *mockStellarCoreClient
system *system
registry *prometheus.Registry
}

func (s *ResumeTestTestSuite) SetupTest() {
Expand All @@ -37,6 +40,7 @@ func (s *ResumeTestTestSuite) SetupTest() {
s.historyAdapter = &mockHistoryArchiveAdapter{}
s.runner = &mockProcessorsRunner{}
s.stellarCoreClient = &mockStellarCoreClient{}
s.registry = prometheus.NewRegistry()
s.system = &system{
ctx: s.ctx,
historyQ: s.historyQ,
Expand All @@ -47,8 +51,8 @@ func (s *ResumeTestTestSuite) SetupTest() {
runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1),
}
s.system.initMetrics()

s.historyQ.On("Rollback").Return(nil).Once()
s.registry.Register(s.system.Metrics().IngestionErrorRestartCounter)

s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once()
s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once()
Expand All @@ -72,6 +76,7 @@ func (s *ResumeTestTestSuite) TearDownTest() {
s.historyAdapter.AssertExpectations(t)
s.ledgerBackend.AssertExpectations(t)
s.stellarCoreClient.AssertExpectations(t)
s.registry.Unregister(s.system.Metrics().IngestionErrorRestartCounter)
}

func (s *ResumeTestTestSuite) TestInvalidParam() {
Expand All @@ -86,6 +91,7 @@ func (s *ResumeTestTestSuite) TestInvalidParam() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "start", 1, s.Assert())

Check failure on line 94 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Assert undefined (type *ResumeTestTestSuite has no field or method Assert) (typecheck)
}

func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() {
Expand All @@ -103,6 +109,7 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "start", 1, s.Assert())

Check failure on line 112 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Assert undefined (type *ResumeTestTestSuite has no field or method Assert) (typecheck)
}

func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() {
Expand All @@ -118,6 +125,7 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail()
s.Assert().Error(err)
s.Assert().EqualError(err, "error getting ledger blocking: my error")
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
assertErrorRestartMetrics(s.registry, "start", 1, s.Assert())

Check failure on line 128 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Assert undefined (type *ResumeTestTestSuite has no field or method Assert) (typecheck)
}

func (s *ResumeTestTestSuite) TestBeginReturnsError() {
Expand All @@ -136,6 +144,7 @@ func (s *ResumeTestTestSuite) TestBeginReturnsError() {
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() {
Expand All @@ -152,6 +161,7 @@ func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() {
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() {
Expand All @@ -165,6 +175,7 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "start", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestGetIngestionVersionError() {
Expand All @@ -182,6 +193,7 @@ func (s *ResumeTestTestSuite) TestGetIngestionVersionError() {
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() {
Expand All @@ -195,6 +207,7 @@ func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() {
Expand All @@ -208,6 +221,7 @@ func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestGetLatestLedgerError() {
Expand All @@ -226,6 +240,7 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerError() {
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() {
Expand All @@ -240,6 +255,7 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() {
Expand All @@ -254,6 +270,7 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() {
transition{node: startState{}, sleepDuration: defaultSleep},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) mockSuccessfulIngestion() {
Expand Down Expand Up @@ -313,6 +330,7 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() {
},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestIngestAllMasterNode() {
Expand All @@ -327,6 +345,7 @@ func (s *ResumeTestTestSuite) TestIngestAllMasterNode() {
},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() {
Expand Down Expand Up @@ -357,6 +376,28 @@ func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() {
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestRunAllProcessorsError() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()

Check failure on line 383 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once()

Check failure on line 384 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()

Check failure on line 385 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)
s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil)

s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")).

Check failure on line 388 in services/horizon/internal/ingest/resume_state_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.runner.On undefined (type *mockProcessorsRunner has no field or method On) (typecheck)
Return(ledgerStats{}, errors.New("processor error")).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
s.Assert().ErrorContains(err, "processor error")
s.Assert().Equal(
transition{
node: resumeState{latestSuccessfullyProcessedLedger: 100},
sleepDuration: defaultSleep,
},
next,
)
assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert())
}

func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() {
Expand Down Expand Up @@ -398,6 +439,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() {
},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() {
Expand Down Expand Up @@ -448,4 +490,31 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() {
},
next,
)
assertErrorRestartMetrics(s.registry, "", 0, s.Assert())
}

func assertErrorRestartMetrics(reg *prometheus.Registry, assertRestartType string, assertRestartCount float64, assert *assert.Assertions) {

metrics, err := reg.Gather()
assert.NoError(err)

for _, metricFamily := range metrics {
if metricFamily.GetName() == "horizon_ingest_error_restarts" {
assert.Len(metricFamily.GetMetric(), 1)
assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount)
var restartType = ""
for _, label := range metricFamily.GetMetric()[0].GetLabel() {
if label.GetName() == "type" {
restartType = label.GetValue()
}
}

assert.Equal(restartType, assertRestartType)
return
}
}

if assertRestartCount > 0.0 {
assert.Fail("horizon_ingest_restarts metrics were not correct")
}
}

0 comments on commit 2b44923

Please sign in to comment.