diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index 5ec7c934a5..ddf0da7f11 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -154,6 +154,8 @@ jobs: ledger-exporter: name: Test and push the Ledger Exporter images runs-on: ubuntu-latest + env: + STELLAR_CORE_VERSION: 21.0.0-1812.rc1.a10329cca.focal steps: - uses: actions/checkout@v3 with: diff --git a/exp/services/ledgerexporter/config.toml b/exp/services/ledgerexporter/config.toml index a56087427c..bc522ff6a8 100644 --- a/exp/services/ledgerexporter/config.toml +++ b/exp/services/ledgerexporter/config.toml @@ -9,6 +9,7 @@ destination_bucket_path = "exporter-test/ledgers" [exporter_config] ledgers_per_file = 1 files_per_partition = 64000 + file_suffix = ".xdr.gz" [stellar_core_config] stellar_core_binary_path = "/usr/local/bin/stellar-core" diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index 588164582c..e23cc7b746 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -17,6 +17,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" _ "github.com/stellar/go/network" + "github.com/stellar/go/support/datastore" supporthttp "github.com/stellar/go/support/http" "github.com/stellar/go/support/log" ) @@ -63,7 +64,7 @@ func (m InvalidDataStoreError) Error() string { type App struct { config *Config ledgerBackend ledgerbackend.LedgerBackend - dataStore DataStore + dataStore datastore.DataStore exportManager *ExportManager uploader Uploader flags Flags @@ -88,17 +89,17 @@ func (a *App) init(ctx context.Context) error { if a.config, err = NewConfig(ctx, a.flags); err != nil { return errors.Wrap(err, "Could not load configuration") } - if archive, err = createHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil { + if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil { return err } a.config.ValidateAndSetLedgerRange(ctx, archive) - if a.dataStore, err = NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil { + if a.dataStore, err = datastore.NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil { return errors.Wrap(err, "Could not connect to destination data store") } if a.config.Resume { if err = a.applyResumability(ctx, - NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil { + datastore.NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil { return err } } @@ -119,7 +120,7 @@ func (a *App) init(ctx context.Context) error { return nil } -func (a *App) applyResumability(ctx context.Context, resumableManager ResumableManager) error { +func (a *App) applyResumability(ctx context.Context, resumableManager datastore.ResumableManager) error { absentLedger, ok, err := resumableManager.FindStart(ctx, a.config.StartLedger, a.config.EndLedger) if err != nil { return err diff --git a/exp/services/ledgerexporter/internal/app_test.go b/exp/services/ledgerexporter/internal/app_test.go index fc63c3a387..e2db0e28d9 100644 --- a/exp/services/ledgerexporter/internal/app_test.go +++ b/exp/services/ledgerexporter/internal/app_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/stellar/go/support/datastore" "github.com/stretchr/testify/require" ) @@ -12,7 +13,7 @@ func TestApplyResumeHasStartError(t *testing.T) { ctx := context.Background() app := &App{} app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true} - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, errors.New("start error")).Once() err := app.applyResumability(ctx, mockResumableManager) @@ -24,7 +25,7 @@ func TestApplyResumeDatastoreComplete(t *testing.T) { ctx := context.Background() app := &App{} app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true} - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, nil).Once() var alreadyExported *DataAlreadyExportedError @@ -40,9 +41,9 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) { StartLedger: 3, EndLedger: 9, Resume: true, - LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, } - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} // simulate the datastore has inconsistent data, // with last ledger not aligned to starting boundary mockResumableManager.On("FindStart", ctx, uint32(3), uint32(9)).Return(uint32(6), true, nil).Once() @@ -60,9 +61,9 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, } - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50 mockResumableManager.On("FindStart", ctx, uint32(10), uint32(99)).Return(uint32(50), true, nil).Once() @@ -79,9 +80,9 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, } - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range mockResumableManager.On("FindStart", ctx, uint32(10), uint32(99)).Return(uint32(2), true, nil).Once() @@ -101,9 +102,9 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) { StartLedger: 2, EndLedger: 99, Resume: true, - LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, } - mockResumableManager := &MockResumableManager{} + mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range mockResumableManager.On("FindStart", ctx, uint32(2), uint32(99)).Return(uint32(2), true, nil).Once() diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index 0d3cb87cb4..2f6589a720 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -11,9 +11,9 @@ import ( "github.com/pelletier/go-toml" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/ordered" - "github.com/stellar/go/support/storage" ) const Pubnet = "pubnet" @@ -34,58 +34,19 @@ type StellarCoreConfig struct { CaptiveCoreTomlPath string `toml:"captive_core_toml_path"` } -type DataStoreConfig struct { - Type string `toml:"type"` - Params map[string]string `toml:"params"` -} - type Config struct { AdminPort int `toml:"admin_port"` - Network string `toml:"network"` - DataStoreConfig DataStoreConfig `toml:"datastore_config"` - LedgerBatchConfig LedgerBatchConfig `toml:"exporter_config"` - StellarCoreConfig StellarCoreConfig `toml:"stellar_core_config"` + Network string `toml:"network"` + DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` + LedgerBatchConfig datastore.LedgerBatchConfig `toml:"exporter_config"` + StellarCoreConfig StellarCoreConfig `toml:"stellar_core_config"` StartLedger uint32 EndLedger uint32 Resume bool } -func createHistoryArchiveFromNetworkName(ctx context.Context, networkName string) (historyarchive.ArchiveInterface, error) { - var historyArchiveUrls []string - switch networkName { - case Pubnet: - historyArchiveUrls = network.PublicNetworkhistoryArchiveURLs - case Testnet: - historyArchiveUrls = network.TestNetworkhistoryArchiveURLs - default: - return nil, errors.Errorf("Invalid network name %s", networkName) - } - - return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{ - ConnectOptions: storage.ConnectOptions{ - UserAgent: "ledger-exporter", - Context: ctx, - }, - }) -} - -func getLatestLedgerSequenceFromHistoryArchives(archive historyarchive.ArchiveInterface) (uint32, error) { - has, err := archive.GetRootHAS() - if err != nil { - logger.WithError(err).Warnf("Error getting root HAS from archives") - return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from any history archive") - } - - return has.CurrentLedger, nil -} - -func getHistoryArchivesCheckPointFrequency() uint32 { - // this could evolve to use other sources for checkpoint freq - return historyarchive.DefaultCheckpointFrequency -} - // This will generate the config based on commandline flags and toml // // ctx - the caller context @@ -113,7 +74,7 @@ func NewConfig(ctx context.Context, flags Flags) (*Config, error) { // Validates requested ledger range, and will automatically adjust it // to be ledgers-per-file boundary aligned func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive historyarchive.ArchiveInterface) error { - latestNetworkLedger, err := getLatestLedgerSequenceFromHistoryArchives(archive) + latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(archive) if err != nil { return errors.Wrap(err, "Failed to retrieve the latest ledger sequence from history archives.") @@ -182,7 +143,7 @@ func (config *Config) GenerateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConf BinaryPath: coreConfig.StellarCoreBinaryPath, NetworkPassphrase: params.NetworkPassphrase, HistoryArchiveURLs: params.HistoryArchiveURLs, - CheckpointFrequency: getHistoryArchivesCheckPointFrequency(), + CheckpointFrequency: datastore.GetHistoryArchivesCheckPointFrequency(), Log: logger.WithField("subservice", "stellar-core"), Toml: captiveCoreToml, UserAgent: "ledger-exporter", diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index 86f6cfb5b3..036c121455 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -22,6 +22,7 @@ func TestNewConfigResumeEnabled(t *testing.T) { require.Equal(t, config.DataStoreConfig.Type, "ABC") require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1)) require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3)) + require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz") require.True(t, config.Resume) url, ok := config.DataStoreConfig.Params["destination_bucket_path"] require.True(t, ok) diff --git a/exp/services/ledgerexporter/internal/exportmanager.go b/exp/services/ledgerexporter/internal/exportmanager.go index 51c3947506..5ee06b8cd6 100644 --- a/exp/services/ledgerexporter/internal/exportmanager.go +++ b/exp/services/ledgerexporter/internal/exportmanager.go @@ -8,19 +8,20 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/xdr" ) type ExportManager struct { - config LedgerBatchConfig + config datastore.LedgerBatchConfig ledgerBackend ledgerbackend.LedgerBackend - currentMetaArchive *LedgerMetaArchive + currentMetaArchive *datastore.LedgerMetaArchive queue UploadQueue latestLedgerMetric *prometheus.GaugeVec } // NewExportManager creates a new ExportManager with the provided configuration. -func NewExportManager(config LedgerBatchConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) { +func NewExportManager(config datastore.LedgerBatchConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) { if config.LedgersPerFile < 1 { return nil, errors.Errorf("Invalid ledgers per file (%d): must be at least 1", config.LedgersPerFile) } @@ -60,7 +61,7 @@ func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta } // Create a new LedgerMetaArchive and add it to the map. - e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq) + e.currentMetaArchive = datastore.NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq) } if err := e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil { diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index 5eaa38bdcb..01e3595373 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -15,6 +15,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/datastore" ) func TestExporterSuite(t *testing.T) { @@ -38,7 +39,7 @@ func (s *ExportManagerSuite) TearDownTest() { } func (s *ExportManagerSuite) TestInvalidExportConfig() { - config := LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10} + config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) _, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -46,7 +47,7 @@ func (s *ExportManagerSuite) TestInvalidExportConfig() { } func (s *ExportManagerSuite) TestRun() { - config := LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10} + config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -57,7 +58,7 @@ func (s *ExportManagerSuite) TestRun() { expectedKeys := set.NewSet[string](10) for i := start; i <= end; i++ { s.mockBackend.On("GetLedger", s.ctx, i). - Return(createLedgerCloseMeta(i), nil) + Return(datastore.CreateLedgerCloseMeta(i), nil) key := config.GetObjectKeyFromSequenceNumber(i) expectedKeys.Add(key) } @@ -73,7 +74,7 @@ func (s *ExportManagerSuite) TestRun() { if !ok { break } - actualKeys.Add(v.objectKey) + actualKeys.Add(v.ObjectKey) } }() @@ -96,7 +97,7 @@ func (s *ExportManagerSuite) TestRun() { } func (s *ExportManagerSuite) TestRunContextCancel() { - config := LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -104,7 +105,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() { ctx, cancel := context.WithCancel(context.Background()) s.mockBackend.On("GetLedger", mock.Anything, mock.Anything). - Return(createLedgerCloseMeta(1), nil) + Return(datastore.CreateLedgerCloseMeta(1), nil) go func() { <-time.After(time.Second * 1) @@ -125,7 +126,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() { } func (s *ExportManagerSuite) TestRunWithCanceledContext() { - config := LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -142,23 +143,24 @@ func (s *ExportManagerSuite) TestGetObjectKeyFromSequenceNumber() { filesPerPartition uint32 ledgerSeq uint32 ledgersPerFile uint32 + fileSuffix string expectedKey string }{ - {0, 5, 1, "5.xdr.gz"}, - {0, 5, 10, "0-9.xdr.gz"}, - {2, 10, 100, "0-199/0-99.xdr.gz"}, - {2, 150, 50, "100-199/150-199.xdr.gz"}, - {2, 300, 200, "0-399/200-399.xdr.gz"}, - {2, 1, 1, "0-1/1.xdr.gz"}, - {4, 10, 100, "0-399/0-99.xdr.gz"}, - {4, 250, 50, "200-399/250-299.xdr.gz"}, - {1, 300, 200, "200-399.xdr.gz"}, - {1, 1, 1, "1.xdr.gz"}, + {0, 5, 1, ".xdr.gz", "5.xdr.gz"}, + {0, 5, 10, ".xdr.gz", "0-9.xdr.gz"}, + {2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"}, + {2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"}, + {2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"}, + {2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"}, + {4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"}, + {4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"}, + {1, 300, 200, ".xdr.gz", "200-399.xdr.gz"}, + {1, 1, 1, ".xdr.gz", "1.xdr.gz"}, } for _, tc := range testCases { s.T().Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) { - config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile} + config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix} key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq) require.Equal(t, tc.expectedKey, key) }) @@ -166,7 +168,7 @@ func (s *ExportManagerSuite) TestGetObjectKeyFromSequenceNumber() { } func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { - config := LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -185,14 +187,15 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { if !ok { break } - actualKeys.Add(v.objectKey) + actualKeys.Add(v.ObjectKey) } }() start := uint32(0) end := uint32(255) for i := start; i <= end; i++ { - require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i))) + require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(i))) + key := config.GetObjectKeyFromSequenceNumber(i) expectedkeys.Add(key) } @@ -203,7 +206,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { } func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { - config := LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -215,19 +218,19 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { cancel() }() - require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1))) - err = exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(2)) + require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, datastore.CreateLedgerCloseMeta(1))) + err = exporter.AddLedgerCloseMeta(ctx, datastore.CreateLedgerCloseMeta(2)) require.EqualError(s.T(), err, "context canceled") } func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { - config := LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1} + config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) require.NoError(s.T(), err) - require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16))) - require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)), + require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(16))) + require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(21)), "Current meta archive object key mismatch") } diff --git a/exp/services/ledgerexporter/internal/ledger_meta_archive.go b/exp/services/ledgerexporter/internal/ledger_meta_archive.go deleted file mode 100644 index 2a193f812c..0000000000 --- a/exp/services/ledgerexporter/internal/ledger_meta_archive.go +++ /dev/null @@ -1,65 +0,0 @@ -package ledgerexporter - -import ( - "fmt" - - "github.com/stellar/go/xdr" -) - -// LedgerMetaArchive represents a file with metadata and binary data. -type LedgerMetaArchive struct { - // file name - objectKey string - // Actual binary data - data xdr.LedgerCloseMetaBatch -} - -// NewLedgerMetaArchive creates a new LedgerMetaArchive instance. -func NewLedgerMetaArchive(key string, startSeq uint32, endSeq uint32) *LedgerMetaArchive { - return &LedgerMetaArchive{ - objectKey: key, - data: xdr.LedgerCloseMetaBatch{ - StartSequence: xdr.Uint32(startSeq), - EndSequence: xdr.Uint32(endSeq), - }, - } -} - -// AddLedger adds a LedgerCloseMeta to the archive. -func (f *LedgerMetaArchive) AddLedger(ledgerCloseMeta xdr.LedgerCloseMeta) error { - if ledgerCloseMeta.LedgerSequence() < uint32(f.data.StartSequence) || - ledgerCloseMeta.LedgerSequence() > uint32(f.data.EndSequence) { - return fmt.Errorf("ledger sequence %d is outside valid range [%d, %d]", - ledgerCloseMeta.LedgerSequence(), f.data.StartSequence, f.data.EndSequence) - } - - if len(f.data.LedgerCloseMetas) > 0 { - lastSequence := f.data.LedgerCloseMetas[len(f.data.LedgerCloseMetas)-1].LedgerSequence() - if ledgerCloseMeta.LedgerSequence() != lastSequence+1 { - return fmt.Errorf("ledgers must be added sequentially: expected sequence %d, got %d", - lastSequence+1, ledgerCloseMeta.LedgerSequence()) - } - } - f.data.LedgerCloseMetas = append(f.data.LedgerCloseMetas, ledgerCloseMeta) - return nil -} - -// GetLedgerCount returns the number of ledgers currently in the archive. -func (f *LedgerMetaArchive) GetLedgerCount() uint32 { - return uint32(len(f.data.LedgerCloseMetas)) -} - -// GetStartLedgerSequence returns the starting ledger sequence of the archive. -func (f *LedgerMetaArchive) GetStartLedgerSequence() uint32 { - return uint32(f.data.StartSequence) -} - -// GetEndLedgerSequence returns the ending ledger sequence of the archive. -func (f *LedgerMetaArchive) GetEndLedgerSequence() uint32 { - return uint32(f.data.EndSequence) -} - -// GetObjectKey returns the object key of the archive. -func (f *LedgerMetaArchive) GetObjectKey() string { - return f.objectKey -} diff --git a/exp/services/ledgerexporter/internal/queue.go b/exp/services/ledgerexporter/internal/queue.go index 372ccb0056..5b46b202ee 100644 --- a/exp/services/ledgerexporter/internal/queue.go +++ b/exp/services/ledgerexporter/internal/queue.go @@ -4,11 +4,12 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/support/datastore" ) // UploadQueue is a queue of LedgerMetaArchive objects which are scheduled for upload type UploadQueue struct { - metaArchiveCh chan *LedgerMetaArchive + metaArchiveCh chan *datastore.LedgerMetaArchive queueLengthMetric prometheus.Gauge } @@ -22,13 +23,13 @@ func NewUploadQueue(size int, prometheusRegistry *prometheus.Registry) UploadQue }) prometheusRegistry.MustRegister(queueLengthMetric) return UploadQueue{ - metaArchiveCh: make(chan *LedgerMetaArchive, size), + metaArchiveCh: make(chan *datastore.LedgerMetaArchive, size), queueLengthMetric: queueLengthMetric, } } // Enqueue will add an upload task to the queue. Enqueue may block if the queue is full. -func (u UploadQueue) Enqueue(ctx context.Context, archive *LedgerMetaArchive) error { +func (u UploadQueue) Enqueue(ctx context.Context, archive *datastore.LedgerMetaArchive) error { u.queueLengthMetric.Inc() select { case u.metaArchiveCh <- archive: @@ -39,7 +40,7 @@ func (u UploadQueue) Enqueue(ctx context.Context, archive *LedgerMetaArchive) er } // Dequeue will pop a task off the queue. Dequeue may block if the queue is empty. -func (u UploadQueue) Dequeue(ctx context.Context) (*LedgerMetaArchive, bool, error) { +func (u UploadQueue) Dequeue(ctx context.Context) (*datastore.LedgerMetaArchive, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() diff --git a/exp/services/ledgerexporter/internal/queue_test.go b/exp/services/ledgerexporter/internal/queue_test.go index a791d29eae..4f5fdd263f 100644 --- a/exp/services/ledgerexporter/internal/queue_test.go +++ b/exp/services/ledgerexporter/internal/queue_test.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stellar/go/support/datastore" "github.com/stretchr/testify/require" ) @@ -31,9 +32,9 @@ func getMetricValue(metric prometheus.Metric) *dto.Metric { func TestQueue(t *testing.T) { queue := NewUploadQueue(3, prometheus.NewRegistry()) - require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 1, 1))) - require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 2, 2))) - require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 3, 3))) + require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 1, 1))) + require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 2, 2))) + require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 3, 3))) require.Equal(t, float64(3), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue()) queue.Close() diff --git a/exp/services/ledgerexporter/internal/test/10perfile.toml b/exp/services/ledgerexporter/internal/test/10perfile.toml index 8629bf4b5b..9b96927804 100644 --- a/exp/services/ledgerexporter/internal/test/10perfile.toml +++ b/exp/services/ledgerexporter/internal/test/10perfile.toml @@ -1,4 +1,5 @@ network = "test" [exporter_config] -ledgers_per_file = 10 \ No newline at end of file +ledgers_per_file = 10 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/15perfile.toml b/exp/services/ledgerexporter/internal/test/15perfile.toml index eb76bac4d8..94df87a2e2 100644 --- a/exp/services/ledgerexporter/internal/test/15perfile.toml +++ b/exp/services/ledgerexporter/internal/test/15perfile.toml @@ -1,4 +1,5 @@ network = "test" [exporter_config] -ledgers_per_file = 15 \ No newline at end of file +ledgers_per_file = 15 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/1perfile.toml b/exp/services/ledgerexporter/internal/test/1perfile.toml index cadef50df8..a15dc9bf41 100644 --- a/exp/services/ledgerexporter/internal/test/1perfile.toml +++ b/exp/services/ledgerexporter/internal/test/1perfile.toml @@ -1,4 +1,5 @@ network = "test" [exporter_config] -ledgers_per_file = 1 \ No newline at end of file +ledgers_per_file = 1 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/64perfile.toml b/exp/services/ledgerexporter/internal/test/64perfile.toml index f4e30a71c0..8e8d122fcc 100644 --- a/exp/services/ledgerexporter/internal/test/64perfile.toml +++ b/exp/services/ledgerexporter/internal/test/64perfile.toml @@ -1,4 +1,5 @@ network = "test" [exporter_config] -ledgers_per_file = 64 \ No newline at end of file +ledgers_per_file = 64 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/no_network.toml b/exp/services/ledgerexporter/internal/test/no_network.toml index f5815b9b9d..1cb591cdd4 100644 --- a/exp/services/ledgerexporter/internal/test/no_network.toml +++ b/exp/services/ledgerexporter/internal/test/no_network.toml @@ -7,4 +7,5 @@ destination_bucket_path = "your-bucket-name/subpath" [exporter_config] ledgers_per_file = 3 -files_per_partition = 1 \ No newline at end of file +files_per_partition = 1 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/test.toml b/exp/services/ledgerexporter/internal/test/test.toml index 49d62384d7..58b5fc6df6 100644 --- a/exp/services/ledgerexporter/internal/test/test.toml +++ b/exp/services/ledgerexporter/internal/test/test.toml @@ -8,4 +8,5 @@ destination_bucket_path = "your-bucket-name/subpath" [exporter_config] ledgers_per_file = 3 -files_per_partition = 1 \ No newline at end of file +files_per_partition = 1 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/test/validate_start_end.toml b/exp/services/ledgerexporter/internal/test/validate_start_end.toml index cadef50df8..a15dc9bf41 100644 --- a/exp/services/ledgerexporter/internal/test/validate_start_end.toml +++ b/exp/services/ledgerexporter/internal/test/validate_start_end.toml @@ -1,4 +1,5 @@ network = "test" [exporter_config] -ledgers_per_file = 1 \ No newline at end of file +ledgers_per_file = 1 +file_suffix = ".xdr.gz" \ No newline at end of file diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go index 04da703fd6..ca07965b85 100644 --- a/exp/services/ledgerexporter/internal/uploader.go +++ b/exp/services/ledgerexporter/internal/uploader.go @@ -8,11 +8,13 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" ) // Uploader is responsible for uploading data to a storage destination. type Uploader struct { - dataStore DataStore + dataStore datastore.DataStore queue UploadQueue uploadDurationMetric *prometheus.SummaryVec objectSizeMetrics *prometheus.SummaryVec @@ -20,7 +22,7 @@ type Uploader struct { // NewUploader constructs a new Uploader instance func NewUploader( - destination DataStore, + destination datastore.DataStore, queue UploadQueue, prometheusRegistry *prometheus.Registry, ) Uploader { @@ -76,13 +78,20 @@ func (r *writerToRecorder) WriteTo(w io.Writer) (int64, error) { } // Upload uploads the serialized binary data of ledger TxMeta to the specified destination. -func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error { +func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaArchive) error { logger.Infof("Uploading: %s", metaArchive.GetObjectKey()) startTime := time.Now() numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10) + // TODO: Add compression config and optimize best compression algorithm + // JIRA https://stellarorg.atlassian.net/browse/HUBBLE-368 + xdrEncoder, err := compressxdr.NewXDREncoder(compressxdr.GZIP, &metaArchive.Data) + if err != nil { + return err + } + writerTo := &writerToRecorder{ - WriterTo: &XDRGzipEncoder{XdrPayload: &metaArchive.data}, + WriterTo: xdrEncoder, } ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), writerTo) if err != nil { @@ -100,7 +109,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) er "already_exists": alreadyExists, }).Observe(float64(writerTo.totalUncompressed)) u.objectSizeMetrics.With(prometheus.Labels{ - "compression": "gzip", + "compression": compressxdr.GZIP, "ledgers": numLedgers, "already_exists": alreadyExists, }).Observe(float64(writerTo.totalCompressed)) @@ -136,6 +145,6 @@ func (u Uploader) Run(ctx context.Context) error { if err = u.Upload(uploadCtx, metaObject); err != nil { return err } - logger.Infof("Uploaded %s successfully", metaObject.objectKey) + logger.Infof("Uploaded %s successfully", metaObject.ObjectKey) } } diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go index 935b4445e8..ee39245486 100644 --- a/exp/services/ledgerexporter/internal/uploader_test.go +++ b/exp/services/ledgerexporter/internal/uploader_test.go @@ -10,11 +10,12 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - - "github.com/stellar/go/support/errors" ) func TestUploaderSuite(t *testing.T) { @@ -25,12 +26,12 @@ func TestUploaderSuite(t *testing.T) { type UploaderSuite struct { suite.Suite ctx context.Context - mockDataStore MockDataStore + mockDataStore datastore.MockDataStore } func (s *UploaderSuite) SetupTest() { s.ctx = context.Background() - s.mockDataStore = MockDataStore{} + s.mockDataStore = datastore.MockDataStore{} } func (s *UploaderSuite) TestUpload() { @@ -40,9 +41,9 @@ func (s *UploaderSuite) TestUpload() { func (s *UploaderSuite) testUpload(putOkReturnVal bool) { key, start, end := "test-1-100", uint32(1), uint32(100) - archive := NewLedgerMetaArchive(key, start, end) + archive := datastore.NewLedgerMetaArchive(key, start, end) for i := start; i <= end; i++ { - _ = archive.AddLedger(createLedgerCloseMeta(i)) + _ = archive.AddLedger(datastore.CreateLedgerCloseMeta(i)) } var capturedBuf bytes.Buffer @@ -60,14 +61,17 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { require.NoError(s.T(), dataUploader.Upload(context.Background(), archive)) expectedCompressedLength := capturedBuf.Len() - var decodedArchive LedgerMetaArchive - decoder := &XDRGzipDecoder{XdrPayload: &decodedArchive.data} - _, err := decoder.ReadFrom(&capturedBuf) + var decodedArchive datastore.LedgerMetaArchive + xdrDecoder, err := compressxdr.NewXDRDecoder(compressxdr.GZIP, &decodedArchive.Data) + require.NoError(s.T(), err) + + decoder := xdrDecoder + _, err = decoder.ReadFrom(&capturedBuf) require.NoError(s.T(), err) // require that the decoded data matches the original test data require.Equal(s.T(), key, capturedKey) - require.Equal(s.T(), archive.data, decodedArchive.data) + require.Equal(s.T(), archive.Data, decodedArchive.Data) alreadyExists := !putOkReturnVal metric, err := dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ @@ -94,7 +98,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": "gzip", + "compression": compressxdr.GZIP, "already_exists": strconv.FormatBool(alreadyExists), }) require.NoError(s.T(), err) @@ -110,7 +114,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { ) metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": "gzip", + "compression": compressxdr.GZIP, "already_exists": strconv.FormatBool(!alreadyExists), }) require.NoError(s.T(), err) @@ -131,7 +135,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { uint64(1), getMetricValue(metric).GetSummary().GetSampleCount(), ) - uncompressedPayload, err := decodedArchive.data.MarshalBinary() + uncompressedPayload, err := decodedArchive.Data.MarshalBinary() require.NoError(s.T(), err) require.Equal( s.T(), @@ -158,7 +162,7 @@ func (s *UploaderSuite) TestUploadPutError() { func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { key, start, end := "test-1-100", uint32(1), uint32(100) - archive := NewLedgerMetaArchive(key, start, end) + archive := datastore.NewLedgerMetaArchive(key, start, end) s.mockDataStore.On("PutFileIfNotExists", context.Background(), key, mock.Anything).Return(putOkReturnVal, errors.New("error in PutFileIfNotExists")) @@ -181,7 +185,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { getMetricValue(metric).GetSummary().GetSampleCount(), ) - for _, compression := range []string{"gzip", "none"} { + for _, compression := range []string{compressxdr.GZIP, "none"} { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", "compression": compression, @@ -206,7 +210,7 @@ func (s *UploaderSuite) TestRunChannelClose() { go func() { key, start, end := "test", uint32(1), uint32(100) for i := start; i <= end; i++ { - s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive(key, i, i))) + s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive(key, i, i))) } <-time.After(time.Second * 2) queue.Close() @@ -222,7 +226,7 @@ func (s *UploaderSuite) TestRunContextCancel() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) - s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1))) + s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive("test", 1, 1))) go func() { <-time.After(time.Second * 2) @@ -237,7 +241,7 @@ func (s *UploaderSuite) TestRunUploadError() { registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) - s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1))) + s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive("test", 1, 1))) s.mockDataStore.On("PutFileIfNotExists", mock.Anything, "test", mock.Anything).Return(false, errors.New("Put error")) diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go new file mode 100644 index 0000000000..b28e68b63c --- /dev/null +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -0,0 +1,296 @@ +// BufferedStorageBackend is a ledger backend that provides buffered access over a given DataStore. +// The DataStore must contain files generated from a LedgerExporter. + +package ledgerbackend + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/xdr" +) + +// Ensure BufferedStorageBackend implements LedgerBackend +var _ LedgerBackend = (*BufferedStorageBackend)(nil) + +type BufferedStorageBackendConfig struct { + LedgerBatchConfig datastore.LedgerBatchConfig + CompressionType string + DataStore datastore.DataStore + BufferSize uint32 + NumWorkers uint32 + RetryLimit uint32 + RetryWait time.Duration +} + +// BufferedStorageBackend is a ledger backend that reads from a storage service. +// The storage service contains files generated from the ledgerExporter. +type BufferedStorageBackend struct { + config BufferedStorageBackendConfig + + bsBackendLock sync.RWMutex + + // ledgerBuffer is the buffer for LedgerCloseMeta data read in parallel. + ledgerBuffer *ledgerBuffer + + dataStore datastore.DataStore + prepared *Range // Non-nil if any range is prepared + closed bool // False until the core is closed + ledgerMetaArchive *datastore.LedgerMetaArchive + decoder compressxdr.XDRDecoder + nextLedger uint32 + lastLedger uint32 +} + +// NewBufferedStorageBackend returns a new BufferedStorageBackend instance. +func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) { + if config.BufferSize == 0 { + return nil, errors.New("buffer size must be > 0") + } + + if config.NumWorkers > config.BufferSize { + return nil, errors.New("number of workers must be <= BufferSize") + } + + if config.DataStore == nil { + return nil, errors.New("no DataStore provided") + } + + if config.LedgerBatchConfig.LedgersPerFile <= 0 { + return nil, errors.New("ledgersPerFile must be > 0") + } + + if config.LedgerBatchConfig.FileSuffix == "" { + return nil, errors.New("no file suffix provided in LedgerBatchConfig") + } + + if config.CompressionType == "" { + return nil, errors.New("no compression type provided in config") + } + + ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) + decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil) + if err != nil { + return nil, err + } + + bsBackend := &BufferedStorageBackend{ + config: config, + dataStore: config.DataStore, + ledgerMetaArchive: ledgerMetaArchive, + decoder: decoder, + } + + return bsBackend, nil +} + +// GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer. +func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + bsb.bsBackendLock.RLock() + defer bsb.bsBackendLock.RUnlock() + + if bsb.closed { + return 0, errors.New("BufferedStorageBackend is closed; cannot GetLatestLedgerSequence") + } + + if bsb.prepared == nil { + return 0, errors.New("BufferedStorageBackend must be prepared, call PrepareRange first") + } + + latestSeq, err := bsb.ledgerBuffer.getLatestLedgerSequence() + if err != nil { + return 0, err + } + + return latestSeq, nil +} + +// getBatchForSequence checks if the requested sequence is in the cached batch. +// Otherwise will continuously load in the next LedgerCloseMetaBatch until found. +func (bsb *BufferedStorageBackend) getBatchForSequence(ctx context.Context, sequence uint32) error { + // Sequence inside the current cached LedgerCloseMetaBatch + if sequence >= bsb.ledgerMetaArchive.GetStartLedgerSequence() && sequence <= bsb.ledgerMetaArchive.GetEndLedgerSequence() { + return nil + } + + // Sequence is before the current LedgerCloseMetaBatch + // Does not support retrieving LedgerCloseMeta before the current cached batch + if sequence < bsb.ledgerMetaArchive.GetStartLedgerSequence() { + return errors.New("requested sequence preceeds current LedgerCloseMetaBatch") + } + + // Sequence is beyond the current LedgerCloseMetaBatch + lcmBatchBinary, err := bsb.ledgerBuffer.getFromLedgerQueue(ctx) + if err != nil { + return errors.Wrap(err, "failed getting next ledger batch from queue") + } + + // Turn binary into xdr + err = bsb.ledgerMetaArchive.Data.UnmarshalBinary(lcmBatchBinary) + if err != nil { + return errors.Wrap(err, "failed unmarshalling lcmBatchBinary") + } + + return nil +} + +// nextExpectedSequence returns nextLedger (if currently set) or start of +// prepared range. Otherwise it returns 0. +// This is done because `nextLedger` is 0 between the moment Stellar-Core is +// started and streaming the first ledger (in such case we return first ledger +// in requested range). +func (bsb *BufferedStorageBackend) nextExpectedSequence() uint32 { + if bsb.nextLedger == 0 && bsb.prepared != nil { + return bsb.prepared.from + } + return bsb.nextLedger +} + +// GetLedger returns the LedgerCloseMeta for the specified ledger sequence number +func (bsb *BufferedStorageBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + bsb.bsBackendLock.RLock() + defer bsb.bsBackendLock.RUnlock() + + if bsb.closed { + return xdr.LedgerCloseMeta{}, errors.New("BufferedStorageBackend is closed; cannot GetLedger") + } + + if bsb.prepared == nil { + return xdr.LedgerCloseMeta{}, errors.New("session is not prepared, call PrepareRange first") + } + + if sequence < bsb.ledgerBuffer.ledgerRange.from { + return xdr.LedgerCloseMeta{}, errors.New("requested sequence preceeds current LedgerRange") + } + + if bsb.ledgerBuffer.ledgerRange.bounded { + if sequence > bsb.ledgerBuffer.ledgerRange.to { + return xdr.LedgerCloseMeta{}, errors.New("requested sequence beyond current LedgerRange") + } + } + + if sequence < bsb.lastLedger { + return xdr.LedgerCloseMeta{}, errors.New("requested sequence preceeds the lastLedger") + } + + if sequence > bsb.nextExpectedSequence() { + return xdr.LedgerCloseMeta{}, errors.New("requested sequence is not the lastLedger nor the next available ledger") + } + + err := bsb.getBatchForSequence(ctx, sequence) + if err != nil { + return xdr.LedgerCloseMeta{}, err + } + + ledgerCloseMeta, err := bsb.ledgerMetaArchive.GetLedger(sequence) + if err != nil { + return xdr.LedgerCloseMeta{}, err + } + bsb.lastLedger = bsb.nextLedger + bsb.nextLedger++ + + return ledgerCloseMeta, nil +} + +// PrepareRange checks if the starting and ending (if bounded) ledgers exist. +func (bsb *BufferedStorageBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { + bsb.bsBackendLock.Lock() + defer bsb.bsBackendLock.Unlock() + + if bsb.closed { + return errors.New("BufferedStorageBackend is closed; cannot PrepareRange") + } + + if alreadyPrepared, err := bsb.startPreparingRange(ledgerRange); err != nil { + return errors.Wrap(err, "error starting prepare range") + } else if alreadyPrepared { + return nil + } + + bsb.prepared = &ledgerRange + + return nil +} + +// IsPrepared returns true if a given ledgerRange is prepared. +func (bsb *BufferedStorageBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { + bsb.bsBackendLock.RLock() + defer bsb.bsBackendLock.RUnlock() + + if bsb.closed { + return false, errors.New("BufferedStorageBackend is closed; cannot IsPrepared") + } + + return bsb.isPrepared(ledgerRange), nil +} + +func (bsb *BufferedStorageBackend) isPrepared(ledgerRange Range) bool { + if bsb.closed { + return false + } + + if bsb.prepared == nil { + return false + } + + if bsb.ledgerBuffer.ledgerRange.from > ledgerRange.from { + return false + } + + if bsb.ledgerBuffer.ledgerRange.bounded && !ledgerRange.bounded { + return false + } + + if !bsb.ledgerBuffer.ledgerRange.bounded && !ledgerRange.bounded { + return true + } + + if !bsb.ledgerBuffer.ledgerRange.bounded && ledgerRange.bounded { + return true + } + + if bsb.ledgerBuffer.ledgerRange.to >= ledgerRange.to { + return true + } + + return false +} + +// Close closes existing BufferedStorageBackend processes. +// Note, once a BufferedStorageBackend instance is closed it can no longer be used and +// all subsequent calls to PrepareRange(), GetLedger(), etc will fail. +// Close is thread-safe and can be called from another go routine. +func (bsb *BufferedStorageBackend) Close() error { + bsb.bsBackendLock.RLock() + defer bsb.bsBackendLock.RUnlock() + + if bsb.ledgerBuffer != nil { + bsb.ledgerBuffer.close() + } + + bsb.closed = true + + return nil +} + +// startPreparingRange prepares the ledger range by setting the range in the ledgerBuffer +func (bsb *BufferedStorageBackend) startPreparingRange(ledgerRange Range) (bool, error) { + if bsb.isPrepared(ledgerRange) { + return true, nil + } + + var err error + bsb.ledgerBuffer, err = bsb.newLedgerBuffer(ledgerRange) + if err != nil { + return false, err + } + + bsb.nextLedger = ledgerRange.from + + return false, nil +} diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go new file mode 100644 index 0000000000..f132bc9419 --- /dev/null +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -0,0 +1,549 @@ +package ledgerbackend + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/xdr" +) + +var partitionSize = uint32(64000) +var ledgerPerFileCount = uint32(1) + +func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig { + param := make(map[string]string) + param["destination_bucket_path"] = "testURL" + + ledgerBatchConfig := datastore.LedgerBatchConfig{ + LedgersPerFile: 1, + FilesPerPartition: 64000, + FileSuffix: ".xdr.gz", + } + + dataStore := new(datastore.MockDataStore) + + return BufferedStorageBackendConfig{ + LedgerBatchConfig: ledgerBatchConfig, + CompressionType: compressxdr.GZIP, + DataStore: dataStore, + BufferSize: 100, + NumWorkers: 5, + RetryLimit: 3, + RetryWait: time.Microsecond, + } +} + +func createBufferedStorageBackendForTesting() BufferedStorageBackend { + config := createBufferedStorageBackendConfigForTesting() + ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) + decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil) + + return BufferedStorageBackend{ + config: config, + dataStore: config.DataStore, + ledgerMetaArchive: ledgerMetaArchive, + decoder: decoder, + } +} + +func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) *datastore.MockDataStore { + mockDataStore := new(datastore.MockDataStore) + partition := count*partitionSize - 1 + for i := start; i <= end; i = i + count { + var objectName string + var readCloser io.ReadCloser + if count > 1 { + endFileSeq := i + count - 1 + readCloser = createLCMBatchReader(i, endFileSeq, count) + objectName = fmt.Sprintf("0-%d/%d-%d.xdr.gz", partition, i, endFileSeq) + } else { + readCloser = createLCMBatchReader(i, i, count) + objectName = fmt.Sprintf("0-%d/%d.xdr.gz", partition, i) + } + mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) + } + + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + return mockDataStore +} + +func createLCMForTesting(start, end uint32) []xdr.LedgerCloseMeta { + var lcmArray []xdr.LedgerCloseMeta + for i := start; i <= end; i++ { + lcmArray = append(lcmArray, datastore.CreateLedgerCloseMeta(i)) + } + + return lcmArray +} + +func createTestLedgerCloseMetaBatch(startSeq, endSeq, count uint32) xdr.LedgerCloseMetaBatch { + var ledgerCloseMetas []xdr.LedgerCloseMeta + for i := uint32(0); i < count; i++ { + ledgerCloseMetas = append(ledgerCloseMetas, datastore.CreateLedgerCloseMeta(startSeq+uint32(i))) + } + return xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(startSeq), + EndSequence: xdr.Uint32(endSeq), + LedgerCloseMetas: ledgerCloseMetas, + } +} + +func createLCMBatchReader(start, end, count uint32) io.ReadCloser { + testData := createTestLedgerCloseMetaBatch(start, end, count) + encoder, _ := compressxdr.NewXDREncoder(compressxdr.GZIP, testData) + var buf bytes.Buffer + encoder.WriteTo(&buf) + capturedBuf := buf.Bytes() + reader := bytes.NewReader(capturedBuf) + return io.NopCloser(reader) +} + +func TestNewBufferedStorageBackend(t *testing.T) { + ctx := context.Background() + config := createBufferedStorageBackendConfigForTesting() + + bsb, err := NewBufferedStorageBackend(ctx, config) + assert.NoError(t, err) + + assert.Equal(t, bsb.dataStore, config.DataStore) + assert.Equal(t, ".xdr.gz", bsb.config.LedgerBatchConfig.FileSuffix) + assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile) + assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition) + assert.Equal(t, uint32(100), bsb.config.BufferSize) + assert.Equal(t, uint32(5), bsb.config.NumWorkers) + assert.Equal(t, uint32(3), bsb.config.RetryLimit) + assert.Equal(t, time.Microsecond, bsb.config.RetryWait) +} + +func TestNewLedgerBuffer(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(7) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 5 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 5 }, time.Second*5, time.Millisecond*50) + assert.NoError(t, err) + + latestSeq, err := ledgerBuffer.getLatestLedgerSequence() + assert.NoError(t, err) + assert.Equal(t, uint32(7), latestSeq) + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + +func TestBSBGetLatestLedgerSequence(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(5) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 3 }, time.Second*5, time.Millisecond*50) + + latestSeq, err := bsb.GetLatestLedgerSequence(ctx) + assert.NoError(t, err) + + assert.Equal(t, uint32(5), latestSeq) +} + +func TestBSBGetLedger_SingleLedgerPerFile(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(5) + ctx := context.Background() + lcmArray := createLCMForTesting(startLedger, endLedger) + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 3 }, time.Second*5, time.Millisecond*50) + + lcm, err := bsb.GetLedger(ctx, uint32(3)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[0], lcm) + lcm, err = bsb.GetLedger(ctx, uint32(4)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[1], lcm) + lcm, err = bsb.GetLedger(ctx, uint32(5)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[2], lcm) +} + +func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(5) + lcmArray := createLCMForTesting(startLedger, endLedger) + bsb := createBufferedStorageBackendForTesting() + ctx := context.Background() + bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2) + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + + lcm, err := bsb.GetLedger(ctx, uint32(2)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[0], lcm) + lcm, err = bsb.GetLedger(ctx, uint32(3)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[1], lcm) + lcm, err = bsb.GetLedger(ctx, uint32(4)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[2], lcm) +} + +func TestBSBGetLedger_ErrorPreceedingLedger(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(5) + ctx := context.Background() + lcmArray := createLCMForTesting(startLedger, endLedger) + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 3 }, time.Second*5, time.Millisecond*50) + + lcm, err := bsb.GetLedger(ctx, uint32(3)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[0], lcm) + + _, err = bsb.GetLedger(ctx, uint32(2)) + assert.EqualError(t, err, "requested sequence preceeds current LedgerRange") +} + +func TestBSBGetLedger_NotPrepared(t *testing.T) { + bsb := createBufferedStorageBackendForTesting() + ctx := context.Background() + + _, err := bsb.GetLedger(ctx, uint32(3)) + assert.EqualError(t, err, "session is not prepared, call PrepareRange first") +} + +func TestBSBGetLedger_SequenceNotInBatch(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(5) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 3 }, time.Second*5, time.Millisecond*50) + + _, err := bsb.GetLedger(ctx, uint32(2)) + assert.EqualError(t, err, "requested sequence preceeds current LedgerRange") + + _, err = bsb.GetLedger(ctx, uint32(6)) + assert.EqualError(t, err, "requested sequence beyond current LedgerRange") +} + +func TestBSBPrepareRange(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + + assert.NotNil(t, bsb.prepared) + + // check alreadyPrepared + err := bsb.PrepareRange(ctx, ledgerRange) + assert.NoError(t, err) + assert.NotNil(t, bsb.prepared) +} + +func TestBSBIsPrepared_Bounded(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(5) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 3 }, time.Second*5, time.Millisecond*50) + + ok, err := bsb.IsPrepared(ctx, ledgerRange) + assert.NoError(t, err) + assert.True(t, ok) + + ok, err = bsb.IsPrepared(ctx, BoundedRange(2, 4)) + assert.NoError(t, err) + assert.False(t, ok) + + ok, err = bsb.IsPrepared(ctx, UnboundedRange(3)) + assert.NoError(t, err) + assert.False(t, ok) + + ok, err = bsb.IsPrepared(ctx, UnboundedRange(2)) + assert.NoError(t, err) + assert.False(t, ok) +} + +func TestBSBIsPrepared_Unbounded(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(8) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 5 + ledgerRange := UnboundedRange(3) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 5 }, time.Second*5, time.Millisecond*50) + + ok, err := bsb.IsPrepared(ctx, ledgerRange) + assert.NoError(t, err) + assert.True(t, ok) + + ok, err = bsb.IsPrepared(ctx, BoundedRange(3, 4)) + assert.NoError(t, err) + assert.True(t, ok) + + ok, err = bsb.IsPrepared(ctx, BoundedRange(2, 4)) + assert.NoError(t, err) + assert.False(t, ok) + + ok, err = bsb.IsPrepared(ctx, UnboundedRange(4)) + assert.NoError(t, err) + assert.True(t, ok) + + ok, err = bsb.IsPrepared(ctx, UnboundedRange(2)) + assert.NoError(t, err) + assert.False(t, ok) +} + +func TestBSBClose(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + + err := bsb.Close() + assert.NoError(t, err) + assert.Equal(t, true, bsb.closed) + + _, err = bsb.GetLatestLedgerSequence(ctx) + assert.EqualError(t, err, "BufferedStorageBackend is closed; cannot GetLatestLedgerSequence") + + _, err = bsb.GetLedger(ctx, 3) + assert.EqualError(t, err, "BufferedStorageBackend is closed; cannot GetLedger") + + err = bsb.PrepareRange(ctx, ledgerRange) + assert.EqualError(t, err, "BufferedStorageBackend is closed; cannot PrepareRange") + + _, err = bsb.IsPrepared(ctx, ledgerRange) + assert.EqualError(t, err, "BufferedStorageBackend is closed; cannot IsPrepared") +} + +func TestLedgerBufferInvariant(t *testing.T) { + startLedger := uint32(3) + endLedger := uint32(6) + ctx := context.Background() + lcmArray := createLCMForTesting(startLedger, endLedger) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 2 + ledgerRange := BoundedRange(startLedger, endLedger) + + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + + // Buffer should have hit the BufferSize limit + assert.Equal(t, 2, len(bsb.ledgerBuffer.ledgerQueue)) + + lcm, err := bsb.GetLedger(ctx, uint32(3)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[0], lcm) + lcm, err = bsb.GetLedger(ctx, uint32(4)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[1], lcm) + + // Buffer should fill up with remaining ledgers + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + assert.Equal(t, 2, len(bsb.ledgerBuffer.ledgerQueue)) + + lcm, err = bsb.GetLedger(ctx, uint32(5)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[2], lcm) + + // Buffer should only have the final ledger + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 1 }, time.Second*5, time.Millisecond*50) + assert.Equal(t, 1, len(bsb.ledgerBuffer.ledgerQueue)) + + lcm, err = bsb.GetLedger(ctx, uint32(6)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[3], lcm) + + // Buffer should be empty + assert.Equal(t, 0, len(bsb.ledgerBuffer.ledgerQueue)) +} + +func TestLedgerBufferClose(t *testing.T) { + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 1 + bsb.config.BufferSize = 5 + ledgerRange := UnboundedRange(3) + + mockDataStore := new(datastore.MockDataStore) + partition := ledgerPerFileCount*partitionSize - 1 + + objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + afterPrepareRange := make(chan struct{}) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) { + <-afterPrepareRange + go bsb.ledgerBuffer.close() + }).Once() + + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + close(afterPrepareRange) + + bsb.ledgerBuffer.wg.Wait() + + _, err := bsb.GetLedger(ctx, 3) + assert.EqualError(t, err, "failed getting next ledger batch from queue: context canceled") +} + +func TestLedgerBufferBoundedObjectNotFound(t *testing.T) { + ctx := context.Background() + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 1 + bsb.config.BufferSize = 5 + ledgerRange := BoundedRange(3, 5) + + mockDataStore := new(datastore.MockDataStore) + partition := ledgerPerFileCount*partitionSize - 1 + + objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once() + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + + bsb.ledgerBuffer.wg.Wait() + + _, err := bsb.GetLedger(ctx, 3) + assert.EqualError(t, err, "failed getting next ledger batch from queue: ledger object containing sequence 3 is missing: file does not exist") +} + +func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 1 + bsb.config.BufferSize = 5 + ledgerRange := UnboundedRange(3) + + mockDataStore := new(datastore.MockDataStore) + partition := ledgerPerFileCount*partitionSize - 1 + + objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + iteration := &atomic.Int32{} + cancelAfter := int32(bsb.config.RetryLimit) + 2 + mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) { + if iteration.Load() >= cancelAfter { + cancel() + } + iteration.Add(1) + }) + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) + + _, err := bsb.GetLedger(ctx, 3) + assert.EqualError(t, err, "failed getting next ledger batch from queue: context canceled") + assert.GreaterOrEqual(t, iteration.Load(), cancelAfter) + assert.NoError(t, bsb.Close()) +} + +func TestLedgerBufferRetryLimit(t *testing.T) { + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 1 + bsb.config.BufferSize = 5 + ledgerRange := UnboundedRange(3) + + mockDataStore := new(datastore.MockDataStore) + partition := ledgerPerFileCount*partitionSize - 1 + + objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + mockDataStore.On("GetFile", mock.Anything, objectName). + Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")). + Times(int(bsb.config.RetryLimit) + 1) + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + bsb.dataStore = mockDataStore + + assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange)) + + bsb.ledgerBuffer.wg.Wait() + + _, err := bsb.GetLedger(context.Background(), 3) + assert.EqualError(t, err, "failed getting next ledger batch from queue: maximum retries exceeded for downloading object containing sequence 3: transient error") +} diff --git a/ingest/ledgerbackend/configs/captive-core-pubnet.cfg b/ingest/ledgerbackend/configs/captive-core-pubnet.cfg index 6e61aa2603..5af59efaf9 100644 --- a/ingest/ledgerbackend/configs/captive-core-pubnet.cfg +++ b/ingest/ledgerbackend/configs/captive-core-pubnet.cfg @@ -192,4 +192,4 @@ NAME="Blockdaemon Validator 1" PUBLIC_KEY="GAAV2GCVFLNN522ORUYFV33E76VPC22E72S75AQ6MBR5V45Z5DWVPWEU" ADDRESS="stellar-full-validator1.bdnodes.net:11625" HISTORY="curl -sf https://stellar-full-history1.bdnodes.net/{0} -o {1}" -HOME_DOMAIN="stellar.blockdaemon.com" \ No newline at end of file +HOME_DOMAIN="stellar.blockdaemon.com" diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go new file mode 100644 index 0000000000..a26aab8ba2 --- /dev/null +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -0,0 +1,248 @@ +package ledgerbackend + +import ( + "bytes" + "context" + "io" + "os" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/stellar/go/support/collections/heap" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" +) + +type ledgerBatchObject struct { + payload []byte + startLedger int // Ledger sequence used as the priority for the priorityqueue. +} + +type ledgerBuffer struct { + // Passed through from BufferedStorageBackend to control lifetime of ledgerBuffer instance + config BufferedStorageBackendConfig + dataStore datastore.DataStore + decoder compressxdr.XDRDecoder + + // context used to cancel workers within the ledgerBuffer + context context.Context + cancel context.CancelCauseFunc + + wg sync.WaitGroup + + // The pipes and data structures below help establish the ledgerBuffer invariant which is + // the number of tasks (both pending and in-flight) + len(ledgerQueue) + ledgerPriorityQueue.Len() + // is always less than or equal to the config.BufferSize + taskQueue chan uint32 // Buffer next object read + ledgerQueue chan []byte // Order corrected lcm batches + ledgerPriorityQueue *heap.Heap[ledgerBatchObject] // Priority is set to the sequence number + priorityQueueLock sync.Mutex + + // Keep track of the ledgers to be processed and the next ordering + // the ledgers should be buffered + currentLedger uint32 // The current ledger that should be popped from ledgerPriorityQueue + nextTaskLedger uint32 // The next task ledger that should be added to taskQueue + ledgerRange Range + currentLedgerLock sync.RWMutex +} + +func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBuffer, error) { + ctx, cancel := context.WithCancelCause(context.Background()) + + less := func(a, b ledgerBatchObject) bool { + return a.startLedger < b.startLedger + } + pq := heap.New(less, int(bsb.config.BufferSize)) + + ledgerBuffer := &ledgerBuffer{ + config: bsb.config, + dataStore: bsb.dataStore, + taskQueue: make(chan uint32, bsb.config.BufferSize), + ledgerQueue: make(chan []byte, bsb.config.BufferSize), + ledgerPriorityQueue: pq, + currentLedger: ledgerRange.from, + nextTaskLedger: ledgerRange.from, + ledgerRange: ledgerRange, + context: ctx, + cancel: cancel, + decoder: bsb.decoder, + } + + // Start workers to read LCM files + ledgerBuffer.wg.Add(int(bsb.config.NumWorkers)) + for i := uint32(0); i < bsb.config.NumWorkers; i++ { + go ledgerBuffer.worker(ctx) + } + + // Upon initialization, the ledgerBuffer invariant is maintained because + // we create bsb.config.BufferSize tasks while the len(ledgerQueue) and ledgerPriorityQueue.Len() are 0. + // Effectively, this is len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize + // which enforces a limit of max tasks (both pending and in-flight) to be less than or equal to bsb.config.BufferSize. + // Note: when a task is in-flight it is no longer in the taskQueue + // but for easier conceptualization, len(taskQueue) can be interpreted as both pending and in-flight tasks + // where we assume the workers are empty and not processing any tasks. + for i := 0; i <= int(bsb.config.BufferSize); i++ { + ledgerBuffer.pushTaskQueue() + } + + return ledgerBuffer, nil +} + +func (lb *ledgerBuffer) pushTaskQueue() { + // In bounded mode, don't queue past the end ledger + if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded { + return + } + lb.taskQueue <- lb.nextTaskLedger + lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile +} + +// sleepWithContext returns true upon sleeping without interruption from the context +func (lb *ledgerBuffer) sleepWithContext(ctx context.Context, d time.Duration) bool { + timer := time.NewTimer(d) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return false + case <-timer.C: + } + return true +} + +func (lb *ledgerBuffer) worker(ctx context.Context) { + defer lb.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case sequence := <-lb.taskQueue: + for attempt := uint32(0); attempt <= lb.config.RetryLimit; { + ledgerObject, err := lb.downloadLedgerObject(ctx, sequence) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // ledgerObject not found and unbounded + if !lb.ledgerRange.bounded { + if !lb.sleepWithContext(ctx, lb.config.RetryWait) { + return + } + continue + } + lb.cancel(errors.Wrapf(err, "ledger object containing sequence %v is missing", sequence)) + return + } + // don't bother retrying if we've received the signal to shut down + if errors.Is(err, context.Canceled) { + return + } + if attempt == lb.config.RetryLimit { + err = errors.Wrapf(err, "maximum retries exceeded for downloading object containing sequence %v", sequence) + lb.cancel(err) + return + } + attempt++ + if !lb.sleepWithContext(ctx, lb.config.RetryWait) { + return + } + continue + } + + // When we store an object we still maintain the ledger buffer invariant because + // at this point the current task is finished and we add 1 ledger object to the priority queue. + // Thus, the number of tasks decreases by 1 and the priority queue length increases by 1. + // This keeps the overall total the same (<= BufferSize). As long as the the ledger buffer invariant + // was maintained in the previous state, it is still maintained during this state transition. + lb.storeObject(ledgerObject, sequence) + break + } + } + } +} + +func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) { + objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence) + + reader, err := lb.dataStore.GetFile(ctx, objectKey) + if err != nil { + return nil, err + } + + defer reader.Close() + + objectBytes, err := io.ReadAll(reader) + if err != nil { + return nil, errors.Wrapf(err, "failed reading file: %s", objectKey) + } + + return objectBytes, nil +} + +func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) { + lb.priorityQueueLock.Lock() + defer lb.priorityQueueLock.Unlock() + + lb.currentLedgerLock.Lock() + defer lb.currentLedgerLock.Unlock() + + lb.ledgerPriorityQueue.Push(ledgerBatchObject{ + payload: ledgerObject, + startLedger: int(sequence), + }) + + // Check if the nextLedger is the next item in the ledgerPriorityQueue + // The ledgerBuffer invariant is maintained here because items are transferred from the ledgerPriorityQueue to the ledgerQueue. + // Thus the overall sum of ledgerPriorityQueue.Len() + len(lb.ledgerQueue) remains the same. + for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) { + item := lb.ledgerPriorityQueue.Pop() + lb.ledgerQueue <- item.payload + lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile + } +} + +func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) ([]byte, error) { + for { + select { + case <-lb.context.Done(): + return nil, context.Cause(lb.context) + case <-ctx.Done(): + return nil, ctx.Err() + case compressedBinary := <-lb.ledgerQueue: + // The ledger buffer invariant is maintained here because + // we create an extra task when consuming one item from the ledger queue. + // Thus len(ledgerQueue) decreases by 1 and the number of tasks increases by 1. + // The overall sum below remains the same: + // len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize + lb.pushTaskQueue() + + reader := bytes.NewReader(compressedBinary) + lcmBinary, err := lb.decoder.Unzip(reader) + if err != nil { + return nil, err + } + + return lcmBinary, nil + } + } +} + +func (lb *ledgerBuffer) getLatestLedgerSequence() (uint32, error) { + lb.currentLedgerLock.Lock() + defer lb.currentLedgerLock.Unlock() + + if lb.currentLedger == lb.ledgerRange.from { + return 0, nil + } + + // Subtract 1 to get the latest ledger in buffer + return lb.currentLedger - 1, nil +} + +func (lb *ledgerBuffer) close() { + lb.cancel(context.Canceled) + // wait for all workers to finish terminating + lb.wg.Wait() +} diff --git a/ingest/ledgerbackend/toml.go b/ingest/ledgerbackend/toml.go index bc3ab2247a..84f84bfe4a 100644 --- a/ingest/ledgerbackend/toml.go +++ b/ingest/ledgerbackend/toml.go @@ -486,7 +486,7 @@ func (c *CaptiveCoreToml) checkCoreVersion(coreBinaryPath string) coreVersion { re := regexp.MustCompile(`\D*(\d*)\.(\d*).*`) versionStr := re.FindStringSubmatch(versionRaw) - if err == nil && len(versionStr) == 3 { + if len(versionStr) == 3 { for i := 1; i < len(versionStr); i++ { val, err := strconv.Atoi((versionStr[i])) if err != nil { diff --git a/support/collections/heap/heap.go b/support/collections/heap/heap.go new file mode 100644 index 0000000000..66d86ee493 --- /dev/null +++ b/support/collections/heap/heap.go @@ -0,0 +1,71 @@ +package heap + +import "container/heap" + +// A Heap is a min-heap backed by a slice. +type Heap[E any] struct { + s sliceHeap[E] +} + +// New constructs a new Heap with a comparison function. +func New[E any](less func(E, E) bool, capacity int) *Heap[E] { + return &Heap[E]{sliceHeap[E]{ + s: make([]E, 0, capacity), + less: less, + }} +} + +// Push pushes an element onto the heap. The complexity is O(log n) +// where n = h.Len(). +func (h *Heap[E]) Push(elem E) { + heap.Push(&h.s, elem) +} + +// Pop removes and returns the minimum element (according to the less function) +// from the heap. Pop panics if the heap is empty. +// The complexity is O(log n) where n = h.Len(). +func (h *Heap[E]) Pop() E { + return heap.Pop(&h.s).(E) +} + +// Peek returns the minimum element (according to the less function) in the heap. +// Peek panics if the heap is empty. +// The complexity is O(1). +func (h *Heap[E]) Peek() E { + return h.s.s[0] +} + +// Len returns the number of elements in the heap. +func (h *Heap[E]) Len() int { + return len(h.s.s) +} + +// sliceHeap just exists to use the existing heap.Interface as the +// implementation of Heap. +type sliceHeap[E any] struct { + s []E + less func(E, E) bool +} + +func (s *sliceHeap[E]) Len() int { return len(s.s) } + +func (s *sliceHeap[E]) Swap(i, j int) { + s.s[i], s.s[j] = s.s[j], s.s[i] +} + +func (s *sliceHeap[E]) Less(i, j int) bool { + return s.less(s.s[i], s.s[j]) +} + +func (s *sliceHeap[E]) Push(x interface{}) { + s.s = append(s.s, x.(E)) +} + +func (s *sliceHeap[E]) Pop() interface{} { + var zero E + e := s.s[len(s.s)-1] + // avoid memory leak by clearing out popped value in slice + s.s[len(s.s)-1] = zero + s.s = s.s[:len(s.s)-1] + return e +} diff --git a/support/collections/heap/heap_test.go b/support/collections/heap/heap_test.go new file mode 100644 index 0000000000..77f702a619 --- /dev/null +++ b/support/collections/heap/heap_test.go @@ -0,0 +1,49 @@ +package heap + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHeapPushAndPop(t *testing.T) { + less := func(a, b int) bool { return a < b } + h := New(less, 0) + + h.Push(3) + h.Push(1) + h.Push(2) + h.Push(1) + + assert.Equal(t, 1, h.Pop()) + assert.Equal(t, 1, h.Pop()) + assert.Equal(t, 2, h.Pop()) + assert.Equal(t, 3, h.Pop()) +} + +func TestHeapPeek(t *testing.T) { + less := func(a, b int) bool { return a < b } + h := New(less, 0) + + h.Push(3) + h.Push(1) + h.Push(2) + + assert.Equal(t, 1, h.Peek()) + assert.Equal(t, 1, h.Pop()) +} + +func TestHeapLen(t *testing.T) { + less := func(a, b int) bool { return a < b } + h := New(less, 0) + + assert.Equal(t, 0, h.Len()) + h.Push(5) + assert.Equal(t, 1, h.Len()) + h.Push(6) + h.Push(7) + assert.Equal(t, 3, h.Len()) + + h.Pop() + assert.Equal(t, 2, h.Len()) +} diff --git a/support/compressxdr/compress_xdr.go b/support/compressxdr/compress_xdr.go new file mode 100644 index 0000000000..c43a1bca77 --- /dev/null +++ b/support/compressxdr/compress_xdr.go @@ -0,0 +1,38 @@ +package compressxdr + +import ( + "io" + + "github.com/stellar/go/support/errors" +) + +const ( + GZIP = "gzip" +) + +type XDREncoder interface { + WriteTo(w io.Writer) (int64, error) +} + +type XDRDecoder interface { + ReadFrom(r io.Reader) (int64, error) + Unzip(r io.Reader) ([]byte, error) +} + +func NewXDREncoder(compressionType string, xdrPayload interface{}) (XDREncoder, error) { + switch compressionType { + case GZIP: + return &XDRGzipEncoder{XdrPayload: xdrPayload}, nil + default: + return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) + } +} + +func NewXDRDecoder(compressionType string, xdrPayload interface{}) (XDRDecoder, error) { + switch compressionType { + case GZIP: + return &XDRGzipDecoder{XdrPayload: xdrPayload}, nil + default: + return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) + } +} diff --git a/exp/services/ledgerexporter/internal/encoder_test.go b/support/compressxdr/compress_xdr_test.go similarity index 50% rename from exp/services/ledgerexporter/internal/encoder_test.go rename to support/compressxdr/compress_xdr_test.go index 2bf61bcee7..da94ab25ae 100644 --- a/exp/services/ledgerexporter/internal/encoder_test.go +++ b/support/compressxdr/compress_xdr_test.go @@ -1,4 +1,4 @@ -package ledgerexporter +package compressxdr import ( "bytes" @@ -11,7 +11,7 @@ import ( func createTestLedgerCloseMetaBatch(startSeq, endSeq uint32, count int) xdr.LedgerCloseMetaBatch { var ledgerCloseMetas []xdr.LedgerCloseMeta for i := 0; i < count; i++ { - ledgerCloseMetas = append(ledgerCloseMetas, createLedgerCloseMeta(startSeq+uint32(i))) + //ledgerCloseMetas = append(ledgerCloseMetas, datastore.CreateLedgerCloseMeta(startSeq+uint32(i))) } return xdr.LedgerCloseMetaBatch{ StartSequence: xdr.Uint32(startSeq), @@ -20,26 +20,27 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq uint32, count int) xdr.Ledg } } -func TestEncodeDecodeLedgerCloseMetaBatch(t *testing.T) { +func TestEncodeDecodeLedgerCloseMetaBatchGzip(t *testing.T) { testData := createTestLedgerCloseMetaBatch(1000, 1005, 6) // Encode the test data - var encoder XDRGzipEncoder - encoder.XdrPayload = testData + encoder, err := NewXDREncoder(GZIP, testData) + require.NoError(t, err) var buf bytes.Buffer - _, err := encoder.WriteTo(&buf) + _, err = encoder.WriteTo(&buf) require.NoError(t, err) // Decode the encoded data - var decoder XDRGzipDecoder - decoder.XdrPayload = &xdr.LedgerCloseMetaBatch{} + lcmBatch := xdr.LedgerCloseMetaBatch{} + decoder, err := NewXDRDecoder(GZIP, &lcmBatch) + require.NoError(t, err) _, err = decoder.ReadFrom(&buf) require.NoError(t, err) // Check if the decoded data matches the original test data - decodedData := decoder.XdrPayload.(*xdr.LedgerCloseMetaBatch) + decodedData := lcmBatch require.Equal(t, testData.StartSequence, decodedData.StartSequence) require.Equal(t, testData.EndSequence, decodedData.EndSequence) require.Equal(t, len(testData.LedgerCloseMetas), len(decodedData.LedgerCloseMetas)) @@ -47,3 +48,26 @@ func TestEncodeDecodeLedgerCloseMetaBatch(t *testing.T) { require.Equal(t, testData.LedgerCloseMetas[i], decodedData.LedgerCloseMetas[i]) } } + +func TestDecodeUnzipGzip(t *testing.T) { + expectedBinary := []byte{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0} + testData := createTestLedgerCloseMetaBatch(2, 2, 1) + + // Encode the test data + encoder, err := NewXDREncoder(GZIP, testData) + require.NoError(t, err) + + var buf bytes.Buffer + _, err = encoder.WriteTo(&buf) + require.NoError(t, err) + + // Decode the encoded data + lcmBatch := xdr.LedgerCloseMetaBatch{} + decoder, err := NewXDRDecoder(GZIP, &lcmBatch) + require.NoError(t, err) + + binary, err := decoder.Unzip(&buf) + require.NoError(t, err) + + require.Equal(t, expectedBinary, binary) +} diff --git a/exp/services/ledgerexporter/internal/encoder.go b/support/compressxdr/gzip_compress_xdr.go similarity index 68% rename from exp/services/ledgerexporter/internal/encoder.go rename to support/compressxdr/gzip_compress_xdr.go index 33909ace75..9171b87ffd 100644 --- a/exp/services/ledgerexporter/internal/encoder.go +++ b/support/compressxdr/gzip_compress_xdr.go @@ -1,4 +1,4 @@ -package ledgerexporter +package compressxdr import ( "compress/gzip" @@ -37,3 +37,19 @@ func (d *XDRGzipDecoder) ReadFrom(r io.Reader) (int64, error) { } return int64(n), nil } + +func (d *XDRGzipDecoder) Unzip(r io.Reader) ([]byte, error) { + gzipReader, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + + defer gzipReader.Close() + + objectBytes, err := io.ReadAll(gzipReader) + if err != nil { + return nil, err + } + + return objectBytes, nil +} diff --git a/support/compressxdr/mocks.go b/support/compressxdr/mocks.go new file mode 100644 index 0000000000..2525ca4477 --- /dev/null +++ b/support/compressxdr/mocks.go @@ -0,0 +1,23 @@ +package compressxdr + +import ( + "io" + + "github.com/stretchr/testify/mock" +) + +type MockXDRDecoder struct { + mock.Mock +} + +func (m *MockXDRDecoder) ReadFrom(r io.Reader) (int64, error) { + args := m.Called(r) + return args.Get(0).(int64), args.Error(1) +} + +func (m *MockXDRDecoder) Unzip(r io.Reader) ([]byte, error) { + args := m.Called(r) + return args.Get(0).([]byte), args.Error(1) +} + +var _ XDRDecoder = &MockXDRDecoder{} diff --git a/exp/services/ledgerexporter/internal/datastore.go b/support/datastore/datastore.go similarity index 87% rename from exp/services/ledgerexporter/internal/datastore.go rename to support/datastore/datastore.go index 0529ba2290..2a26aa1328 100644 --- a/exp/services/ledgerexporter/internal/datastore.go +++ b/support/datastore/datastore.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" @@ -7,6 +7,11 @@ import ( "github.com/stellar/go/support/errors" ) +type DataStoreConfig struct { + Type string `toml:"type"` + Params map[string]string `toml:"params"` +} + // DataStore defines an interface for interacting with data storage type DataStore interface { GetFile(ctx context.Context, path string) (io.ReadCloser, error) diff --git a/exp/services/ledgerexporter/internal/datastore_test.go b/support/datastore/datastore_test.go similarity index 90% rename from exp/services/ledgerexporter/internal/datastore_test.go rename to support/datastore/datastore_test.go index d3128a4cf3..12041729be 100644 --- a/exp/services/ledgerexporter/internal/datastore_test.go +++ b/support/datastore/datastore_test.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" diff --git a/exp/services/ledgerexporter/internal/gcs_datastore.go b/support/datastore/gcs_datastore.go similarity index 85% rename from exp/services/ledgerexporter/internal/gcs_datastore.go rename to support/datastore/gcs_datastore.go index c68b7e7a5e..ca1c90abea 100644 --- a/exp/services/ledgerexporter/internal/gcs_datastore.go +++ b/support/datastore/gcs_datastore.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" @@ -15,6 +15,7 @@ import ( "cloud.google.com/go/storage" "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" "github.com/stellar/go/support/url" ) @@ -43,7 +44,7 @@ func NewGCSDataStore(ctx context.Context, params map[string]string, network stri prefix := strings.TrimPrefix(parsed.Path, "/") bucketName := parsed.Host - logger.Infof("creating GCS client for bucket: %s, prefix: %s", bucketName, prefix) + log.Infof("creating GCS client for bucket: %s, prefix: %s", bucketName, prefix) var options []option.ClientOption client, err := storage.NewClient(ctx, options...) @@ -65,12 +66,15 @@ func (b GCSDataStore) GetFile(ctx context.Context, filePath string) (io.ReadClos filePath = path.Join(b.prefix, filePath) r, err := b.bucket.Object(filePath).NewReader(ctx) if err != nil { + if err == storage.ErrObjectNotExist { + return nil, os.ErrNotExist + } if gcsError, ok := err.(*googleapi.Error); ok { - logger.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) + log.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) } return nil, errors.Wrapf(err, "error retrieving file: %s", filePath) } - logger.Infof("File retrieved successfully: %s", filePath) + log.Infof("File retrieved successfully: %s", filePath) return r, nil } @@ -81,15 +85,15 @@ func (b GCSDataStore) PutFileIfNotExists(ctx context.Context, filePath string, i if gcsError, ok := err.(*googleapi.Error); ok { switch gcsError.Code { case http.StatusPreconditionFailed: - logger.Infof("Precondition failed: %s already exists in the bucket", filePath) + log.Infof("Precondition failed: %s already exists in the bucket", filePath) return false, nil // Treat as success default: - logger.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) + log.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) } } return false, errors.Wrapf(err, "error uploading file: %s", filePath) } - logger.Infof("File uploaded successfully: %s", filePath) + log.Infof("File uploaded successfully: %s", filePath) return true, nil } @@ -99,11 +103,11 @@ func (b GCSDataStore) PutFile(ctx context.Context, filePath string, in io.Writer if err != nil { if gcsError, ok := err.(*googleapi.Error); ok { - logger.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) + log.Errorf("GCS error: %s %s", gcsError.Message, gcsError.Body) } return errors.Wrapf(err, "error uploading file: %v", filePath) } - logger.Infof("File uploaded successfully: %s", filePath) + log.Infof("File uploaded successfully: %s", filePath) return nil } diff --git a/support/datastore/history_archive.go b/support/datastore/history_archive.go new file mode 100644 index 0000000000..bc692fb77e --- /dev/null +++ b/support/datastore/history_archive.go @@ -0,0 +1,48 @@ +package datastore + +import ( + "context" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/network" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" +) + +const Pubnet = "pubnet" +const Testnet = "testnet" + +func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string) (historyarchive.ArchiveInterface, error) { + var historyArchiveUrls []string + switch networkName { + case Pubnet: + historyArchiveUrls = network.PublicNetworkhistoryArchiveURLs + case Testnet: + historyArchiveUrls = network.TestNetworkhistoryArchiveURLs + default: + return nil, errors.Errorf("Invalid network name %s", networkName) + } + + return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{ + ConnectOptions: storage.ConnectOptions{ + UserAgent: "ledger-exporter", + Context: ctx, + }, + }) +} + +func GetLatestLedgerSequenceFromHistoryArchives(archive historyarchive.ArchiveInterface) (uint32, error) { + has, err := archive.GetRootHAS() + if err != nil { + log.Error("Error getting root HAS from archives", err) + return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from any history archive") + } + + return has.CurrentLedger, nil +} + +func GetHistoryArchivesCheckPointFrequency() uint32 { + // this could evolve to use other sources for checkpoint freq + return historyarchive.DefaultCheckpointFrequency +} diff --git a/support/datastore/ledger_meta_archive.go b/support/datastore/ledger_meta_archive.go new file mode 100644 index 0000000000..7942a8bdab --- /dev/null +++ b/support/datastore/ledger_meta_archive.go @@ -0,0 +1,96 @@ +package datastore + +import ( + "fmt" + + "github.com/stellar/go/xdr" +) + +// LedgerMetaArchive represents a file with metadata and binary data. +type LedgerMetaArchive struct { + // file name + ObjectKey string + // Actual binary data + Data xdr.LedgerCloseMetaBatch +} + +// NewLedgerMetaArchive creates a new LedgerMetaArchive instance. +func NewLedgerMetaArchive(key string, startSeq uint32, endSeq uint32) *LedgerMetaArchive { + return &LedgerMetaArchive{ + ObjectKey: key, + Data: xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(startSeq), + EndSequence: xdr.Uint32(endSeq), + }, + } +} + +// AddLedger adds a LedgerCloseMeta to the archive. +func (f *LedgerMetaArchive) AddLedger(ledgerCloseMeta xdr.LedgerCloseMeta) error { + if ledgerCloseMeta.LedgerSequence() < uint32(f.Data.StartSequence) || + ledgerCloseMeta.LedgerSequence() > uint32(f.Data.EndSequence) { + return fmt.Errorf("ledger sequence %d is outside valid range [%d, %d]", + ledgerCloseMeta.LedgerSequence(), f.Data.StartSequence, f.Data.EndSequence) + } + + if len(f.Data.LedgerCloseMetas) > 0 { + lastSequence := f.Data.LedgerCloseMetas[len(f.Data.LedgerCloseMetas)-1].LedgerSequence() + if ledgerCloseMeta.LedgerSequence() != lastSequence+1 { + return fmt.Errorf("ledgers must be added sequentially: expected sequence %d, got %d", + lastSequence+1, ledgerCloseMeta.LedgerSequence()) + } + } + f.Data.LedgerCloseMetas = append(f.Data.LedgerCloseMetas, ledgerCloseMeta) + return nil +} + +// GetLedgerCount returns the number of ledgers currently in the archive. +func (f *LedgerMetaArchive) GetLedgerCount() uint32 { + return uint32(len(f.Data.LedgerCloseMetas)) +} + +// GetStartLedgerSequence returns the starting ledger sequence of the archive. +func (f *LedgerMetaArchive) GetStartLedgerSequence() uint32 { + return uint32(f.Data.StartSequence) +} + +// GetEndLedgerSequence returns the ending ledger sequence of the archive. +func (f *LedgerMetaArchive) GetEndLedgerSequence() uint32 { + return uint32(f.Data.EndSequence) +} + +// GetObjectKey returns the object key of the archive. +func (f *LedgerMetaArchive) GetObjectKey() string { + return f.ObjectKey +} + +func (f *LedgerMetaArchive) GetLedger(sequence uint32) (xdr.LedgerCloseMeta, error) { + if sequence < uint32(f.Data.StartSequence) || sequence > uint32(f.Data.EndSequence) { + return xdr.LedgerCloseMeta{}, fmt.Errorf("ledger sequence %d is outside valid range [%d, %d]", + sequence, f.Data.StartSequence, f.Data.EndSequence) + } + + ledgerIndex := sequence - f.GetStartLedgerSequence() + if ledgerIndex >= uint32(len(f.Data.LedgerCloseMetas)) { + return xdr.LedgerCloseMeta{}, fmt.Errorf("LedgerCloseMeta for sequence %d not found", sequence) + } + return f.Data.LedgerCloseMetas[ledgerIndex], nil +} + +func CreateLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: int32(0), + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSeq), + }, + }, + TxSet: xdr.TransactionSet{}, + TxProcessing: nil, + UpgradesProcessing: nil, + ScpInfo: nil, + }, + V1: nil, + } +} diff --git a/exp/services/ledgerexporter/internal/ledger_meta_archive_test.go b/support/datastore/ledger_meta_archive_test.go similarity index 98% rename from exp/services/ledgerexporter/internal/ledger_meta_archive_test.go rename to support/datastore/ledger_meta_archive_test.go index 3403cbaafa..26eeadc313 100644 --- a/exp/services/ledgerexporter/internal/ledger_meta_archive_test.go +++ b/support/datastore/ledger_meta_archive_test.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "fmt" diff --git a/exp/services/ledgerexporter/internal/ledgerbatch_config.go b/support/datastore/ledgerbatch_config.go similarity index 93% rename from exp/services/ledgerexporter/internal/ledgerbatch_config.go rename to support/datastore/ledgerbatch_config.go index 6ac3a3f36a..eca8bbe737 100644 --- a/exp/services/ledgerexporter/internal/ledgerbatch_config.go +++ b/support/datastore/ledgerbatch_config.go @@ -1,16 +1,13 @@ -package ledgerexporter +package datastore import ( "fmt" ) -const ( - fileSuffix = ".xdr.gz" -) - type LedgerBatchConfig struct { LedgersPerFile uint32 `toml:"ledgers_per_file"` FilesPerPartition uint32 `toml:"files_per_partition"` + FileSuffix string `toml:"file_suffix"` } func (ec LedgerBatchConfig) GetSequenceNumberStartBoundary(ledgerSeq uint32) uint32 { @@ -43,7 +40,7 @@ func (ec LedgerBatchConfig) GetObjectKeyFromSequenceNumber(ledgerSeq uint32) str if fileStart != fileEnd { objectKey += fmt.Sprintf("-%d", fileEnd) } - objectKey += fileSuffix + objectKey += ec.FileSuffix return objectKey } diff --git a/exp/services/ledgerexporter/internal/ledgerbatch_config_test.go b/support/datastore/ledgerbatch_config_test.go similarity index 50% rename from exp/services/ledgerexporter/internal/ledgerbatch_config_test.go rename to support/datastore/ledgerbatch_config_test.go index cad9249dc5..9b2e466677 100644 --- a/exp/services/ledgerexporter/internal/ledgerbatch_config_test.go +++ b/support/datastore/ledgerbatch_config_test.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "fmt" @@ -12,23 +12,24 @@ func TestGetObjectKeyFromSequenceNumber(t *testing.T) { filesPerPartition uint32 ledgerSeq uint32 ledgersPerFile uint32 + fileSuffix string expectedKey string }{ - {0, 5, 1, "5.xdr.gz"}, - {0, 5, 10, "0-9.xdr.gz"}, - {2, 10, 100, "0-199/0-99.xdr.gz"}, - {2, 150, 50, "100-199/150-199.xdr.gz"}, - {2, 300, 200, "0-399/200-399.xdr.gz"}, - {2, 1, 1, "0-1/1.xdr.gz"}, - {4, 10, 100, "0-399/0-99.xdr.gz"}, - {4, 250, 50, "200-399/250-299.xdr.gz"}, - {1, 300, 200, "200-399.xdr.gz"}, - {1, 1, 1, "1.xdr.gz"}, + {0, 5, 1, ".xdr.gz", "5.xdr.gz"}, + {0, 5, 10, ".xdr.gz", "0-9.xdr.gz"}, + {2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"}, + {2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"}, + {2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"}, + {2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"}, + {4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"}, + {4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"}, + {1, 300, 200, ".xdr.gz", "200-399.xdr.gz"}, + {1, 1, 1, ".xdr.gz", "1.xdr.gz"}, } for _, tc := range testCases { t.Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) { - config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile} + config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix} key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq) require.Equal(t, tc.expectedKey, key) }) diff --git a/exp/services/ledgerexporter/internal/mocks.go b/support/datastore/mocks.go similarity index 98% rename from exp/services/ledgerexporter/internal/mocks.go rename to support/datastore/mocks.go index 3f514a8f57..b77040d280 100644 --- a/exp/services/ledgerexporter/internal/mocks.go +++ b/support/datastore/mocks.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" @@ -52,5 +52,5 @@ func (m *MockResumableManager) FindStart(ctx context.Context, start, end uint32) } // ensure that the MockClient implements ClientInterface -var _ DataStore = &MockDataStore{} var _ ResumableManager = &MockResumableManager{} +var _ DataStore = &MockDataStore{} diff --git a/exp/services/ledgerexporter/internal/resumablemanager_test.go b/support/datastore/resumablemanager_test.go similarity index 95% rename from exp/services/ledgerexporter/internal/resumablemanager_test.go rename to support/datastore/resumablemanager_test.go index 189136932a..34279682ef 100644 --- a/exp/services/ledgerexporter/internal/resumablemanager_test.go +++ b/support/datastore/resumablemanager_test.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" @@ -32,6 +32,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "archive error", @@ -46,6 +47,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -58,6 +60,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -70,6 +73,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "datastore error happened", @@ -83,6 +87,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -95,6 +100,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -107,6 +113,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -119,6 +126,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -131,6 +139,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "Invalid start value", @@ -144,6 +153,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test2", latestLedger: uint32(2000), @@ -157,6 +167,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test3", latestLedger: uint32(3000), @@ -170,6 +181,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test4", latestLedger: uint32(4000), @@ -183,6 +195,7 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), + FileSuffix: ".xdr.gz", }, networkName: "test5", latestLedger: uint32(5000), diff --git a/exp/services/ledgerexporter/internal/resumablemanager.go b/support/datastore/resumeablemanager.go similarity index 92% rename from exp/services/ledgerexporter/internal/resumablemanager.go rename to support/datastore/resumeablemanager.go index 1d832f2ea4..ce0bd12717 100644 --- a/exp/services/ledgerexporter/internal/resumablemanager.go +++ b/support/datastore/resumeablemanager.go @@ -1,4 +1,4 @@ -package ledgerexporter +package datastore import ( "context" @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/stellar/go/historyarchive" + "github.com/stellar/go/support/log" ) type ResumableManager interface { @@ -58,18 +59,18 @@ func (rm resumableManagerService) FindStart(ctx context.Context, start, end uint return 0, false, errors.New("Invalid start value, must be greater than zero") } - log := logger.WithField("start", start).WithField("end", end).WithField("network", rm.network) + log.WithField("start", start).WithField("end", end).WithField("network", rm.network) networkLatest := uint32(0) if end < 1 { var latestErr error - networkLatest, latestErr = getLatestLedgerSequenceFromHistoryArchives(rm.archive) + networkLatest, latestErr = GetLatestLedgerSequenceFromHistoryArchives(rm.archive) if latestErr != nil { err := errors.Wrap(latestErr, "Resumability of requested export ledger range, was not able to get latest ledger from network") return 0, false, err } - networkLatest = networkLatest + (getHistoryArchivesCheckPointFrequency() * 2) - logger.Infof("Resumability computed effective latest network ledger including padding of checkpoint frequency to be %d + for network=%v", networkLatest, rm.network) + networkLatest = networkLatest + (GetHistoryArchivesCheckPointFrequency() * 2) + log.Infof("Resumability computed effective latest network ledger including padding of checkpoint frequency to be %d + for network=%v", networkLatest, rm.network) if start > networkLatest { // requested to start at a point beyond the latest network, resume not applicable.