Skip to content

Commit

Permalink
Merge pull request #220 from stellar/add-export-all-command
Browse files Browse the repository at this point in the history
Add export all history command
  • Loading branch information
chowbao authored Jan 19, 2024
2 parents 156b072 + e62c333 commit 3761e4a
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 3 deletions.
207 changes: 207 additions & 0 deletions cmd/export_all_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/toid"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

var allHistoryCmd = &cobra.Command{
Use: "export_all_history",
Short: "Exports all stellar network history.",
Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files.
This is a temporary command used to reduce the amount of requests to history archives
in order to mitigate egress costs for the entity hosting history archives.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture)

allHistory, err := input.GetAllHistory(startNum, endNum, limit, env)
if err != nil {
cmdLogger.Fatal("could not read all history: ", err)
}

cmdLogger.Info("start doing other exports")
getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, path+"exported_operations.txt", env)
getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, path+"exported_trades.txt")
getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_effects.txt", env)
getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_transactions.txt")
getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_diagnostic_events.txt")
cmdLogger.Info("done doing other exports")
},
}

func getOperations(operations []input.OperationTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) {
outFileOperations := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range operations {
transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
txIndex := transformInput.Transaction.Index
cmdLogger.LogError(fmt.Errorf("could not transform operation %d in transaction %d in ledger %d: %v", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum, err))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFileOperations, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFileOperations.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(operations), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getTrades(trades []input.TradeTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, tradeInput := range trades {
trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime)
if err != nil {
parsedID := toid.Parse(tradeInput.OperationHistoryID)
cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err))
numFailures += 1
continue
}

for _, transformed := range trades {
numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(err)
numFailures += 1
continue
}
totalNumBytes += numBytes
}
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(trades), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range transactions {
LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq)
effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
txIndex := transformInput.Transaction.Index
cmdLogger.Errorf("could not transform transaction %d in ledger %d: %v", txIndex, LedgerSeq, err)
numFailures += 1
continue
}

for _, transformed := range effects {
numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(err)
numFailures += 1
continue
}
totalNumBytes += numBytes
}
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range transactions {
transformed, err := transform.TransformTransaction(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
for _, transformInput := range transactions {
transformed, err, ok := transform.TransformDiagnosticEvent(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform diagnostic events in transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

if !ok {
continue
}
for _, diagnosticEvent := range transformed {
_, err := exportEntry(diagnosticEvent, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err))
numFailures += 1
continue
}
}
}

outFile.Close()

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func init() {
rootCmd.AddCommand(allHistoryCmd)
utils.AddCommonFlags(allHistoryCmd.Flags())
utils.AddArchiveFlags("", allHistoryCmd.Flags())
utils.AddGcsFlags(allHistoryCmd.Flags())
allHistoryCmd.MarkFlagRequired("end-ledger")
}
6 changes: 3 additions & 3 deletions docker/stellar-core.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ VALIDATORS=[

# Stellar.org history store
[HISTORY.sdf1]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}"

[HISTORY.sdf2]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}"

[HISTORY.sdf3]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}"
101 changes: 101 additions & 0 deletions internal/input/all_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package input

import (
"context"
"fmt"
"io"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/stellar-etl/internal/toid"
"github.com/stellar/stellar-etl/internal/utils"
)

// AllHistoryTransformInput is a representation of the input for the TransformOperation function
type AllHistoryTransformInput struct {
Operations []OperationTransformInput
Trades []TradeTransformInput
Ledgers []LedgerTransformInput
}

// GetAllHistory returns a slice of operations, trades, effects, transactions, diagnostic events
// for the ledgers in the provided range (inclusive on both ends)
func GetAllHistory(start, end uint32, limit int64, env utils.EnvironmentDetails) (AllHistoryTransformInput, error) {
ctx := context.Background()

backend, err := env.CreateCaptiveCoreBackend()

if err != nil {
return AllHistoryTransformInput{}, err
}

opSlice := []OperationTransformInput{}
tradeSlice := []TradeTransformInput{}
txSlice := []LedgerTransformInput{}
err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(start, end))
panicIf(err)
for seq := start; seq <= end; seq++ {
changeReader, err := ingest.NewLedgerChangeReader(ctx, backend, env.NetworkPassphrase, seq)
if err != nil {
return AllHistoryTransformInput{}, err
}
txReader := changeReader.LedgerTransactionReader

ledgerCloseMeta, err := backend.GetLedger(ctx, seq)
if err != nil {
return AllHistoryTransformInput{}, fmt.Errorf("error getting ledger seq %d from the backend: %v", seq, err)
}

closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime)
if err != nil {
return AllHistoryTransformInput{}, err
}

lhe := txReader.GetHeader()

for limit < 0 {
tx, err := txReader.Read()
if err == io.EOF {
break
}

for index, op := range tx.Envelope.Operations() {
// Operations
opSlice = append(opSlice, OperationTransformInput{
Operation: op,
OperationIndex: int32(index),
Transaction: tx,
LedgerSeqNum: int32(seq),
LedgerCloseMeta: ledgerCloseMeta,
})

// Trades
if operationResultsInTrade(op) && tx.Result.Successful() {
tradeSlice = append(tradeSlice, TradeTransformInput{
OperationIndex: int32(index),
Transaction: tx,
CloseTime: closeTime,
OperationHistoryID: toid.New(int32(seq), int32(tx.Index), int32(index)).ToInt64(),
})
}
}
// Transactions
txSlice = append(txSlice, LedgerTransformInput{
Transaction: tx,
LedgerHistory: lhe,
LedgerCloseMeta: ledgerCloseMeta,
})

}

txReader.Close()
}

allHistoryTransformInput := AllHistoryTransformInput{
Operations: opSlice,
Trades: tradeSlice,
Ledgers: txSlice,
}

return allHistoryTransformInput, nil
}
1 change: 1 addition & 0 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func PrepareCaptiveCore(execPath string, tomlPath string, start, end uint32, env
NetworkPassphrase: env.NetworkPassphrase,
HistoryArchiveURLs: env.ArchiveURLs,
UseDB: false,
UserAgent: "stellar-etl/1.0.0",
},
)
if err != nil {
Expand Down

0 comments on commit 3761e4a

Please sign in to comment.