diff --git a/cmd/export_assets.go b/cmd/export_assets.go index 7ea33a69..2160d6eb 100644 --- a/cmd/export_assets.go +++ b/cmd/export_assets.go @@ -20,7 +20,7 @@ var assetsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) outFile := mustOutFile(path) diff --git a/cmd/export_diagnostic_events.go b/cmd/export_diagnostic_events.go index 3a5e9ad2..66ed6438 100644 --- a/cmd/export_diagnostic_events.go +++ b/cmd/export_diagnostic_events.go @@ -20,7 +20,7 @@ var diagnosticEventsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_effects.go b/cmd/export_effects.go index 03e0f4d4..b93aaf1c 100644 --- a/cmd/export_effects.go +++ b/cmd/export_effects.go @@ -18,7 +18,7 @@ var effectsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index aec74de1..b227defe 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -30,7 +30,7 @@ be exported.`, Run: func(cmd *cobra.Command, args []string) { commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) _, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) diff --git a/cmd/export_ledger_transaction.go b/cmd/export_ledger_transaction.go index 4054b63f..7d07b9ec 100644 --- a/cmd/export_ledger_transaction.go +++ b/cmd/export_ledger_transaction.go @@ -20,7 +20,7 @@ var ledgerTransactionCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) ledgerTransaction, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_ledgers.go b/cmd/export_ledgers.go index 501e649b..e1dce45b 100644 --- a/cmd/export_ledgers.go +++ b/cmd/export_ledgers.go @@ -20,7 +20,7 @@ var ledgersCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) var ledgers []utils.HistoryArchiveLedgerAndLCM var err error diff --git a/cmd/export_operations.go b/cmd/export_operations.go index e8418e6d..cbfb8d84 100644 --- a/cmd/export_operations.go +++ b/cmd/export_operations.go @@ -20,7 +20,7 @@ var operationsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) operations, err := input.GetOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_trades.go b/cmd/export_trades.go index 94441860..748cdb66 100644 --- a/cmd/export_trades.go +++ b/cmd/export_trades.go @@ -22,7 +22,7 @@ var tradesCmd = &cobra.Command{ commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) trades, err := input.GetTrades(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) diff --git a/cmd/export_transactions.go b/cmd/export_transactions.go index 966cd0fa..35f82bd2 100644 --- a/cmd/export_transactions.go +++ b/cmd/export_transactions.go @@ -20,7 +20,7 @@ var transactionsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/internal/input/ledger_range.go b/internal/input/ledger_range.go index f4ec07ee..2f778095 100644 --- a/internal/input/ledger_range.go +++ b/internal/input/ledger_range.go @@ -32,7 +32,11 @@ const avgCloseTime = time.Second * 5 // average time to close a stellar ledger func GetLedgerRange(startTime, endTime time.Time, isTest bool, isFuture bool) (int64, int64, error) { startTime = startTime.UTC() endTime = endTime.UTC() - env := utils.GetEnvironmentDetails(isTest, isFuture, "") + commonFlagValues := utils.CommonFlagValues{ + IsTest: isTest, + IsFuture: isFuture, + } + env := utils.GetEnvironmentDetails(commonFlagValues) if startTime.After(endTime) { return 0, 0, fmt.Errorf("start time must be less than or equal to the end time") diff --git a/internal/utils/main.go b/internal/utils/main.go index 069ac0f8..e54fe920 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -236,6 +236,7 @@ func AddCommonFlags(flags *pflag.FlagSet) { flags.StringToStringP("extra-fields", "u", map[string]string{}, "Additional fields to append to output jsons. Used for appending metadata") flags.Bool("captive-core", false, "If set, run captive core to retrieve data. Otherwise use TxMeta file datastore.") flags.String("datastore-path", "ledger-exporter/ledgers", "Datastore bucket path to read txmeta files from.") + flags.Uint32("buffer-size", 5, "Buffer size sets the max limit for the number of txmeta files that can be held in memory.") flags.Uint32("num-workers", 5, "Number of workers to spawn that read txmeta files from the datastore.") flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.") flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.") @@ -293,6 +294,7 @@ type CommonFlagValues struct { Extra map[string]string UseCaptiveCore bool DatastorePath string + BufferSize uint32 NumWorkers uint32 RetryLimit uint32 RetryWait uint32 @@ -336,6 +338,11 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { logger.Fatal("could not get datastore-bucket-path string: ", err) } + bufferSize, err := flags.GetUint32("buffer-size") + if err != nil { + logger.Fatal("could not get buffer-size uint32: ", err) + } + numWorkers, err := flags.GetUint32("num-workers") if err != nil { logger.Fatal("could not get num-workers uint32: ", err) @@ -359,6 +366,7 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { Extra: extra, UseCaptiveCore: useCaptiveCore, DatastorePath: datastorePath, + BufferSize: bufferSize, NumWorkers: numWorkers, RetryLimit: retryLimit, RetryWait: retryWait, @@ -679,29 +687,29 @@ type EnvironmentDetails struct { ArchiveURLs []string BinaryPath string CoreConfig string - DatastorePath string Network string + CommonFlagValues CommonFlagValues } // GetPassphrase returns the correct Network Passphrase based on env preference -func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (details EnvironmentDetails) { - if isTest { +func GetEnvironmentDetails(commonFlags CommonFlagValues) (details EnvironmentDetails) { + if commonFlags.IsTest { // testnet passphrase to be used for testing details.NetworkPassphrase = network.TestNetworkPassphrase details.ArchiveURLs = testArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg" - details.DatastorePath = datastorePath details.Network = "testnet" + details.CommonFlagValues = commonFlags return details - } else if isFuture { + } else if commonFlags.IsFuture { // details.NetworkPassphrase = network.FutureNetworkPassphrase details.NetworkPassphrase = "Test SDF Future Network ; October 2022" details.ArchiveURLs = futureArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg" - details.DatastorePath = datastorePath details.Network = "futurenet" + details.CommonFlagValues = commonFlags return details } else { // default: mainnet @@ -709,8 +717,8 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (de details.ArchiveURLs = mainArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core.cfg" - details.DatastorePath = datastorePath details.Network = "pubnet" + details.CommonFlagValues = commonFlags return details } } @@ -796,7 +804,7 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme // Create ledger backend from datastore params := make(map[string]string) - params["destination_bucket_path"] = env.DatastorePath + params["destination_bucket_path"] = env.CommonFlagValues.DatastorePath dataStoreConfig := datastore.DataStoreConfig{ Type: "GCS", Params: params, @@ -820,10 +828,10 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme LedgerBatchConfig: ledgerBatchConfig, CompressionType: "gzip", DataStore: dataStore, - BufferSize: 1000, - NumWorkers: 5, - RetryLimit: 3, - RetryWait: 5, + BufferSize: env.CommonFlagValues.BufferSize, + NumWorkers: env.CommonFlagValues.NumWorkers, + RetryLimit: env.CommonFlagValues.RetryLimit, + RetryWait: time.Duration(env.CommonFlagValues.RetryWait) * time.Second, } backend, err := ledgerbackend.NewBufferedStorageBackend(ctx, BSBackendConfig)