diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go deleted file mode 100644 index 9e4b9876..00000000 --- a/cmd/export_account_signers.go +++ /dev/null @@ -1,78 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var accountSignersCmd = &cobra.Command{ - Use: "export_signers", - Short: "Exports the account signers data.", - Long: `Exports historical account signers data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get account information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - numSigners := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, acc := range accounts { - if utils.AccountSignersChanged(acc) { - transformed, err := transform.TransformSigners(acc, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) - numFailures += 1 - continue - } - - for _, entry := range transformed { - numBytes, err := exportEntry(entry, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) - numFailures += 1 - continue - } - numSigners += 1 - totalNumBytes += numBytes - } - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(numSigners, numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(accountSignersCmd) - utils.AddCommonFlags(accountSignersCmd.Flags()) - utils.AddBucketFlags("signers", accountSignersCmd.Flags()) - utils.AddCloudStorageFlags(accountSignersCmd.Flags()) - accountSignersCmd.MarkFlagRequired("end-ledger") -} diff --git a/cmd/export_account_signers_test.go b/cmd/export_account_signers_test.go deleted file mode 100644 index 33193444..00000000 --- a/cmd/export_account_signers_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package cmd - -import ( -"testing" -) - -func TestExportSigners(t *testing.T) { - tests := []cliTest{ - { - name: "signers: bucket list with exact checkpoint", - args: []string{"export_signers", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - sortForComparison: true, - }, - { - name: "signers: bucket list with end not on checkpoint", - args: []string{"export_signers", "-e", "80210", "-o", gotTestDir(t, "bucket_read_off.txt")}, - golden: "bucket_read_off.golden", - wantErr: nil, - sortForComparison: true, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/signers/") - } -} - diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go deleted file mode 100644 index 85d9bcc1..00000000 --- a/cmd/export_accounts.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var accountsCmd = &cobra.Command{ - Use: "export_accounts", - Short: "Exports the account data.", - Long: `Exports historical account data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get account information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(accounts), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(accountsCmd) - utils.AddCommonFlags(accountsCmd.Flags()) - utils.AddBucketFlags("accounts", accountsCmd.Flags()) - utils.AddCloudStorageFlags(accountsCmd.Flags()) - accountsCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_accounts_test.go b/cmd/export_accounts_test.go deleted file mode 100644 index c1981fbf..00000000 --- a/cmd/export_accounts_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportAccounts(t *testing.T) { - tests := []cliTest{ - { - name: "accounts: bucket list with exact checkpoint", - args: []string{"export_accounts", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "accounts: bucket list with end not on checkpoint", - args: []string{"export_accounts", "-e", "80210", "-o", gotTestDir(t, "bucket_read_off.txt")}, - golden: "bucket_read_off.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/accounts/") - } -} diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go deleted file mode 100644 index d2d86299..00000000 --- a/cmd/export_all_history.go +++ /dev/null @@ -1,207 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/toid" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" -) - -var allHistoryCmd = &cobra.Command{ - Use: "export_all_history", - Short: "Exports all stellar network history.", - Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files. -This is a temporary command used to reduce the amount of requests to history archives -in order to mitigate egress costs for the entity hosting history archives.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - - allHistory, err := input.GetAllHistory(startNum, endNum, limit, env, useCaptiveCore) - if err != nil { - cmdLogger.Fatal("could not read all history: ", err) - } - - cmdLogger.Info("start doing other exports") - getOperations(allHistory.Operations, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env) - getTrades(allHistory.Trades, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt") - getEffects(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env) - getTransactions(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt") - getDiagnosticEvents(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt") - cmdLogger.Info("done doing other exports") - }, -} - -func getOperations(operations []input.OperationTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { - outFileOperations := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range operations { - transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase) - if err != nil { - txIndex := transformInput.Transaction.Index - cmdLogger.LogError(fmt.Errorf("could not transform operation %d in transaction %d in ledger %d: %v", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFileOperations, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFileOperations.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(operations), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getTrades(trades []input.TradeTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, tradeInput := range trades { - trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime) - if err != nil { - parsedID := toid.Parse(tradeInput.OperationHistoryID) - cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err)) - numFailures += 1 - continue - } - - for _, transformed := range trades { - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(err) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(trades), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range transactions { - LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq) - effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase) - if err != nil { - txIndex := transformInput.Transaction.Index - cmdLogger.Errorf("could not transform transaction %d in ledger %d: %v", txIndex, LedgerSeq, err) - numFailures += 1 - continue - } - - for _, transformed := range effects { - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(err) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - for _, transformInput := range transactions { - transformed, err := transform.TransformTransaction(transformInput.Transaction, transformInput.LedgerHistory) - if err != nil { - ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq - cmdLogger.LogError(fmt.Errorf("could not transform transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { - outFile := mustOutFile(path) - numFailures := 0 - for _, transformInput := range transactions { - transformed, err, ok := transform.TransformDiagnosticEvent(transformInput.Transaction, transformInput.LedgerHistory) - if err != nil { - ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq - cmdLogger.LogError(fmt.Errorf("could not transform diagnostic events in transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) - numFailures += 1 - continue - } - - if !ok { - continue - } - for _, diagnosticEvent := range transformed { - _, err := exportEntry(diagnosticEvent, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err)) - numFailures += 1 - continue - } - } - } - - outFile.Close() - - printTransformStats(len(transactions), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) -} - -func init() { - rootCmd.AddCommand(allHistoryCmd) - utils.AddCommonFlags(allHistoryCmd.Flags()) - utils.AddArchiveFlags("", allHistoryCmd.Flags()) - utils.AddCloudStorageFlags(allHistoryCmd.Flags()) - allHistoryCmd.MarkFlagRequired("end-ledger") -} diff --git a/cmd/export_assets.go b/cmd/export_assets.go index 5de3aa4d..2160d6eb 100644 --- a/cmd/export_assets.go +++ b/cmd/export_assets.go @@ -16,21 +16,21 @@ var assetsCmd = &cobra.Command{ Long: `Exports the assets that are created from payment operations over a specified ledger range`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) outFile := mustOutFile(path) var paymentOps []input.AssetTransformInput var err error - if useCaptiveCore { - paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, endNum, limit, env, useCaptiveCore) + if commonArgs.UseCaptiveCore { + paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } else { - paymentOps, err = input.GetPaymentOperations(startNum, endNum, limit, env, useCaptiveCore) + paymentOps, err = input.GetPaymentOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } if err != nil { cmdLogger.Fatal("could not read asset: ", err) @@ -55,7 +55,7 @@ var assetsCmd = &cobra.Command{ } seenIDs[transformed.AssetID] = true - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.Error(err) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go deleted file mode 100644 index f552c913..00000000 --- a/cmd/export_claimable_balances.go +++ /dev/null @@ -1,82 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var claimableBalancesCmd = &cobra.Command{ - Use: "export_claimable_balances", - Short: "Exports the data on claimable balances made from the genesis ledger to a specified endpoint.", - Long: `Exports historical offer data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - balances, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read balances: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, balance := range balances { - transformed, err := transform.TransformClaimableBalance(balance, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export balance %+v: %v", balance, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(balances), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(claimableBalancesCmd) - utils.AddCommonFlags(claimableBalancesCmd.Flags()) - utils.AddBucketFlags("claimable_balances", claimableBalancesCmd.Flags()) - utils.AddCloudStorageFlags(claimableBalancesCmd.Flags()) - claimableBalancesCmd.MarkFlagRequired("end-ledger") - - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_claimable_balances_test.go b/cmd/export_claimable_balances_test.go deleted file mode 100644 index fafcaa64..00000000 --- a/cmd/export_claimable_balances_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportClaimableBalances(t *testing.T) { - tests := []cliTest{ - { - name: "claimable balances", - args: []string{"export_claimable_balances", "-e", "32878607", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/claimable_balances/") - } -} diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go deleted file mode 100644 index 892d96eb..00000000 --- a/cmd/export_config_setting.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var configSettingCmd = &cobra.Command{ - Use: "export_config_setting", - Short: "Exports the config setting information.", - Long: `Exports historical config settings data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - settings, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, setting := range settings { - transformed, err := transform.TransformConfigSetting(setting, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export config setting %+v: %v", setting, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(settings), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(configSettingCmd) - utils.AddCommonFlags(configSettingCmd.Flags()) - utils.AddBucketFlags("config_settings", configSettingCmd.Flags()) - utils.AddCloudStorageFlags(configSettingCmd.Flags()) - configSettingCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_config_setting_test.go b/cmd/export_config_setting_test.go deleted file mode 100644 index a8114429..00000000 --- a/cmd/export_config_setting_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportConfigSetting(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "config setting", - args: []string{"export_config_setting", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/config_setting/") - } -} diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go deleted file mode 100644 index 4ff1d037..00000000 --- a/cmd/export_contract_code.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var codeCmd = &cobra.Command{ - Use: "export_contract_code", - Short: "Exports the contract code information.", - Long: `Exports historical contract code data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - codes, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, code := range codes { - transformed, err := transform.TransformContractCode(code, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export contract code %+v: %v", code, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(codes), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(codeCmd) - utils.AddCommonFlags(codeCmd.Flags()) - utils.AddBucketFlags("contract_code", codeCmd.Flags()) - utils.AddCloudStorageFlags(codeCmd.Flags()) - codeCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_contract_code_test.go b/cmd/export_contract_code_test.go deleted file mode 100644 index 74f51285..00000000 --- a/cmd/export_contract_code_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportContractCode(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "contract code", - args: []string{"export_contract_code", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/contract_code/") - } -} diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go deleted file mode 100644 index 46a427fb..00000000 --- a/cmd/export_contract_data.go +++ /dev/null @@ -1,86 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var dataCmd = &cobra.Command{ - Use: "export_contract_data", - Short: "Exports the contract data information made from the genesis ledger to a specified endpoint.", - Long: `Exports historical contract data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - datas, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, data := range datas { - TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) - numFailures += 1 - continue - } - - if !ok { - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export contract data %+v: %v", data, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(datas), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(dataCmd) - utils.AddCommonFlags(dataCmd.Flags()) - utils.AddBucketFlags("contract_data", dataCmd.Flags()) - utils.AddCloudStorageFlags(dataCmd.Flags()) - dataCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_contract_data_test.go b/cmd/export_contract_data_test.go deleted file mode 100644 index 2e511e8e..00000000 --- a/cmd/export_contract_data_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportContractData(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "contract data", - args: []string{"export_contract_data", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/contract_data/") - } -} diff --git a/cmd/export_diagnostic_events.go b/cmd/export_diagnostic_events.go index 1a888ff9..66ed6438 100644 --- a/cmd/export_diagnostic_events.go +++ b/cmd/export_diagnostic_events.go @@ -16,13 +16,13 @@ var diagnosticEventsCmd = &cobra.Command{ Long: `Exports the diagnostic events over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read transactions: ", err) } @@ -42,7 +42,7 @@ var diagnosticEventsCmd = &cobra.Command{ continue } for _, diagnosticEvent := range transformed { - _, err := exportEntry(diagnosticEvent, outFile, extra) + _, err := exportEntry(diagnosticEvent, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err)) numFailures += 1 diff --git a/cmd/export_effects.go b/cmd/export_effects.go index 3ed1df75..b93aaf1c 100644 --- a/cmd/export_effects.go +++ b/cmd/export_effects.go @@ -14,15 +14,15 @@ var effectsCmd = &cobra.Command{ Long: "Exports the effects data over a specified range to an output file.", Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { - cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, endNum, limit, err) + cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, commonArgs.EndNum, limit, err) } outFile := mustOutFile(path) @@ -39,7 +39,7 @@ var effectsCmd = &cobra.Command{ } for _, transformed := range effects { - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(err) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 7b727680..b227defe 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -28,9 +28,9 @@ confirmed by the Stellar network. If no data type flags are set, then by default all of them are exported. If any are set, it is assumed that the others should not be exported.`, Run: func(cmd *cobra.Command, args []string) { - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport + env := utils.GetEnvironmentDetails(commonArgs) _, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) @@ -62,28 +62,28 @@ be exported.`, } } - if configPath == "" && endNum == 0 { + if configPath == "" && commonArgs.EndNum == 0 { cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)") } ctx := context.Background() - backend, err := utils.CreateLedgerBackend(ctx, useCaptiveCore, env) + backend, err := utils.CreateLedgerBackend(ctx, commonArgs.UseCaptiveCore, env) if err != nil { cmdLogger.Fatal("error creating a cloud storage backend: ", err) } - err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, endNum)) + err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, commonArgs.EndNum)) if err != nil { cmdLogger.Fatal("error preparing ledger range for cloud storage backend: ", err) } - if endNum == 0 { - endNum = math.MaxInt32 + if commonArgs.EndNum == 0 { + commonArgs.EndNum = math.MaxInt32 } changeChan := make(chan input.ChangeBatch) closeChan := make(chan int) - go input.StreamChanges(&backend, startNum, endNum, batchSize, changeChan, closeChan, env, cmdLogger) + go input.StreamChanges(&backend, startNum, commonArgs.EndNum, batchSize, changeChan, closeChan, env, cmdLogger) for { select { @@ -252,7 +252,16 @@ be exported.`, } } - err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, cloudCredentials, cloudStorageBucket, cloudProvider, extra) + err := exportTransformedData( + batch.BatchStart, + batch.BatchEnd, + outputFolder, + transformedOutputs, + cloudCredentials, + cloudStorageBucket, + cloudProvider, + commonArgs.Extra, + ) if err != nil { cmdLogger.LogError(err) continue diff --git a/cmd/export_ledger_transaction.go b/cmd/export_ledger_transaction.go index c08a2017..7d07b9ec 100644 --- a/cmd/export_ledger_transaction.go +++ b/cmd/export_ledger_transaction.go @@ -16,13 +16,13 @@ var ledgerTransactionCmd = &cobra.Command{ Long: `Exports the ledger_transaction transaction data over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) - ledgerTransaction, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + ledgerTransaction, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read ledger_transaction: ", err) } @@ -39,7 +39,7 @@ var ledgerTransactionCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) numFailures += 1 diff --git a/cmd/export_ledgers.go b/cmd/export_ledgers.go index f5000289..e1dce45b 100644 --- a/cmd/export_ledgers.go +++ b/cmd/export_ledgers.go @@ -16,19 +16,19 @@ var ledgersCmd = &cobra.Command{ Long: `Exports ledger data within the specified range to an output file. Encodes ledgers as JSON objects and exports them to the output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) var ledgers []utils.HistoryArchiveLedgerAndLCM var err error - if useCaptiveCore { - ledgers, err = input.GetLedgersHistoryArchive(startNum, endNum, limit, env, useCaptiveCore) + if commonArgs.UseCaptiveCore { + ledgers, err = input.GetLedgersHistoryArchive(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } else { - ledgers, err = input.GetLedgers(startNum, endNum, limit, env, useCaptiveCore) + ledgers, err = input.GetLedgers(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) } if err != nil { cmdLogger.Fatal("could not read ledgers: ", err) @@ -46,7 +46,7 @@ var ledgersCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export ledger %d: %s", startNum+uint32(i), err)) numFailures += 1 diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go deleted file mode 100644 index f05cab03..00000000 --- a/cmd/export_liquidity_pools.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var poolsCmd = &cobra.Command{ - Use: "export_pools", - Short: "Exports the liquidity pools data.", - Long: `Exports historical liquidity pools data from the genesis ledger to the provided end-ledger to an output file. -The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it -should be used in an initial data dump. In order to get liqudity pools information within a specified ledger range, see -the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - pools, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeLiquidityPool, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read accounts: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, pool := range pools { - transformed, err := transform.TransformPool(pool, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export pool %+v: %v", pool, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(pools), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(poolsCmd) - utils.AddCommonFlags(poolsCmd.Flags()) - utils.AddBucketFlags("pools", poolsCmd.Flags()) - utils.AddCloudStorageFlags(poolsCmd.Flags()) - poolsCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_offers.go b/cmd/export_offers.go deleted file mode 100644 index 8f0ea5c6..00000000 --- a/cmd/export_offers.go +++ /dev/null @@ -1,82 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// offersCmd represents the offers command -var offersCmd = &cobra.Command{ - Use: "export_offers", - Short: "Exports the data on offers made from the genesis ledger to a specified endpoint.", - Long: `Exports historical offer data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - offers, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read offers: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, offer := range offers { - transformed, err := transform.TransformOffer(offer, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export offer %+v: %v", offer, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(offers), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(offersCmd) - utils.AddCommonFlags(offersCmd.Flags()) - utils.AddBucketFlags("offers", offersCmd.Flags()) - utils.AddCloudStorageFlags(offersCmd.Flags()) - offersCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_offers_test.go b/cmd/export_offers_test.go deleted file mode 100644 index a2ab9c40..00000000 --- a/cmd/export_offers_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportOffers(t *testing.T) { - tests := []cliTest{ - { - name: "offers: bucket list with exact checkpoint", - args: []string{"export_offers", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.txt")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "offers: bucket list with end not on checkpoint", - args: []string{"export_offers", "-e", "80210", "-o", gotTestDir(t, "bucket_read_offset.txt")}, - golden: "bucket_read_offset.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/offers/") - } -} diff --git a/cmd/export_operations.go b/cmd/export_operations.go index 9b84265a..cbfb8d84 100644 --- a/cmd/export_operations.go +++ b/cmd/export_operations.go @@ -16,13 +16,13 @@ var operationsCmd = &cobra.Command{ Long: `Exports the operations data over a specified range. Each operation is an individual command that mutates the Stellar ledger.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) - operations, err := input.GetOperations(startNum, endNum, limit, env, useCaptiveCore) + operations, err := input.GetOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read operations: ", err) } @@ -39,7 +39,7 @@ var operationsCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err)) numFailures += 1 diff --git a/cmd/export_orderbooks.go b/cmd/export_orderbooks.go deleted file mode 100644 index 91e924ab..00000000 --- a/cmd/export_orderbooks.go +++ /dev/null @@ -1,185 +0,0 @@ -package cmd - -import ( - "bytes" - "encoding/json" - "math" - "os" - "path/filepath" - - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// exportOrderbooksCmd represents the exportOrderbooks command -var exportOrderbooksCmd = &cobra.Command{ - Use: "export_orderbooks", - Short: "This command exports the historical orderbooks", - Long: `This command instantiates a stellar-core instance and uses it to export normalized orderbooks. - The information is exported in batches determined by the batch-size flag. The normalized data is exported in multiple - different files within the exported data folder. These files are dimAccounts.txt, dimOffers.txt, dimMarkets.txt, and factEvents.txt. - These files contain normalized data that helps save storage space. - - If the end-ledger is omitted, then the stellar-core node will continue running and exporting information as new ledgers are - confirmed by the Stellar network. In this unbounded case, a stellar-core config path is required to utilize the Captive Core toml.`, - Run: func(cmd *cobra.Command, args []string) { - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - - execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - if batchSize <= 0 { - cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize) - } - - if configPath == "" && endNum == 0 { - cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)") - } - - var err error - execPath, err = filepath.Abs(execPath) - if err != nil { - cmdLogger.Fatal("could not get absolute filepath for stellar-core executable: ", err) - } - - configPath, err = filepath.Abs(configPath) - if err != nil { - cmdLogger.Fatal("could not get absolute filepath for the config file: ", err) - } - - checkpointSeq := utils.GetMostRecentCheckpoint(startNum) - core, err := input.PrepareCaptiveCore(execPath, configPath, checkpointSeq, endNum, env) - if err != nil { - cmdLogger.Fatal("error creating a prepared captive core instance: ", err) - } - - orderbook, err := input.GetEntriesFromGenesis(checkpointSeq, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read initial orderbook: ", err) - } - - orderbookChannel := make(chan input.OrderbookBatch) - - go input.StreamOrderbooks(core, startNum, endNum, batchSize, orderbookChannel, orderbook, env, cmdLogger) - - // If the end sequence number is defined, we work in a closed range and export a finite number of batches - if endNum != 0 { - batchCount := uint32(math.Ceil(float64(endNum-startNum+1) / float64(batchSize))) - for i := uint32(0); i < batchCount; i++ { - batchStart := startNum + i*batchSize - // Subtract 1 from the end batch number because batches do not include the last batch in the range - batchEnd := batchStart + batchSize - 1 - if batchEnd > endNum { - batchEnd = endNum - } - - parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) - } - } else { - // otherwise, we export in an unbounded manner where batches are constantly exported - var batchNum uint32 = 0 - for { - batchStart := startNum + batchNum*batchSize - batchEnd := batchStart + batchSize - 1 - parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) - batchNum++ - } - } - }, -} - -// writeSlice writes the slice either to a file. -func writeSlice(file *os.File, slice [][]byte, extra map[string]string) error { - - for _, data := range slice { - bytesToWrite := data - if len(extra) > 0 { - i := map[string]interface{}{} - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.UseNumber() - err := decoder.Decode(&i) - if err != nil { - return err - } - for k, v := range extra { - i[k] = v - } - bytesToWrite, err = json.Marshal(i) - if err != nil { - return err - } - } - file.WriteString(string(bytesToWrite) + "\n") - } - - file.Close() - return nil -} - -func exportOrderbook( - start, end uint32, - folderPath string, - parser *input.OrderbookParser, - cloudCredentials, cloudStorageBucket, cloudProvider string, - extra map[string]string) { - marketsFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimMarkets")) - offersFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimOffers")) - accountsFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimAccounts")) - eventsFilePath := filepath.Join(folderPath, exportFilename(start, end, "factEvents")) - - marketsFile := mustOutFile(marketsFilePath) - offersFile := mustOutFile(offersFilePath) - accountsFile := mustOutFile(accountsFilePath) - eventsFile := mustOutFile(eventsFilePath) - - err := writeSlice(marketsFile, parser.Markets, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(offersFile, parser.Offers, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(accountsFile, parser.Accounts, extra) - if err != nil { - cmdLogger.LogError(err) - } - err = writeSlice(eventsFile, parser.Events, extra) - if err != nil { - cmdLogger.LogError(err) - } - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, marketsFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, offersFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, accountsFilePath) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, eventsFilePath) -} - -func init() { - rootCmd.AddCommand(exportOrderbooksCmd) - utils.AddCommonFlags(exportOrderbooksCmd.Flags()) - utils.AddCoreFlags(exportOrderbooksCmd.Flags(), "orderbooks_output/") - utils.AddCloudStorageFlags(exportOrderbooksCmd.Flags()) - - exportOrderbooksCmd.MarkFlagRequired("start-ledger") - /* - Current flags: - start-ledger: the ledger sequence number for the beginning of the export period - end-ledger: the ledger sequence number for the end of the export range - - output-folder: folder that will contain the output files - limit: maximum number of changes to export in a given batch; if negative then everything gets exported - batch-size: size of the export batches - - core-executable: path to stellar-core executable - core-config: path to stellar-core config file - */ -} diff --git a/cmd/export_orderbooks_test.go b/cmd/export_orderbooks_test.go deleted file mode 100644 index bf1c043b..00000000 --- a/cmd/export_orderbooks_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package cmd - -import ( - "fmt" - "testing" -) - -func TestExportOrderbooks(t *testing.T) { - tests := []cliTest{ - { - name: "unbounded range with no config", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-s", "100000"}, - golden: "", - wantErr: fmt.Errorf("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)"), - }, - { - name: "0 batch size", - args: []string{"export_orderbooks", "-b", "0", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "100000", "-e", "164000"}, - golden: "", - wantErr: fmt.Errorf("batch-size (0) must be greater than 0"), - }, - { - name: "orderbook from single ledger", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "5000000", "-e", "5000000", "-o", gotTestDir(t, "single/")}, - golden: "single_ledger.golden", - sortForComparison: true, - wantErr: nil, - }, - { - name: "orderbooks from large range", - args: []string{"export_orderbooks", "-x", coreExecutablePath, "-c", coreConfigPath, "-s", "6000000", "-e", "6001000", "-o", gotTestDir(t, "range/")}, - golden: "large_range_orderbooks.golden", - sortForComparison: true, - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/orderbooks/") - } -} diff --git a/cmd/export_trades.go b/cmd/export_trades.go index 551263e7..748cdb66 100644 --- a/cmd/export_trades.go +++ b/cmd/export_trades.go @@ -19,13 +19,13 @@ var tradesCmd = &cobra.Command{ Long: `Exports trade data within the specified range to an output file`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - trades, err := input.GetTrades(startNum, endNum, limit, env, useCaptiveCore) + trades, err := input.GetTrades(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read trades ", err) } @@ -43,7 +43,7 @@ var tradesCmd = &cobra.Command{ } for _, transformed := range trades { - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(err) numFailures += 1 diff --git a/cmd/export_transactions.go b/cmd/export_transactions.go index cd37e247..35f82bd2 100644 --- a/cmd/export_transactions.go +++ b/cmd/export_transactions.go @@ -16,13 +16,13 @@ var transactionsCmd = &cobra.Command{ Long: `Exports the transaction data over a specified range to an output file.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport + commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = commonArgs.StrictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) + env := utils.GetEnvironmentDetails(commonArgs) - transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore) + transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore) if err != nil { cmdLogger.Fatal("could not read transactions: ", err) } @@ -39,7 +39,7 @@ var transactionsCmd = &cobra.Command{ continue } - numBytes, err := exportEntry(transformed, outFile, extra) + numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra) if err != nil { cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go deleted file mode 100644 index 8b1315b5..00000000 --- a/cmd/export_trustlines.go +++ /dev/null @@ -1,85 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -// trustlinesCmd represents the trustlines command -var trustlinesCmd = &cobra.Command{ - Use: "export_trustlines", - Short: "Exports the trustline data over a specified range.", - Long: `Exports historical trustline data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get trustline information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - trustlines, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTrustline, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("could not read trustlines: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, trust := range trustlines { - transformed, err := transform.TransformTrustline(trust, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export trustline %+v: %v", trust, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - - outFile.Close() - - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(trustlines), numFailures) - - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - }, -} - -func init() { - rootCmd.AddCommand(trustlinesCmd) - utils.AddCommonFlags(trustlinesCmd.Flags()) - utils.AddBucketFlags("trustlines", trustlinesCmd.Flags()) - utils.AddCloudStorageFlags(trustlinesCmd.Flags()) - trustlinesCmd.MarkFlagRequired("end-ledger") - - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_trustlines_test.go b/cmd/export_trustlines_test.go deleted file mode 100644 index 61a69281..00000000 --- a/cmd/export_trustlines_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportTrustlines(t *testing.T) { - tests := []cliTest{ - { - name: "trustlines: bucket list with exact checkpoint", - args: []string{"export_trustlines", "-e", "78975", "-o", gotTestDir(t, "bucket_read_exact.golden")}, - golden: "bucket_read_exact.golden", - wantErr: nil, - }, - { - name: "trustlines: bucket list with end not on checkpoint", - args: []string{"export_trustlines", "-e", "139672", "-o", gotTestDir(t, "bucket_read_off.golden")}, - golden: "bucket_read_off.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/trustlines/") - } -} diff --git a/cmd/export_ttl.go b/cmd/export_ttl.go deleted file mode 100644 index ce689fda..00000000 --- a/cmd/export_ttl.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/stellar/stellar-etl/internal/input" - "github.com/stellar/stellar-etl/internal/transform" - "github.com/stellar/stellar-etl/internal/utils" - - "github.com/stellar/go/xdr" -) - -var ttlCmd = &cobra.Command{ - Use: "export_ttl", - Short: "Exports the ttl information.", - Long: `Exports historical ttl data from the genesis ledger to the provided end-ledger to an output file. - The command reads from the bucket list, which includes the full history of the Stellar ledger. As a result, it - should be used in an initial data dump. In order to get offer information within a specified ledger range, see - the export_ledger_entry_changes command.`, - Run: func(cmd *cobra.Command, args []string) { - cmdLogger.SetLevel(logrus.InfoLevel) - endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger) - cmdLogger.StrictExport = strictExport - env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl) - path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) - - ttls, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTtl, env.ArchiveURLs) - if err != nil { - cmdLogger.Fatal("Error getting ledger entries: ", err) - } - - outFile := mustOutFile(path) - numFailures := 0 - totalNumBytes := 0 - var header xdr.LedgerHeaderHistoryEntry - for _, ttl := range ttls { - transformed, err := transform.TransformTtl(ttl, header) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not transform ttl %+v: %v", ttl, err)) - numFailures += 1 - continue - } - - numBytes, err := exportEntry(transformed, outFile, extra) - if err != nil { - cmdLogger.LogError(fmt.Errorf("could not export ttl %+v: %v", ttl, err)) - numFailures += 1 - continue - } - totalNumBytes += numBytes - } - outFile.Close() - cmdLogger.Info("Number of bytes written: ", totalNumBytes) - - printTransformStats(len(ttls), numFailures) - maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) - - }, -} - -func init() { - rootCmd.AddCommand(ttlCmd) - utils.AddCommonFlags(ttlCmd.Flags()) - utils.AddBucketFlags("ttl", ttlCmd.Flags()) - utils.AddCloudStorageFlags(ttlCmd.Flags()) - ttlCmd.MarkFlagRequired("end-ledger") - /* - Current flags: - end-ledger: the ledger sequence number for the end of the export range (required) - output-file: filename of the output file - stdout: if set, output is printed to stdout - - TODO: implement extra flags if possible - serialize-method: the method for serialization of the output data (JSON, XDR, etc) - end time as a replacement for end sequence numbers - */ -} diff --git a/cmd/export_ttl_test.go b/cmd/export_ttl_test.go deleted file mode 100644 index 3dc5d762..00000000 --- a/cmd/export_ttl_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestExportttl(t *testing.T) { - t.Skip("Skipping due to unstable data in Futurenet") - // TODO: find ledger with data and create testdata - tests := []cliTest{ - { - name: "ttl", - args: []string{"export_ttl", "-e", "78975", "-o", gotTestDir(t, "bucket_read.txt")}, - golden: "bucket_read.golden", - wantErr: nil, - }, - } - - for _, test := range tests { - runCLITest(t, test, "testdata/ttl/") - } -} diff --git a/go.mod b/go.mod index 56f483d1..fc76db68 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.17.0 - github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba + github.com/stellar/go v0.0.0-20240507142223-735600adb2d4 github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 92a52cf7..568dfe39 100644 --- a/go.sum +++ b/go.sum @@ -296,8 +296,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= -github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba h1:2UPb78V6mL07B0nJ6/89nJ2cimVD3xPMCFxawwRvpJ0= -github.com/stellar/go v0.0.0-20240423031611-e1c5206ad1ba/go.mod h1:ckzsX0B0qfTMVZQJtPELJLs7cJ6xXMYHPVLyIsReGsU= +github.com/stellar/go v0.0.0-20240507142223-735600adb2d4 h1:4dmEOaVcttNCZTIXE8y5VwNvduqVwE+D7oFLAu2nn/k= +github.com/stellar/go v0.0.0-20240507142223-735600adb2d4/go.mod h1:kxiz7GJ94uVORlLZ/q7BrEQZAvBgkNXly7I19axD3EA= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/input/bucketlist_entries.go b/internal/input/bucketlist_entries.go deleted file mode 100644 index 517641d5..00000000 --- a/internal/input/bucketlist_entries.go +++ /dev/null @@ -1,63 +0,0 @@ -package input - -import ( - "context" - "io" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" - - "github.com/stellar/stellar-etl/internal/utils" -) - -// GetEntriesFromGenesis returns a slice of ledger entries of the specified type for the ledgers starting from the genesis ledger and ending at end (inclusive) -func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType, archiveURLs []string) ([]ingest.Change, error) { - archive, err := utils.CreateHistoryArchiveClient(archiveURLs) - if err != nil { - return []ingest.Change{}, err - } - - latestNum, err := utils.GetLatestLedgerSequence(archiveURLs) - if err != nil { - return []ingest.Change{}, err - } - - if err = utils.ValidateLedgerRange(2, end, latestNum); err != nil { - return []ingest.Change{}, err - } - - checkpointSeq, err := utils.GetCheckpointNum(end, latestNum) - if err != nil { - return []ingest.Change{}, err - } - - return readBucketList(archive, checkpointSeq, entryType) -} - -// readBucketList reads the bucket list for the specified checkpoint sequence number and returns a slice of ledger entries of the specified type -func readBucketList(archive historyarchive.ArchiveInterface, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) { - changeReader, err := ingest.NewCheckpointChangeReader(context.Background(), archive, checkpointSeq) - defer changeReader.Close() - if err != nil { - return []ingest.Change{}, err - } - - entrySlice := []ingest.Change{} - for { - change, err := changeReader.Read() - if err == io.EOF { - break - } - - if err != nil { - return []ingest.Change{}, err - } - - if change.Type == entryType { - entrySlice = append(entrySlice, change) - } - } - - return entrySlice, nil -} diff --git a/internal/input/ledger_range.go b/internal/input/ledger_range.go index f4ec07ee..2f778095 100644 --- a/internal/input/ledger_range.go +++ b/internal/input/ledger_range.go @@ -32,7 +32,11 @@ const avgCloseTime = time.Second * 5 // average time to close a stellar ledger func GetLedgerRange(startTime, endTime time.Time, isTest bool, isFuture bool) (int64, int64, error) { startTime = startTime.UTC() endTime = endTime.UTC() - env := utils.GetEnvironmentDetails(isTest, isFuture, "") + commonFlagValues := utils.CommonFlagValues{ + IsTest: isTest, + IsFuture: isFuture, + } + env := utils.GetEnvironmentDetails(commonFlagValues) if startTime.After(endTime) { return 0, 0, fmt.Errorf("start time must be less than or equal to the end time") diff --git a/internal/input/operations.go b/internal/input/operations.go index 5290691c..0fca21ed 100644 --- a/internal/input/operations.go +++ b/internal/input/operations.go @@ -71,6 +71,7 @@ func GetOperations(start, end uint32, limit int64, env utils.EnvironmentDetails, } txReader.Close() + if int64(len(opSlice)) >= limit && limit >= 0 { break } diff --git a/internal/utils/main.go b/internal/utils/main.go index c1259a50..e54fe920 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math/big" - "net/url" "time" "github.com/spf13/pflag" @@ -17,6 +16,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" "github.com/stellar/go/network" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/storage" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" @@ -235,7 +235,11 @@ func AddCommonFlags(flags *pflag.FlagSet) { flags.Bool("futurenet", false, "If set, will connect to Futurenet instead of Mainnet.") flags.StringToStringP("extra-fields", "u", map[string]string{}, "Additional fields to append to output jsons. Used for appending metadata") flags.Bool("captive-core", false, "If set, run captive core to retrieve data. Otherwise use TxMeta file datastore.") - flags.String("datastore-url", "", "Datastore url to read txmeta files from.") + flags.String("datastore-path", "ledger-exporter/ledgers", "Datastore bucket path to read txmeta files from.") + flags.Uint32("buffer-size", 5, "Buffer size sets the max limit for the number of txmeta files that can be held in memory.") + flags.Uint32("num-workers", 5, "Number of workers to spawn that read txmeta files from the datastore.") + flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.") + flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.") } // AddArchiveFlags adds the history archive specific flags: start-ledger, output, and limit @@ -282,56 +286,91 @@ func AddExportTypeFlags(flags *pflag.FlagSet) { flags.BoolP("export-ttl", "", false, "set in order to export ttl changes") } +type CommonFlagValues struct { + EndNum uint32 + StrictExport bool + IsTest bool + IsFuture bool + Extra map[string]string + UseCaptiveCore bool + DatastorePath string + BufferSize uint32 + NumWorkers uint32 + RetryLimit uint32 + RetryWait uint32 +} + // MustCommonFlags gets the values of the the flags common to all commands: end-ledger and strict-export. // If any do not exist, it stops the program fatally using the logger -func MustCommonFlags( - flags *pflag.FlagSet, - logger *EtlLogger, -) ( - endNum uint32, - strictExport, - isTest bool, - isFuture bool, - extra map[string]string, - useCaptiveCore bool, - datastoreUrl string, -) { +func MustCommonFlags(flags *pflag.FlagSet, logger *EtlLogger) CommonFlagValues { endNum, err := flags.GetUint32("end-ledger") if err != nil { logger.Fatal("could not get end sequence number: ", err) } - strictExport, err = flags.GetBool("strict-export") + strictExport, err := flags.GetBool("strict-export") if err != nil { logger.Fatal("could not get strict-export boolean: ", err) } - isTest, err = flags.GetBool("testnet") + isTest, err := flags.GetBool("testnet") if err != nil { logger.Fatal("could not get testnet boolean: ", err) } - isFuture, err = flags.GetBool("futurenet") + isFuture, err := flags.GetBool("futurenet") if err != nil { logger.Fatal("could not get futurenet boolean: ", err) } - extra, err = flags.GetStringToString("extra-fields") + extra, err := flags.GetStringToString("extra-fields") if err != nil { logger.Fatal("could not get extra fields string: ", err) } - useCaptiveCore, err = flags.GetBool("captive-core") + useCaptiveCore, err := flags.GetBool("captive-core") if err != nil { logger.Fatal("could not get captive-core flag: ", err) } - datastoreUrl, err = flags.GetString("datastore-url") + datastorePath, err := flags.GetString("datastore-path") if err != nil { - logger.Fatal("could not get datastore-url string: ", err) + logger.Fatal("could not get datastore-bucket-path string: ", err) } - return + bufferSize, err := flags.GetUint32("buffer-size") + if err != nil { + logger.Fatal("could not get buffer-size uint32: ", err) + } + + numWorkers, err := flags.GetUint32("num-workers") + if err != nil { + logger.Fatal("could not get num-workers uint32: ", err) + } + + retryLimit, err := flags.GetUint32("retry-limit") + if err != nil { + logger.Fatal("could not get retry-limit uint32: ", err) + } + + retryWait, err := flags.GetUint32("retry-wait") + if err != nil { + logger.Fatal("could not get retry-wait uint32: ", err) + } + + return CommonFlagValues{ + EndNum: endNum, + StrictExport: strictExport, + IsTest: isTest, + IsFuture: isFuture, + Extra: extra, + UseCaptiveCore: useCaptiveCore, + DatastorePath: datastorePath, + BufferSize: bufferSize, + NumWorkers: numWorkers, + RetryLimit: retryLimit, + RetryWait: retryWait, + } } // MustArchiveFlags gets the values of the the history archive specific flags: start-ledger, output, and limit @@ -430,7 +469,7 @@ func MustExportTypeFlags(flags *pflag.FlagSet, logger *EtlLogger) map[string]boo "export-ttl": false, } - for export_name, _ := range exports { + for export_name := range exports { exports[export_name], err = flags.GetBool(export_name) if err != nil { logger.Fatalf("could not get %s flag: %v", export_name, err) @@ -648,26 +687,29 @@ type EnvironmentDetails struct { ArchiveURLs []string BinaryPath string CoreConfig string - StorageURL string + Network string + CommonFlagValues CommonFlagValues } // GetPassphrase returns the correct Network Passphrase based on env preference -func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (details EnvironmentDetails) { - if isTest { +func GetEnvironmentDetails(commonFlags CommonFlagValues) (details EnvironmentDetails) { + if commonFlags.IsTest { // testnet passphrase to be used for testing details.NetworkPassphrase = network.TestNetworkPassphrase details.ArchiveURLs = testArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_testnet.cfg" - details.StorageURL = datastoreUrl + details.Network = "testnet" + details.CommonFlagValues = commonFlags return details - } else if isFuture { + } else if commonFlags.IsFuture { // details.NetworkPassphrase = network.FutureNetworkPassphrase details.NetworkPassphrase = "Test SDF Future Network ; October 2022" details.ArchiveURLs = futureArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core_futurenet.cfg" - details.StorageURL = datastoreUrl + details.Network = "futurenet" + details.CommonFlagValues = commonFlags return details } else { // default: mainnet @@ -675,7 +717,8 @@ func GetEnvironmentDetails(isTest bool, isFuture bool, datastoreUrl string) (det details.ArchiveURLs = mainArchiveURLs details.BinaryPath = "/usr/bin/stellar-core" details.CoreConfig = "/etl/docker/stellar-core.cfg" - details.StorageURL = datastoreUrl + details.Network = "pubnet" + details.CommonFlagValues = commonFlags return details } } @@ -714,6 +757,9 @@ func (e EnvironmentDetails) GetUnboundedLedgerCloseMeta(end uint32) (xdr.LedgerC ctx := context.Background() backend, err := e.CreateCaptiveCoreBackend() + if err != nil { + return xdr.LedgerCloseMeta{}, err + } ledgerRange := ledgerbackend.UnboundedRange(end) @@ -757,28 +803,42 @@ func CreateLedgerBackend(ctx context.Context, useCaptiveCore bool, env Environme } // Create ledger backend from datastore - fileConfig := ledgerbackend.LCMFileConfig{ - StorageURL: env.StorageURL, - FileSuffix: ".xdr.gz", - LedgersPerFile: 1, - FilesPerPartition: 64000, + params := make(map[string]string) + params["destination_bucket_path"] = env.CommonFlagValues.DatastorePath + dataStoreConfig := datastore.DataStoreConfig{ + Type: "GCS", + Params: params, } - parsed, err := url.Parse(env.StorageURL) + dataStore, err := datastore.NewDataStore(ctx, dataStoreConfig, env.Network) if err != nil { return nil, err } - // Using the GCS datastore backend - if parsed.Scheme == "gcs" { - backend, err := ledgerbackend.NewGCSBackend(ctx, fileConfig) - if err != nil { - return nil, err - } - return backend, nil + // TODO: In the future these will come from a config file written by ledgerexporter + // Hard code ledger batch values for now + ledgerBatchConfig := datastore.LedgerBatchConfig{ + LedgersPerFile: 1, + FilesPerPartition: 64000, + FileSuffix: ".xdr.gz", + } + + // TODO: In the future CompressionType should be removed as it won't be configurable + BSBackendConfig := ledgerbackend.BufferedStorageBackendConfig{ + LedgerBatchConfig: ledgerBatchConfig, + CompressionType: "gzip", + DataStore: dataStore, + BufferSize: env.CommonFlagValues.BufferSize, + NumWorkers: env.CommonFlagValues.NumWorkers, + RetryLimit: env.CommonFlagValues.RetryLimit, + RetryWait: time.Duration(env.CommonFlagValues.RetryWait) * time.Second, } - return nil, errors.New("no valid ledgerbackend selected") + backend, err := ledgerbackend.NewBufferedStorageBackend(ctx, BSBackendConfig) + if err != nil { + return nil, err + } + return backend, nil } func LedgerKeyToLedgerKeyHash(ledgerKey xdr.LedgerKey) string {