diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index fb4a5f788b..8239b9c72e 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -10,8 +10,13 @@ import ( "syscall" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/stellar/go/ingest/ledgerbackend" _ "github.com/stellar/go/network" + supporthttp "github.com/stellar/go/support/http" "github.com/stellar/go/support/log" ) @@ -20,11 +25,12 @@ var ( ) type App struct { - config Config - ledgerBackend ledgerbackend.LedgerBackend - dataStore DataStore - exportManager ExportManager - uploader Uploader + config Config + ledgerBackend ledgerbackend.LedgerBackend + dataStore DataStore + exportManager *ExportManager + uploader Uploader + prometheusRegistry *prometheus.Registry } func NewApp() *App { @@ -34,15 +40,25 @@ func NewApp() *App { err := config.LoadConfig() logFatalIf(err, "Could not load configuration") - app := &App{config: config} + app := &App{config: config, prometheusRegistry: prometheus.NewRegistry()} + app.prometheusRegistry.MustRegister( + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{Namespace: "ledger_exporter"}), + collectors.NewGoCollector(), + ) return app } func (a *App) init(ctx context.Context) { - a.dataStore = mustNewDataStore(ctx, &a.config) - a.ledgerBackend = mustNewLedgerBackend(ctx, a.config) - a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend) - a.uploader = NewUploader(a.dataStore, a.exportManager.GetMetaArchiveChannel()) + a.dataStore = mustNewDataStore(ctx, a.config) + a.ledgerBackend = mustNewLedgerBackend(ctx, a.config, a.prometheusRegistry) + // TODO: make number of upload workers configurable instead of hard coding it to 1 + queue := NewUploadQueue(1, a.prometheusRegistry) + a.exportManager = NewExportManager(a.config.ExporterConfig, a.ledgerBackend, queue, a.prometheusRegistry) + a.uploader = NewUploader( + a.dataStore, + queue, + a.prometheusRegistry, + ) } func (a *App) close() { @@ -54,6 +70,24 @@ func (a *App) close() { } } +func (a *App) serveAdmin() { + if a.config.AdminPort == 0 { + return + } + + mux := supporthttp.NewMux(logger) + mux.Handle("/metrics", promhttp.HandlerFor(a.prometheusRegistry, promhttp.HandlerOpts{})) + + addr := fmt.Sprintf(":%d", a.config.AdminPort) + supporthttp.Run(supporthttp.Config{ + ListenAddr: addr, + Handler: mux, + OnStarting: func() { + logger.Infof("Starting admin port server on %s", addr) + }, + }) +} + func (a *App) Run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -84,6 +118,8 @@ func (a *App) Run() { } }() + go a.serveAdmin() + // Handle OS signals to gracefully terminate the service sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -97,7 +133,7 @@ func (a *App) Run() { logger.Info("Shutting down ledger-exporter") } -func mustNewDataStore(ctx context.Context, config *Config) DataStore { +func mustNewDataStore(ctx context.Context, config Config) DataStore { dataStore, err := NewDataStore(ctx, fmt.Sprintf("%s/%s", config.DestinationURL, config.Network)) logFatalIf(err, "Could not connect to destination data store") return dataStore @@ -105,12 +141,15 @@ func mustNewDataStore(ctx context.Context, config *Config) DataStore { // mustNewLedgerBackend Creates and initializes captive core ledger backend // Currently, only supports captive-core as ledger backend -func mustNewLedgerBackend(ctx context.Context, config Config) ledgerbackend.LedgerBackend { +func mustNewLedgerBackend(ctx context.Context, config Config, prometheusRegistry *prometheus.Registry) ledgerbackend.LedgerBackend { captiveConfig := config.GenerateCaptiveCoreConfig() + var backend ledgerbackend.LedgerBackend + var err error // Create a new captive core backend - backend, err := ledgerbackend.NewCaptive(captiveConfig) + backend, err = ledgerbackend.NewCaptive(captiveConfig) logFatalIf(err, "Failed to create captive-core instance") + backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter") var ledgerRange ledgerbackend.Range if config.EndLedger == 0 { diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index 640841b1d9..17e76a5d68 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -10,6 +10,7 @@ import ( "github.com/stellar/go/network" "github.com/pelletier/go-toml" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/ordered" ) @@ -25,6 +26,8 @@ type StellarCoreConfig struct { } type Config struct { + AdminPort int `toml:"admin_port"` + Network string `toml:"network"` DestinationURL string `toml:"destination_url"` ExporterConfig ExporterConfig `toml:"exporter_config"` @@ -41,6 +44,7 @@ func (config *Config) LoadConfig() error { startLedger := flag.Uint("start", 0, "Starting ledger") endLedger := flag.Uint("end", 0, "Ending ledger (inclusive)") startFromLastNLedger := flag.Uint("from-last", 0, "Start streaming from last N ledgers") + adminPort := flag.Int("admin-port", 0, "Admin HTTP port for prometheus metrics") configFilePath := flag.String("config-file", "config.toml", "Path to the TOML config file") flag.Parse() @@ -48,6 +52,7 @@ func (config *Config) LoadConfig() error { config.StartLedger = uint32(*startLedger) config.EndLedger = uint32(*endLedger) config.StartFromLastLedgers = uint32(*startFromLastNLedger) + config.AdminPort = *adminPort // Load config TOML file cfg, err := toml.LoadFile(*configFilePath) diff --git a/exp/services/ledgerexporter/internal/datastore.go b/exp/services/ledgerexporter/internal/datastore.go index 0367e9008e..a53d97465a 100644 --- a/exp/services/ledgerexporter/internal/datastore.go +++ b/exp/services/ledgerexporter/internal/datastore.go @@ -6,16 +6,17 @@ import ( "strings" "cloud.google.com/go/storage" + "google.golang.org/api/option" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/url" - "google.golang.org/api/option" ) // DataStore defines an interface for interacting with data storage type DataStore interface { GetFile(ctx context.Context, path string) (io.ReadCloser, error) PutFile(ctx context.Context, path string, in io.WriterTo) error - PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) error + PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) (bool, error) Exists(ctx context.Context, path string) (bool, error) Size(ctx context.Context, path string) (int64, error) Close() error diff --git a/exp/services/ledgerexporter/internal/exportmanager.go b/exp/services/ledgerexporter/internal/exportmanager.go index de322aa30c..a918a16e4e 100644 --- a/exp/services/ledgerexporter/internal/exportmanager.go +++ b/exp/services/ledgerexporter/internal/exportmanager.go @@ -2,8 +2,11 @@ package ledgerexporter import ( "context" + "strconv" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/xdr" ) @@ -14,35 +17,32 @@ type ExporterConfig struct { } // ExportManager manages the creation and handling of export objects. -type ExportManager interface { - GetMetaArchiveChannel() chan *LedgerMetaArchive - Run(ctx context.Context, startLedger uint32, endLedger uint32) error - AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error -} - -type exportManager struct { +type ExportManager struct { config ExporterConfig ledgerBackend ledgerbackend.LedgerBackend currentMetaArchive *LedgerMetaArchive - metaArchiveCh chan *LedgerMetaArchive + queue UploadQueue + latestLedgerMetric *prometheus.GaugeVec } // NewExportManager creates a new ExportManager with the provided configuration. -func NewExportManager(config ExporterConfig, backend ledgerbackend.LedgerBackend) ExportManager { - return &exportManager{ - config: config, - ledgerBackend: backend, - metaArchiveCh: make(chan *LedgerMetaArchive, 1), - } -} +func NewExportManager(config ExporterConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) *ExportManager { + latestLedgerMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ledger_exporter", Subsystem: "export_manager", Name: "latest_ledger", + Help: "sequence number of the latest ledger consumed by the export manager", + }, []string{"start_ledger", "end_ledger"}) + prometheusRegistry.MustRegister(latestLedgerMetric) -// GetMetaArchiveChannel returns a channel that receives LedgerMetaArchive objects. -func (e *exportManager) GetMetaArchiveChannel() chan *LedgerMetaArchive { - return e.metaArchiveCh + return &ExportManager{ + config: config, + ledgerBackend: backend, + queue: queue, + latestLedgerMetric: latestLedgerMetric, + } } // AddLedgerCloseMeta adds ledger metadata to the current export object -func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error { +func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta xdr.LedgerCloseMeta) error { ledgerSeq := ledgerCloseMeta.LedgerSequence() // Determine the object key for the given ledger sequence @@ -67,19 +67,16 @@ func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq) } - err = e.currentMetaArchive.AddLedger(ledgerCloseMeta) - if err != nil { + if err = e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil { return errors.Wrapf(err, "failed to add ledger %d", ledgerSeq) } if ledgerSeq >= e.currentMetaArchive.GetEndLedgerSequence() { // Current archive is full, send it for upload - select { - case e.metaArchiveCh <- e.currentMetaArchive: - e.currentMetaArchive = nil - case <-ctx.Done(): - return ctx.Err() + if err = e.queue.Enqueue(ctx, e.currentMetaArchive); err != nil { + return err } + e.currentMetaArchive = nil } return nil } @@ -88,10 +85,12 @@ func (e *exportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta // from the backend, and processes the corresponding ledger close metadata. // The process continues until the ending ledger number is reached or a cancellation // signal is received. -func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32) error { - - // Close the object channel - defer close(e.metaArchiveCh) +func (e *ExportManager) Run(ctx context.Context, startLedger, endLedger uint32) error { + defer e.queue.Close() + labels := prometheus.Labels{ + "start_ledger": strconv.FormatUint(uint64(startLedger), 10), + "end_ledger": strconv.FormatUint(uint64(endLedger), 10), + } for nextLedger := startLedger; endLedger < 1 || nextLedger <= endLedger; nextLedger++ { select { @@ -103,6 +102,7 @@ func (e *exportManager) Run(ctx context.Context, startLedger, endLedger uint32) if err != nil { return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger) } + e.latestLedgerMetric.With(labels).Set(float64(nextLedger)) err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta) if err != nil { return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger) diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index f6f330ec08..b082a8512e 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -6,12 +6,14 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" - "github.com/stellar/go/ingest/ledgerbackend" - "github.com/stellar/go/support/collections/set" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/collections/set" ) func TestExporterSuite(t *testing.T) { @@ -36,7 +38,9 @@ func (s *ExportManagerSuite) TearDownTest() { func (s *ExportManagerSuite) TestRun() { config := ExporterConfig{LedgersPerFile: 64, FilesPerPartition: 10} - exporter := NewExportManager(config, &s.mockBackend) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) start := uint32(0) end := uint32(255) @@ -53,7 +57,12 @@ func (s *ExportManagerSuite) TestRun() { wg.Add(1) go func() { defer wg.Done() - for v := range exporter.GetMetaArchiveChannel() { + for { + v, ok, err := queue.Dequeue(s.ctx) + s.Assert().NoError(err) + if !ok { + break + } actualKeys.Add(v.objectKey) } }() @@ -64,11 +73,23 @@ func (s *ExportManagerSuite) TestRun() { wg.Wait() require.Equal(s.T(), expectedKeys, actualKeys) + require.Equal( + s.T(), + float64(255), + getMetricValue(exporter.latestLedgerMetric.With( + prometheus.Labels{ + "start_ledger": "0", + "end_ledger": "255", + }), + ).GetGauge().GetValue(), + ) } func (s *ExportManagerSuite) TestRunContextCancel() { config := ExporterConfig{LedgersPerFile: 1, FilesPerPartition: 1} - exporter := NewExportManager(config, &s.mockBackend) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) ctx, cancel := context.WithCancel(context.Background()) s.mockBackend.On("GetLedger", mock.Anything, mock.Anything). @@ -80,9 +101,10 @@ func (s *ExportManagerSuite) TestRunContextCancel() { }() go func() { - ch := exporter.GetMetaArchiveChannel() for i := 0; i < 127; i++ { - <-ch + _, ok, err := queue.Dequeue(s.ctx) + s.Assert().NoError(err) + s.Assert().True(ok) } }() @@ -93,7 +115,9 @@ func (s *ExportManagerSuite) TestRunContextCancel() { func (s *ExportManagerSuite) TestRunWithCanceledContext() { config := ExporterConfig{LedgersPerFile: 1, FilesPerPartition: 10} - exporter := NewExportManager(config, &s.mockBackend) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) ctx, cancel := context.WithCancel(context.Background()) cancel() err := exporter.Run(ctx, 1, 10) @@ -102,8 +126,9 @@ func (s *ExportManagerSuite) TestRunWithCanceledContext() { func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { config := ExporterConfig{LedgersPerFile: 1, FilesPerPartition: 10} - exporter := NewExportManager(config, &s.mockBackend) - objectCh := exporter.GetMetaArchiveChannel() + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) expectedkeys := set.NewSet[string](10) actualKeys := set.NewSet[string](10) @@ -111,7 +136,12 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { wg.Add(1) go func() { defer wg.Done() - for v := range objectCh { + for { + v, ok, err := queue.Dequeue(s.ctx) + s.Assert().NoError(err) + if !ok { + break + } actualKeys.Add(v.objectKey) } }() @@ -126,14 +156,16 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { expectedkeys.Add(key) } - close(objectCh) + queue.Close() wg.Wait() require.Equal(s.T(), expectedkeys, actualKeys) } func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { config := ExporterConfig{LedgersPerFile: 1, FilesPerPartition: 10} - exporter := NewExportManager(config, &s.mockBackend) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -148,7 +180,9 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { config := ExporterConfig{LedgersPerFile: 10, FilesPerPartition: 1} - exporter := NewExportManager(config, &s.mockBackend) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + exporter := NewExportManager(config, &s.mockBackend, queue, registry) require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16))) require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)), diff --git a/exp/services/ledgerexporter/internal/gcs_datastore.go b/exp/services/ledgerexporter/internal/gcs_datastore.go index 4fa1287e94..927130fe6a 100644 --- a/exp/services/ledgerexporter/internal/gcs_datastore.go +++ b/exp/services/ledgerexporter/internal/gcs_datastore.go @@ -10,6 +10,7 @@ import ( "google.golang.org/api/googleapi" "cloud.google.com/go/storage" + "github.com/stellar/go/support/errors" ) @@ -21,7 +22,7 @@ type GCSDataStore struct { } // GetFile retrieves a file from the GCS bucket. -func (b *GCSDataStore) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { +func (b GCSDataStore) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { filePath = path.Join(b.prefix, filePath) r, err := b.bucket.Object(filePath).NewReader(ctx) if err != nil { @@ -35,26 +36,26 @@ func (b *GCSDataStore) GetFile(ctx context.Context, filePath string) (io.ReadClo } // PutFileIfNotExists uploads a file to GCS only if it doesn't already exist. -func (b *GCSDataStore) PutFileIfNotExists(ctx context.Context, filePath string, in io.WriterTo) error { +func (b GCSDataStore) PutFileIfNotExists(ctx context.Context, filePath string, in io.WriterTo) (bool, error) { err := b.putFile(ctx, filePath, in, &storage.Conditions{DoesNotExist: true}) if err != nil { if gcsError, ok := err.(*googleapi.Error); ok { switch gcsError.Code { case http.StatusPreconditionFailed: logger.Infof("Precondition failed: %s already exists in the bucket", filePath) - return nil // Treat as success + return false, nil // Treat as success default: logger.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) } } - return errors.Wrapf(err, "error uploading file: %s", filePath) + return false, errors.Wrapf(err, "error uploading file: %s", filePath) } logger.Infof("File uploaded successfully: %s", filePath) - return nil + return true, nil } // PutFile uploads a file to GCS -func (b *GCSDataStore) PutFile(ctx context.Context, filePath string, in io.WriterTo) error { +func (b GCSDataStore) PutFile(ctx context.Context, filePath string, in io.WriterTo) error { err := b.putFile(ctx, filePath, in, nil) // No conditions for regular PutFile if err != nil { @@ -68,7 +69,7 @@ func (b *GCSDataStore) PutFile(ctx context.Context, filePath string, in io.Write } // Size retrieves the size of a file in the GCS bucket. -func (b *GCSDataStore) Size(ctx context.Context, pth string) (int64, error) { +func (b GCSDataStore) Size(ctx context.Context, pth string) (int64, error) { pth = path.Join(b.prefix, pth) attrs, err := b.bucket.Object(pth).Attrs(ctx) if err == storage.ErrObjectNotExist { @@ -81,17 +82,17 @@ func (b *GCSDataStore) Size(ctx context.Context, pth string) (int64, error) { } // Exists checks if a file exists in the GCS bucket. -func (b *GCSDataStore) Exists(ctx context.Context, pth string) (bool, error) { +func (b GCSDataStore) Exists(ctx context.Context, pth string) (bool, error) { _, err := b.Size(ctx, pth) return err == nil, err } // Close closes the GCS client connection. -func (b *GCSDataStore) Close() error { +func (b GCSDataStore) Close() error { return b.client.Close() } -func (b *GCSDataStore) putFile(ctx context.Context, filePath string, in io.WriterTo, conditions *storage.Conditions) error { +func (b GCSDataStore) putFile(ctx context.Context, filePath string, in io.WriterTo, conditions *storage.Conditions) error { filePath = path.Join(b.prefix, filePath) o := b.bucket.Object(filePath) if conditions != nil { diff --git a/exp/services/ledgerexporter/internal/mock_datastore.go b/exp/services/ledgerexporter/internal/mock_datastore.go index 7675a87461..705df45a26 100644 --- a/exp/services/ledgerexporter/internal/mock_datastore.go +++ b/exp/services/ledgerexporter/internal/mock_datastore.go @@ -32,9 +32,9 @@ func (m *MockDataStore) PutFile(ctx context.Context, path string, in io.WriterTo return args.Error(0) } -func (m *MockDataStore) PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) error { +func (m *MockDataStore) PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo) (bool, error) { args := m.Called(ctx, path, in) - return args.Error(0) + return args.Bool(0), args.Error(1) } func (m *MockDataStore) Close() error { diff --git a/exp/services/ledgerexporter/internal/queue.go b/exp/services/ledgerexporter/internal/queue.go new file mode 100644 index 0000000000..372ccb0056 --- /dev/null +++ b/exp/services/ledgerexporter/internal/queue.go @@ -0,0 +1,58 @@ +package ledgerexporter + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" +) + +// UploadQueue is a queue of LedgerMetaArchive objects which are scheduled for upload +type UploadQueue struct { + metaArchiveCh chan *LedgerMetaArchive + queueLengthMetric prometheus.Gauge +} + +// NewUploadQueue constructs a new UploadQueue +func NewUploadQueue(size int, prometheusRegistry *prometheus.Registry) UploadQueue { + queueLengthMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "ledger_exporter", + Subsystem: "upload_queue", + Name: "length", + Help: "The number of objects queued for upload", + }) + prometheusRegistry.MustRegister(queueLengthMetric) + return UploadQueue{ + metaArchiveCh: make(chan *LedgerMetaArchive, size), + queueLengthMetric: queueLengthMetric, + } +} + +// Enqueue will add an upload task to the queue. Enqueue may block if the queue is full. +func (u UploadQueue) Enqueue(ctx context.Context, archive *LedgerMetaArchive) error { + u.queueLengthMetric.Inc() + select { + case u.metaArchiveCh <- archive: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Dequeue will pop a task off the queue. Dequeue may block if the queue is empty. +func (u UploadQueue) Dequeue(ctx context.Context) (*LedgerMetaArchive, bool, error) { + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + + case metaObject, ok := <-u.metaArchiveCh: + if ok { + u.queueLengthMetric.Dec() + } + return metaObject, ok, nil + } +} + +// Close will close the queue. +func (u UploadQueue) Close() { + close(u.metaArchiveCh) +} diff --git a/exp/services/ledgerexporter/internal/queue_test.go b/exp/services/ledgerexporter/internal/queue_test.go new file mode 100644 index 0000000000..1d001765ce --- /dev/null +++ b/exp/services/ledgerexporter/internal/queue_test.go @@ -0,0 +1,63 @@ +package ledgerexporter + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestQueueContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + queue := NewUploadQueue(0, prometheus.NewRegistry()) + cancel() + + require.ErrorIs(t, queue.Enqueue(ctx, nil), context.Canceled) + _, _, err := queue.Dequeue(ctx) + require.ErrorIs(t, err, context.Canceled) +} + +func getMetricValue(metric prometheus.Metric) *dto.Metric { + value := &dto.Metric{} + err := metric.Write(value) + if err != nil { + panic(err) + } + return value +} + +func TestQueue(t *testing.T) { + queue := NewUploadQueue(3, prometheus.NewRegistry()) + + require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 1, 1))) + require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 2, 2))) + require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 3, 3))) + + require.Equal(t, float64(3), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue()) + queue.Close() + + l, ok, err := queue.Dequeue(context.Background()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, float64(2), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue()) + require.Equal(t, uint32(1), l.GetStartLedgerSequence()) + + l, ok, err = queue.Dequeue(context.Background()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, float64(1), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue()) + require.Equal(t, uint32(2), l.GetStartLedgerSequence()) + + l, ok, err = queue.Dequeue(context.Background()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, float64(0), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue()) + require.Equal(t, uint32(3), l.GetStartLedgerSequence()) + + l, ok, err = queue.Dequeue(context.Background()) + require.NoError(t, err) + require.False(t, false) + require.Nil(t, l) +} diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go index 633db4ead7..04da703fd6 100644 --- a/exp/services/ledgerexporter/internal/uploader.go +++ b/exp/services/ledgerexporter/internal/uploader.go @@ -2,39 +2,108 @@ package ledgerexporter import ( "context" + "io" + "strconv" "time" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) // Uploader is responsible for uploading data to a storage destination. -type Uploader interface { - Run(ctx context.Context) error - Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error +type Uploader struct { + dataStore DataStore + queue UploadQueue + uploadDurationMetric *prometheus.SummaryVec + objectSizeMetrics *prometheus.SummaryVec } -type uploader struct { - dataStore DataStore - metaArchiveCh chan *LedgerMetaArchive +// NewUploader constructs a new Uploader instance +func NewUploader( + destination DataStore, + queue UploadQueue, + prometheusRegistry *prometheus.Registry, +) Uploader { + uploadDurationMetric := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "ledger_exporter", Subsystem: "uploader", Name: "put_duration_seconds", + Help: "duration for uploading a ledger batch, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"already_exists", "ledgers"}, + ) + objectSizeMetrics := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "ledger_exporter", Subsystem: "uploader", Name: "object_size_bytes", + Help: "size of a ledger batch in bytes, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"ledgers", "already_exists", "compression"}, + ) + prometheusRegistry.MustRegister(uploadDurationMetric, objectSizeMetrics) + return Uploader{ + dataStore: destination, + queue: queue, + uploadDurationMetric: uploadDurationMetric, + objectSizeMetrics: objectSizeMetrics, + } } -func NewUploader(destination DataStore, metaArchiveCh chan *LedgerMetaArchive) Uploader { - return &uploader{ - dataStore: destination, - metaArchiveCh: metaArchiveCh, - } +type writerRecorder struct { + io.Writer + count *int64 +} + +func (r writerRecorder) Write(p []byte) (int, error) { + total, err := r.Writer.Write(p) + *r.count += int64(total) + return total, err +} + +type writerToRecorder struct { + io.WriterTo + totalCompressed int64 + totalUncompressed int64 +} + +func (r *writerToRecorder) WriteTo(w io.Writer) (int64, error) { + uncompressedCount, err := r.WriterTo.WriteTo(writerRecorder{ + Writer: w, + count: &r.totalCompressed, + }) + r.totalUncompressed += uncompressedCount + return uncompressedCount, err } // Upload uploads the serialized binary data of ledger TxMeta to the specified destination. -// TODO: Add retry logic. -func (u *uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error { +func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error { logger.Infof("Uploading: %s", metaArchive.GetObjectKey()) + startTime := time.Now() + numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10) - err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), - &XDRGzipEncoder{XdrPayload: &metaArchive.data}) + writerTo := &writerToRecorder{ + WriterTo: &XDRGzipEncoder{XdrPayload: &metaArchive.data}, + } + ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), writerTo) if err != nil { return errors.Wrapf(err, "error uploading %s", metaArchive.GetObjectKey()) } + alreadyExists := strconv.FormatBool(!ok) + + u.uploadDurationMetric.With(prometheus.Labels{ + "ledgers": numLedgers, + "already_exists": alreadyExists, + }).Observe(time.Since(startTime).Seconds()) + u.objectSizeMetrics.With(prometheus.Labels{ + "compression": "none", + "ledgers": numLedgers, + "already_exists": alreadyExists, + }).Observe(float64(writerTo.totalUncompressed)) + u.objectSizeMetrics.With(prometheus.Labels{ + "compression": "gzip", + "ledgers": numLedgers, + "already_exists": alreadyExists, + }).Observe(float64(writerTo.totalCompressed)) return nil } @@ -42,7 +111,7 @@ func (u *uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) e var uploaderShutdownWaitTime = 10 * time.Second // Run starts the uploader, continuously listening for LedgerMetaArchive objects to upload. -func (u *uploader) Run(ctx context.Context) error { +func (u Uploader) Run(ctx context.Context) error { uploadCtx, cancel := context.WithCancel(context.Background()) go func() { <-ctx.Done() @@ -54,21 +123,19 @@ func (u *uploader) Run(ctx context.Context) error { }() for { - select { - case <-uploadCtx.Done(): - return uploadCtx.Err() + metaObject, ok, err := u.queue.Dequeue(uploadCtx) + if err != nil { + return err + } + if !ok { + logger.Info("Meta archive channel closed, stopping uploader") + return nil + } - case metaObject, ok := <-u.metaArchiveCh: - if !ok { - logger.Info("Meta archive channel closed, stopping uploader") - return nil - } - //Upload the received LedgerMetaArchive. - err := u.Upload(uploadCtx, metaObject) - if err != nil { - return err - } - logger.Infof("Uploaded %s successfully", metaObject.objectKey) + // Upload the received LedgerMetaArchive. + if err = u.Upload(uploadCtx, metaObject); err != nil { + return err } + logger.Infof("Uploaded %s successfully", metaObject.objectKey) } } diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go index c2a0fb96ab..935b4445e8 100644 --- a/exp/services/ledgerexporter/internal/uploader_test.go +++ b/exp/services/ledgerexporter/internal/uploader_test.go @@ -5,13 +5,16 @@ import ( "context" "fmt" "io" + "strconv" "testing" "time" - "github.com/stellar/go/support/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/stellar/go/support/errors" ) func TestUploaderSuite(t *testing.T) { @@ -31,97 +34,214 @@ func (s *UploaderSuite) SetupTest() { } func (s *UploaderSuite) TestUpload() { + s.testUpload(false) + s.testUpload(true) +} + +func (s *UploaderSuite) testUpload(putOkReturnVal bool) { key, start, end := "test-1-100", uint32(1), uint32(100) archive := NewLedgerMetaArchive(key, start, end) for i := start; i <= end; i++ { _ = archive.AddLedger(createLedgerCloseMeta(i)) } - var capturedWriterTo io.WriterTo + var capturedBuf bytes.Buffer var capturedKey string s.mockDataStore.On("PutFileIfNotExists", mock.Anything, key, mock.Anything). Run(func(args mock.Arguments) { capturedKey = args.Get(1).(string) - capturedWriterTo = args.Get(2).(io.WriterTo) - }).Return(nil).Once() + _, err := args.Get(2).(io.WriterTo).WriteTo(&capturedBuf) + require.NoError(s.T(), err) + }).Return(putOkReturnVal, nil).Once() - dataUploader := uploader{dataStore: &s.mockDataStore} + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + dataUploader := NewUploader(&s.mockDataStore, queue, registry) require.NoError(s.T(), dataUploader.Upload(context.Background(), archive)) - var capturedBuf bytes.Buffer - _, err := capturedWriterTo.WriteTo(&capturedBuf) - require.NoError(s.T(), err) - + expectedCompressedLength := capturedBuf.Len() var decodedArchive LedgerMetaArchive decoder := &XDRGzipDecoder{XdrPayload: &decodedArchive.data} - _, err = decoder.ReadFrom(&capturedBuf) + _, err := decoder.ReadFrom(&capturedBuf) require.NoError(s.T(), err) // require that the decoded data matches the original test data require.Equal(s.T(), key, capturedKey) require.Equal(s.T(), archive.data, decodedArchive.data) + + alreadyExists := !putOkReturnVal + metric, err := dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "already_exists": strconv.FormatBool(alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(1), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + require.Positive(s.T(), getMetricValue(metric).GetSummary().GetSampleSum()) + metric, err = dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "already_exists": strconv.FormatBool(!alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(0), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + + metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "compression": "gzip", + "already_exists": strconv.FormatBool(alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(1), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + require.Equal( + s.T(), + float64(expectedCompressedLength), + getMetricValue(metric).GetSummary().GetSampleSum(), + ) + metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "compression": "gzip", + "already_exists": strconv.FormatBool(!alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(0), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + + metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "compression": "none", + "already_exists": strconv.FormatBool(alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(1), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + uncompressedPayload, err := decodedArchive.data.MarshalBinary() + require.NoError(s.T(), err) + require.Equal( + s.T(), + float64(len(uncompressedPayload)), + getMetricValue(metric).GetSummary().GetSampleSum(), + ) + metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "compression": "none", + "already_exists": strconv.FormatBool(!alreadyExists), + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(0), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) } func (s *UploaderSuite) TestUploadPutError() { + s.testUploadPutError(true) + s.testUploadPutError(false) +} + +func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { key, start, end := "test-1-100", uint32(1), uint32(100) archive := NewLedgerMetaArchive(key, start, end) s.mockDataStore.On("PutFileIfNotExists", context.Background(), key, - mock.Anything).Return(errors.New("error in PutFileIfNotExists")) + mock.Anything).Return(putOkReturnVal, errors.New("error in PutFileIfNotExists")) - dataUploader := uploader{dataStore: &s.mockDataStore} + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + dataUploader := NewUploader(&s.mockDataStore, queue, registry) err := dataUploader.Upload(context.Background(), archive) require.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error()) + + for _, alreadyExists := range []string{"true", "false"} { + metric, err := dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "already_exists": alreadyExists, + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(0), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + + for _, compression := range []string{"gzip", "none"} { + metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ + "ledgers": "100", + "compression": compression, + "already_exists": alreadyExists, + }) + require.NoError(s.T(), err) + require.Equal( + s.T(), + uint64(0), + getMetricValue(metric).GetSummary().GetSampleCount(), + ) + } + } } func (s *UploaderSuite) TestRunChannelClose() { s.mockDataStore.On("PutFileIfNotExists", mock.Anything, - mock.Anything, mock.Anything).Return(nil) + mock.Anything, mock.Anything).Return(true, nil) - objectCh := make(chan *LedgerMetaArchive, 1) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) go func() { key, start, end := "test", uint32(1), uint32(100) for i := start; i <= end; i++ { - objectCh <- NewLedgerMetaArchive(key, i, i) + s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive(key, i, i))) } <-time.After(time.Second * 2) - close(objectCh) + queue.Close() }() - dataUploader := uploader{dataStore: &s.mockDataStore, metaArchiveCh: objectCh} + dataUploader := NewUploader(&s.mockDataStore, queue, registry) require.NoError(s.T(), dataUploader.Run(context.Background())) } func (s *UploaderSuite) TestRunContextCancel() { - objectCh := make(chan *LedgerMetaArchive, 1) - s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockDataStore.On("PutFileIfNotExists", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) ctx, cancel := context.WithCancel(context.Background()) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) - go func() { - for { - objectCh <- NewLedgerMetaArchive("test", 1, 1) - } - }() + s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1))) go func() { <-time.After(time.Second * 2) cancel() }() - dataUploader := uploader{dataStore: &s.mockDataStore, metaArchiveCh: objectCh} - err := dataUploader.Run(ctx) - - require.EqualError(s.T(), err, "context canceled") + dataUploader := NewUploader(&s.mockDataStore, queue, registry) + require.EqualError(s.T(), dataUploader.Run(ctx), "context canceled") } func (s *UploaderSuite) TestRunUploadError() { - objectCh := make(chan *LedgerMetaArchive, 10) - objectCh <- NewLedgerMetaArchive("test", 1, 1) + registry := prometheus.NewRegistry() + queue := NewUploadQueue(1, registry) + s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1))) s.mockDataStore.On("PutFileIfNotExists", mock.Anything, "test", - mock.Anything).Return(errors.New("Put error")) + mock.Anything).Return(false, errors.New("Put error")) - dataUploader := uploader{dataStore: &s.mockDataStore, metaArchiveCh: objectCh} + dataUploader := NewUploader(&s.mockDataStore, queue, registry) err := dataUploader.Run(context.Background()) require.Equal(s.T(), "error uploading test: Put error", err.Error()) } diff --git a/exp/services/ledgerexporter/internal/utils.go b/exp/services/ledgerexporter/internal/utils.go index d1bc8e20d1..8b556c5814 100644 --- a/exp/services/ledgerexporter/internal/utils.go +++ b/exp/services/ledgerexporter/internal/utils.go @@ -6,6 +6,7 @@ import ( "io" xdr3 "github.com/stellar/go-xdr/xdr3" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/storage"