Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/reingest_precomputed' into rei…
Browse files Browse the repository at this point in the history
…ngest_job_size
  • Loading branch information
sreuland committed Jul 9, 2024
2 parents 3d6aaca + a873570 commit fb23f8c
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 130 deletions.
10 changes: 7 additions & 3 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ var dbReapCmd = &cobra.Command{
Long: "reap removes any historical data that is earlier than the configured retention cutoff",
RunE: func(cmd *cobra.Command, args []string) error {

err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false})
err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false})
if err != nil {
return err
}
Expand Down Expand Up @@ -382,6 +382,7 @@ var dbReingestRangeCmd = &cobra.Command{
}

var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
if ledgerBackendType == ingest.BufferedStorageBackend {
cfg, err := toml.LoadFile(storageBackendConfigPath)
if err != nil {
Expand All @@ -397,9 +398,10 @@ var dbReingestRangeCmd = &cobra.Command{
if !viper.IsSet("parallel-job-size") {
parallelJobSize = 100
}
options.NoCaptiveCore = true
}

err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false})
err := horizon.ApplyFlags(globalConfig, globalFlags, options)
if err != nil {
return err
}
Expand Down Expand Up @@ -449,6 +451,7 @@ var dbFillGapsCmd = &cobra.Command{
}

var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}
if ledgerBackendType == ingest.BufferedStorageBackend {
cfg, err := toml.LoadFile(storageBackendConfigPath)
if err != nil {
Expand All @@ -459,9 +462,10 @@ var dbFillGapsCmd = &cobra.Command{
}
storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend
storageBackendConfig.DataStoreFactory = datastore.NewDataStore
options.NoCaptiveCore = true
}

err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false})
err := horizon.ApplyFlags(globalConfig, globalFlags, options)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/stellar/go/historyarchive"
horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -94,7 +95,7 @@ var ingestVerifyRangeCmd = &cobra.Command{
co.SetValue()
}

