Skip to content

Commit

Permalink
PrepareRange() in exporter manager goroutine (#5321)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored May 17, 2024
1 parent 1389949 commit 7d6f7e7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 46 deletions.
14 changes: 2 additions & 12 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (a *App) init(ctx context.Context) error {

logger.Infof("Final computed ledger range for backend retrieval and export, start=%d, end=%d", a.config.StartLedger, a.config.EndLedger)

if a.ledgerBackend, err = newLedgerBackend(ctx, a.config, registry); err != nil {
if a.ledgerBackend, err = newLedgerBackend(a.config, registry); err != nil {
return err
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (a *App) Run() {

// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
func newLedgerBackend(config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.GenerateCaptiveCoreConfig()
if err != nil {
return nil, err
Expand All @@ -261,15 +261,5 @@ func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *p
}
backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter")

var ledgerRange ledgerbackend.Range
if config.EndLedger == 0 {
ledgerRange = ledgerbackend.UnboundedRange(config.StartLedger)
} else {
ledgerRange = ledgerbackend.BoundedRange(config.StartLedger, config.EndLedger)
}

if err = backend.PrepareRange(ctx, ledgerRange); err != nil {
return nil, errors.Wrap(err, "Could not prepare captive core ledger backend")
}
return backend, nil
}
32 changes: 18 additions & 14 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,25 @@ func (e *ExportManager) Run(ctx context.Context, startLedger, endLedger uint32)
"end_ledger": strconv.FormatUint(uint64(endLedger), 10),
}

var ledgerRange ledgerbackend.Range
if endLedger < 1 {
ledgerRange = ledgerbackend.UnboundedRange(startLedger)
} else {
ledgerRange = ledgerbackend.BoundedRange(startLedger, endLedger)
}
if err := e.ledgerBackend.PrepareRange(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "Could not prepare captive core ledger backend")
}

for nextLedger := startLedger; endLedger < 1 || nextLedger <= endLedger; nextLedger++ {
select {
case <-ctx.Done():
logger.Info("Stopping ExportManager due to context cancellation")
return ctx.Err()
default:
ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}
e.latestLedgerMetric.With(labels).Set(float64(nextLedger))
err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger)
}
ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}
e.latestLedgerMetric.With(labels).Set(float64(nextLedger))
err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger)
}
}
logger.Infof("ExportManager successfully exported ledgers from %d to %d", startLedger, endLedger)
Expand Down
46 changes: 26 additions & 20 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest/ledgerbackend"
Expand Down Expand Up @@ -61,19 +60,20 @@ func (s *ExportManagerSuite) TestInvalidExportConfig() {
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
_, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.Error(s.T(), err)
s.Require().Error(err)
}

func (s *ExportManagerSuite) TestRun() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

start := uint32(0)
end := uint32(255)
expectedKeys := set.NewSet[string](10)
s.mockBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(start, end)).Return(nil)
for i := start; i <= end; i++ {
s.mockBackend.On("GetLedger", s.ctx, i).
Return(createLedgerCloseMeta(i), nil)
Expand All @@ -97,13 +97,12 @@ func (s *ExportManagerSuite) TestRun() {
}()

err = exporter.Run(s.ctx, start, end)
require.NoError(s.T(), err)
s.Require().NoError(err)

wg.Wait()

require.Equal(s.T(), expectedKeys, actualKeys)
require.Equal(
s.T(),
s.Require().Equal(expectedKeys, actualKeys)
s.Require().Equal(
float64(255),
getMetricValue(exporter.latestLedgerMetric.With(
prometheus.Labels{
Expand All @@ -119,9 +118,10 @@ func (s *ExportManagerSuite) TestRunContextCancel() {
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)
ctx, cancel := context.WithCancel(context.Background())

s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(0, 255)).Return(nil)
s.mockBackend.On("GetLedger", mock.Anything, mock.Anything).
Return(createLedgerCloseMeta(1), nil)

Expand All @@ -139,7 +139,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() {
}()

err = exporter.Run(ctx, 0, 255)
require.EqualError(s.T(), err, "failed to add ledgerCloseMeta for ledger 128: context canceled")
s.Require().EqualError(err, "failed to add ledgerCloseMeta for ledger 128: context canceled")

}

Expand All @@ -148,20 +148,26 @@ func (s *ExportManagerSuite) TestRunWithCanceledContext() {
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(1, 10)).
Return(context.Canceled).Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
s.Require().ErrorIs(ctx.Err(), context.Canceled)
})
err = exporter.Run(ctx, 1, 10)
require.EqualError(s.T(), err, "context canceled")
s.Require().ErrorIs(err, context.Canceled)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

expectedKeys := set.NewSet[string](10)
actualKeys := set.NewSet[string](10)
Expand All @@ -183,43 +189,43 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
start := uint32(0)
end := uint32(255)
for i := start; i <= end; i++ {
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i)))
s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i)))

key := config.GetObjectKeyFromSequenceNumber(i)
expectedKeys.Add(key)
}

queue.Close()
wg.Wait()
require.Equal(s.T(), expectedKeys, actualKeys)
s.Require().Equal(expectedKeys, actualKeys)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-time.After(time.Second * 1)
cancel()
}()

require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1)))
s.Require().NoError(exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1)))
err = exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(2))
require.EqualError(s.T(), err, "context canceled")
s.Require().EqualError(err, "context canceled")
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16)))
require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)),
s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16)))
s.Require().EqualError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)),
"Current meta archive object key mismatch")
}

0 comments on commit 7d6f7e7

Please sign in to comment.