Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/services/ledgerexporter/internal: Call PrepareRange() in exporter manager goroutine #5321

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (a *App) init(ctx context.Context) error {

logger.Infof("Final computed ledger range for backend retrieval and export, start=%d, end=%d", a.config.StartLedger, a.config.EndLedger)

if a.ledgerBackend, err = newLedgerBackend(ctx, a.config, registry); err != nil {
if a.ledgerBackend, err = newLedgerBackend(a.config, registry); err != nil {
return err
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (a *App) Run() {

// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
func newLedgerBackend(config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.GenerateCaptiveCoreConfig()
if err != nil {
return nil, err
Expand All @@ -261,15 +261,5 @@ func newLedgerBackend(ctx context.Context, config *Config, prometheusRegistry *p
}
backend = ledgerbackend.WithMetrics(backend, prometheusRegistry, "ledger_exporter")

var ledgerRange ledgerbackend.Range
if config.EndLedger == 0 {
ledgerRange = ledgerbackend.UnboundedRange(config.StartLedger)
} else {
ledgerRange = ledgerbackend.BoundedRange(config.StartLedger, config.EndLedger)
}

if err = backend.PrepareRange(ctx, ledgerRange); err != nil {
return nil, errors.Wrap(err, "Could not prepare captive core ledger backend")
}
return backend, nil
}
32 changes: 18 additions & 14 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,25 @@ func (e *ExportManager) Run(ctx context.Context, startLedger, endLedger uint32)
"end_ledger": strconv.FormatUint(uint64(endLedger), 10),
}

var ledgerRange ledgerbackend.Range
if endLedger < 1 {
ledgerRange = ledgerbackend.UnboundedRange(startLedger)
} else {
ledgerRange = ledgerbackend.BoundedRange(startLedger, endLedger)
}
if err := e.ledgerBackend.PrepareRange(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "Could not prepare captive core ledger backend")
}

for nextLedger := startLedger; endLedger < 1 || nextLedger <= endLedger; nextLedger++ {
select {
case <-ctx.Done():
logger.Info("Stopping ExportManager due to context cancellation")
return ctx.Err()
tamirms marked this conversation as resolved.
Show resolved Hide resolved
default:
ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}
e.latestLedgerMetric.With(labels).Set(float64(nextLedger))
err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger)
}
ledgerCloseMeta, err := e.ledgerBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}
e.latestLedgerMetric.With(labels).Set(float64(nextLedger))
err = e.AddLedgerCloseMeta(ctx, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to add ledgerCloseMeta for ledger %d", nextLedger)
}
}
logger.Infof("ExportManager successfully exported ledgers from %d to %d", startLedger, endLedger)
Expand Down
46 changes: 26 additions & 20 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest/ledgerbackend"
Expand Down Expand Up @@ -61,19 +60,20 @@
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
_, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.Error(s.T(), err)
s.Require().Error(err)

Check failure on line 63 in exp/services/ledgerexporter/internal/exportmanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Require undefined (type *ExportManagerSuite has no field or method Require) (typecheck)
}

func (s *ExportManagerSuite) TestRun() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

Check failure on line 71 in exp/services/ledgerexporter/internal/exportmanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Require undefined (type *ExportManagerSuite has no field or method Require) (typecheck)

start := uint32(0)
end := uint32(255)
expectedKeys := set.NewSet[string](10)
s.mockBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(start, end)).Return(nil)
for i := start; i <= end; i++ {
s.mockBackend.On("GetLedger", s.ctx, i).
Return(createLedgerCloseMeta(i), nil)
Expand All @@ -97,13 +97,12 @@
}()

err = exporter.Run(s.ctx, start, end)
require.NoError(s.T(), err)
s.Require().NoError(err)

Check failure on line 100 in exp/services/ledgerexporter/internal/exportmanager_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Require undefined (type *ExportManagerSuite has no field or method Require) (typecheck)

wg.Wait()

require.Equal(s.T(), expectedKeys, actualKeys)
require.Equal(
s.T(),
s.Require().Equal(expectedKeys, actualKeys)
s.Require().Equal(
float64(255),
getMetricValue(exporter.latestLedgerMetric.With(
prometheus.Labels{
Expand All @@ -119,9 +118,10 @@
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)
ctx, cancel := context.WithCancel(context.Background())

s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(0, 255)).Return(nil)
s.mockBackend.On("GetLedger", mock.Anything, mock.Anything).
Return(createLedgerCloseMeta(1), nil)

Expand All @@ -139,7 +139,7 @@
}()

err = exporter.Run(ctx, 0, 255)
require.EqualError(s.T(), err, "failed to add ledgerCloseMeta for ledger 128: context canceled")
s.Require().EqualError(err, "failed to add ledgerCloseMeta for ledger 128: context canceled")

}

Expand All @@ -148,20 +148,26 @@
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

s.mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(1, 10)).
Return(context.Canceled).Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
s.Require().ErrorIs(ctx.Err(), context.Canceled)
})
err = exporter.Run(ctx, 1, 10)
require.EqualError(s.T(), err, "context canceled")
s.Require().ErrorIs(err, context.Canceled)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

expectedKeys := set.NewSet[string](10)
actualKeys := set.NewSet[string](10)
Expand All @@ -183,43 +189,43 @@
start := uint32(0)
end := uint32(255)
for i := start; i <= end; i++ {
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i)))
s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i)))

key := config.GetObjectKeyFromSequenceNumber(i)
expectedKeys.Add(key)
}

queue.Close()
wg.Wait()
require.Equal(s.T(), expectedKeys, actualKeys)
s.Require().Equal(expectedKeys, actualKeys)
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-time.After(time.Second * 1)
cancel()
}()

require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1)))
s.Require().NoError(exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1)))
err = exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(2))
require.EqualError(s.T(), err, "context canceled")
s.Require().EqualError(err, "context canceled")
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)
s.Require().NoError(err)

require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16)))
require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)),
s.Require().NoError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16)))
s.Require().EqualError(exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)),
"Current meta archive object key mismatch")
}
Loading