Skip to content

Commit

Permalink
Add prometheus metrics to track captive core startup time
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 30, 2024
1 parent f10ea0f commit f724562
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 30 deletions.
40 changes: 35 additions & 5 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ type CaptiveStellarCore struct {
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string

config CaptiveCoreConfig
stellarCoreClient *stellarcore.Client
captiveCoreVersion string // Updates when captive-core restarts
config CaptiveCoreConfig
captiveCoreStartDuration prometheus.Summary
captiveCoreNewDBCounter prometheus.Counter
stellarCoreClient *stellarcore.Client
captiveCoreVersion string // Updates when captive-core restarts
}

// CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance
Expand Down Expand Up @@ -230,7 +232,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {

c.stellarCoreRunnerFactory = func() stellarCoreRunnerInterface {
c.setCoreVersion()
return newStellarCoreRunner(config)
return newStellarCoreRunner(config, c.captiveCoreNewDBCounter)
}

if config.Toml != nil && config.Toml.HTTPPort != 0 {
Expand Down Expand Up @@ -315,7 +317,27 @@ func (c *CaptiveStellarCore) registerMetrics(registry *prometheus.Registry, name
return float64(latest)
},
)
registry.MustRegister(coreSynced, supportedProtocolVersion, latestLedger)
c.captiveCoreStartDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "ingest",
Name: "captive_stellar_core_start_duration_seconds",
Help: "duration of start up time when running captive core on an unbounded range, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
c.captiveCoreNewDBCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "ingest",
Name: "captive_stellar_core_new_db",
Help: "counter for the number of times we start up captive core with a new buckets db, sliding window = 10m",
})

registry.MustRegister(
coreSynced,
supportedProtocolVersion,
latestLedger,
c.captiveCoreStartDuration,
c.captiveCoreNewDBCounter,
)
}

func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) {
Expand Down Expand Up @@ -521,19 +543,27 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang
// Please note that using a BoundedRange, currently, requires a full-trust on
// history archive. This issue is being fixed in Stellar-Core.
func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error {
startTime := time.Now()
if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "error starting prepare range")
} else if alreadyPrepared {
return nil
}

var reportedStartTime bool
// the prepared range might be below ledgerRange.from so we
// need to seek ahead until we reach ledgerRange.from
for seq := c.prepared.from; seq <= ledgerRange.from; seq++ {
_, err := c.GetLedger(ctx, seq)
if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from)
}
if !reportedStartTime {
reportedStartTime = true
if c.captiveCoreStartDuration != nil && !ledgerRange.bounded {
c.captiveCoreStartDuration.Observe(time.Since(startTime).Seconds())
}
}
}

return nil
Expand Down
23 changes: 23 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -234,6 +236,7 @@ func TestCaptivePrepareRange(t *testing.T) {
cancelCalled = true
}),
}
captiveBackend.registerMetrics(prometheus.NewRegistry(), "test")

err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200))
assert.NoError(t, err)
Expand All @@ -243,6 +246,8 @@ func TestCaptivePrepareRange(t *testing.T) {
assert.True(t, cancelCalled)
mockRunner.AssertExpectations(t)
mockArchive.AssertExpectations(t)

assert.Equal(t, uint64(0), getStartDurationMetric(captiveBackend).GetSampleCount())

Check failure on line 250 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

call of getStartDurationMetric copies lock value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex
}

func TestCaptivePrepareRangeCrash(t *testing.T) {
Expand Down Expand Up @@ -575,10 +580,13 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
cancelCalled = true
}),
}
captiveBackend.registerMetrics(prometheus.NewRegistry(), "test")

err := captiveBackend.PrepareRange(ctx, UnboundedRange(128))
assert.EqualError(t, err, "error starting prepare range: opening subprocess: error running stellar-core: transient error")

assert.Equal(t, uint64(0), getStartDurationMetric(captiveBackend).GetSampleCount())

Check failure on line 588 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

call of getStartDurationMetric copies lock value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex

// make sure we can Close without errors
assert.NoError(t, captiveBackend.Close())
assert.True(t, cancelCalled)
Expand All @@ -587,6 +595,15 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
mockRunner.AssertExpectations(t)
}

func getStartDurationMetric(captiveCore CaptiveStellarCore) *dto.Summary {

Check failure on line 598 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

getStartDurationMetric passes lock by value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex
value := &dto.Metric{}
err := captiveCore.captiveCoreStartDuration.Write(value)
if err != nil {
panic(err)
}
return value.GetSummary()
}

func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
metaChan := make(chan metaResult, 100)

Expand Down Expand Up @@ -624,14 +641,20 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
captiveBackend.registerMetrics(prometheus.NewRegistry(), "test")

err := captiveBackend.PrepareRange(ctx, UnboundedRange(65))
assert.NoError(t, err)

assert.Equal(t, uint64(1), getStartDurationMetric(captiveBackend).GetSampleCount())

Check failure on line 649 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

call of getStartDurationMetric copies lock value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex
assert.Greater(t, getStartDurationMetric(captiveBackend).GetSampleSum(), float64(0))

Check failure on line 650 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

call of getStartDurationMetric copies lock value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex

captiveBackend.nextLedger = 64
err = captiveBackend.PrepareRange(ctx, UnboundedRange(65))
assert.NoError(t, err)

assert.Equal(t, uint64(1), getStartDurationMetric(captiveBackend).GetSampleCount())

Check failure on line 656 in ingest/ledgerbackend/captive_core_backend_test.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

call of getStartDurationMetric copies lock value: github.com/stellar/go/ingest/ledgerbackend.CaptiveStellarCore contains sync.RWMutex

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
}
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledgerbackend/file_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func createFWFixtures(t *testing.T) (*mockHash, *stellarCoreRunner, *fileWatcher
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: storagePath,
})
}, nil)

