Skip to content

Commit

Permalink
exp/services/ledgerexporter: Add prometheus metrics for ledger export…
Browse files Browse the repository at this point in the history
…er (#5265)
  • Loading branch information
tamirms authored Apr 24, 2024
1 parent 1042b62 commit 9f7f9c2
Show file tree
Hide file tree
Showing 12 changed files with 522 additions and 133 deletions.
65 changes: 52 additions & 13 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import (
"syscall"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/stellar/go/ingest/ledgerbackend"
_ "github.com/stellar/go/network"
supporthttp "github.com/stellar/go/support/http"
"github.com/stellar/go/support/log"
)

Expand All @@ -20,11 +25,12 @@ var (
)

type App struct {
config Config
ledgerBackend ledgerbackend.LedgerBackend
dataStore DataStore
exportManager ExportManager
uploader Uploader
config Config
ledgerBackend ledgerbackend.LedgerBackend
dataStore DataStore
exportManager *ExportManager
uploader Uploader
prometheusRegistry *prometheus.Registry
}

func NewApp() *App {
Expand All @@ -34,15 +40,25 @@ func NewApp() *App {
err := config.LoadConfig()
logFatalIf(err, "Could not load configuration")

app := &App{config: config}
app := &App{config: config, prometheusRegistry: prometheus.NewRegistry()}
app.prometheusRegistry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{Namespace: "ledger_exporter"}),
collectors.NewGoCollector(),
)
return app
}

func (a *App) init(ctx context.Context) {
a.dataStore = mustNewDataStore(ctx, &a.config)
a.ledgerBackend = mustNewLedgerBackend(ctx, a.config)
a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend)
a.uploader = NewUploader(a.dataStore, a.exportManager.GetMetaArchiveChannel())
a.dataStore = mustNewDataStore(ctx, a.config)
a.ledgerBackend = mustNewLedgerBackend(ctx, a.config, a.prometheusRegistry)
// TODO: make number of upload workers configurable instead of hard coding it to 1
queue := NewUploadQueue(1, a.prometheusRegistry)
a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend, queue, a.prometheusRegistry)
a.uploader = NewUploader(
a.dataStore,
queue,
a.prometheusRegistry,
)
}

func (a *App) close() {
Expand All @@ -54,6 +70,24 @@ func (a *App) close() {
}
}

func (a *App) serveAdmin() {
if a.config.AdminPort == 0 {
return
}

mux := supporthttp.NewMux(logger)
mux.Handle("/metrics", promhttp.HandlerFor(a.prometheusRegistry, promhttp.HandlerOpts{}))

addr := fmt.Sprintf(":%d", a.config.AdminPort)
supporthttp.Run(supporthttp.Config{
ListenAddr: addr,
Handler: mux,
OnStarting: func() {
logger.Infof("Starting admin port server on %s", addr)
},
})
}

func (a *App) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -84,6 +118,8 @@ func (a *App) Run() {
}
}()

go a.serveAdmin()

// Handle OS signals to gracefully terminate the service
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -97,20 +133,23 @@ func (a *App) Run() {
logger.Info("Shutting down ledger-exporter")
}

