From ef553e5136a9485858d564fcba3e8b65beac7dee Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 18 Jan 2024 12:32:33 -0500 Subject: [PATCH 1/4] Add export all history command --- cmd/export_all_history.go | 205 ++++++++++++++++++++++++++++++++++ internal/input/all_history.go | 101 +++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100644 cmd/export_all_history.go create mode 100644 internal/input/all_history.go diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go new file mode 100644 index 00000000..b0e6d856 --- /dev/null +++ b/cmd/export_all_history.go @@ -0,0 +1,205 @@ +package cmd + +import ( + "fmt" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/stellar/stellar-etl/internal/input" + "github.com/stellar/stellar-etl/internal/toid" + "github.com/stellar/stellar-etl/internal/transform" + "github.com/stellar/stellar-etl/internal/utils" +) + +var allHistoryCmd = &cobra.Command{ + Use: "export_all_history", + Short: "Exports all stellar network history.", + Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files. +This is a termporary command used to reduce the amount of requests to history archives +in order to mitigate egress costs for the entity hosting history archives.`, + Run: func(cmd *cobra.Command, args []string) { + cmdLogger.SetLevel(logrus.InfoLevel) + endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) + cmdLogger.StrictExport = strictExport + startNum, _, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) + gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + env := utils.GetEnvironmentDetails(isTest, isFuture) + + allHistory, err := input.GetAllHistory(startNum, endNum, limit, env) + if err != nil { + cmdLogger.Fatal("could not read all history: ", err) + } + + getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, "exported_operations.txt", env) + getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, "exported_trades.txt") + getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_effects.txt", env) + getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_transactions.txt") + getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_diagnostic_events.txt") + }, +} + +func getOperations(operations []input.OperationTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) { + outFileOperations := mustOutFile(path) + numFailures := 0 + totalNumBytes := 0 + for _, transformInput := range operations { + transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase) + if err != nil { + txIndex := transformInput.Transaction.Index + cmdLogger.LogError(fmt.Errorf("could not transform operation %d in transaction %d in ledger %d: %v", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum, err)) + numFailures += 1 + continue + } + + numBytes, err := exportEntry(transformed, outFileOperations, extra) + if err != nil { + cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err)) + numFailures += 1 + continue + } + totalNumBytes += numBytes + } + + outFileOperations.Close() + cmdLogger.Info("Number of bytes written: ", totalNumBytes) + + printTransformStats(len(operations), numFailures) + + maybeUpload(gcpCredentials, gcsBucket, path) +} + +func getTrades(trades []input.TradeTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { + outFile := mustOutFile(path) + numFailures := 0 + totalNumBytes := 0 + for _, tradeInput := range trades { + trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime) + if err != nil { + parsedID := toid.Parse(tradeInput.OperationHistoryID) + cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err)) + numFailures += 1 + continue + } + + for _, transformed := range trades { + numBytes, err := exportEntry(transformed, outFile, extra) + if err != nil { + cmdLogger.LogError(err) + numFailures += 1 + continue + } + totalNumBytes += numBytes + } + } + + outFile.Close() + cmdLogger.Info("Number of bytes written: ", totalNumBytes) + + printTransformStats(len(trades), numFailures) + + maybeUpload(gcpCredentials, gcsBucket, path) +} + +func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) { + outFile := mustOutFile(path) + numFailures := 0 + totalNumBytes := 0 + for _, transformInput := range transactions { + LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq) + effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase) + if err != nil { + txIndex := transformInput.Transaction.Index + cmdLogger.Errorf("could not transform transaction %d in ledger %d: %v", txIndex, LedgerSeq, err) + numFailures += 1 + continue + } + + for _, transformed := range effects { + numBytes, err := exportEntry(transformed, outFile, extra) + if err != nil { + cmdLogger.LogError(err) + numFailures += 1 + continue + } + totalNumBytes += numBytes + } + } + + outFile.Close() + cmdLogger.Info("Number of bytes written: ", totalNumBytes) + + printTransformStats(len(transactions), numFailures) + + maybeUpload(gcpCredentials, gcsBucket, path) +} + +func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { + outFile := mustOutFile(path) + numFailures := 0 + totalNumBytes := 0 + for _, transformInput := range transactions { + transformed, err := transform.TransformTransaction(transformInput.Transaction, transformInput.LedgerHistory) + if err != nil { + ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq + cmdLogger.LogError(fmt.Errorf("could not transform transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) + numFailures += 1 + continue + } + + numBytes, err := exportEntry(transformed, outFile, extra) + if err != nil { + cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err)) + numFailures += 1 + continue + } + totalNumBytes += numBytes + } + + outFile.Close() + cmdLogger.Info("Number of bytes written: ", totalNumBytes) + + printTransformStats(len(transactions), numFailures) + + maybeUpload(gcpCredentials, gcsBucket, path) +} + +func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) { + outFile := mustOutFile(path) + numFailures := 0 + for _, transformInput := range transactions { + transformed, err, ok := transform.TransformDiagnosticEvent(transformInput.Transaction, transformInput.LedgerHistory) + if err != nil { + ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq + cmdLogger.LogError(fmt.Errorf("could not transform diagnostic events in transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq)) + numFailures += 1 + continue + } + + if !ok { + continue + } + for _, diagnosticEvent := range transformed { + _, err := exportEntry(diagnosticEvent, outFile, extra) + if err != nil { + cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err)) + numFailures += 1 + continue + } + } + } + + outFile.Close() + + printTransformStats(len(transactions), numFailures) + + maybeUpload(gcpCredentials, gcsBucket, path) +} + +func init() { + rootCmd.AddCommand(allHistoryCmd) + utils.AddCommonFlags(allHistoryCmd.Flags()) + utils.AddArchiveFlags("all_history", allHistoryCmd.Flags()) + utils.AddGcsFlags(allHistoryCmd.Flags()) + allHistoryCmd.MarkFlagRequired("end-ledger") +} diff --git a/internal/input/all_history.go b/internal/input/all_history.go new file mode 100644 index 00000000..35fab98f --- /dev/null +++ b/internal/input/all_history.go @@ -0,0 +1,101 @@ +package input + +import ( + "context" + "fmt" + "io" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/stellar-etl/internal/toid" + "github.com/stellar/stellar-etl/internal/utils" +) + +// AllHistoryTransformInput is a representation of the input for the TransformOperation function +type AllHistoryTransformInput struct { + Operations []OperationTransformInput + Trades []TradeTransformInput + Ledgers []LedgerTransformInput +} + +// GetAllHistory returns a slice of operations, trades, effects, transactions, diagnostic events +// for the ledgers in the provided range (inclusive on both ends) +func GetAllHistory(start, end uint32, limit int64, env utils.EnvironmentDetails) (AllHistoryTransformInput, error) { + ctx := context.Background() + + backend, err := env.CreateCaptiveCoreBackend() + + if err != nil { + return AllHistoryTransformInput{}, err + } + + opSlice := []OperationTransformInput{} + tradeSlice := []TradeTransformInput{} + txSlice := []LedgerTransformInput{} + err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(start, end)) + panicIf(err) + for seq := start; seq <= end; seq++ { + changeReader, err := ingest.NewLedgerChangeReader(ctx, backend, env.NetworkPassphrase, seq) + if err != nil { + return AllHistoryTransformInput{}, err + } + txReader := changeReader.LedgerTransactionReader + + ledgerCloseMeta, err := backend.GetLedger(ctx, seq) + if err != nil { + return AllHistoryTransformInput{}, fmt.Errorf("error getting ledger seq %d from the backend: %v", seq, err) + } + + closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime) + if err != nil { + return AllHistoryTransformInput{}, err + } + + lhe := txReader.GetHeader() + + for limit < 0 { + tx, err := txReader.Read() + if err == io.EOF { + break + } + + for index, op := range tx.Envelope.Operations() { + // Operations + opSlice = append(opSlice, OperationTransformInput{ + Operation: op, + OperationIndex: int32(index), + Transaction: tx, + LedgerSeqNum: int32(seq), + LedgerCloseMeta: ledgerCloseMeta, + }) + + // Trades + if operationResultsInTrade(op) && tx.Result.Successful() { + tradeSlice = append(tradeSlice, TradeTransformInput{ + OperationIndex: int32(index), + Transaction: tx, + CloseTime: closeTime, + OperationHistoryID: toid.New(int32(seq), int32(tx.Index), int32(index)).ToInt64(), + }) + } + } + // Transactions + txSlice = append(txSlice, LedgerTransformInput{ + Transaction: tx, + LedgerHistory: lhe, + LedgerCloseMeta: ledgerCloseMeta, + }) + + } + + txReader.Close() + } + + allHistoryTransformInput := AllHistoryTransformInput{ + Operations: opSlice, + Trades: tradeSlice, + Ledgers: txSlice, + } + + return allHistoryTransformInput, nil +} From 4a20e6371d075a5fb36c8b310f9579d7687e47ab Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 18 Jan 2024 18:09:20 -0500 Subject: [PATCH 2/4] Add useragent changes --- docker/stellar-core.cfg | 6 +++--- internal/input/changes.go | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docker/stellar-core.cfg b/docker/stellar-core.cfg index 31a037cb..449e0806 100644 --- a/docker/stellar-core.cfg +++ b/docker/stellar-core.cfg @@ -40,10 +40,10 @@ VALIDATORS=[ # Stellar.org history store [HISTORY.sdf1] -get="curl -sf http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}" +get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}" [HISTORY.sdf2] -get="curl -sf http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}" +get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}" [HISTORY.sdf3] -get="curl -sf http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}" +get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}" diff --git a/internal/input/changes.go b/internal/input/changes.go index 36853654..7dc7dfaa 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -51,6 +51,7 @@ func PrepareCaptiveCore(execPath string, tomlPath string, start, end uint32, env NetworkPassphrase: env.NetworkPassphrase, HistoryArchiveURLs: env.ArchiveURLs, UseDB: false, + UserAgent: "stellar-etl/1.0.0", }, ) if err != nil { From b77271bab3841aee0677db6bb18de577588f1bf3 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 18 Jan 2024 19:29:03 -0500 Subject: [PATCH 3/4] add path --- cmd/export_all_history.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go index b0e6d856..42b20ad3 100644 --- a/cmd/export_all_history.go +++ b/cmd/export_all_history.go @@ -22,7 +22,7 @@ in order to mitigate egress costs for the entity hosting history archives.`, cmdLogger.SetLevel(logrus.InfoLevel) endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport - startNum, _, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) + startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) env := utils.GetEnvironmentDetails(isTest, isFuture) @@ -31,11 +31,13 @@ in order to mitigate egress costs for the entity hosting history archives.`, cmdLogger.Fatal("could not read all history: ", err) } - getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, "exported_operations.txt", env) - getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, "exported_trades.txt") - getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_effects.txt", env) - getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_transactions.txt") - getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, "exported_diagnostic_events.txt") + cmdLogger.Info("start doing other exports") + getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, path+"exported_operations.txt", env) + getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, path+"exported_trades.txt") + getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_effects.txt", env) + getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_transactions.txt") + getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_diagnostic_events.txt") + cmdLogger.Info("done doing other exports") }, } @@ -199,7 +201,7 @@ func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[st func init() { rootCmd.AddCommand(allHistoryCmd) utils.AddCommonFlags(allHistoryCmd.Flags()) - utils.AddArchiveFlags("all_history", allHistoryCmd.Flags()) + utils.AddArchiveFlags("", allHistoryCmd.Flags()) utils.AddGcsFlags(allHistoryCmd.Flags()) allHistoryCmd.MarkFlagRequired("end-ledger") } From e62c333b81a588d05a56ef0567a2b439c08e6b46 Mon Sep 17 00:00:00 2001 From: chowbao Date: Fri, 19 Jan 2024 13:16:19 -0500 Subject: [PATCH 4/4] Update cmd/export_all_history.go Co-authored-by: sydneynotthecity --- cmd/export_all_history.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/export_all_history.go b/cmd/export_all_history.go index 42b20ad3..c4706947 100644 --- a/cmd/export_all_history.go +++ b/cmd/export_all_history.go @@ -16,7 +16,7 @@ var allHistoryCmd = &cobra.Command{ Use: "export_all_history", Short: "Exports all stellar network history.", Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files. -This is a termporary command used to reduce the amount of requests to history archives +This is a temporary command used to reduce the amount of requests to history archives in order to mitigate egress costs for the entity hosting history archives.`, Run: func(cmd *cobra.Command, args []string) { cmdLogger.SetLevel(logrus.InfoLevel)