fw, err := newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.NoError(t, err)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestNewFileWatcherError(t *testing.T) {
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: storagePath,
})
}, nil)

_, err = newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.EqualError(t, err, "could not hash captive core binary: test error")
Expand Down
33 changes: 20 additions & 13 deletions ingest/ledgerbackend/run_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ import (
"fmt"
"runtime"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/support/log"
)

type runFromStream struct {
dir workingDir
from uint32
hash string
coreCmdFactory coreCmdFactory
log *log.Entry
useDB bool
dir workingDir
from uint32
hash string
coreCmdFactory coreCmdFactory
log *log.Entry
useDB bool
captiveCoreNewDBCounter prometheus.Counter
}

func newRunFromStream(r *stellarCoreRunner, from uint32, hash string) runFromStream {
func newRunFromStream(r *stellarCoreRunner, from uint32, hash string, captiveCoreNewDBCounter prometheus.Counter) runFromStream {
// We only use ephemeral directories on windows because there is
// no way to terminate captive core gracefully on windows.
// Having an ephemeral directory ensures that it is wiped out
// whenever we terminate captive core
dir := newWorkingDir(r, runtime.GOOS == "windows")
return runFromStream{
dir: dir,
from: from,
hash: hash,
coreCmdFactory: newCoreCmdFactory(r, dir),
log: r.log,
useDB: r.useDB,
dir: dir,
from: from,
hash: hash,
coreCmdFactory: newCoreCmdFactory(r, dir),
log: r.log,
useDB: r.useDB,
captiveCoreNewDBCounter: captiveCoreNewDBCounter,
}
}

Expand Down Expand Up @@ -79,6 +83,9 @@ func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pip
}

if createNewDB {
if s.captiveCoreNewDBCounter != nil {
s.captiveCoreNewDBCounter.Inc()
}
if err = s.dir.remove(); err != nil {
return nil, pipe{}, fmt.Errorf("error removing existing storage-dir contents: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/rand"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/support/log"
)

Expand Down Expand Up @@ -67,6 +69,8 @@ type stellarCoreRunner struct {
toml *CaptiveCoreToml
useDB bool

captiveCoreNewDBCounter prometheus.Counter

log *log.Entry
}

Expand All @@ -79,7 +83,7 @@ func createRandomHexString(n int) string {
return string(b)
}

func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner {
func newStellarCoreRunner(config CaptiveCoreConfig, captiveCoreNewDBCounter prometheus.Counter) *stellarCoreRunner {
ctx, cancel := context.WithCancel(config.Context)

runner := &stellarCoreRunner{
Expand All @@ -91,7 +95,8 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner {
log: config.Log,
toml: config.Toml,

systemCaller: realSystemCaller{},
captiveCoreNewDBCounter: captiveCoreNewDBCounter,
systemCaller: realSystemCaller{},
}

return runner
Expand All @@ -104,7 +109,7 @@ func (r *stellarCoreRunner) context() context.Context {

// runFrom executes the run command with a starting ledger on the captive core subprocess
func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return r.startMetaStream(newRunFromStream(r, from, hash))
return r.startMetaStream(newRunFromStream(r, from, hash, r.captiveCoreNewDBCounter))
}

// catchup executes the catchup command on the captive core subprocess
Expand Down
36 changes: 29 additions & 7 deletions ingest/ledgerbackend/stellar_core_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand All @@ -26,7 +28,7 @@ func TestCloseOffline(t *testing.T) {
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})
}, nil)

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(nil)
Expand Down Expand Up @@ -68,7 +70,7 @@ func TestCloseOnline(t *testing.T) {
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})
}, nil)

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(nil)
Expand Down Expand Up @@ -112,7 +114,7 @@ func TestCloseOnlineWithError(t *testing.T) {
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})
}, nil)

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(errors.New("wait error"))
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestCloseConcurrency(t *testing.T) {
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})
}, nil)

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300))
Expand Down Expand Up @@ -223,7 +225,7 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) {
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
UseDB: true,
})
}, createNewDBCounter())

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(nil)
Expand Down Expand Up @@ -263,6 +265,8 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) {

assert.NoError(t, runner.runFrom(100, "hash"))
assert.NoError(t, runner.close())

assert.Equal(t, float64(0), getNewDBCounterMetric(runner))
}

func TestRunFromUseDBLedgersBehind(t *testing.T) {
Expand All @@ -279,7 +283,7 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) {
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
UseDB: true,
})
}, createNewDBCounter())

newDBCmdMock := simpleCommandMock()
newDBCmdMock.On("Run").Return(nil)
Expand Down Expand Up @@ -325,6 +329,23 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) {

assert.NoError(t, runner.runFrom(100, "hash"))
assert.NoError(t, runner.close())

assert.Equal(t, float64(0), getNewDBCounterMetric(runner))
}

func createNewDBCounter() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "test", Subsystem: "captive_core", Name: "new_db_counter",
})
}

func getNewDBCounterMetric(runner *stellarCoreRunner) float64 {
value := &dto.Metric{}
err := runner.captiveCoreNewDBCounter.Write(value)
if err != nil {
panic(err)
}
return value.GetCounter().GetValue()
}

func TestRunFromUseDBLedgersInFront(t *testing.T) {
Expand All @@ -341,7 +362,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) {
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
UseDB: true,
})
}, createNewDBCounter())

newDBCmdMock := simpleCommandMock()
newDBCmdMock.On("Run").Return(nil)
Expand Down Expand Up @@ -405,4 +426,5 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) {

assert.NoError(t, runner.runFrom(100, "hash"))
assert.NoError(t, runner.close())
assert.Equal(t, float64(1), getNewDBCounterMetric(runner))
}

0 comments on commit f724562

Please sign in to comment.