if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil {
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

Expand Down Expand Up @@ -189,7 +190,7 @@ var ingestStressTestCmd = &cobra.Command{
co.SetValue()
}

if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil {
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

Expand Down Expand Up @@ -239,7 +240,7 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{
Short: "updates a database to trigger state rebuild, state will be rebuilt by a running Horizon instance, DO NOT RUN production DB, some endpoints will be unavailable until state is rebuilt",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil {
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

Expand All @@ -263,7 +264,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{
Short: "ingests genesis state (ledger 1)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil {
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

Expand Down Expand Up @@ -320,7 +321,7 @@ var ingestBuildStateCmd = &cobra.Command{
co.SetValue()
}

if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}); err != nil {
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

Expand Down
129 changes: 54 additions & 75 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func Flags() (*Config, support.ConfigOptions) {

// NewAppFromFlags constructs a new Horizon App from the given command line flags
func NewAppFromFlags(config *Config, flags support.ConfigOptions) (*App, error) {
err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true, AlwaysIngest: false})
err := ApplyFlags(config, flags, ApplyOptions{RequireCaptiveCoreFullConfig: true})
if err != nil {
return nil, err
}
Expand All @@ -850,30 +850,10 @@ func NewAppFromFlags(config *Config, flags support.ConfigOptions) (*App, error)
}

type ApplyOptions struct {
AlwaysIngest bool
RequireCaptiveCoreFullConfig bool
NoCaptiveCore bool
}

type networkConfig struct {
defaultConfig []byte
HistoryArchiveURLs []string
NetworkPassphrase string
}

var (
PubnetConf = networkConfig{
defaultConfig: ledgerbackend.PubnetDefaultConfig,
HistoryArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
NetworkPassphrase: network.PublicNetworkPassphrase,
}

TestnetConf = networkConfig{
defaultConfig: ledgerbackend.TestnetDefaultConfig,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
NetworkPassphrase: network.TestNetworkPassphrase,
}
)

// getCaptiveCoreBinaryPath retrieves the path of the Captive Core binary
// Returns the path or an error if the binary is not found
func getCaptiveCoreBinaryPath() (string, error) {
Expand All @@ -884,69 +864,32 @@ func getCaptiveCoreBinaryPath() (string, error) {
return result, nil
}

// getCaptiveCoreConfigFromNetworkParameter returns the default Captive Core configuration based on the network.
func getCaptiveCoreConfigFromNetworkParameter(config *Config) (networkConfig, error) {
var defaultNetworkConfig networkConfig

if config.NetworkPassphrase != "" {
return defaultNetworkConfig, fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter",
NetworkPassphraseFlagName, NetworkFlagName)
}

if len(config.HistoryArchiveURLs) > 0 {
return defaultNetworkConfig, fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter",
HistoryArchiveURLsFlagName, NetworkFlagName)
}

switch config.Network {
case StellarPubnet:
defaultNetworkConfig = PubnetConf
case StellarTestnet:
defaultNetworkConfig = TestnetConf
default:
return defaultNetworkConfig, fmt.Errorf("no default configuration found for network %s", config.Network)
}

return defaultNetworkConfig, nil
}

// setCaptiveCoreConfiguration prepares configuration for the Captive Core
func setCaptiveCoreConfiguration(config *Config, options ApplyOptions) error {
stdLog.Println("Preparing captive core...")

var err error
// If the user didn't specify a Stellar Core binary, we can check the
// $PATH and possibly fill it in for them.
if config.CaptiveCoreBinaryPath == "" {
var err error
if config.CaptiveCoreBinaryPath, err = getCaptiveCoreBinaryPath(); err != nil {
return fmt.Errorf("captive core requires %s", StellarCoreBinaryPathName)
}
}

var defaultNetworkConfig networkConfig
if config.Network != "" {
var err error
defaultNetworkConfig, err = getCaptiveCoreConfigFromNetworkParameter(config)
if err != nil {
return err
}
config.NetworkPassphrase = defaultNetworkConfig.NetworkPassphrase
config.HistoryArchiveURLs = defaultNetworkConfig.HistoryArchiveURLs
} else {
if config.NetworkPassphrase == "" {
return fmt.Errorf("%s must be set", NetworkPassphraseFlagName)
}
var defaultCaptiveCoreConfig []byte
switch config.Network {
case StellarPubnet:
defaultCaptiveCoreConfig = ledgerbackend.PubnetDefaultConfig
case StellarTestnet:

if len(config.HistoryArchiveURLs) == 0 {
return fmt.Errorf("%s must be set", HistoryArchiveURLsFlagName)
}
defaultCaptiveCoreConfig = ledgerbackend.TestnetDefaultConfig
}

config.CaptiveCoreTomlParams.CoreBinaryPath = config.CaptiveCoreBinaryPath
config.CaptiveCoreTomlParams.HistoryArchiveURLs = config.HistoryArchiveURLs
config.CaptiveCoreTomlParams.NetworkPassphrase = config.NetworkPassphrase

var err error
if config.CaptiveCoreConfigPath != "" {
config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromFile(config.CaptiveCoreConfigPath,
config.CaptiveCoreTomlParams)
Expand All @@ -960,8 +903,8 @@ func setCaptiveCoreConfiguration(config *Config, options ApplyOptions) error {
if err != nil {
return errors.Wrap(err, "invalid captive core toml file")
}
} else if len(defaultNetworkConfig.defaultConfig) != 0 {
config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromData(defaultNetworkConfig.defaultConfig,
} else if len(defaultCaptiveCoreConfig) != 0 {
config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreTomlFromData(defaultCaptiveCoreConfig,
config.CaptiveCoreTomlParams)
if err != nil {
return errors.Wrap(err, "invalid captive core toml file")
Expand Down Expand Up @@ -1004,10 +947,6 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
return err
}

if options.AlwaysIngest {
config.Ingest = true
}

if config.Ingest {
// Migrations should be checked as early as possible. Apply and check
// only on ingesting instances which are required to have write-access
Expand All @@ -1023,9 +962,15 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
return err
}

err := setCaptiveCoreConfiguration(config, options)
if err != nil {
return errors.Wrap(err, "error generating captive core configuration")
if err := setNetworkConfiguration(config); err != nil {
return err
}

if !options.NoCaptiveCore {
err := setCaptiveCoreConfiguration(config, options)
if err != nil {
return errors.Wrap(err, "error generating captive core configuration")
}
}
}

Expand Down Expand Up @@ -1061,3 +1006,37 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption

return nil
}

func setNetworkConfiguration(config *Config) error {
if config.Network != "" {
if config.NetworkPassphrase != "" {
return fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter",
NetworkPassphraseFlagName, NetworkFlagName)
}

if len(config.HistoryArchiveURLs) > 0 {
return fmt.Errorf("invalid config: %s parameter not allowed with the %s parameter",
HistoryArchiveURLsFlagName, NetworkFlagName)
}

switch config.Network {
case StellarPubnet:
config.NetworkPassphrase = network.PublicNetworkPassphrase
config.HistoryArchiveURLs = network.PublicNetworkhistoryArchiveURLs
case StellarTestnet:
config.NetworkPassphrase = network.TestNetworkPassphrase
config.HistoryArchiveURLs = network.TestNetworkhistoryArchiveURLs
default:
return fmt.Errorf("no default configuration found for network %s", config.Network)
}
}

if config.NetworkPassphrase == "" {
return fmt.Errorf("%s must be set", NetworkPassphraseFlagName)
}

if len(config.HistoryArchiveURLs) == 0 {
return fmt.Errorf("%s must be set", HistoryArchiveURLsFlagName)
}
return nil
}
Loading

0 comments on commit fb23f8c

Please sign in to comment.