From 7d6f7e789ac62096260177aa822d33e24a9fc52a Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 17 May 2024 17:06:59 +0100 Subject: [PATCH] PrepareRange() in exporter manager goroutine (#5321) --- exp/services/ledgerexporter/internal/app.go | 14 +----- .../ledgerexporter/internal/exportmanager.go | 32 +++++++------ .../internal/exportmanager_test.go | 46 +++++++++++-------- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index b90c7a33c3..fe84022d65 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -112,7 +112,7 @@ func (a *App) init(ctx context.Context) error { logger.Infof("Final computed ledger range for backend retrieval and export, start=%d, end=%d", a.config.StartLedger, a.config.EndLedger) - if a.ledgerBackend, err = newLedgerBackend(ctx, a.config, registry); err != nil { + if a.ledgerBackend, err = newLedgerBackend(a.config, registry); err != nil { return err } @@ -247,7 +247,7 @@ func (a *App) Run() { // newLedgerBackend Creates and initializes captive core ledger backend // Currently, only supports captive-core as ledger backend -func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) { +func newLedgerBackend(config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) { captiveConfig, err := config.GenerateCaptiveCoreConfig() if err != nil { return nil, err @@ -261,15 +261,5 @@ func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *p } backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter") - var ledgerRange ledgerbackend.Range - if config.EndLedger == 0 { - ledgerRange = ledgerbackend.UnboundedRange(config.StartLedger) - } else { - ledgerRange = ledgerbackend.BoundedRange(config.StartLedger, config.EndLedger) - } - - if err = backend.PrepareRange(ctx, ledgerRange); err != nil { - return nil, errors.Wrap(err, "Could not prepare captive core ledger backend") - } return backend, nil } diff --git a/exp/services/ledgerexporter/internal/exportmanager.go b/exp/services/ledgerexporter/internal/exportmanager.go index 5e6c87bbf5..55f85b9c46 100644 --- a/exp/services/ledgerexporter/internal/exportmanager.go +++ b/exp/services/ledgerexporter/internal/exportmanager.go @@ -89,21 +89,25 @@ func (e *ExportManager) Run(ctx context.Context, startLedger, endLedger uint32) "end_ledger": strconv.FormatUint(uint64(endLedger), 10), } + var ledgerRange ledgerbackend.Range + if endLedger < 1 { + ledgerRange = ledgerbackend.UnboundedRange(startLedger) + } else { + ledgerRange = ledgerbackend.BoundedRange(startLedger, endLedger) + } + if err := e.ledgerBackend.PrepareRange(ctx, ledgerRange); err != nil { + return errors.Wrap(err, "Could not prepare captive core ledger backend") + } + for nextLedger := startLedger; endLedger < 1 || nextLedger <= endLedger; nextLedger++ { - select { - case <-ctx.Done(): - logger.Info("Stopping ExportManager due to context cancellation") - return ctx.Err() - default: - ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger) - if err != nil { - return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger) - } - e.latestLedgerMetric.With(labels).Set(float64(nextLedger)) - err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta) - if err != nil { - return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger) - } + ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger) + if err != nil { + return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger) + } + e.latestLedgerMetric.With(labels).Set(float64(nextLedger)) + err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta) + if err != nil { + return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger) } } logger.Infof("ExportManager successfully exported ledgers from %d to %d", startLedger, endLedger) diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index a84e20fb52..d99f88cd14 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest/ledgerbackend" @@ -61,7 +60,7 @@ func (s *ExportManagerSuite) TestInvalidExportConfig() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) _, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.Error(s.T(), err) + s.Require().Error(err) } func (s *ExportManagerSuite) TestRun() { @@ -69,11 +68,12 @@ func (s *ExportManagerSuite) TestRun() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) start := uint32(0) end := uint32(255) expectedKeys := set.NewSet[string](10) + s.mockBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(start, end)).Return(nil) for i := start; i <= end; i++ { s.mockBackend.On("GetLedger", s.ctx, i). Return(createLedgerCloseMeta(i), nil) @@ -97,13 +97,12 @@ func (s *ExportManagerSuite) TestRun() { }() err = exporter.Run(s.ctx, start, end) - require.NoError(s.T(), err) + s.Require().NoError(err) wg.Wait() - require.Equal(s.T(), expectedKeys, actualKeys) - require.Equal( - s.T(), + s.Require().Equal(expectedKeys, actualKeys) + s.Require().Equal( float64(255), getMetricValue(exporter.latestLedgerMetric.With( prometheus.Labels{ @@ -119,9 +118,10 @@ func (s *ExportManagerSuite) TestRunContextCancel() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) ctx, cancel := context.WithCancel(context.Background()) + s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(0, 255)).Return(nil) s.mockBackend.On("GetLedger", mock.Anything, mock.Anything). Return(createLedgerCloseMeta(1), nil) @@ -139,7 +139,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() { }() err = exporter.Run(ctx, 0, 255) - require.EqualError(s.T(), err, "failed to add ledgerCloseMeta for ledger 128: context canceled") + s.Require().EqualError(err, "failed to add ledgerCloseMeta for ledger 128: context canceled") } @@ -148,12 +148,18 @@ func (s *ExportManagerSuite) TestRunWithCanceledContext() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) ctx, cancel := context.WithCancel(context.Background()) cancel() + + s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(1, 10)). + Return(context.Canceled).Run(func(args mock.Arguments) { + ctx := args.Get(0).(context.Context) + s.Require().ErrorIs(ctx.Err(), context.Canceled) + }) err = exporter.Run(ctx, 1, 10) - require.EqualError(s.T(), err, "context canceled") + s.Require().ErrorIs(err, context.Canceled) } func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { @@ -161,7 +167,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) expectedKeys := set.NewSet[string](10) actualKeys := set.NewSet[string](10) @@ -183,7 +189,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { start := uint32(0) end := uint32(255) for i := start; i <= end; i++ { - require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i))) + s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i))) key := config.GetObjectKeyFromSequenceNumber(i) expectedKeys.Add(key) @@ -191,7 +197,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { queue.Close() wg.Wait() - require.Equal(s.T(), expectedKeys, actualKeys) + s.Require().Equal(expectedKeys, actualKeys) } func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { @@ -199,7 +205,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -207,9 +213,9 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { cancel() }() - require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1))) + s.Require().NoError(exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1))) err = exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(2)) - require.EqualError(s.T(), err, "context canceled") + s.Require().EqualError(err, "context canceled") } func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { @@ -217,9 +223,9 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) - require.NoError(s.T(), err) + s.Require().NoError(err) - require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16))) - require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)), + s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16))) + s.Require().EqualError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)), "Current meta archive object key mismatch") }