func mustNewDataStore(ctx context.Context, config *Config) DataStore {
func mustNewDataStore(ctx context.Context, config Config) DataStore {
dataStore, err := NewDataStore(ctx, fmt.Sprintf("%s/%s", config.DestinationURL, config.Network))
logFatalIf(err, "Could not connect to destination data store")
return dataStore
}

// mustNewLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func mustNewLedgerBackend(ctx context.Context, config Config) ledgerbackend.LedgerBackend {
func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry *prometheus.Registry) ledgerbackend.LedgerBackend {
captiveConfig := config.GenerateCaptiveCoreConfig()

var backend ledgerbackend.LedgerBackend
var err error
// Create a new captive core backend
backend, err := ledgerbackend.NewCaptive(captiveConfig)
backend, err = ledgerbackend.NewCaptive(captiveConfig)
logFatalIf(err, "Failed to create captive-core instance")
backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter")

var ledgerRange ledgerbackend.Range
if config.EndLedger == 0 {
Expand Down
5 changes: 5 additions & 0 deletions exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stellar/go/network"

"github.com/pelletier/go-toml"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)
Expand All @@ -25,6 +26,8 @@ type StellarCoreConfig struct {
}

type Config struct {
AdminPort int `toml:"admin_port"`

Network string `toml:"network"`
DestinationURL string `toml:"destination_url"`
ExporterConfig ExporterConfig `toml:"exporter_config"`
Expand All @@ -41,13 +44,15 @@ func (config *Config) LoadConfig() error {
startLedger := flag.Uint("start", 0, "Starting ledger")
endLedger := flag.Uint("end", 0, "Ending ledger (inclusive)")
startFromLastNLedger := flag.Uint("from-last", 0, "Start streaming from last N ledgers")
adminPort := flag.Int("admin-port", 0, "Admin HTTP port for prometheus metrics")

configFilePath := flag.String("config-file", "config.toml", "Path to the TOML config file")
flag.Parse()

config.StartLedger = uint32(*startLedger)
config.EndLedger = uint32(*endLedger)
config.StartFromLastLedgers = uint32(*startFromLastNLedger)
config.AdminPort = *adminPort

// Load config TOML file
cfg, err := toml.LoadFile(*configFilePath)
Expand Down
5 changes: 3 additions & 2 deletions exp/services/ledgerexporter/internal/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"strings"

"cloud.google.com/go/storage"
"google.golang.org/api/option"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/url"
"google.golang.org/api/option"
)

// DataStore defines an interface for interacting with data storage
type DataStore interface {
GetFile(ctx context.Context, path string) (io.ReadCloser, error)
PutFile(ctx context.Context, path string, in io.WriterTo) error
PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) error
PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) (bool, error)
Exists(ctx context.Context, path string) (bool, error)
Size(ctx context.Context, path string) (int64, error)
Close() error
Expand Down
60 changes: 30 additions & 30 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ledgerexporter

import (
"context"
"strconv"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/xdr"
)
Expand All @@ -14,35 +17,32 @@ type ExporterConfig struct {
}

// ExportManager manages the creation and handling of export objects.
type ExportManager interface {
GetMetaArchiveChannel() chan *LedgerMetaArchive
Run(ctx context.Context, startLedger uint32, endLedger uint32) error
AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error
}

type exportManager struct {
type ExportManager struct {
config ExporterConfig
ledgerBackend ledgerbackend.LedgerBackend
currentMetaArchive *LedgerMetaArchive
metaArchiveCh chan *LedgerMetaArchive
queue UploadQueue
latestLedgerMetric *prometheus.GaugeVec
}

// NewExportManager creates a new ExportManager with the provided configuration.
func NewExportManager(config ExporterConfig, backend ledgerbackend.LedgerBackend) ExportManager {
return &exportManager{
config: config,
ledgerBackend: backend,
metaArchiveCh: make(chan *LedgerMetaArchive, 1),
}
}
func NewExportManager(config ExporterConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) *ExportManager {
latestLedgerMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ledger_exporter", Subsystem: "export_manager", Name: "latest_ledger",
Help: "sequence number of the latest ledger consumed by the export manager",
}, []string{"start_ledger", "end_ledger"})
prometheusRegistry.MustRegister(latestLedgerMetric)

// GetMetaArchiveChannel returns a channel that receives LedgerMetaArchive objects.
func (e *exportManager) GetMetaArchiveChannel() chan *LedgerMetaArchive {
return e.metaArchiveCh
return &ExportManager{
config: config,
ledgerBackend: backend,
queue: queue,
latestLedgerMetric: latestLedgerMetric,
}
}

// AddLedgerCloseMeta adds ledger metadata to the current export object
func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error {
func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error {
ledgerSeq := ledgerCloseMeta.LedgerSequence()

// Determine the object key for the given ledger sequence
Expand All @@ -67,19 +67,16 @@ func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
}

err = e.currentMetaArchive.AddLedger(ledgerCloseMeta)
if err != nil {
if err = e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil {
return errors.Wrapf(err, "failed to add ledger %d", ledgerSeq)
}

if ledgerSeq >= e.currentMetaArchive.GetEndLedgerSequence() {
// Current archive is full, send it for upload
select {
case e.metaArchiveCh <- e.currentMetaArchive:
e.currentMetaArchive = nil
case <-ctx.Done():
return ctx.Err()
if err = e.queue.Enqueue(ctx, e.currentMetaArchive); err != nil {
return err
}
e.currentMetaArchive = nil
}
return nil
}
Expand All @@ -88,10 +85,12 @@ func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
// from the backend, and processes the corresponding ledger close metadata.
// The process continues until the ending ledger number is reached or a cancellation
// signal is received.
func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32) error {

// Close the object channel
defer close(e.metaArchiveCh)
func (e *ExportManager) Run(ctx context.Context, startLedger, endLedger uint32) error {
defer e.queue.Close()
labels := prometheus.Labels{
"start_ledger": strconv.FormatUint(uint64(startLedger), 10),
"end_ledger": strconv.FormatUint(uint64(endLedger), 10),
}

for nextLedger := startLedger; endLedger < 1 || nextLedger <= endLedger; nextLedger++ {
select {
Expand All @@ -103,6 +102,7 @@ func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}
e.latestLedgerMetric.With(labels).Set(float64(nextLedger))
err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger)
Expand Down
Loading

0 comments on commit 9f7f9c2

Please sign in to comment.