diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index c8f28974f5..dc5365809b 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -112,9 +112,11 @@ type CaptiveStellarCore struct { lastLedger *uint32 // end of current segment if offline, nil if online previousLedgerHash *string - config CaptiveCoreConfig - stellarCoreClient *stellarcore.Client - captiveCoreVersion string // Updates when captive-core restarts + config CaptiveCoreConfig + captiveCoreStartDuration prometheus.Summary + captiveCoreNewDBCounter prometheus.Counter + stellarCoreClient *stellarcore.Client + captiveCoreVersion string // Updates when captive-core restarts } // CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance @@ -230,7 +232,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { c.stellarCoreRunnerFactory = func() stellarCoreRunnerInterface { c.setCoreVersion() - return newStellarCoreRunner(config) + return newStellarCoreRunner(config, c.captiveCoreNewDBCounter) } if config.Toml != nil && config.Toml.HTTPPort != 0 { @@ -315,7 +317,27 @@ func (c *CaptiveStellarCore) registerMetrics(registry *prometheus.Registry, name return float64(latest) }, ) - registry.MustRegister(coreSynced, supportedProtocolVersion, latestLedger) + c.captiveCoreStartDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: "ingest", + Name: "captive_stellar_core_start_duration_seconds", + Help: "duration of start up time when running captive core on an unbounded range, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }) + c.captiveCoreNewDBCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "ingest", + Name: "captive_stellar_core_new_db", + Help: "counter for the number of times we start up captive core with a new buckets db, sliding window = 10m", + }) + + registry.MustRegister( + coreSynced, + supportedProtocolVersion, + latestLedger, + c.captiveCoreStartDuration, + c.captiveCoreNewDBCounter, + ) } func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) { @@ -521,12 +543,14 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang // Please note that using a BoundedRange, currently, requires a full-trust on // history archive. This issue is being fixed in Stellar-Core. func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { + startTime := time.Now() if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil { return errors.Wrap(err, "error starting prepare range") } else if alreadyPrepared { return nil } + var reportedStartTime bool // the prepared range might be below ledgerRange.from so we // need to seek ahead until we reach ledgerRange.from for seq := c.prepared.from; seq <= ledgerRange.from; seq++ { @@ -534,6 +558,12 @@ func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range if err != nil { return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from) } + if !reportedStartTime { + reportedStartTime = true + if c.captiveCoreStartDuration != nil && !ledgerRange.bounded { + c.captiveCoreStartDuration.Observe(time.Since(startTime).Seconds()) + } + } } return nil diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index f8161aec25..a13ab36ce3 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -10,6 +10,8 @@ import ( "sync" "testing" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -234,6 +236,7 @@ func TestCaptivePrepareRange(t *testing.T) { cancelCalled = true }), } + captiveBackend.registerMetrics(prometheus.NewRegistry(), "test") err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.NoError(t, err) @@ -243,6 +246,8 @@ func TestCaptivePrepareRange(t *testing.T) { assert.True(t, cancelCalled) mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) + + assert.Equal(t, uint64(0), getStartDurationMetric(captiveBackend).GetSampleCount()) } func TestCaptivePrepareRangeCrash(t *testing.T) { @@ -575,10 +580,13 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { cancelCalled = true }), } + captiveBackend.registerMetrics(prometheus.NewRegistry(), "test") err := captiveBackend.PrepareRange(ctx, UnboundedRange(128)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error running stellar-core: transient error") + assert.Equal(t, uint64(0), getStartDurationMetric(captiveBackend).GetSampleCount()) + // make sure we can Close without errors assert.NoError(t, captiveBackend.Close()) assert.True(t, cancelCalled) @@ -587,6 +595,15 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner.AssertExpectations(t) } +func getStartDurationMetric(captiveCore CaptiveStellarCore) *dto.Summary { + value := &dto.Metric{} + err := captiveCore.captiveCoreStartDuration.Write(value) + if err != nil { + panic(err) + } + return value.GetSummary() +} + func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { metaChan := make(chan metaResult, 100) @@ -624,14 +641,20 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { }, checkpointManager: historyarchive.NewCheckpointManager(64), } + captiveBackend.registerMetrics(prometheus.NewRegistry(), "test") err := captiveBackend.PrepareRange(ctx, UnboundedRange(65)) assert.NoError(t, err) + assert.Equal(t, uint64(1), getStartDurationMetric(captiveBackend).GetSampleCount()) + assert.Greater(t, getStartDurationMetric(captiveBackend).GetSampleSum(), float64(0)) + captiveBackend.nextLedger = 64 err = captiveBackend.PrepareRange(ctx, UnboundedRange(65)) assert.NoError(t, err) + assert.Equal(t, uint64(1), getStartDurationMetric(captiveBackend).GetSampleCount()) + mockArchive.AssertExpectations(t) mockRunner.AssertExpectations(t) } diff --git a/ingest/ledgerbackend/file_watcher_test.go b/ingest/ledgerbackend/file_watcher_test.go index 7e84bbfcf2..c3e85d643f 100644 --- a/ingest/ledgerbackend/file_watcher_test.go +++ b/ingest/ledgerbackend/file_watcher_test.go @@ -65,7 +65,7 @@ func createFWFixtures(t *testing.T) (*mockHash, *stellarCoreRunner, *fileWatcher Context: context.Background(), Toml: captiveCoreToml, StoragePath: storagePath, - }) + }, nil) fw, err := newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond) assert.NoError(t, err) @@ -96,7 +96,7 @@ func TestNewFileWatcherError(t *testing.T) { Context: context.Background(), Toml: captiveCoreToml, StoragePath: storagePath, - }) + }, nil) _, err = newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond) assert.EqualError(t, err, "could not hash captive core binary: test error") diff --git a/ingest/ledgerbackend/run_from.go b/ingest/ledgerbackend/run_from.go index 2d02322519..8a424da105 100644 --- a/ingest/ledgerbackend/run_from.go +++ b/ingest/ledgerbackend/run_from.go @@ -6,32 +6,36 @@ import ( "fmt" "runtime" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" ) type runFromStream struct { - dir workingDir - from uint32 - hash string - coreCmdFactory coreCmdFactory - log *log.Entry - useDB bool + dir workingDir + from uint32 + hash string + coreCmdFactory coreCmdFactory + log *log.Entry + useDB bool + captiveCoreNewDBCounter prometheus.Counter } -func newRunFromStream(r *stellarCoreRunner, from uint32, hash string) runFromStream { +func newRunFromStream(r *stellarCoreRunner, from uint32, hash string, captiveCoreNewDBCounter prometheus.Counter) runFromStream { // We only use ephemeral directories on windows because there is // no way to terminate captive core gracefully on windows. // Having an ephemeral directory ensures that it is wiped out // whenever we terminate captive core dir := newWorkingDir(r, runtime.GOOS == "windows") return runFromStream{ - dir: dir, - from: from, - hash: hash, - coreCmdFactory: newCoreCmdFactory(r, dir), - log: r.log, - useDB: r.useDB, + dir: dir, + from: from, + hash: hash, + coreCmdFactory: newCoreCmdFactory(r, dir), + log: r.log, + useDB: r.useDB, + captiveCoreNewDBCounter: captiveCoreNewDBCounter, } } @@ -79,6 +83,9 @@ func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pip } if createNewDB { + if s.captiveCoreNewDBCounter != nil { + s.captiveCoreNewDBCounter.Inc() + } if err = s.dir.remove(); err != nil { return nil, pipe{}, fmt.Errorf("error removing existing storage-dir contents: %w", err) } diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 5245051dce..4f95e94f45 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -7,6 +7,8 @@ import ( "math/rand" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/support/log" ) @@ -67,6 +69,8 @@ type stellarCoreRunner struct { toml *CaptiveCoreToml useDB bool + captiveCoreNewDBCounter prometheus.Counter + log *log.Entry } @@ -79,7 +83,7 @@ func createRandomHexString(n int) string { return string(b) } -func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { +func newStellarCoreRunner(config CaptiveCoreConfig, captiveCoreNewDBCounter prometheus.Counter) *stellarCoreRunner { ctx, cancel := context.WithCancel(config.Context) runner := &stellarCoreRunner{ @@ -91,7 +95,8 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { log: config.Log, toml: config.Toml, - systemCaller: realSystemCaller{}, + captiveCoreNewDBCounter: captiveCoreNewDBCounter, + systemCaller: realSystemCaller{}, } return runner @@ -104,7 +109,7 @@ func (r *stellarCoreRunner) context() context.Context { // runFrom executes the run command with a starting ledger on the captive core subprocess func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { - return r.startMetaStream(newRunFromStream(r, from, hash)) + return r.startMetaStream(newRunFromStream(r, from, hash, r.captiveCoreNewDBCounter)) } // catchup executes the catchup command on the captive core subprocess diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index f53cd88328..06b9c85fce 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -26,7 +28,7 @@ func TestCloseOffline(t *testing.T) { Context: context.Background(), Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", - }) + }, nil) cmdMock := simpleCommandMock() cmdMock.On("Wait").Return(nil) @@ -68,7 +70,7 @@ func TestCloseOnline(t *testing.T) { Context: context.Background(), Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", - }) + }, nil) cmdMock := simpleCommandMock() cmdMock.On("Wait").Return(nil) @@ -112,7 +114,7 @@ func TestCloseOnlineWithError(t *testing.T) { Context: context.Background(), Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", - }) + }, nil) cmdMock := simpleCommandMock() cmdMock.On("Wait").Return(errors.New("wait error")) @@ -166,7 +168,7 @@ func TestCloseConcurrency(t *testing.T) { Context: context.Background(), Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", - }) + }, nil) cmdMock := simpleCommandMock() cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300)) @@ -223,7 +225,7 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) { Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", UseDB: true, - }) + }, createNewDBCounter()) cmdMock := simpleCommandMock() cmdMock.On("Wait").Return(nil) @@ -263,6 +265,8 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) { assert.NoError(t, runner.runFrom(100, "hash")) assert.NoError(t, runner.close()) + + assert.Equal(t, float64(0), getNewDBCounterMetric(runner)) } func TestRunFromUseDBLedgersBehind(t *testing.T) { @@ -279,7 +283,7 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) { Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", UseDB: true, - }) + }, createNewDBCounter()) newDBCmdMock := simpleCommandMock() newDBCmdMock.On("Run").Return(nil) @@ -325,6 +329,23 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) { assert.NoError(t, runner.runFrom(100, "hash")) assert.NoError(t, runner.close()) + + assert.Equal(t, float64(0), getNewDBCounterMetric(runner)) +} + +func createNewDBCounter() prometheus.Counter { + return prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "test", Subsystem: "captive_core", Name: "new_db_counter", + }) +} + +func getNewDBCounterMetric(runner *stellarCoreRunner) float64 { + value := &dto.Metric{} + err := runner.captiveCoreNewDBCounter.Write(value) + if err != nil { + panic(err) + } + return value.GetCounter().GetValue() } func TestRunFromUseDBLedgersInFront(t *testing.T) { @@ -341,7 +362,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { Toml: captiveCoreToml, StoragePath: "/tmp/captive-core", UseDB: true, - }) + }, createNewDBCounter()) newDBCmdMock := simpleCommandMock() newDBCmdMock.On("Run").Return(nil) @@ -405,4 +426,5 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { assert.NoError(t, runner.runFrom(100, "hash")) assert.NoError(t, runner.close()) + assert.Equal(t, float64(1), getNewDBCounterMetric(runner)) }