From fdc8857c2d1297c487c7057be781571cb822f2e8 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 11:10:24 -0400 Subject: [PATCH 1/6] Update to use BufferedStorageBackend to read txmeta files --- cmd/export_account_signers.go | 10 +-- cmd/export_accounts.go | 10 +-- cmd/export_all_history.go | 18 ++-- cmd/export_assets.go | 14 ++-- cmd/export_claimable_balances.go | 10 +-- cmd/export_config_setting.go | 10 +-- cmd/export_contract_code.go | 10 +-- cmd/export_contract_data.go | 10 +-- cmd/export_diagnostic_events.go | 10 +-- cmd/export_effects.go | 12 +-- cmd/export_ledger_entry_changes.go | 29 ++++--- cmd/export_ledger_transaction.go | 10 +-- cmd/export_ledgers.go | 14 ++-- cmd/export_liquidity_pools.go | 10 +-- cmd/export_offers.go | 10 +-- cmd/export_operations.go | 10 +-- cmd/export_orderbooks.go | 24 +++--- cmd/export_trades.go | 10 +-- cmd/export_transactions.go | 10 +-- cmd/export_trustlines.go | 10 +-- cmd/export_ttl.go | 10 +-- go.mod | 2 +- go.sum | 4 +- internal/input/operations.go | 1 + internal/utils/main.go | 127 ++++++++++++++++++++--------- 25 files changed, 229 insertions(+), 166 deletions(-) diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 9e4b9876..f5d7eb47 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -22,13 +22,13 @@ should be used in an initial data dump. In order to get account information with the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) + accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read accounts: ", err) } @@ -48,7 +48,7 @@ the export_ledger_entry_changes command.`, } for _, entry := range transformed { - numBytes, err := exportEntry(entry, outFile, extra) + numBytes, err := exportEntry(entry, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) numFailures += 1 diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index 85d9bcc1..257a68f7 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -22,13 +22,13 @@ should be used in an initial data dump. In order to get account information with the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) + accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read accounts: ", err) } @@ -45,7 +45,7 @@ the export_ledger_entry_changes command.`, continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) numFailures += 1 diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go index d2d86299..0b197157 100644 --- a/cmd/export_all_history.go +++ b/cmd/export_all_history.go @@ -20,23 +20,23 @@ This is a temporary command used to reduce the amount of requests to history arc in order to mitigate egress costs for the entity hosting history archives.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - allHistory, err := input.GetAllHistory(startNum, endNum, limit, env, useCaptiveCore) + allHistory, err := input.GetAllHistory(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read all history: ", err) } cmdLogger.Info("start doing other exports") - getOperations(allHistory.Operations, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env) - getTrades(allHistory.Trades, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt") - getEffects(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env) - getTransactions(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt") - getDiagnosticEvents(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt") + getOperations(allHistory.Operations, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env) + getTrades(allHistory.Trades, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt") + getEffects(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env) + getTransactions(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt") + getDiagnosticEvents(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt") cmdLogger.Info("done doing other exports") }, } diff --git a/cmd/export_assets.go b/cmd/export_assets.go index 5de3aa4d..7ea33a69 100644 --- a/cmd/export_assets.go +++ b/cmd/export_assets.go @@ -16,21 +16,21 @@ var assetsCmd = &cobra.Command{ Long: `Exports the assets that are created from payment operations over a specified ledger range`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) outFile := mustOutFile(path) var paymentOps []input.AssetTransformInput var err error - if useCaptiveCore { - paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, endNum, limit, env, useCaptiveCore) + if commonArgs.UseCaptiveCore { + paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } else { - paymentOps, err = input.GetPaymentOperations(startNum, endNum, limit, env, useCaptiveCore) + paymentOps, err = input.GetPaymentOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } if err != nil { cmdLogger.Fatal("could not read asset: ", err) @@ -55,7 +55,7 @@ var assetsCmd = &cobra.Command{ } seenIDs[transformed.AssetID] = true - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.Error(err) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index f552c913..684c5113 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -22,13 +22,13 @@ var claimableBalancesCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - balances, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs) + balances, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read balances: ", err) } @@ -45,7 +45,7 @@ var claimableBalancesCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export balance %+v: %v", balance, err)) numFailures += 1 diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 892d96eb..58c944c2 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -22,13 +22,13 @@ var configSettingCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - settings, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs) + settings, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("Error getting ledger entries: ", err) } @@ -45,7 +45,7 @@ var configSettingCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export config setting %+v: %v", setting, err)) numFailures += 1 diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index 4ff1d037..c9b5978b 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -22,13 +22,13 @@ var codeCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - codes, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs) + codes, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("Error getting ledger entries: ", err) } @@ -45,7 +45,7 @@ var codeCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export contract code %+v: %v", code, err)) numFailures += 1 diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index 46a427fb..dc7f8c97 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -22,13 +22,13 @@ var dataCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - datas, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs) + datas, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("Error getting ledger entries: ", err) } @@ -50,7 +50,7 @@ var dataCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export contract data %+v: %v", data, err)) numFailures += 1 diff --git a/cmd/export_diagnostic_events.go b/cmd/export_diagnostic_events.go index 1a888ff9..3a5e9ad2 100644 --- a/cmd/export_diagnostic_events.go +++ b/cmd/export_diagnostic_events.go @@ -16,13 +16,13 @@ var diagnosticEventsCmd = &cobra.Command{ Long: `Exports the diagnostic events over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read transactions: ", err) } @@ -42,7 +42,7 @@ var diagnosticEventsCmd = &cobra.Command{ continue } for _, diagnosticEvent := range transformed { - _, err := exportEntry(diagnosticEvent, outFile, extra) + _, err := exportEntry(diagnosticEvent, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err)) numFailures += 1 diff --git a/cmd/export_effects.go b/cmd/export_effects.go index 3ed1df75..03e0f4d4 100644 --- a/cmd/export_effects.go +++ b/cmd/export_effects.go @@ -14,15 +14,15 @@ var effectsCmd = &cobra.Command{ Long: "Exports the effects data over a specified range to an output file.", Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { - cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, endNum, limit, err) + cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, commonArgs.EndNum, limit, err) } outFile := mustOutFile(path) @@ -39,7 +39,7 @@ var effectsCmd = &cobra.Command{ } for _, transformed := range effects { - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(err) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 7b727680..aec74de1 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -28,9 +28,9 @@ confirmed by the Stellar network. If no data type flags are set, then by default all of them are exported. If any are set, it is assumed that the others should not be exported.`, Run: func(cmd *cobra.Command, args []string) { - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) _, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) @@ -62,28 +62,28 @@ be exported.`, } } - if configPath == "" && endNum == 0 { + if configPath == "" && commonArgs.EndNum == 0 { cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)") } ctx := context.Background() - backend, err := utils.CreateLedgerBackend(ctx, useCaptiveCore, env) + backend, err := utils.CreateLedgerBackend(ctx, commonArgs.UseCaptiveCore, env) if err != nil { cmdLogger.Fatal("error creating a cloud storage backend: ", err) } - err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, endNum)) + err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, commonArgs.EndNum)) if err != nil { cmdLogger.Fatal("error preparing ledger range for cloud storage backend: ", err) } - if endNum == 0 { - endNum = math.MaxInt32 + if commonArgs.EndNum == 0 { + commonArgs.EndNum = math.MaxInt32 } changeChan := make(chan input.ChangeBatch) closeChan := make(chan int) - go input.StreamChanges(&backend, startNum, endNum, batchSize, changeChan, closeChan, env, cmdLogger) + go input.StreamChanges(&backend, startNum, commonArgs.EndNum, batchSize, changeChan, closeChan, env, cmdLogger) for { select { @@ -252,7 +252,16 @@ be exported.`, } } - err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, cloudCredentials, cloudStorageBucket, cloudProvider, extra) + err := exportTransformedData( + batch.BatchStart, + batch.BatchEnd, + outputFolder, + transformedOutputs, + cloudCredentials, + cloudStorageBucket, + cloudProvider, + commonArgs.Extra, + ) if err != nil { cmdLogger.LogError(err) continue diff --git a/cmd/export_ledger_transaction.go b/cmd/export_ledger_transaction.go index c08a2017..4054b63f 100644 --- a/cmd/export_ledger_transaction.go +++ b/cmd/export_ledger_transaction.go @@ -16,13 +16,13 @@ var ledgerTransactionCmd = &cobra.Command{ Long: `Exports the ledger_transaction transaction data over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - ledgerTransaction, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + ledgerTransaction, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read ledger_transaction: ", err) } @@ -39,7 +39,7 @@ var ledgerTransactionCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) numFailures += 1 diff --git a/cmd/export_ledgers.go b/cmd/export_ledgers.go index f5000289..501e649b 100644 --- a/cmd/export_ledgers.go +++ b/cmd/export_ledgers.go @@ -16,19 +16,19 @@ var ledgersCmd = &cobra.Command{ Long: `Exports ledger data within the specified range to an output file. Encodes ledgers as JSON objects and exports them to the output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) var ledgers []utils.HistoryArchiveLedgerAndLCM var err error - if useCaptiveCore { - ledgers, err = input.GetLedgersHistoryArchive(startNum, endNum, limit, env, useCaptiveCore) + if commonArgs.UseCaptiveCore { + ledgers, err = input.GetLedgersHistoryArchive(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } else { - ledgers, err = input.GetLedgers(startNum, endNum, limit, env, useCaptiveCore) + ledgers, err = input.GetLedgers(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } if err != nil { cmdLogger.Fatal("could not read ledgers: ", err) @@ -46,7 +46,7 @@ var ledgersCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export ledger %d: %s", startNum+uint32(i), err)) numFailures += 1 diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index f05cab03..d5b3de65 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -22,13 +22,13 @@ should be used in an initial data dump. In order to get liqudity pools informati the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - pools, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeLiquidityPool, env.ArchiveURLs) + pools, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeLiquidityPool, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read accounts: ", err) } @@ -45,7 +45,7 @@ the export_ledger_entry_changes command.`, continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export pool %+v: %v", pool, err)) numFailures += 1 diff --git a/cmd/export_offers.go b/cmd/export_offers.go index 8f0ea5c6..cf7a425f 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -23,13 +23,13 @@ var offersCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - offers, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) + offers, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read offers: ", err) } @@ -46,7 +46,7 @@ var offersCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export offer %+v: %v", offer, err)) numFailures += 1 diff --git a/cmd/export_operations.go b/cmd/export_operations.go index 9b84265a..e8418e6d 100644 --- a/cmd/export_operations.go +++ b/cmd/export_operations.go @@ -16,13 +16,13 @@ var operationsCmd = &cobra.Command{ Long: `Exports the operations data over a specified range. Each operation is an individual command that mutates the Stellar ledger.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - operations, err := input.GetOperations(startNum, endNum, limit, env, useCaptiveCore) + operations, err := input.GetOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read operations: ", err) } @@ -39,7 +39,7 @@ var operationsCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err)) numFailures += 1 diff --git a/cmd/export_orderbooks.go b/cmd/export_orderbooks.go index 91e924ab..58816fae 100644 --- a/cmd/export_orderbooks.go +++ b/cmd/export_orderbooks.go @@ -27,9 +27,9 @@ var exportOrderbooksCmd = &cobra.Command{ If the end-ledger is omitted, then the stellar-core node will continue running and exporting information as new ledgers are confirmed by the Stellar network. In this unbounded case, a stellar-core config path is required to utilize the Captive Core toml.`, Run: func(cmd *cobra.Command, args []string) { - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) @@ -38,7 +38,7 @@ var exportOrderbooksCmd = &cobra.Command{ cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize) } - if configPath == "" && endNum == 0 { + if configPath == "" && commonArgs.EndNum == 0 { cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)") } @@ -54,7 +54,7 @@ var exportOrderbooksCmd = &cobra.Command{ } checkpointSeq := utils.GetMostRecentCheckpoint(startNum) - core, err := input.PrepareCaptiveCore(execPath, configPath, checkpointSeq, endNum, env) + core, err := input.PrepareCaptiveCore(execPath, configPath, checkpointSeq, commonArgs.EndNum, env) if err != nil { cmdLogger.Fatal("error creating a prepared captive core instance: ", err) } @@ -66,21 +66,21 @@ var exportOrderbooksCmd = &cobra.Command{ orderbookChannel := make(chan input.OrderbookBatch) - go input.StreamOrderbooks(core, startNum, endNum, batchSize, orderbookChannel, orderbook, env, cmdLogger) + go input.StreamOrderbooks(core, startNum, commonArgs.EndNum, batchSize, orderbookChannel, orderbook, env, cmdLogger) // If the end sequence number is defined, we work in a closed range and export a finite number of batches - if endNum != 0 { - batchCount := uint32(math.Ceil(float64(endNum-startNum+1) / float64(batchSize))) + if commonArgs.EndNum != 0 { + batchCount := uint32(math.Ceil(float64(commonArgs.EndNum-startNum+1) / float64(batchSize))) for i := uint32(0); i < batchCount; i++ { batchStart := startNum + i*batchSize // Subtract 1 from the end batch number because batches do not include the last batch in the range batchEnd := batchStart + batchSize - 1 - if batchEnd > endNum { - batchEnd = endNum + if batchEnd > commonArgs.EndNum { + batchEnd = commonArgs.EndNum } parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) + exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, commonArgs.Extra) } } else { // otherwise, we export in an unbounded manner where batches are constantly exported @@ -89,7 +89,7 @@ var exportOrderbooksCmd = &cobra.Command{ batchStart := startNum + batchNum*batchSize batchEnd := batchStart + batchSize - 1 parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) + exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, commonArgs.Extra) batchNum++ } } diff --git a/cmd/export_trades.go b/cmd/export_trades.go index 551263e7..94441860 100644 --- a/cmd/export_trades.go +++ b/cmd/export_trades.go @@ -19,13 +19,13 @@ var tradesCmd = &cobra.Command{ Long: `Exports trade data within the specified range to an output file`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - trades, err := input.GetTrades(startNum, endNum, limit, env, useCaptiveCore) + trades, err := input.GetTrades(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read trades ", err) } @@ -43,7 +43,7 @@ var tradesCmd = &cobra.Command{ } for _, transformed := range trades { - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(err) numFailures += 1 diff --git a/cmd/export_transactions.go b/cmd/export_transactions.go index cd37e247..966cd0fa 100644 --- a/cmd/export_transactions.go +++ b/cmd/export_transactions.go @@ -16,13 +16,13 @@ var transactionsCmd = &cobra.Command{ Long: `Exports the transaction data over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read transactions: ", err) } @@ -39,7 +39,7 @@ var transactionsCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index 8b1315b5..01434be9 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -23,13 +23,13 @@ var trustlinesCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - trustlines, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTrustline, env.ArchiveURLs) + trustlines, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeTrustline, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("could not read trustlines: ", err) } @@ -46,7 +46,7 @@ var trustlinesCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export trustline %+v: %v", trust, err)) numFailures += 1 diff --git a/cmd/export_ttl.go b/cmd/export_ttl.go index ce689fda..89adba3b 100644 --- a/cmd/export_ttl.go +++ b/cmd/export_ttl.go @@ -22,13 +22,13 @@ var ttlCmd = &cobra.Command{ the export_ledger_entry_changes command.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - ttls, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTtl, env.ArchiveURLs) + ttls, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeTtl, env.ArchiveURLs) if err != nil { cmdLogger.Fatal("Error getting ledger entries: ", err) } @@ -45,7 +45,7 @@ var ttlCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export ttl %+v: %v", ttl, err)) numFailures += 1 diff --git a/go.mod b/go.mod index 56f483d1..fc76db68 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.17.0 - github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba + github.com/stellar/go v0.0.0-20240507142223-735600adb2d4 github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 92a52cf7..568dfe39 100644 --- a/go.sum +++ b/go.sum @@ -296,8 +296,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= -github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba h1:2UPb78V6mL07B0nJ6/89nJ2cimVD3xPMCFxawwRvpJ0= -github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba/go.mod h1:ckzsX0B0qfTMVZQJtPELJLs7cJ6xXMYHPVLyIsReGsU= +github.com/stellar/go v0.0.0-20240507142223-735600adb2d4 h1:4dmEOaVcttNCZTIXE8y5VwNvduqVwE+D7oFLAu2nn/k= +github.com/stellar/go v0.0.0-20240507142223-735600adb2d4/go.mod h1:kxiz7GJ94uVORlLZ/q7BrEQZAvBgkNXly7I19axD3EA= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/input/operations.go b/internal/input/operations.go index 5290691c..0fca21ed 100644 --- a/internal/input/operations.go +++ b/internal/input/operations.go @@ -71,6 +71,7 @@ func GetOperations(start, end uint32, limit int64, env utils.EnvironmentDetails, } txReader.Close() + if int64(len(opSlice)) >= limit && limit >= 0 { break } diff --git a/internal/utils/main.go b/internal/utils/main.go index c1259a50..b68ca190 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math/big" - "net/url" "time" "github.com/spf13/pflag" @@ -17,6 +16,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" "github.com/stellar/go/network" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/storage" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" @@ -235,7 +235,10 @@ func AddCommonFlags(flags *pflag.FlagSet) { flags.Bool("futurenet", false, "If set, will connect to Futurenet instead of Mainnet.") flags.StringToStringP("extra-fields", "u", map[string]string{}, "Additional fields to append to output jsons. Used for appending metadata") flags.Bool("captive-core", false, "If set, run captive core to retrieve data. Otherwise use TxMeta file datastore.") - flags.String("datastore-url", "", "Datastore url to read txmeta files from.") + flags.String("datastore-path", "ledger-exporter/ledgers", "Datastore bucket path to read txmeta files from.") + flags.Uint32("num-workers", 5, "Number of workers to spawn that read txmeta files from the datastore.") + flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.") + flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.") } // AddArchiveFlags adds the history archive specific flags: start-ledger, output, and limit @@ -282,56 +285,84 @@ func AddExportTypeFlags(flags *pflag.FlagSet) { flags.BoolP("export-ttl", "", false, "set in order to export ttl changes") } +type CommonFlagValues struct { + EndNum uint32 + StrictExport bool + IsTest bool + IsFuture bool + Extra map[string]string + UseCaptiveCore bool + DatastorePath string + NumWorkers uint32 + RetryLimit uint32 + RetryWait uint32 +} + // MustCommonFlags gets the values of the the flags common to all commands: end-ledger and strict-export. // If any do not exist, it stops the program fatally using the logger -func MustCommonFlags( - flags *pflag.FlagSet, - logger *EtlLogger, -) ( - endNum uint32, - strictExport, - isTest bool, - isFuture bool, - extra map[string]string, - useCaptiveCore bool, - datastoreUrl string, -) { +func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { endNum, err := flags.GetUint32("end-ledger") if err != nil { logger.Fatal("could not get end sequence number: ", err) } - strictExport, err = flags.GetBool("strict-export") + strictExport, err := flags.GetBool("strict-export") if err != nil { logger.Fatal("could not get strict-export boolean: ", err) } - isTest, err = flags.GetBool("testnet") + isTest, err := flags.GetBool("testnet") if err != nil { logger.Fatal("could not get testnet boolean: ", err) } - isFuture, err = flags.GetBool("futurenet") + isFuture, err := flags.GetBool("futurenet") if err != nil { logger.Fatal("could not get futurenet boolean: ", err) } - extra, err = flags.GetStringToString("extra-fields") + extra, err := flags.GetStringToString("extra-fields") if err != nil { logger.Fatal("could not get extra fields string: ", err) } - useCaptiveCore, err = flags.GetBool("captive-core") + useCaptiveCore, err := flags.GetBool("captive-core") if err != nil { logger.Fatal("could not get captive-core flag: ", err) } - datastoreUrl, err = flags.GetString("datastore-url") + datastorePath, err := flags.GetString("datastore-path") if err != nil { - logger.Fatal("could not get datastore-url string: ", err) + logger.Fatal("could not get datastore-bucket-path string: ", err) } - return + numWorkers, err := flags.GetUint32("num-workers") + if err != nil { + logger.Fatal("could not get num-workers uint32: ", err) + } + + retryLimit, err := flags.GetUint32("retry-limit") + if err != nil { + logger.Fatal("could not get retry-limit uint32: ", err) + } + + retryWait, err := flags.GetUint32("retry-wait") + if err != nil { + logger.Fatal("could not get retry-wait uint32: ", err) + } + + return CommonFlagValues{ + EndNum: endNum, + StrictExport: strictExport, + IsTest: isTest, + IsFuture: isFuture, + Extra: extra, + UseCaptiveCore: useCaptiveCore, + DatastorePath: datastorePath, + NumWorkers: numWorkers, + RetryLimit: retryLimit, + RetryWait: retryWait, + } } // MustArchiveFlags gets the values of the the history archive specific flags: start-ledger, output, and limit @@ -430,7 +461,7 @@ func MustExportTypeFlags(flags *pflag.FlagSet, logger *EtlLogger) map[string]boo "export-ttl": false, } - for export_name, _ := range exports { + for export_name := range exports { exports[export_name], err = flags.GetBool(export_name) if err != nil { logger.Fatalf("could not get %s flag: %v", export_name, err) @@ -649,6 +680,7 @@ type EnvironmentDetails struct { BinaryPath string CoreConfig string StorageURL string + Network string } // GetPassphrase returns the correct Network Passphrase based on env preference @@ -660,6 +692,7 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg" details.StorageURL = datastoreUrl + details.Network = "testnet" return details } else if isFuture { // details.NetworkPassphrase = network.FutureNetworkPassphrase @@ -668,6 +701,7 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg" details.StorageURL = datastoreUrl + details.Network = "futurenet" return details } else { // default: mainnet @@ -676,6 +710,7 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core.cfg" details.StorageURL = datastoreUrl + details.Network = "pubnet" return details } } @@ -714,6 +749,9 @@ func (e EnvironmentDetails) GetUnboundedLedgerCloseMeta(end uint32) (xdr.LedgerC ctx := context.Background() backend, err := e.CreateCaptiveCoreBackend() + if err != nil { + return xdr.LedgerCloseMeta{}, err + } ledgerRange := ledgerbackend.UnboundedRange(end) @@ -757,28 +795,43 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme } // Create ledger backend from datastore - fileConfig := ledgerbackend.LCMFileConfig{ - StorageURL: env.StorageURL, - FileSuffix: ".xdr.gz", - LedgersPerFile: 1, - FilesPerPartition: 64000, + params := make(map[string]string) + //params["destination_bucket_path"] = "ledger-exporter/ledgers" + params["destination_bucket_path"] = env.StorageURL + dataStoreConfig := datastore.DataStoreConfig{ + Type: "GCS", + Params: params, } - parsed, err := url.Parse(env.StorageURL) + dataStore, err := datastore.NewDataStore(ctx, dataStoreConfig, env.Network) if err != nil { return nil, err } - // Using the GCS datastore backend - if parsed.Scheme == "gcs" { - backend, err := ledgerbackend.NewGCSBackend(ctx, fileConfig) - if err != nil { - return nil, err - } - return backend, nil + // TODO: In the future these will come from a config file written by ledgerexporter + // Hard code ledger batch values for now + ledgerBatchConfig := datastore.LedgerBatchConfig{ + LedgersPerFile: 1, + FilesPerPartition: 64000, + FileSuffix: ".xdr.gz", + } + + // TODO: In the future CompressionType should be removed as it won't be configurable + BSBackendConfig := ledgerbackend.BufferedStorageBackendConfig{ + LedgerBatchConfig: ledgerBatchConfig, + CompressionType: "gzip", + DataStore: dataStore, + BufferSize: 1000, + NumWorkers: 5, + RetryLimit: 3, + RetryWait: 5, } - return nil, errors.New("no valid ledgerbackend selected") + backend, err := ledgerbackend.NewBufferedStorageBackend(ctx, BSBackendConfig) + if err != nil { + return nil, err + } + return backend, nil } func LedgerKeyToLedgerKeyHash(ledgerKey xdr.LedgerKey) string { From f527b68baa87d321fd4495710a89d16521bc89bf Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 11:15:59 -0400 Subject: [PATCH 2/6] Rename storageurl --- internal/utils/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/utils/main.go b/internal/utils/main.go index b68ca190..1bf01ed0 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -679,19 +679,19 @@ type EnvironmentDetails struct { ArchiveURLs []string BinaryPath string CoreConfig string - StorageURL string + DatastorePath string Network string } // GetPassphrase returns the correct Network Passphrase based on env preference -func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (details EnvironmentDetails) { +func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (details EnvironmentDetails) { if isTest { // testnet passphrase to be used for testing details.NetworkPassphrase = network.TestNetworkPassphrase details.ArchiveURLs = testArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg" - details.StorageURL = datastoreUrl + details.DatastorePath = datastorePath details.Network = "testnet" return details } else if isFuture { @@ -700,7 +700,7 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.ArchiveURLs = futureArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg" - details.StorageURL = datastoreUrl + details.DatastorePath = datastorePath details.Network = "futurenet" return details } else { @@ -709,7 +709,7 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.ArchiveURLs = mainArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core.cfg" - details.StorageURL = datastoreUrl + details.DatastorePath = datastorePath details.Network = "pubnet" return details } @@ -797,7 +797,7 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme // Create ledger backend from datastore params := make(map[string]string) //params["destination_bucket_path"] = "ledger-exporter/ledgers" - params["destination_bucket_path"] = env.StorageURL + params["destination_bucket_path"] = env.DatastorePath dataStoreConfig := datastore.DataStoreConfig{ Type: "GCS", Params: params, From d74a721ecbc17e0e71695471784efda4ef69e6b3 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 16:58:38 -0400 Subject: [PATCH 3/6] Remove unused export commands --- cmd/export_account_signers.go | 78 ---------- cmd/export_account_signers_test.go | 29 ---- cmd/export_accounts.go | 81 ---------- cmd/export_accounts_test.go | 26 ---- cmd/export_all_history.go | 207 -------------------------- cmd/export_claimable_balances.go | 82 ---------- cmd/export_claimable_balances_test.go | 20 --- cmd/export_config_setting.go | 81 ---------- cmd/export_config_setting_test.go | 22 --- cmd/export_contract_code.go | 81 ---------- cmd/export_contract_code_test.go | 22 --- cmd/export_contract_data.go | 86 ----------- cmd/export_contract_data_test.go | 22 --- cmd/export_liquidity_pools.go | 81 ---------- cmd/export_offers.go | 82 ---------- cmd/export_offers_test.go | 26 ---- cmd/export_orderbooks.go | 185 ----------------------- cmd/export_orderbooks_test.go | 41 ----- cmd/export_trustlines.go | 85 ----------- cmd/export_trustlines_test.go | 26 ---- cmd/export_ttl.go | 81 ---------- cmd/export_ttl_test.go | 22 --- 22 files changed, 1466 deletions(-) delete mode 100644 cmd/export_account_signers.go delete mode 100644 cmd/export_account_signers_test.go delete mode 100644 cmd/export_accounts.go delete mode 100644 cmd/export_accounts_test.go delete mode 100644 cmd/export_all_history.go delete mode 100644 cmd/export_claimable_balances.go delete mode 100644 cmd/export_claimable_balances_test.go delete mode 100644 cmd/export_config_setting.go delete mode 100644 cmd/export_config_setting_test.go delete mode 100644 cmd/export_contract_code.go delete mode 100644 cmd/export_contract_code_test.go delete mode 100644 cmd/export_contract_data.go delete mode 100644 cmd/export_contract_data_test.go delete mode 100644 cmd/export_liquidity_pools.go delete mode 100644 cmd/export_offers.go delete mode 100644 cmd/export_offers_test.go delete mode 100644 cmd/export_orderbooks.go delete mode 100644 cmd/export_orderbooks_test.go delete mode 100644 cmd/export_trustlines.go delete mode 100644 cmd/export_trustlines_test.go delete mode 100644 cmd/export_ttl.go delete mode 100644 cmd/export_ttl_test.go diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go deleted file mode 100644 index f5d7eb47..00000000 --- a/cmd/export_account_signers.go +++ /dev/null @@ -1,78 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var accountSignersCmd = &cobra.Command{ - Use: "export_signers", - Short: "Exports the account signers data.", - Long: `Exports historical account signers data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get account information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - numSigners := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, acc := range accounts { - if utils.AccountSignersChanged(acc) { - transformed, err := transform.TransformSigners(acc, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) - numFailures += 1 - continue - } - - for _, entry := range transformed { - numBytes, err := exportEntry(entry, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) - numFailures += 1 - continue - } - numSigners += 1 - totalNumBytes += numBytes - } - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(numSigners, numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(accountSignersCmd) - utils.AddCommonFlags(accountSignersCmd.Flags()) - utils.AddBucketFlags("signers", accountSignersCmd.Flags()) - utils.AddCloudStorageFlags(accountSignersCmd.Flags()) - accountSignersCmd.MarkFlagRequired("end-ledger") -} diff --git a/cmd/export_account_signers_test.go b/cmd/export_account_signers_test.go deleted file mode 100644 index 33193444..00000000 --- a/cmd/export_account_signers_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package cmd - -import ( -"testing" -) - -func TestExportSigners(t *testing.T) { - tests := []cliTest{ - { - name: "signers: bucket list with exact checkpoint", - args: []string{"export_signers", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - sortForComparison: true, - }, - { - name: "signers: bucket list with end not on checkpoint", - args: []string{"export_signers", "-e", "80210", "-o", gotTestDir(t, "bucket_read_off.txt")}, - golden: "bucket_read_off.golden", - wantErr: nil, - sortForComparison: true, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/signers/") - } -} - diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go deleted file mode 100644 index 257a68f7..00000000 --- a/cmd/export_accounts.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var accountsCmd = &cobra.Command{ - Use: "export_accounts", - Short: "Exports the account data.", - Long: `Exports historical account data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get account information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(accounts), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(accountsCmd) - utils.AddCommonFlags(accountsCmd.Flags()) - utils.AddBucketFlags("accounts", accountsCmd.Flags()) - utils.AddCloudStorageFlags(accountsCmd.Flags()) - accountsCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_accounts_test.go b/cmd/export_accounts_test.go deleted file mode 100644 index c1981fbf..00000000 --- a/cmd/export_accounts_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportAccounts(t *testing.T) { - tests := []cliTest{ - { - name: "accounts: bucket list with exact checkpoint", - args: []string{"export_accounts", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "accounts: bucket list with end not on checkpoint", - args: []string{"export_accounts", "-e", "80210", "-o", gotTestDir(t, "bucket_read_off.txt")}, - golden: "bucket_read_off.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/accounts/") - } -} diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go deleted file mode 100644 index 0b197157..00000000 --- a/cmd/export_all_history.go +++ /dev/null @@ -1,207 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/toid" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" -) - -var allHistoryCmd = &cobra.Command{ - Use: "export_all_history", - Short: "Exports all stellar network history.", - Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files. -This is a temporary command used to reduce the amount of requests to history archives -in order to mitigate egress costs for the entity hosting history archives.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - - allHistory, err := input.GetAllHistory(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) - if err != nil { - cmdLogger.Fatal("could not read all history: ", err) - } - - cmdLogger.Info("start doing other exports") - getOperations(allHistory.Operations, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env) - getTrades(allHistory.Trades, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt") - getEffects(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env) - getTransactions(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt") - getDiagnosticEvents(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt") - cmdLogger.Info("done doing other exports") - }, -} - -func getOperations(operations []input.OperationTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { - outFileOperations := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range operations { - transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase) - if err != nil { - txIndex := transformInput.Transaction.Index - cmdLogger.LogError(fmt.Errorf("could not transform operation %d in transaction %d in ledger %d: %v", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFileOperations, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFileOperations.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(operations), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getTrades(trades []input.TradeTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, tradeInput := range trades { - trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime) - if err != nil { - parsedID := toid.Parse(tradeInput.OperationHistoryID) - cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err)) - numFailures += 1 - continue - } - - for _, transformed := range trades { - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(err) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(trades), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range transactions { - LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq) - effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase) - if err != nil { - txIndex := transformInput.Transaction.Index - cmdLogger.Errorf("could not transform transaction %d in ledger %d: %v", txIndex, LedgerSeq, err) - numFailures += 1 - continue - } - - for _, transformed := range effects { - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(err) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range transactions { - transformed, err := transform.TransformTransaction(transformInput.Transaction, transformInput.LedgerHistory) - if err != nil { - ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq - cmdLogger.LogError(fmt.Errorf("could not transform transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - for _, transformInput := range transactions { - transformed, err, ok := transform.TransformDiagnosticEvent(transformInput.Transaction, transformInput.LedgerHistory) - if err != nil { - ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq - cmdLogger.LogError(fmt.Errorf("could not transform diagnostic events in transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) - numFailures += 1 - continue - } - - if !ok { - continue - } - for _, diagnosticEvent := range transformed { - _, err := exportEntry(diagnosticEvent, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err)) - numFailures += 1 - continue - } - } - } - - outFile.Close() - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func init() { - rootCmd.AddCommand(allHistoryCmd) - utils.AddCommonFlags(allHistoryCmd.Flags()) - utils.AddArchiveFlags("", allHistoryCmd.Flags()) - utils.AddCloudStorageFlags(allHistoryCmd.Flags()) - allHistoryCmd.MarkFlagRequired("end-ledger") -} diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go deleted file mode 100644 index 684c5113..00000000 --- a/cmd/export_claimable_balances.go +++ /dev/null @@ -1,82 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var claimableBalancesCmd = &cobra.Command{ - Use: "export_claimable_balances", - Short: "Exports the data on claimable balances made from the genesis ledger to a specified endpoint.", - Long: `Exports historical offer data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - balances, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read balances: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, balance := range balances { - transformed, err := transform.TransformClaimableBalance(balance, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export balance %+v: %v", balance, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(balances), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(claimableBalancesCmd) - utils.AddCommonFlags(claimableBalancesCmd.Flags()) - utils.AddBucketFlags("claimable_balances", claimableBalancesCmd.Flags()) - utils.AddCloudStorageFlags(claimableBalancesCmd.Flags()) - claimableBalancesCmd.MarkFlagRequired("end-ledger") - - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_claimable_balances_test.go b/cmd/export_claimable_balances_test.go deleted file mode 100644 index fafcaa64..00000000 --- a/cmd/export_claimable_balances_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportClaimableBalances(t *testing.T) { - tests := []cliTest{ - { - name: "claimable balances", - args: []string{"export_claimable_balances", "-e", "32878607", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/claimable_balances/") - } -} diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go deleted file mode 100644 index 58c944c2..00000000 --- a/cmd/export_config_setting.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var configSettingCmd = &cobra.Command{ - Use: "export_config_setting", - Short: "Exports the config setting information.", - Long: `Exports historical config settings data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - settings, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, setting := range settings { - transformed, err := transform.TransformConfigSetting(setting, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export config setting %+v: %v", setting, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(settings), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(configSettingCmd) - utils.AddCommonFlags(configSettingCmd.Flags()) - utils.AddBucketFlags("config_settings", configSettingCmd.Flags()) - utils.AddCloudStorageFlags(configSettingCmd.Flags()) - configSettingCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_config_setting_test.go b/cmd/export_config_setting_test.go deleted file mode 100644 index a8114429..00000000 --- a/cmd/export_config_setting_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportConfigSetting(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "config setting", - args: []string{"export_config_setting", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/config_setting/") - } -} diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go deleted file mode 100644 index c9b5978b..00000000 --- a/cmd/export_contract_code.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var codeCmd = &cobra.Command{ - Use: "export_contract_code", - Short: "Exports the contract code information.", - Long: `Exports historical contract code data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - codes, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, code := range codes { - transformed, err := transform.TransformContractCode(code, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export contract code %+v: %v", code, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(codes), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(codeCmd) - utils.AddCommonFlags(codeCmd.Flags()) - utils.AddBucketFlags("contract_code", codeCmd.Flags()) - utils.AddCloudStorageFlags(codeCmd.Flags()) - codeCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_contract_code_test.go b/cmd/export_contract_code_test.go deleted file mode 100644 index 74f51285..00000000 --- a/cmd/export_contract_code_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportContractCode(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "contract code", - args: []string{"export_contract_code", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/contract_code/") - } -} diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go deleted file mode 100644 index dc7f8c97..00000000 --- a/cmd/export_contract_data.go +++ /dev/null @@ -1,86 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var dataCmd = &cobra.Command{ - Use: "export_contract_data", - Short: "Exports the contract data information made from the genesis ledger to a specified endpoint.", - Long: `Exports historical contract data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - datas, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, data := range datas { - TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) - numFailures += 1 - continue - } - - if !ok { - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export contract data %+v: %v", data, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(datas), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(dataCmd) - utils.AddCommonFlags(dataCmd.Flags()) - utils.AddBucketFlags("contract_data", dataCmd.Flags()) - utils.AddCloudStorageFlags(dataCmd.Flags()) - dataCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_contract_data_test.go b/cmd/export_contract_data_test.go deleted file mode 100644 index 2e511e8e..00000000 --- a/cmd/export_contract_data_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportContractData(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "contract data", - args: []string{"export_contract_data", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/contract_data/") - } -} diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go deleted file mode 100644 index d5b3de65..00000000 --- a/cmd/export_liquidity_pools.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var poolsCmd = &cobra.Command{ - Use: "export_pools", - Short: "Exports the liquidity pools data.", - Long: `Exports historical liquidity pools data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get liqudity pools information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - pools, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeLiquidityPool, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, pool := range pools { - transformed, err := transform.TransformPool(pool, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export pool %+v: %v", pool, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(pools), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(poolsCmd) - utils.AddCommonFlags(poolsCmd.Flags()) - utils.AddBucketFlags("pools", poolsCmd.Flags()) - utils.AddCloudStorageFlags(poolsCmd.Flags()) - poolsCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_offers.go b/cmd/export_offers.go deleted file mode 100644 index cf7a425f..00000000 --- a/cmd/export_offers.go +++ /dev/null @@ -1,82 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// offersCmd represents the offers command -var offersCmd = &cobra.Command{ - Use: "export_offers", - Short: "Exports the data on offers made from the genesis ledger to a specified endpoint.", - Long: `Exports historical offer data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - offers, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read offers: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, offer := range offers { - transformed, err := transform.TransformOffer(offer, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export offer %+v: %v", offer, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(offers), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(offersCmd) - utils.AddCommonFlags(offersCmd.Flags()) - utils.AddBucketFlags("offers", offersCmd.Flags()) - utils.AddCloudStorageFlags(offersCmd.Flags()) - offersCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_offers_test.go b/cmd/export_offers_test.go deleted file mode 100644 index a2ab9c40..00000000 --- a/cmd/export_offers_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportOffers(t *testing.T) { - tests := []cliTest{ - { - name: "offers: bucket list with exact checkpoint", - args: []string{"export_offers", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "offers: bucket list with end not on checkpoint", - args: []string{"export_offers", "-e", "80210", "-o", gotTestDir(t, "bucket_read_offset.txt")}, - golden: "bucket_read_offset.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/offers/") - } -} diff --git a/cmd/export_orderbooks.go b/cmd/export_orderbooks.go deleted file mode 100644 index 58816fae..00000000 --- a/cmd/export_orderbooks.go +++ /dev/null @@ -1,185 +0,0 @@ -package cmd - -import ( - "bytes" - "encoding/json" - "math" - "os" - "path/filepath" - - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// exportOrderbooksCmd represents the exportOrderbooks command -var exportOrderbooksCmd = &cobra.Command{ - Use: "export_orderbooks", - Short: "This command exports the historical orderbooks", - Long: `This command instantiates a stellar-core instance and uses it to export normalized orderbooks. - The information is exported in batches determined by the batch-size flag. The normalized data is exported in multiple - different files within the exported data folder. These files are dimAccounts.txt, dimOffers.txt, dimMarkets.txt, and factEvents.txt. - These files contain normalized data that helps save storage space. - - If the end-ledger is omitted, then the stellar-core node will continue running and exporting information as new ledgers are - confirmed by the Stellar network. In this unbounded case, a stellar-core config path is required to utilize the Captive Core toml.`, - Run: func(cmd *cobra.Command, args []string) { - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - - execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - if batchSize <= 0 { - cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize) - } - - if configPath == "" && commonArgs.EndNum == 0 { - cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)") - } - - var err error - execPath, err = filepath.Abs(execPath) - if err != nil { - cmdLogger.Fatal("could not get absolute filepath for stellar-core executable: ", err) - } - - configPath, err = filepath.Abs(configPath) - if err != nil { - cmdLogger.Fatal("could not get absolute filepath for the config file: ", err) - } - - checkpointSeq := utils.GetMostRecentCheckpoint(startNum) - core, err := input.PrepareCaptiveCore(execPath, configPath, checkpointSeq, commonArgs.EndNum, env) - if err != nil { - cmdLogger.Fatal("error creating a prepared captive core instance: ", err) - } - - orderbook, err := input.GetEntriesFromGenesis(checkpointSeq, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read initial orderbook: ", err) - } - - orderbookChannel := make(chan input.OrderbookBatch) - - go input.StreamOrderbooks(core, startNum, commonArgs.EndNum, batchSize, orderbookChannel, orderbook, env, cmdLogger) - - // If the end sequence number is defined, we work in a closed range and export a finite number of batches - if commonArgs.EndNum != 0 { - batchCount := uint32(math.Ceil(float64(commonArgs.EndNum-startNum+1) / float64(batchSize))) - for i := uint32(0); i < batchCount; i++ { - batchStart := startNum + i*batchSize - // Subtract 1 from the end batch number because batches do not include the last batch in the range - batchEnd := batchStart + batchSize - 1 - if batchEnd > commonArgs.EndNum { - batchEnd = commonArgs.EndNum - } - - parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, commonArgs.Extra) - } - } else { - // otherwise, we export in an unbounded manner where batches are constantly exported - var batchNum uint32 = 0 - for { - batchStart := startNum + batchNum*batchSize - batchEnd := batchStart + batchSize - 1 - parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, commonArgs.Extra) - batchNum++ - } - } - }, -} - -// writeSlice writes the slice either to a file. -func writeSlice(file *os.File, slice [][]byte, extra map[string]string) error { - - for _, data := range slice { - bytesToWrite := data - if len(extra) > 0 { - i := map[string]interface{}{} - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.UseNumber() - err := decoder.Decode(&i) - if err != nil { - return err - } - for k, v := range extra { - i[k] = v - } - bytesToWrite, err = json.Marshal(i) - if err != nil { - return err - } - } - file.WriteString(string(bytesToWrite) + "\n") - } - - file.Close() - return nil -} - -func exportOrderbook( - start, end uint32, - folderPath string, - parser *input.OrderbookParser, - cloudCredentials, cloudStorageBucket, cloudProvider string, - extra map[string]string) { - marketsFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimMarkets")) - offersFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimOffers")) - accountsFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimAccounts")) - eventsFilePath := filepath.Join(folderPath, exportFilename(start, end, "factEvents")) - - marketsFile := mustOutFile(marketsFilePath) - offersFile := mustOutFile(offersFilePath) - accountsFile := mustOutFile(accountsFilePath) - eventsFile := mustOutFile(eventsFilePath) - - err := writeSlice(marketsFile, parser.Markets, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(offersFile, parser.Offers, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(accountsFile, parser.Accounts, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(eventsFile, parser.Events, extra) - if err != nil { - cmdLogger.LogError(err) - } - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, marketsFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, offersFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, accountsFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, eventsFilePath) -} - -func init() { - rootCmd.AddCommand(exportOrderbooksCmd) - utils.AddCommonFlags(exportOrderbooksCmd.Flags()) - utils.AddCoreFlags(exportOrderbooksCmd.Flags(), "orderbooks_output/") - utils.AddCloudStorageFlags(exportOrderbooksCmd.Flags()) - - exportOrderbooksCmd.MarkFlagRequired("start-ledger") - /* - Current flags: - start-ledger: the ledger sequence number for the beginning of the export period - end-ledger: the ledger sequence number for the end of the export range - - output-folder: folder that will contain the output files - limit: maximum number of changes to export in a given batch; if negative then everything gets exported - batch-size: size of the export batches - - core-executable: path to stellar-core executable - core-config: path to stellar-core config file - */ -} diff --git a/cmd/export_orderbooks_test.go b/cmd/export_orderbooks_test.go deleted file mode 100644 index bf1c043b..00000000 --- a/cmd/export_orderbooks_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package cmd - -import ( - "fmt" - "testing" -) - -func TestExportOrderbooks(t *testing.T) { - tests := []cliTest{ - { - name: "unbounded range with no config", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-s", "100000"}, - golden: "", - wantErr: fmt.Errorf("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)"), - }, - { - name: "0 batch size", - args: []string{"export_orderbooks", "-b", "0", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "100000", "-e", "164000"}, - golden: "", - wantErr: fmt.Errorf("batch-size (0) must be greater than 0"), - }, - { - name: "orderbook from single ledger", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "5000000", "-e", "5000000", "-o", gotTestDir(t, "single/")}, - golden: "single_ledger.golden", - sortForComparison: true, - wantErr: nil, - }, - { - name: "orderbooks from large range", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "6000000", "-e", "6001000", "-o", gotTestDir(t, "range/")}, - golden: "large_range_orderbooks.golden", - sortForComparison: true, - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/orderbooks/") - } -} diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go deleted file mode 100644 index 01434be9..00000000 --- a/cmd/export_trustlines.go +++ /dev/null @@ -1,85 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// trustlinesCmd represents the trustlines command -var trustlinesCmd = &cobra.Command{ - Use: "export_trustlines", - Short: "Exports the trustline data over a specified range.", - Long: `Exports historical trustline data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get trustline information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - trustlines, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeTrustline, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read trustlines: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, trust := range trustlines { - transformed, err := transform.TransformTrustline(trust, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export trustline %+v: %v", trust, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(trustlines), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(trustlinesCmd) - utils.AddCommonFlags(trustlinesCmd.Flags()) - utils.AddBucketFlags("trustlines", trustlinesCmd.Flags()) - utils.AddCloudStorageFlags(trustlinesCmd.Flags()) - trustlinesCmd.MarkFlagRequired("end-ledger") - - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_trustlines_test.go b/cmd/export_trustlines_test.go deleted file mode 100644 index 61a69281..00000000 --- a/cmd/export_trustlines_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportTrustlines(t *testing.T) { - tests := []cliTest{ - { - name: "trustlines: bucket list with exact checkpoint", - args: []string{"export_trustlines", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.golden")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "trustlines: bucket list with end not on checkpoint", - args: []string{"export_trustlines", "-e", "139672", "-o", gotTestDir(t, "bucket_read_off.golden")}, - golden: "bucket_read_off.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/trustlines/") - } -} diff --git a/cmd/export_ttl.go b/cmd/export_ttl.go deleted file mode 100644 index 89adba3b..00000000 --- a/cmd/export_ttl.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var ttlCmd = &cobra.Command{ - Use: "export_ttl", - Short: "Exports the ttl information.", - Long: `Exports historical ttl data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - ttls, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeTtl, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, ttl := range ttls { - transformed, err := transform.TransformTtl(ttl, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform ttl %+v: %v", ttl, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export ttl %+v: %v", ttl, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(ttls), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(ttlCmd) - utils.AddCommonFlags(ttlCmd.Flags()) - utils.AddBucketFlags("ttl", ttlCmd.Flags()) - utils.AddCloudStorageFlags(ttlCmd.Flags()) - ttlCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_ttl_test.go b/cmd/export_ttl_test.go deleted file mode 100644 index 3dc5d762..00000000 --- a/cmd/export_ttl_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportttl(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "ttl", - args: []string{"export_ttl", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/ttl/") - } -} From 86a7f88bd76b4597f8d7eb21d73a86235dfa7f0b Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 17:00:19 -0400 Subject: [PATCH 4/6] Remove unused bucketlist entries --- internal/input/bucketlist_entries.go | 63 ---------------------------- 1 file changed, 63 deletions(-) delete mode 100644 internal/input/bucketlist_entries.go diff --git a/internal/input/bucketlist_entries.go b/internal/input/bucketlist_entries.go deleted file mode 100644 index 517641d5..00000000 --- a/internal/input/bucketlist_entries.go +++ /dev/null @@ -1,63 +0,0 @@ -package input - -import ( - "context" - "io" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" - - "github.com/stellar/stellar-etl/internal/utils" -) - -// GetEntriesFromGenesis returns a slice of ledger entries of the specified type for the ledgers starting from the genesis ledger and ending at end (inclusive) -func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType, archiveURLs []string) ([]ingest.Change, error) { - archive, err := utils.CreateHistoryArchiveClient(archiveURLs) - if err != nil { - return []ingest.Change{}, err - } - - latestNum, err := utils.GetLatestLedgerSequence(archiveURLs) - if err != nil { - return []ingest.Change{}, err - } - - if err = utils.ValidateLedgerRange(2, end, latestNum); err != nil { - return []ingest.Change{}, err - } - - checkpointSeq, err := utils.GetCheckpointNum(end, latestNum) - if err != nil { - return []ingest.Change{}, err - } - - return readBucketList(archive, checkpointSeq, entryType) -} - -// readBucketList reads the bucket list for the specified checkpoint sequence number and returns a slice of ledger entries of the specified type -func readBucketList(archive historyarchive.ArchiveInterface, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) { - changeReader, err := ingest.NewCheckpointChangeReader(context.Background(), archive, checkpointSeq) - defer changeReader.Close() - if err != nil { - return []ingest.Change{}, err - } - - entrySlice := []ingest.Change{} - for { - change, err := changeReader.Read() - if err == io.EOF { - break - } - - if err != nil { - return []ingest.Change{}, err - } - - if change.Type == entryType { - entrySlice = append(entrySlice, change) - } - } - - return entrySlice, nil -} From ed5a07f552df0a0e8f1bc2d5cd0c5680b1588466 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 17:03:09 -0400 Subject: [PATCH 5/6] Remove debug comment --- internal/utils/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/utils/main.go b/internal/utils/main.go index 1bf01ed0..069ac0f8 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -796,7 +796,6 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme // Create ledger backend from datastore params := make(map[string]string) - //params["destination_bucket_path"] = "ledger-exporter/ledgers" params["destination_bucket_path"] = env.DatastorePath dataStoreConfig := datastore.DataStoreConfig{ Type: "GCS", From f8250609dd16e1aeeabf33ff3328fb204eae59c0 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 7 May 2024 17:57:20 -0400 Subject: [PATCH 6/6] Pass new params correctly --- cmd/export_assets.go | 2 +- cmd/export_diagnostic_events.go | 2 +- cmd/export_effects.go | 2 +- cmd/export_ledger_entry_changes.go | 2 +- cmd/export_ledger_transaction.go | 2 +- cmd/export_ledgers.go | 2 +- cmd/export_operations.go | 2 +- cmd/export_trades.go | 2 +- cmd/export_transactions.go | 2 +- internal/input/ledger_range.go | 6 +++++- internal/utils/main.go | 32 +++++++++++++++++++----------- 11 files changed, 34 insertions(+), 22 deletions(-) diff --git a/cmd/export_assets.go b/cmd/export_assets.go index 7ea33a69..2160d6eb 100644 --- a/cmd/export_assets.go +++ b/cmd/export_assets.go @@ -20,7 +20,7 @@ var assetsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) outFile := mustOutFile(path) diff --git a/cmd/export_diagnostic_events.go b/cmd/export_diagnostic_events.go index 3a5e9ad2..66ed6438 100644 --- a/cmd/export_diagnostic_events.go +++ b/cmd/export_diagnostic_events.go @@ -20,7 +20,7 @@ var diagnosticEventsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_effects.go b/cmd/export_effects.go index 03e0f4d4..b93aaf1c 100644 --- a/cmd/export_effects.go +++ b/cmd/export_effects.go @@ -18,7 +18,7 @@ var effectsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index aec74de1..b227defe 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -30,7 +30,7 @@ be exported.`, Run: func(cmd *cobra.Command, args []string) { commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = commonArgs.StrictExport - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) _, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) diff --git a/cmd/export_ledger_transaction.go b/cmd/export_ledger_transaction.go index 4054b63f..7d07b9ec 100644 --- a/cmd/export_ledger_transaction.go +++ b/cmd/export_ledger_transaction.go @@ -20,7 +20,7 @@ var ledgerTransactionCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) ledgerTransaction, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_ledgers.go b/cmd/export_ledgers.go index 501e649b..e1dce45b 100644 --- a/cmd/export_ledgers.go +++ b/cmd/export_ledgers.go @@ -20,7 +20,7 @@ var ledgersCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) var ledgers []utils.HistoryArchiveLedgerAndLCM var err error diff --git a/cmd/export_operations.go b/cmd/export_operations.go index e8418e6d..cbfb8d84 100644 --- a/cmd/export_operations.go +++ b/cmd/export_operations.go @@ -20,7 +20,7 @@ var operationsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) operations, err := input.GetOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/cmd/export_trades.go b/cmd/export_trades.go index 94441860..748cdb66 100644 --- a/cmd/export_trades.go +++ b/cmd/export_trades.go @@ -22,7 +22,7 @@ var tradesCmd = &cobra.Command{ commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) trades, err := input.GetTrades(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) diff --git a/cmd/export_transactions.go b/cmd/export_transactions.go index 966cd0fa..35f82bd2 100644 --- a/cmd/export_transactions.go +++ b/cmd/export_transactions.go @@ -20,7 +20,7 @@ var transactionsCmd = &cobra.Command{ cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath) + env := utils.GetEnvironmentDetails(commonArgs) transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { diff --git a/internal/input/ledger_range.go b/internal/input/ledger_range.go index f4ec07ee..2f778095 100644 --- a/internal/input/ledger_range.go +++ b/internal/input/ledger_range.go @@ -32,7 +32,11 @@ const avgCloseTime = time.Second * 5 // average time to close a stellar ledger func GetLedgerRange(startTime, endTime time.Time, isTest bool, isFuture bool) (int64, int64, error) { startTime = startTime.UTC() endTime = endTime.UTC() - env := utils.GetEnvironmentDetails(isTest, isFuture, "") + commonFlagValues := utils.CommonFlagValues{ + IsTest: isTest, + IsFuture: isFuture, + } + env := utils.GetEnvironmentDetails(commonFlagValues) if startTime.After(endTime) { return 0, 0, fmt.Errorf("start time must be less than or equal to the end time") diff --git a/internal/utils/main.go b/internal/utils/main.go index 069ac0f8..e54fe920 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -236,6 +236,7 @@ func AddCommonFlags(flags *pflag.FlagSet) { flags.StringToStringP("extra-fields", "u", map[string]string{}, "Additional fields to append to output jsons. Used for appending metadata") flags.Bool("captive-core", false, "If set, run captive core to retrieve data. Otherwise use TxMeta file datastore.") flags.String("datastore-path", "ledger-exporter/ledgers", "Datastore bucket path to read txmeta files from.") + flags.Uint32("buffer-size", 5, "Buffer size sets the max limit for the number of txmeta files that can be held in memory.") flags.Uint32("num-workers", 5, "Number of workers to spawn that read txmeta files from the datastore.") flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.") flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.") @@ -293,6 +294,7 @@ type CommonFlagValues struct { Extra map[string]string UseCaptiveCore bool DatastorePath string + BufferSize uint32 NumWorkers uint32 RetryLimit uint32 RetryWait uint32 @@ -336,6 +338,11 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { logger.Fatal("could not get datastore-bucket-path string: ", err) } + bufferSize, err := flags.GetUint32("buffer-size") + if err != nil { + logger.Fatal("could not get buffer-size uint32: ", err) + } + numWorkers, err := flags.GetUint32("num-workers") if err != nil { logger.Fatal("could not get num-workers uint32: ", err) @@ -359,6 +366,7 @@ func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { Extra: extra, UseCaptiveCore: useCaptiveCore, DatastorePath: datastorePath, + BufferSize: bufferSize, NumWorkers: numWorkers, RetryLimit: retryLimit, RetryWait: retryWait, @@ -679,29 +687,29 @@ type EnvironmentDetails struct { ArchiveURLs []string BinaryPath string CoreConfig string - DatastorePath string Network string + CommonFlagValues CommonFlagValues } // GetPassphrase returns the correct Network Passphrase based on env preference -func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (details EnvironmentDetails) { - if isTest { +func GetEnvironmentDetails(commonFlags CommonFlagValues) (details EnvironmentDetails) { + if commonFlags.IsTest { // testnet passphrase to be used for testing details.NetworkPassphrase = network.TestNetworkPassphrase details.ArchiveURLs = testArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg" - details.DatastorePath = datastorePath details.Network = "testnet" + details.CommonFlagValues = commonFlags return details - } else if isFuture { + } else if commonFlags.IsFuture { // details.NetworkPassphrase = network.FutureNetworkPassphrase details.NetworkPassphrase = "Test SDF Future Network ; October 2022" details.ArchiveURLs = futureArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg" - details.DatastorePath = datastorePath details.Network = "futurenet" + details.CommonFlagValues = commonFlags return details } else { // default: mainnet @@ -709,8 +717,8 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastorePath string) (de details.ArchiveURLs = mainArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core.cfg" - details.DatastorePath = datastorePath details.Network = "pubnet" + details.CommonFlagValues = commonFlags return details } } @@ -796,7 +804,7 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme // Create ledger backend from datastore params := make(map[string]string) - params["destination_bucket_path"] = env.DatastorePath + params["destination_bucket_path"] = env.CommonFlagValues.DatastorePath dataStoreConfig := datastore.DataStoreConfig{ Type: "GCS", Params: params, @@ -820,10 +828,10 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme LedgerBatchConfig: ledgerBatchConfig, CompressionType: "gzip", DataStore: dataStore, - BufferSize: 1000, - NumWorkers: 5, - RetryLimit: 3, - RetryWait: 5, + BufferSize: env.CommonFlagValues.BufferSize, + NumWorkers: env.CommonFlagValues.NumWorkers, + RetryLimit: env.CommonFlagValues.RetryLimit, + RetryWait: time.Duration(env.CommonFlagValues.RetryWait) * time.Second, } backend, err := ledgerbackend.NewBufferedStorageBackend(ctx, BSBackendConfig)