Skip to content

Commit

Permalink
#4911: added buffered storage ledger backend to NewSystem factory method
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Jul 5, 2024
1 parent 54dfcc3 commit 5a020f3
Showing 1 changed file with 64 additions and 29 deletions.
93 changes: 64 additions & 29 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,20 @@ func (s LedgerMetaBackendType) String() string {
case LedgerBackendPrecomputed:
return "precomputed"
default:
return ""
return ""
}
}

type BufferedBackendConfig struct {
BufferSize uint32 `toml:"size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
BufferSize uint32 `toml:"size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

type PrecomputedLedgerMetaConfig struct {
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedBackendConfig BufferedBackendConfig `toml:"buffered_backend_config"`
DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"`
BufferedBackendConfig BufferedBackendConfig `toml:"buffered_backend_config"`
}

type Config struct {
Expand Down Expand Up @@ -148,7 +148,7 @@ type Config struct {

ReapConfig ReapConfig

PrecomputedMetaConfig PrecomputedLedgerMetaConfig
PrecomputedMetaConfig *PrecomputedLedgerMetaConfig
}

const (
Expand Down Expand Up @@ -297,28 +297,63 @@ func NewSystem(config Config) (System, error) {
return nil, errors.Wrap(err, "error creating history archive")
}

// the only ingest option is local captive core config
logger := log.WithField("subservice", "stellar-core")
ledgerBackend, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: config.CaptiveCoreBinaryPath,
StoragePath: config.CaptiveCoreStoragePath,
UseDB: config.CaptiveCoreConfigUseDB,
Toml: config.CaptiveCoreToml,
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: config.HistoryArchiveURLs,
CheckpointFrequency: config.CheckpointFrequency,
LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession),
Log: logger,
Context: ctx,
UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()),
CoreProtocolVersionFn: config.CoreProtocolVersionFn,
CoreBuildVersionFn: config.CoreBuildVersionFn,
},
)
if err != nil {
var ledgerBackend ledgerbackend.LedgerBackend

switch config.LedgerMetaBackendType {
case LedgerBackendCaptiveCore:
logger := log.WithField("subservice", "stellar-core")
ledgerBackend, err = ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: config.CaptiveCoreBinaryPath,
StoragePath: config.CaptiveCoreStoragePath,
UseDB: config.CaptiveCoreConfigUseDB,
Toml: config.CaptiveCoreToml,
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: config.HistoryArchiveURLs,
CheckpointFrequency: config.CheckpointFrequency,
LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession),
Log: logger,
Context: ctx,
UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()),
CoreProtocolVersionFn: config.CoreProtocolVersionFn,
CoreBuildVersionFn: config.CoreBuildVersionFn,
},
)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
}
log.Infof("successfully created ledger backend of type captive core")
case LedgerBackendPrecomputed:
if config.PrecomputedMetaConfig == nil {
cancel()
return nil, errors.New("error creating precomputed buffered backend, precomputed backend config is not present")
}
precompConfig := config.PrecomputedMetaConfig

dataStore, err := datastore.NewDataStore(ctx, precompConfig.DataStoreConfig)
if err != nil {
cancel()
return nil, errors.Wrapf(err, "error creating datastore from config, %v", precompConfig.DataStoreConfig)
}

bufferedConfig := ledgerbackend.BufferedStorageBackendConfig{
LedgerBatchConfig: precompConfig.DataStoreConfig.Schema,
DataStore: dataStore,
BufferSize: precompConfig.BufferedBackendConfig.BufferSize,
NumWorkers: precompConfig.BufferedBackendConfig.NumWorkers,
RetryLimit: precompConfig.BufferedBackendConfig.RetryLimit,
RetryWait: precompConfig.BufferedBackendConfig.RetryWait,
}

if ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(ctx, bufferedConfig); err != nil {
cancel()
return nil, errors.Wrapf(err, "error creating buffered storage backend, %v", bufferedConfig)
}
log.Infof("successfully created ledger backend of type buffered storage")
default:
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
return nil, errors.Errorf("unsupported ledger backend type %v", config.LedgerMetaBackendType.String())
}

historyQ := &history.Q{config.HistorySession.Clone()}
Expand Down

0 comments on commit 5a020f3

Please sign in to comment.