diff --git a/cmd/command_utils.go b/cmd/command_utils.go new file mode 100644 index 00000000..009c8cc0 --- /dev/null +++ b/cmd/command_utils.go @@ -0,0 +1,137 @@ +package cmd + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "path/filepath" +) + +type CloudStorage interface { + UploadTo(credentialsPath, bucket, path string) error +} + +func createOutputFile(filepath string) error { + var _, err = os.Stat(filepath) + if os.IsNotExist(err) { + var _, err = os.Create(filepath) + if err != nil { + return err + } + } + + return nil +} + +func mustOutFile(path string) *os.File { + absolutePath, err := filepath.Abs(path) + if err != nil { + cmdLogger.Fatal("could not get absolute filepath: ", err) + } + + err = os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + cmdLogger.Fatalf("could not create directory %s: %s", path, err) + } + + err = createOutputFile(absolutePath) + if err != nil { + cmdLogger.Fatal("could not create output file: ", err) + } + + outFile, err := os.OpenFile(absolutePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + cmdLogger.Fatal("error in opening output file: ", err) + } + + return outFile +} + +func exportEntry(entry interface{}, outFile *os.File, extra map[string]string) (int, error) { + // This extra marshalling/unmarshalling is silly, but it's required to properly handle the null.[String|Int*] types, and add the extra fields. + m, err := json.Marshal(entry) + if err != nil { + cmdLogger.Errorf("Error marshalling %+v: %v ", entry, err) + } + i := map[string]interface{}{} + // Use a decoder here so that 'UseNumber' ensures large ints are properly decoded + decoder := json.NewDecoder(bytes.NewReader(m)) + decoder.UseNumber() + err = decoder.Decode(&i) + if err != nil { + cmdLogger.Errorf("Error unmarshalling %+v: %v ", i, err) + } + for k, v := range extra { + i[k] = v + } + + marshalled, err := json.Marshal(i) + if err != nil { + return 0, fmt.Errorf("could not json encode %+v: %s", entry, err) + } + cmdLogger.Debugf("Writing entry to %s", outFile.Name()) + numBytes, err := outFile.Write(marshalled) + if err != nil { + cmdLogger.Errorf("Error writing %+v to file: %s", entry, err) + } + newLineNumBytes, err := outFile.WriteString("\n") + if err != nil { + cmdLogger.Errorf("Error writing new line to file %s: %s", outFile.Name(), err) + } + return numBytes + newLineNumBytes, nil +} + +// Prints the number of attempted, failed, and successful transformations as a JSON object +func printTransformStats(attempts, failures int) { + resultsMap := map[string]int{ + "attempted_transforms": attempts, + "failed_transforms": failures, + "successful_transforms": attempts - failures, + } + + results, err := json.Marshal(resultsMap) + if err != nil { + cmdLogger.Fatal("Could not marshal results: ", err) + } + + cmdLogger.Info(string(results)) +} + +func exportFilename(start, end uint32, dataType string) string { + return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType) +} + +func deleteLocalFiles(path string) { + err := os.RemoveAll(path) + if err != nil { + cmdLogger.Errorf("Unable to remove %s: %s", path, err) + return + } + cmdLogger.Infof("Successfully deleted %s", path) +} + +func maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path string) { + if cloudProvider == "" { + cmdLogger.Info("No cloud provider specified for upload. Skipping upload.") + return + } + + if len(cloudStorageBucket) == 0 { + cmdLogger.Error("No bucket specified") + return + } + + var cloudStorage CloudStorage + switch cloudProvider { + case "gcp": + cloudStorage = newGCS(cloudCredentials, cloudStorageBucket) + err := cloudStorage.UploadTo(cloudCredentials, cloudStorageBucket, path) + if err != nil { + cmdLogger.Errorf("Unable to upload output to GCS: %s", err) + return + } + default: + cmdLogger.Error("Unknown cloud provider") + } +} diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 4e53a26d..73d01517 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -26,7 +26,7 @@ the export_ledger_entry_changes command.`, cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) if err != nil { @@ -65,7 +65,7 @@ the export_ledger_entry_changes command.`, printTransformStats(numSigners, numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -73,6 +73,6 @@ func init() { rootCmd.AddCommand(accountSignersCmd) utils.AddCommonFlags(accountSignersCmd.Flags()) utils.AddBucketFlags("signers", accountSignersCmd.Flags()) - utils.AddGcsFlags(accountSignersCmd.Flags()) + utils.AddCloudStorageFlags(accountSignersCmd.Flags()) accountSignersCmd.MarkFlagRequired("end-ledger") } diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index eee10af0..9c6d7f76 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -26,7 +26,7 @@ the export_ledger_entry_changes command.`, cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs) if err != nil { @@ -59,7 +59,7 @@ the export_ledger_entry_changes command.`, printTransformStats(len(accounts), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -67,7 +67,7 @@ func init() { rootCmd.AddCommand(accountsCmd) utils.AddCommonFlags(accountsCmd.Flags()) utils.AddBucketFlags("accounts", accountsCmd.Flags()) - utils.AddGcsFlags(accountsCmd.Flags()) + utils.AddCloudStorageFlags(accountsCmd.Flags()) accountsCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go index c4706947..2d4f12f7 100644 --- a/cmd/export_all_history.go +++ b/cmd/export_all_history.go @@ -23,7 +23,7 @@ in order to mitigate egress costs for the entity hosting history archives.`, endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) allHistory, err := input.GetAllHistory(startNum, endNum, limit, env) @@ -32,16 +32,16 @@ in order to mitigate egress costs for the entity hosting history archives.`, } cmdLogger.Info("start doing other exports") - getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, path+"exported_operations.txt", env) - getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, path+"exported_trades.txt") - getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_effects.txt", env) - getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_transactions.txt") - getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_diagnostic_events.txt") + getOperations(allHistory.Operations, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env) + getTrades(allHistory.Trades, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt") + getEffects(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env) + getTransactions(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt") + getDiagnosticEvents(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt") cmdLogger.Info("done doing other exports") }, } -func getOperations(operations []input.OperationTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) { +func getOperations(operations []input.OperationTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { outFileOperations := mustOutFile(path) numFailures := 0 totalNumBytes := 0 @@ -68,10 +68,10 @@ func getOperations(operations []input.OperationTransformInput, extra map[string] printTransformStats(len(operations), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } -func getTrades(trades []input.TradeTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { +func getTrades(trades []input.TradeTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 @@ -100,10 +100,10 @@ func getTrades(trades []input.TradeTransformInput, extra map[string]string, gcpC printTransformStats(len(trades), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } -func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) { +func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string, env utils.EnvironmentDetails) { outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 @@ -133,10 +133,10 @@ func getEffects(transactions []input.LedgerTransformInput, extra map[string]stri printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } -func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { +func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 @@ -163,10 +163,10 @@ func getTransactions(transactions []input.LedgerTransformInput, extra map[string printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } -func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { +func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, cloudStorageBucket string, cloudCredentials string, cloudProvider string, path string) { outFile := mustOutFile(path) numFailures := 0 for _, transformInput := range transactions { @@ -195,13 +195,13 @@ func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[st printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } func init() { rootCmd.AddCommand(allHistoryCmd) utils.AddCommonFlags(allHistoryCmd.Flags()) utils.AddArchiveFlags("", allHistoryCmd.Flags()) - utils.AddGcsFlags(allHistoryCmd.Flags()) + utils.AddCloudStorageFlags(allHistoryCmd.Flags()) allHistoryCmd.MarkFlagRequired("end-ledger") } diff --git a/cmd/export_assets.go b/cmd/export_assets.go index 7ff3ff4a..55647dab 100644 --- a/cmd/export_assets.go +++ b/cmd/export_assets.go @@ -19,7 +19,7 @@ var assetsCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) outFile := mustOutFile(path) @@ -61,7 +61,7 @@ var assetsCmd = &cobra.Command{ printTransformStats(len(paymentOps), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -69,7 +69,7 @@ func init() { rootCmd.AddCommand(assetsCmd) utils.AddCommonFlags(assetsCmd.Flags()) utils.AddArchiveFlags("assets", assetsCmd.Flags()) - utils.AddGcsFlags(assetsCmd.Flags()) + utils.AddCloudStorageFlags(assetsCmd.Flags()) assetsCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 119693c4..89af046c 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -26,7 +26,7 @@ var claimableBalancesCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) balances, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs) if err != nil { @@ -59,7 +59,7 @@ var claimableBalancesCmd = &cobra.Command{ printTransformStats(len(balances), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -67,7 +67,7 @@ func init() { rootCmd.AddCommand(claimableBalancesCmd) utils.AddCommonFlags(claimableBalancesCmd.Flags()) utils.AddBucketFlags("claimable_balances", claimableBalancesCmd.Flags()) - utils.AddGcsFlags(claimableBalancesCmd.Flags()) + utils.AddCloudStorageFlags(claimableBalancesCmd.Flags()) claimableBalancesCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 4a8e4454..4de5d087 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -26,7 +26,7 @@ var configSettingCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) settings, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs) if err != nil { @@ -57,7 +57,7 @@ var configSettingCmd = &cobra.Command{ cmdLogger.Info("Number of bytes written: ", totalNumBytes) printTransformStats(len(settings), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -66,7 +66,7 @@ func init() { rootCmd.AddCommand(configSettingCmd) utils.AddCommonFlags(configSettingCmd.Flags()) utils.AddBucketFlags("config_settings", configSettingCmd.Flags()) - utils.AddGcsFlags(configSettingCmd.Flags()) + utils.AddCloudStorageFlags(configSettingCmd.Flags()) configSettingCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index 1a0e4795..07e2ffee 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -26,7 +26,7 @@ var codeCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) codes, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs) if err != nil { @@ -57,7 +57,7 @@ var codeCmd = &cobra.Command{ cmdLogger.Info("Number of bytes written: ", totalNumBytes) printTransformStats(len(codes), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -66,7 +66,7 @@ func init() { rootCmd.AddCommand(codeCmd) utils.AddCommonFlags(codeCmd.Flags()) utils.AddBucketFlags("contract_code", codeCmd.Flags()) - utils.AddGcsFlags(codeCmd.Flags()) + utils.AddCloudStorageFlags(codeCmd.Flags()) codeCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index d573d2e3..fcd759d9 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -26,7 +26,7 @@ var dataCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) datas, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs) if err != nil { @@ -62,7 +62,7 @@ var dataCmd = &cobra.Command{ cmdLogger.Info("Number of bytes written: ", totalNumBytes) printTransformStats(len(datas), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -71,7 +71,7 @@ func init() { rootCmd.AddCommand(dataCmd) utils.AddCommonFlags(dataCmd.Flags()) utils.AddBucketFlags("contract_data", dataCmd.Flags()) - utils.AddGcsFlags(dataCmd.Flags()) + utils.AddCloudStorageFlags(dataCmd.Flags()) dataCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_diagnostic_events.go b/cmd/export_diagnostic_events.go index 64326a80..17655bd9 100644 --- a/cmd/export_diagnostic_events.go +++ b/cmd/export_diagnostic_events.go @@ -19,7 +19,7 @@ var diagnosticEventsCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) transactions, err := input.GetTransactions(startNum, endNum, limit, env) @@ -55,7 +55,7 @@ var diagnosticEventsCmd = &cobra.Command{ printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -63,7 +63,7 @@ func init() { rootCmd.AddCommand(diagnosticEventsCmd) utils.AddCommonFlags(diagnosticEventsCmd.Flags()) utils.AddArchiveFlags("diagnostic_events", diagnosticEventsCmd.Flags()) - utils.AddGcsFlags(diagnosticEventsCmd.Flags()) + utils.AddCloudStorageFlags(diagnosticEventsCmd.Flags()) diagnosticEventsCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_effects.go b/cmd/export_effects.go index 401cbfa6..3fff5009 100644 --- a/cmd/export_effects.go +++ b/cmd/export_effects.go @@ -17,7 +17,7 @@ var effectsCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) transactions, err := input.GetTransactions(startNum, endNum, limit, env) @@ -54,7 +54,7 @@ var effectsCmd = &cobra.Command{ printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -62,7 +62,7 @@ func init() { rootCmd.AddCommand(effectsCmd) utils.AddCommonFlags(effectsCmd.Flags()) utils.AddArchiveFlags("effects", effectsCmd.Flags()) - utils.AddGcsFlags(effectsCmd.Flags()) + utils.AddCloudStorageFlags(effectsCmd.Flags()) effectsCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 1118f071..21489aff 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -32,7 +32,7 @@ be exported.`, execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) err := os.MkdirAll(outputFolder, os.ModePerm) if err != nil { @@ -252,7 +252,7 @@ be exported.`, } } - err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, gcpCredentials, gcsBucket, extra) + err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, cloudCredentials, cloudStorageBucket, cloudProvider, extra) if err != nil { cmdLogger.LogError(err) continue @@ -266,7 +266,7 @@ func exportTransformedData( start, end uint32, folderPath string, transformedOutput map[string][]interface{}, - gcpCredentials, gcsBucket string, + cloudCredentials, cloudStorageBucket, cloudProvider string, extra map[string]string) error { for resource, output := range transformedOutput { @@ -281,7 +281,7 @@ func exportTransformedData( return err } } - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) } return nil @@ -292,7 +292,7 @@ func init() { utils.AddCommonFlags(exportLedgerEntryChangesCmd.Flags()) utils.AddCoreFlags(exportLedgerEntryChangesCmd.Flags(), "changes_output/") utils.AddExportTypeFlags(exportLedgerEntryChangesCmd.Flags()) - utils.AddGcsFlags(exportLedgerEntryChangesCmd.Flags()) + utils.AddCloudStorageFlags(exportLedgerEntryChangesCmd.Flags()) exportLedgerEntryChangesCmd.MarkFlagRequired("start-ledger") exportLedgerEntryChangesCmd.MarkFlagRequired("core-executable") diff --git a/cmd/export_ledger_transaction.go b/cmd/export_ledger_transaction.go index 8e623604..b8dca36e 100644 --- a/cmd/export_ledger_transaction.go +++ b/cmd/export_ledger_transaction.go @@ -19,7 +19,7 @@ var ledgerTransactionCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) ledgerTransaction, err := input.GetTransactions(startNum, endNum, limit, env) @@ -53,7 +53,7 @@ var ledgerTransactionCmd = &cobra.Command{ printTransformStats(len(ledgerTransaction), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -61,7 +61,7 @@ func init() { rootCmd.AddCommand(ledgerTransactionCmd) utils.AddCommonFlags(ledgerTransactionCmd.Flags()) utils.AddArchiveFlags("ledger_transaction", ledgerTransactionCmd.Flags()) - utils.AddGcsFlags(ledgerTransactionCmd.Flags()) + utils.AddCloudStorageFlags(ledgerTransactionCmd.Flags()) ledgerTransactionCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_ledgers.go b/cmd/export_ledgers.go index 292e6e3e..0bec34d2 100644 --- a/cmd/export_ledgers.go +++ b/cmd/export_ledgers.go @@ -1,16 +1,8 @@ package cmd import ( - "bytes" - "context" - "encoding/json" "fmt" - "io" - "os" - "path/filepath" - "time" - "cloud.google.com/go/storage" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/stellar/stellar-etl/internal/input" @@ -18,152 +10,6 @@ import ( "github.com/stellar/stellar-etl/internal/utils" ) -func createOutputFile(filepath string) error { - var _, err = os.Stat(filepath) - if os.IsNotExist(err) { - var _, err = os.Create(filepath) - if err != nil { - return err - } - } - - return nil -} - -func mustOutFile(path string) *os.File { - absolutePath, err := filepath.Abs(path) - if err != nil { - cmdLogger.Fatal("could not get absolute filepath: ", err) - } - - err = os.MkdirAll(filepath.Dir(path), os.ModePerm) - if err != nil { - cmdLogger.Fatalf("could not create directory %s: %s", path, err) - } - - err = createOutputFile(absolutePath) - if err != nil { - cmdLogger.Fatal("could not create output file: ", err) - } - - outFile, err := os.OpenFile(absolutePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - cmdLogger.Fatal("error in opening output file: ", err) - } - - return outFile -} - -func exportEntry(entry interface{}, outFile *os.File, extra map[string]string) (int, error) { - // This extra marshalling/unmarshalling is silly, but it's required to properly handle the null.[String|Int*] types, and add the extra fields. - m, err := json.Marshal(entry) - if err != nil { - cmdLogger.Errorf("Error marshalling %+v: %v ", entry, err) - } - i := map[string]interface{}{} - // Use a decoder here so that 'UseNumber' ensures large ints are properly decoded - decoder := json.NewDecoder(bytes.NewReader(m)) - decoder.UseNumber() - err = decoder.Decode(&i) - if err != nil { - cmdLogger.Errorf("Error unmarshalling %+v: %v ", i, err) - } - for k, v := range extra { - i[k] = v - } - - marshalled, err := json.Marshal(i) - if err != nil { - return 0, fmt.Errorf("could not json encode %+v: %s", entry, err) - } - cmdLogger.Debugf("Writing entry to %s", outFile.Name) - numBytes, err := outFile.Write(marshalled) - if err != nil { - cmdLogger.Errorf("Error writing %+v to file: ", entry, err) - } - newLineNumBytes, err := outFile.WriteString("\n") - if err != nil { - cmdLogger.Error("Error writing new line to file %s: ", outFile.Name, err) - } - return numBytes + newLineNumBytes, nil -} - -// Prints the number of attempted, failed, and successful transformations as a JSON object -func printTransformStats(attempts, failures int) { - resultsMap := map[string]int{ - "attempted_transforms": attempts, - "failed_transforms": failures, - "successful_transforms": attempts - failures, - } - - results, err := json.Marshal(resultsMap) - if err != nil { - cmdLogger.Fatal("Could not marshal results: ", err) - } - - cmdLogger.Info(string(results)) -} - -func exportFilename(start, end uint32, dataType string) string { - return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType) -} - -func uploadToGcs(credentialsPath, bucket, path string) error { - // Use credentials file in dev/local runs. Otherwise, derive credentials from the service account. - if len(credentialsPath) > 0 { - os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", credentialsPath) - cmdLogger.Infof("Using credentials found at: %s", credentialsPath) - } - - reader, err := os.Open(path) - if err != nil { - return fmt.Errorf("failed to open file %s: %v", path, err) - } - - ctx := context.Background() - client, err := storage.NewClient(ctx) - if err != nil { - return fmt.Errorf("failed to create client: %v", err) - } - defer client.Close() - - ctx, cancel := context.WithTimeout(ctx, time.Hour) - defer cancel() - - wc := client.Bucket(bucket).Object(path).NewWriter(ctx) - - uploadLocation := fmt.Sprintf("gs://%s/%s", bucket, path) - cmdLogger.Infof("Uploading %s to %s", path, uploadLocation) - - var written int64 - if written, err = io.Copy(wc, reader); err != nil { - return fmt.Errorf("unable to copy: %v", err) - } - err = wc.Close() - if err != nil { - return err - } - - cmdLogger.Infof("Successfully uploaded %d bytes to gs://%s/%s", written, bucket, path) - return nil -} - -func maybeUpload(gcpCredentials, gcsBucket, path string) { - if len(gcsBucket) > 0 { - err := uploadToGcs(gcpCredentials, gcsBucket, path) - if err != nil { - cmdLogger.Errorf("Unable to upload output to GCS: %s", err) - return - } - err = os.RemoveAll(path) - if err != nil { - cmdLogger.Errorf("Unable to remove %s: %s", path, err) - return - } - cmdLogger.Infof("Successfully deleted %s", path) - } -} - var ledgersCmd = &cobra.Command{ Use: "export_ledgers", Short: "Exports the ledger data.", @@ -173,7 +19,7 @@ var ledgersCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) ledgers, err := input.GetLedgers(startNum, endNum, limit, isTest, isFuture) if err != nil { @@ -206,7 +52,7 @@ var ledgersCmd = &cobra.Command{ printTransformStats(len(ledgers), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -214,7 +60,7 @@ func init() { rootCmd.AddCommand(ledgersCmd) utils.AddCommonFlags(ledgersCmd.Flags()) utils.AddArchiveFlags("ledgers", ledgersCmd.Flags()) - utils.AddGcsFlags(ledgersCmd.Flags()) + utils.AddCloudStorageFlags(ledgersCmd.Flags()) ledgersCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index f0f7f236..7130b3e1 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -26,7 +26,7 @@ the export_ledger_entry_changes command.`, cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) pools, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeLiquidityPool, env.ArchiveURLs) if err != nil { @@ -57,7 +57,7 @@ the export_ledger_entry_changes command.`, cmdLogger.Info("Number of bytes written: ", totalNumBytes) printTransformStats(len(pools), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -66,7 +66,7 @@ func init() { rootCmd.AddCommand(poolsCmd) utils.AddCommonFlags(poolsCmd.Flags()) utils.AddBucketFlags("pools", poolsCmd.Flags()) - utils.AddGcsFlags(poolsCmd.Flags()) + utils.AddCloudStorageFlags(poolsCmd.Flags()) poolsCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_offers.go b/cmd/export_offers.go index 17898e13..c1827c96 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -27,7 +27,7 @@ var offersCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) offers, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeOffer, env.ArchiveURLs) if err != nil { @@ -60,7 +60,7 @@ var offersCmd = &cobra.Command{ printTransformStats(len(offers), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -68,7 +68,7 @@ func init() { rootCmd.AddCommand(offersCmd) utils.AddCommonFlags(offersCmd.Flags()) utils.AddBucketFlags("offers", offersCmd.Flags()) - utils.AddGcsFlags(offersCmd.Flags()) + utils.AddCloudStorageFlags(offersCmd.Flags()) offersCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/export_operations.go b/cmd/export_operations.go index becf1095..e82a2942 100644 --- a/cmd/export_operations.go +++ b/cmd/export_operations.go @@ -19,7 +19,7 @@ var operationsCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) operations, err := input.GetOperations(startNum, endNum, limit, env) @@ -53,7 +53,7 @@ var operationsCmd = &cobra.Command{ printTransformStats(len(operations), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -61,7 +61,7 @@ func init() { rootCmd.AddCommand(operationsCmd) utils.AddCommonFlags(operationsCmd.Flags()) utils.AddArchiveFlags("operations", operationsCmd.Flags()) - utils.AddGcsFlags(operationsCmd.Flags()) + utils.AddCloudStorageFlags(operationsCmd.Flags()) operationsCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_orderbooks.go b/cmd/export_orderbooks.go index f6c3dc1e..2c39e8d6 100644 --- a/cmd/export_orderbooks.go +++ b/cmd/export_orderbooks.go @@ -32,7 +32,7 @@ var exportOrderbooksCmd = &cobra.Command{ env := utils.GetEnvironmentDetails(isTest, isFuture) execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) if batchSize <= 0 { cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize) @@ -80,7 +80,7 @@ var exportOrderbooksCmd = &cobra.Command{ } parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, gcpCredentials, gcsBucket, extra) + exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) } } else { // otherwise, we export in an unbounded manner where batches are constantly exported @@ -89,7 +89,7 @@ var exportOrderbooksCmd = &cobra.Command{ batchStart := startNum + batchNum*batchSize batchEnd := batchStart + batchSize - 1 parser := input.ReceiveParsedOrderbooks(orderbookChannel, cmdLogger) - exportOrderbook(batchStart, batchEnd, outputFolder, parser, gcpCredentials, gcsBucket, extra) + exportOrderbook(batchStart, batchEnd, outputFolder, parser, cloudCredentials, cloudStorageBucket, cloudProvider, extra) batchNum++ } } @@ -128,7 +128,7 @@ func exportOrderbook( start, end uint32, folderPath string, parser *input.OrderbookParser, - gcpCredentials, gcsBucket string, + cloudCredentials, cloudStorageBucket, cloudProvider string, extra map[string]string) { marketsFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimMarkets")) offersFilePath := filepath.Join(folderPath, exportFilename(start, end, "dimOffers")) @@ -157,17 +157,17 @@ func exportOrderbook( cmdLogger.LogError(err) } - maybeUpload(gcpCredentials, gcsBucket, marketsFilePath) - maybeUpload(gcpCredentials, gcsBucket, offersFilePath) - maybeUpload(gcpCredentials, gcsBucket, accountsFilePath) - maybeUpload(gcpCredentials, gcsBucket, eventsFilePath) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, marketsFilePath) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, offersFilePath) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, accountsFilePath) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, eventsFilePath) } func init() { rootCmd.AddCommand(exportOrderbooksCmd) utils.AddCommonFlags(exportOrderbooksCmd.Flags()) utils.AddCoreFlags(exportOrderbooksCmd.Flags(), "orderbooks_output/") - utils.AddGcsFlags(exportOrderbooksCmd.Flags()) + utils.AddCloudStorageFlags(exportOrderbooksCmd.Flags()) exportOrderbooksCmd.MarkFlagRequired("start-ledger") /* diff --git a/cmd/export_trades.go b/cmd/export_trades.go index e86c6b77..99ff880e 100644 --- a/cmd/export_trades.go +++ b/cmd/export_trades.go @@ -23,7 +23,7 @@ var tradesCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) trades, err := input.GetTrades(startNum, endNum, limit, env) if err != nil { @@ -58,7 +58,7 @@ var tradesCmd = &cobra.Command{ printTransformStats(len(trades), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -66,7 +66,7 @@ func init() { rootCmd.AddCommand(tradesCmd) utils.AddCommonFlags(tradesCmd.Flags()) utils.AddArchiveFlags("trades", tradesCmd.Flags()) - utils.AddGcsFlags(tradesCmd.Flags()) + utils.AddCloudStorageFlags(tradesCmd.Flags()) tradesCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_transactions.go b/cmd/export_transactions.go index ddfdfdc0..18717bd0 100644 --- a/cmd/export_transactions.go +++ b/cmd/export_transactions.go @@ -19,7 +19,7 @@ var transactionsCmd = &cobra.Command{ endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) transactions, err := input.GetTransactions(startNum, endNum, limit, env) @@ -53,7 +53,7 @@ var transactionsCmd = &cobra.Command{ printTransformStats(len(transactions), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -61,7 +61,7 @@ func init() { rootCmd.AddCommand(transactionsCmd) utils.AddCommonFlags(transactionsCmd.Flags()) utils.AddArchiveFlags("transactions", transactionsCmd.Flags()) - utils.AddGcsFlags(transactionsCmd.Flags()) + utils.AddCloudStorageFlags(transactionsCmd.Flags()) transactionsCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index 52e4075e..68ab838d 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -27,7 +27,7 @@ var trustlinesCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) trustlines, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTrustline, env.ArchiveURLs) if err != nil { @@ -61,7 +61,7 @@ var trustlinesCmd = &cobra.Command{ printTransformStats(len(trustlines), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -69,7 +69,7 @@ func init() { rootCmd.AddCommand(trustlinesCmd) utils.AddCommonFlags(trustlinesCmd.Flags()) utils.AddBucketFlags("trustlines", trustlinesCmd.Flags()) - utils.AddGcsFlags(trustlinesCmd.Flags()) + utils.AddCloudStorageFlags(trustlinesCmd.Flags()) trustlinesCmd.MarkFlagRequired("end-ledger") /* diff --git a/cmd/export_ttl.go b/cmd/export_ttl.go index 4ad373fc..c55ef571 100644 --- a/cmd/export_ttl.go +++ b/cmd/export_ttl.go @@ -26,7 +26,7 @@ var ttlCmd = &cobra.Command{ cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest, isFuture) path := utils.MustBucketFlags(cmd.Flags(), cmdLogger) - gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) ttls, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeTtl, env.ArchiveURLs) if err != nil { @@ -57,7 +57,7 @@ var ttlCmd = &cobra.Command{ cmdLogger.Info("Number of bytes written: ", totalNumBytes) printTransformStats(len(ttls), numFailures) - maybeUpload(gcpCredentials, gcsBucket, path) + maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) }, } @@ -66,7 +66,7 @@ func init() { rootCmd.AddCommand(ttlCmd) utils.AddCommonFlags(ttlCmd.Flags()) utils.AddBucketFlags("ttl", ttlCmd.Flags()) - utils.AddGcsFlags(ttlCmd.Flags()) + utils.AddCloudStorageFlags(ttlCmd.Flags()) ttlCmd.MarkFlagRequired("end-ledger") /* Current flags: diff --git a/cmd/upload_to_gcs.go b/cmd/upload_to_gcs.go new file mode 100644 index 00000000..3116de2d --- /dev/null +++ b/cmd/upload_to_gcs.go @@ -0,0 +1,66 @@ +package cmd + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "cloud.google.com/go/storage" +) + +type GCS struct { + gcsCredentialsPath string + gcsBucket string +} + +func newGCS(gcsCredentialsPath, gcsBucket string) CloudStorage { + return &GCS{ + gcsCredentialsPath: gcsCredentialsPath, + gcsBucket: gcsBucket, + } +} + +func (g *GCS) UploadTo(credentialsPath, bucket, path string) error { + // Use credentials file in dev/local runs. Otherwise, derive credentials from the service account. + if len(credentialsPath) > 0 { + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", credentialsPath) + cmdLogger.Infof("Using credentials found at: %s", credentialsPath) + } + + reader, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open file %s: %v", path, err) + } + + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + return fmt.Errorf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(ctx, time.Hour) + defer cancel() + + wc := client.Bucket(bucket).Object(path).NewWriter(ctx) + + uploadLocation := fmt.Sprintf("gs://%s/%s", bucket, path) + cmdLogger.Infof("Uploading %s to %s", path, uploadLocation) + + var written int64 + if written, err = io.Copy(wc, reader); err != nil { + return fmt.Errorf("unable to copy: %v", err) + } + err = wc.Close() + if err != nil { + return err + } + + cmdLogger.Infof("Successfully uploaded %d bytes to gs://%s/%s", written, bucket, path) + + deleteLocalFiles(path) + + return nil +} diff --git a/internal/input/transactions.go b/internal/input/transactions.go index 6f2174c1..8bc79fcd 100644 --- a/internal/input/transactions.go +++ b/internal/input/transactions.go @@ -33,17 +33,18 @@ func GetTransactions(start, end uint32, limit int64, env utils.EnvironmentDetail err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(start, end)) panicIf(err) for seq := start; seq <= end; seq++ { - txReader, err := ingest.NewLedgerTransactionReader(ctx, backend, env.NetworkPassphrase, seq) + ledgerCloseMeta, err := backend.GetLedger(ctx, seq) + if err != nil { + return nil, errors.Wrap(err, "error getting ledger from the backend") + } + + txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(env.NetworkPassphrase, ledgerCloseMeta) if err != nil { return []LedgerTransformInput{}, err } lhe := txReader.GetHeader() - ledgerCloseMeta, err := backend.GetLedger(ctx, seq) - if err != nil { - return nil, errors.Wrap(err, "error getting ledger from the backend") - } // A negative limit value means that all input should be processed for int64(len(txSlice)) < limit || limit < 0 { tx, err := txReader.Read() diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index a243038a..8bfcf574 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/stellar/go/ingest" - "github.com/stellar/go/strkey" "github.com/stellar/go/xdr" "github.com/stellar/stellar-etl/internal/utils" ) @@ -30,8 +29,7 @@ func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHi contractCodeExtV := contractCode.Ext.V - contractCodeHashByte, _ := contractCode.Hash.MarshalBinary() - contractCodeHash, _ := strkey.Encode(strkey.VersionByteContract, contractCodeHashByte) + contractCodeHash := contractCode.Hash.HexString() closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) if err != nil { diff --git a/internal/transform/contract_code_test.go b/internal/transform/contract_code_test.go index 72ba3408..812bd446 100644 --- a/internal/transform/contract_code_test.go +++ b/internal/transform/contract_code_test.go @@ -86,14 +86,14 @@ func makeContractCodeTestInput() []ingest.Change { func makeContractCodeTestOutput() []ContractCodeOutput { return []ContractCodeOutput{ { - ContractCodeHash: "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABSC4", + ContractCodeHash: "0000000000000000000000000000000000000000000000000000000000000000", ContractCodeExtV: 1, LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), - LedgerKeyHash: "CDP62BQ5XZDE4D7TEB2E7TLAJLAIWOO2U5H2EQIQSNTFJS6LSFOMZRZJ", + LedgerKeyHash: "dfed061dbe464e0ff320744fcd604ac08b39daa74fa24110936654cbcb915ccc", }, } } diff --git a/internal/transform/contract_data_test.go b/internal/transform/contract_data_test.go index c215a4bd..d09ab588 100644 --- a/internal/transform/contract_data_test.go +++ b/internal/transform/contract_data_test.go @@ -138,7 +138,7 @@ func makeContractDataTestOutput() []ContractDataOutput { Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), - LedgerKeyHash: "CCV7YMZHECK2TX2MGEGP6GEQIAMSVCXON5VCHNVUMKEJCFGYA4UMUUCA", + LedgerKeyHash: "abfc33272095a9df4c310cff189040192a8aee6f6a23b6b462889114d80728ca", }, } } diff --git a/internal/transform/operation.go b/internal/transform/operation.go index 36463ab8..9229419b 100644 --- a/internal/transform/operation.go +++ b/internal/transform/operation.go @@ -1707,8 +1707,7 @@ func contractCodeFromContractData(ledgerKey xdr.LedgerKey) string { return "" } - contractCodeHashByte, _ := contractCode.Hash.MarshalBinary() - contractCodeHash, _ := strkey.Encode(strkey.VersionByteContract, contractCodeHashByte) + contractCodeHash := contractCode.Hash.HexString() return contractCodeHash } diff --git a/internal/transform/ttl.go b/internal/transform/ttl.go index 4a81a701..cb9218e1 100644 --- a/internal/transform/ttl.go +++ b/internal/transform/ttl.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/stellar/go/ingest" - "github.com/stellar/go/strkey" "github.com/stellar/go/xdr" "github.com/stellar/stellar-etl/internal/utils" ) @@ -26,8 +25,7 @@ func TransformTtl(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntr return TtlOutput{}, nil } - keyHashByte, _ := ttl.KeyHash.MarshalBinary() - keyHash, _ := strkey.Encode(strkey.VersionByteContract, keyHashByte) + keyHash := ttl.KeyHash.HexString() liveUntilLedgerSeq := ttl.LiveUntilLedgerSeq closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) diff --git a/internal/transform/ttl_test.go b/internal/transform/ttl_test.go index 2307fe0c..4d49a54b 100644 --- a/internal/transform/ttl_test.go +++ b/internal/transform/ttl_test.go @@ -95,7 +95,7 @@ func makeTtlTestInput() []ingest.Change { func makeTtlTestOutput() []TtlOutput { return []TtlOutput{ { - KeyHash: "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABSC4", + KeyHash: "0000000000000000000000000000000000000000000000000000000000000000", LiveUntilLedgerSeq: 123, LastModifiedLedger: 1, LedgerEntryChange: 1, diff --git a/internal/utils/main.go b/internal/utils/main.go index 8bb3c35d..1a36eabb 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -16,7 +16,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" "github.com/stellar/go/network" - "github.com/stellar/go/strkey" "github.com/stellar/go/support/storage" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" @@ -248,11 +247,12 @@ func AddBucketFlags(objectName string, flags *pflag.FlagSet) { flags.StringP("output", "o", "exported_"+objectName+".txt", "Filename of the output file") } -// AddGcsFlags adds the gcs-related flags: gcs-bucket, gcp-credentials -func AddGcsFlags(flags *pflag.FlagSet) { - flags.String("gcs-bucket", "stellar-etl-cli", "GCS bucket to export to.") - flags.StringP("gcp-credentials", "g", "", "Path to GOOGLE_APPLICATION_CREDENTIALS, service account json. Only used for local/dev purposes. "+ - "When run on GCP, credentials should be inferred by service account.") +// AddCloudStorageFlags adds the cloud storage releated flags: cloud-storage-bucket, cloud-credentials +func AddCloudStorageFlags(flags *pflag.FlagSet) { + flags.String("cloud-storage-bucket", "stellar-etl-cli", "Cloud storage bucket to export to.") + flags.String("cloud-credentials", "", "Path to cloud provider service account credentials. Only used for local/dev purposes. "+ + "When run on GCP, credentials should be inferred by service account json.") + flags.String("cloud-provider", "", "Cloud provider for storage services.") } // AddCoreFlags adds the captive core specific flags: core-executable, core-config, batch-size, and output flags @@ -338,16 +338,23 @@ func MustBucketFlags(flags *pflag.FlagSet, logger *EtlLogger) (path string) { return } -// MustGcsFlags gets the values of the bucket list specific flags: gcp-project and gcs-bucket -func MustGcsFlags(flags *pflag.FlagSet, logger *EtlLogger) (bucket, credentials string) { - bucket, err := flags.GetString("gcs-bucket") +// MustCloudStorageFlags gets the values of the bucket list specific flags: cloud-storage-bucket, cloud-credentials +func MustCloudStorageFlags(flags *pflag.FlagSet, logger *EtlLogger) (bucket, credentials, provider string) { + bucket, err := flags.GetString("cloud-storage-bucket") if err != nil { - logger.Fatal("could not get gcs bucket: ", err) + logger.Fatal("could not get cloud storage bucket: ", err) } - credentials, err = flags.GetString("gcp-credentials") + + credentials, err = flags.GetString("cloud-credentials") if err != nil { - logger.Fatal("could not get GOOGLE_APPLICATION_CREDENTIALS file: ", err) + logger.Fatal("could not get cloud credentials file: ", err) } + + provider, err = flags.GetString("cloud-provider") + if err != nil { + logger.Fatal("could not get cloud provider: ", err) + } + return } @@ -702,7 +709,7 @@ func LedgerEntryToLedgerKeyHash(ledgerEntry xdr.LedgerEntry) string { ledgerKey, _ := ledgerEntry.LedgerKey() ledgerKeyByte, _ := ledgerKey.MarshalBinary() hashedLedgerKeyByte := hash.Hash(ledgerKeyByte) - ledgerKeyHash, _ := strkey.Encode(strkey.VersionByteContract, hashedLedgerKeyByte[:]) + ledgerKeyHash := hex.EncodeToString(hashedLedgerKeyByte[:]) return ledgerKeyHash }