Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledgerbackend: Add cloud_storage_ledger backend #5260

Merged
merged 58 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b12f3e8
Create cloud_storage_backend to process ledgerexporter ledgerclosemeta
chowbao Mar 17, 2024
d8c2634
Fully form file path
chowbao Mar 17, 2024
1f6ee7d
unzip
chowbao Mar 17, 2024
c50b067
Remove storage url concat
chowbao Mar 17, 2024
f34a842
Test get latest change
chowbao Mar 17, 2024
75262f6
Remove validators
chowbao Mar 19, 2024
13d0e23
Merge branch 'master' into add_cloud_storage_ledger_backend
chowbao Apr 18, 2024
d478c10
Add GetLatestLedgerSequence
chowbao Apr 18, 2024
65b1f89
Fix filename path issues
chowbao Apr 19, 2024
d1a4126
debug statements
chowbao Apr 19, 2024
70b2b29
debug
chowbao Apr 19, 2024
f9cf5b6
debug
chowbao Apr 19, 2024
2d7308b
Remove debug prints
chowbao Apr 19, 2024
2069535
Add tests
chowbao Apr 19, 2024
9a60b1f
Merge branch 'master' into add_cloud_storage_ledger_backend
chowbao Apr 19, 2024
dad2ee5
Update filesuffix
chowbao Apr 19, 2024
f18e32f
Address comments/fixes
chowbao Apr 19, 2024
2346a6c
Update ingest/ledgerbackend/cloud_storage_backend.go
chowbao Apr 19, 2024
3a79646
Address comments
chowbao Apr 19, 2024
e1c5206
Move GetObjectKeyFromSequenceNumber
chowbao Apr 23, 2024
1965400
Update PrepareRange
chowbao Apr 24, 2024
af6df57
Merge branch 'master' into add_cloud_storage_ledger_backend
chowbao Apr 24, 2024
a9ac66b
Fix merge issues
chowbao Apr 24, 2024
53b4319
Update how backend buffers ledgers
chowbao Apr 26, 2024
b45cbc2
Updates
chowbao Apr 29, 2024
8ba7a73
Merge branch 'master' into add_cloud_storage_ledger_backend
chowbao Apr 30, 2024
fff0122
Update GetLatestLedgerSequence
chowbao May 1, 2024
ed8f702
WIP; add heap and addressing comments
chowbao May 1, 2024
af00f1c
WIP
chowbao May 2, 2024
fdd3637
some unit tests
chowbao May 2, 2024
4b1b50d
wip
chowbao May 2, 2024
ae23275
Update unit tests
chowbao May 3, 2024
ee64b81
fixing unit tests
chowbao May 3, 2024
d532482
Update unit tests
chowbao May 3, 2024
5eba324
Fix unit tests
chowbao May 3, 2024
5c54f57
Address comments
chowbao May 3, 2024
41844d2
Address comments
chowbao May 3, 2024
899aeab
address comments
chowbao May 3, 2024
68ffa21
fix not exist error
chowbao May 3, 2024
c109121
address comments
chowbao May 3, 2024
3b6cb9f
address comments
chowbao May 6, 2024
4f60a7f
rename to BufferedStorageBackend
chowbao May 6, 2024
eca6d65
Add comment header
chowbao May 6, 2024
2a4e073
Remove ResumableManager
chowbao May 6, 2024
3ada03b
Update error message
chowbao May 6, 2024
333bbaa
Update context in ledger buffer
chowbao May 6, 2024
e1e83c9
add test cast
chowbao May 6, 2024
7753b33
Merge branch 'master' into add_cloud_storage_ledger_backend
chowbao May 6, 2024
34b81ae
Update ingest/ledgerbackend/ledger_buffer.go
chowbao May 6, 2024
d7b4704
Update ingest/ledgerbackend/ledger_buffer.go
chowbao May 6, 2024
acd57df
Update ingest/ledgerbackend/ledger_buffer.go
chowbao May 6, 2024
9348f2e
updates
chowbao May 6, 2024
1724559
Update unbounded test
chowbao May 6, 2024
0311b45
Fix tests
chowbao May 6, 2024
046c3ef
Add tests on ledger buffer termination logic
tamirms May 7, 2024
010e254
make error message more descriptive
tamirms May 7, 2024
241c802
fix data race in TestLedgerBufferClose
tamirms May 7, 2024
33f7d2d
configure STELLAR_CORE_VERSION for ledger-exporter CI job
tamirms May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ destination_bucket_path = "exporter-test/ledgers"
[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000
file_suffix = ".xdr.gz"

[stellar_core_config]
stellar_core_binary_path = "/usr/local/bin/stellar-core"
Expand Down
11 changes: 6 additions & 5 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/stellar/go/ingest/ledgerbackend"
_ "github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
supporthttp "github.com/stellar/go/support/http"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func (m InvalidDataStoreError) Error() string {
type App struct {
config *Config
ledgerBackend ledgerbackend.LedgerBackend
dataStore DataStore
dataStore datastore.DataStore
exportManager *ExportManager
uploader Uploader
flags Flags
Expand All @@ -88,17 +89,17 @@ func (a *App) init(ctx context.Context) error {
if a.config, err = NewConfig(ctx, a.flags); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = createHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
return err
}
a.config.ValidateAndSetLedgerRange(ctx, archive)

if a.dataStore, err = NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil {
if a.dataStore, err = datastore.NewDataStore(ctx, a.config.DataStoreConfig, a.config.Network); err != nil {
return errors.Wrap(err, "Could not connect to destination data store")
}
if a.config.Resume {
if err = a.applyResumability(ctx,
NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil {
datastore.NewResumableManager(a.dataStore, a.config.Network, a.config.LedgerBatchConfig, archive)); err != nil {
return err
}
}
Expand All @@ -119,7 +120,7 @@ func (a *App) init(ctx context.Context) error {
return nil
}

func (a *App) applyResumability(ctx context.Context, resumableManager ResumableManager) error {
func (a *App) applyResumability(ctx context.Context, resumableManager datastore.ResumableManager) error {
absentLedger, ok, err := resumableManager.FindStart(ctx, a.config.StartLedger, a.config.EndLedger)
if err != nil {
return err
Expand Down
21 changes: 11 additions & 10 deletions exp/services/ledgerexporter/internal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/stellar/go/support/datastore"
"github.com/stretchr/testify/require"
)

func TestApplyResumeHasStartError(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, errors.New("start error")).Once()

err := app.applyResumability(ctx, mockResumableManager)
Expand All @@ -24,7 +25,7 @@ func TestApplyResumeDatastoreComplete(t *testing.T) {
ctx := context.Background()
app := &App{}
app.config = &Config{StartLedger: 10, EndLedger: 19, Resume: true}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(19)).Return(uint32(0), false, nil).Once()

var alreadyExported *DataAlreadyExportedError
Expand All @@ -40,9 +41,9 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) {
StartLedger: 3,
EndLedger: 9,
Resume: true,
LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
// simulate the datastore has inconsistent data,
// with last ledger not aligned to starting boundary
mockResumableManager.On("FindStart", ctx, uint32(3), uint32(9)).Return(uint32(6), true, nil).Once()
Expand All @@ -60,9 +61,9 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(99)).Return(uint32(50), true, nil).Once()

Expand All @@ -79,9 +80,9 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
mockResumableManager.On("FindStart", ctx, uint32(10), uint32(99)).Return(uint32(2), true, nil).Once()

Expand All @@ -101,9 +102,9 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) {
StartLedger: 2,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
}
mockResumableManager := &MockResumableManager{}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
mockResumableManager.On("FindStart", ctx, uint32(2), uint32(99)).Return(uint32(2), true, nil).Once()

Expand Down
53 changes: 7 additions & 46 deletions exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/pelletier/go-toml"

"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/support/storage"
)

const Pubnet = "pubnet"
Expand All @@ -34,58 +34,19 @@ type StellarCoreConfig struct {
CaptiveCoreTomlPath string `toml:"captive_core_toml_path"`
}

type DataStoreConfig struct {
Type string `toml:"type"`
Params map[string]string `toml:"params"`
}

type Config struct {
AdminPort int `toml:"admin_port"`

Network string `toml:"network"`
DataStoreConfig DataStoreConfig `toml:"datastore_config"`
LedgerBatchConfig LedgerBatchConfig `toml:"exporter_config"`
StellarCoreConfig StellarCoreConfig `toml:"stellar_core_config"`
Network string `toml:"network"`
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
LedgerBatchConfig datastore.LedgerBatchConfig `toml:"exporter_config"`
StellarCoreConfig StellarCoreConfig `toml:"stellar_core_config"`

StartLedger uint32
EndLedger uint32
Resume bool
}

func createHistoryArchiveFromNetworkName(ctx context.Context, networkName string) (historyarchive.ArchiveInterface, error) {
var historyArchiveUrls []string
switch networkName {
case Pubnet:
historyArchiveUrls = network.PublicNetworkhistoryArchiveURLs
case Testnet:
historyArchiveUrls = network.TestNetworkhistoryArchiveURLs
default:
return nil, errors.Errorf("Invalid network name %s", networkName)
}

return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "ledger-exporter",
Context: ctx,
},
})
}

func getLatestLedgerSequenceFromHistoryArchives(archive historyarchive.ArchiveInterface) (uint32, error) {
has, err := archive.GetRootHAS()
if err != nil {
logger.WithError(err).Warnf("Error getting root HAS from archives")
return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from any history archive")
}

return has.CurrentLedger, nil
}

func getHistoryArchivesCheckPointFrequency() uint32 {
// this could evolve to use other sources for checkpoint freq
return historyarchive.DefaultCheckpointFrequency
}

// This will generate the config based on commandline flags and toml
//
// ctx - the caller context
Expand Down Expand Up @@ -113,7 +74,7 @@ func NewConfig(ctx context.Context, flags Flags) (*Config, error) {
// Validates requested ledger range, and will automatically adjust it
// to be ledgers-per-file boundary aligned
func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive historyarchive.ArchiveInterface) error {
latestNetworkLedger, err := getLatestLedgerSequenceFromHistoryArchives(archive)
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(archive)

if err != nil {
return errors.Wrap(err, "Failed to retrieve the latest ledger sequence from history archives.")
Expand Down Expand Up @@ -182,7 +143,7 @@ func (config *Config) GenerateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConf
BinaryPath: coreConfig.StellarCoreBinaryPath,
NetworkPassphrase: params.NetworkPassphrase,
HistoryArchiveURLs: params.HistoryArchiveURLs,
CheckpointFrequency: getHistoryArchivesCheckPointFrequency(),
CheckpointFrequency: datastore.GetHistoryArchivesCheckPointFrequency(),
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "ledger-exporter",
Expand Down
1 change: 1 addition & 0 deletions exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestNewConfigResumeEnabled(t *testing.T) {
require.Equal(t, config.DataStoreConfig.Type, "ABC")
require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1))
require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3))
require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz")
require.True(t, config.Resume)
url, ok := config.DataStoreConfig.Params["destination_bucket_path"]
require.True(t, ok)
Expand Down
9 changes: 5 additions & 4 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/xdr"
)

type ExportManager struct {
config LedgerBatchConfig
config datastore.LedgerBatchConfig
ledgerBackend ledgerbackend.LedgerBackend
currentMetaArchive *LedgerMetaArchive
currentMetaArchive *datastore.LedgerMetaArchive
queue UploadQueue
latestLedgerMetric *prometheus.GaugeVec
}

// NewExportManager creates a new ExportManager with the provided configuration.
func NewExportManager(config LedgerBatchConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) {
func NewExportManager(config datastore.LedgerBatchConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) {
if config.LedgersPerFile < 1 {
return nil, errors.Errorf("Invalid ledgers per file (%d): must be at least 1", config.LedgersPerFile)
}
Expand Down Expand Up @@ -60,7 +61,7 @@ func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
}

// Create a new LedgerMetaArchive and add it to the map.
e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
e.currentMetaArchive = datastore.NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
}

if err := e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil {
Expand Down
Loading
Loading