Skip to content

Commit

Permalink
Pass new params correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed May 7, 2024
1 parent ed5a07f commit f825060
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var assetsCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

outFile := mustOutFile(path)

Expand Down
2 changes: 1 addition & 1 deletion cmd/export_diagnostic_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var diagnosticEventsCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var effectsCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ be exported.`,
Run: func(cmd *cobra.Command, args []string) {
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var ledgerTransactionCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

ledgerTransaction, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var ledgersCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

var ledgers []utils.HistoryArchiveLedgerAndLCM
var err error
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var operationsCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

operations, err := input.GetOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var tradesCmd = &cobra.Command{
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

trades, err := input.GetTrades(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var transactionsCmd = &cobra.Command{
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
env := utils.GetEnvironmentDetails(commonArgs)

transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/input/ledger_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ const avgCloseTime = time.Second * 5 // average time to close a stellar ledger
func GetLedgerRange(startTime, endTime time.Time, isTest bool, isFuture bool) (int64, int64, error) {
startTime = startTime.UTC()
endTime = endTime.UTC()
env := utils.GetEnvironmentDetails(isTest, isFuture, "")
commonFlagValues := utils.CommonFlagValues{
IsTest: isTest,
IsFuture: isFuture,
}
env := utils.GetEnvironmentDetails(commonFlagValues)

if startTime.After(endTime) {
return 0, 0, fmt.Errorf("start time must be less than or equal to the end time")
Expand Down
32 changes: 20 additions & 12 deletions internal/utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func AddCommonFlags(flags *pflag.FlagSet) {
flags.StringToStringP("extra-fields", "u", map[string]string{}, "Additional fields to append to output jsons. Used for appending metadata")
flags.Bool("captive-core", false, "If set, run captive core to retrieve data. Otherwise use TxMeta file datastore.")
flags.String("datastore-path", "ledger-exporter/ledgers", "Datastore bucket path to read txmeta files from.")
flags.Uint32("buffer-size", 5, "Buffer size sets the max limit for the number of txmeta files that can be held in memory.")
flags.Uint32("num-workers", 5, "Number of workers to spawn that read txmeta files from the datastore.")
flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.")
flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.")
Expand Down Expand Up @@ -293,6 +294,7 @@ type CommonFlagValues struct {
Extra map[string]string
UseCaptiveCore bool
DatastorePath string
BufferSize uint32
NumWorkers uint32
RetryLimit uint32
RetryWait uint32
Expand Down Expand Up @@ -336,6 +338,11 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues {
logger.Fatal("could not get datastore-bucket-path string: ", err)
}

bufferSize, err := flags.GetUint32("buffer-size")
if err != nil {
logger.Fatal("could not get buffer-size uint32: ", err)
}

numWorkers, err := flags.GetUint32("num-workers")
if err != nil {
logger.Fatal("could not get num-workers uint32: ", err)
Expand All @@ -359,6 +366,7 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues {
Extra: extra,
UseCaptiveCore: useCaptiveCore,
DatastorePath: datastorePath,
BufferSize: bufferSize,
NumWorkers: numWorkers,
RetryLimit: retryLimit,
RetryWait: retryWait,
Expand Down Expand Up @@ -679,38 +687,38 @@ type EnvironmentDetails struct {
ArchiveURLs []string
BinaryPath string
CoreConfig string
DatastorePath string
Network string
CommonFlagValues CommonFlagValues
}

// GetPassphrase returns the correct Network Passphrase based on env preference
func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (details EnvironmentDetails) {
if isTest {
func GetEnvironmentDetails(commonFlags CommonFlagValues) (details EnvironmentDetails) {
if commonFlags.IsTest {
// testnet passphrase to be used for testing
details.NetworkPassphrase = network.TestNetworkPassphrase
details.ArchiveURLs = testArchiveURLs
details.BinaryPath = "/usr/bin/stellar-core"
details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg"
details.DatastorePath = datastorePath
details.Network = "testnet"
details.CommonFlagValues = commonFlags
return details
} else if isFuture {
} else if commonFlags.IsFuture {
// details.NetworkPassphrase = network.FutureNetworkPassphrase
details.NetworkPassphrase = "Test SDF Future Network ; October 2022"
details.ArchiveURLs = futureArchiveURLs
details.BinaryPath = "/usr/bin/stellar-core"
details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg"
details.DatastorePath = datastorePath
details.Network = "futurenet"
details.CommonFlagValues = commonFlags
return details
} else {
// default: mainnet
details.NetworkPassphrase = network.PublicNetworkPassphrase
details.ArchiveURLs = mainArchiveURLs
details.BinaryPath = "/usr/bin/stellar-core"
details.CoreConfig = "/etl/docker/stellar-core.cfg"
details.DatastorePath = datastorePath
details.Network = "pubnet"
details.CommonFlagValues = commonFlags
return details
}
}
Expand Down Expand Up @@ -796,7 +804,7 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme

// Create ledger backend from datastore
params := make(map[string]string)
params["destination_bucket_path"] = env.DatastorePath
params["destination_bucket_path"] = env.CommonFlagValues.DatastorePath
dataStoreConfig := datastore.DataStoreConfig{
Type: "GCS",
Params: params,
Expand All @@ -820,10 +828,10 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme
LedgerBatchConfig: ledgerBatchConfig,
CompressionType: "gzip",
DataStore: dataStore,
BufferSize: 1000,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: 5,
BufferSize: env.CommonFlagValues.BufferSize,
NumWorkers: env.CommonFlagValues.NumWorkers,
RetryLimit: env.CommonFlagValues.RetryLimit,
RetryWait: time.Duration(env.CommonFlagValues.RetryWait) * time.Second,
}

backend, err := ledgerbackend.NewBufferedStorageBackend(ctx, BSBackendConfig)
Expand Down

0 comments on commit f825060

Please sign in to comment.