Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export all history command #220

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions cmd/export_all_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/toid"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

var allHistoryCmd = &cobra.Command{
Use: "export_all_history",
Short: "Exports all stellar network history.",
Long: `Exports historical stellar network data between provided start-ledger/end-ledger to output files.
This is a termporary command used to reduce the amount of requests to history archives
chowbao marked this conversation as resolved.
Show resolved Hide resolved
in order to mitigate egress costs for the entity hosting history archives.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture)

allHistory, err := input.GetAllHistory(startNum, endNum, limit, env)
if err != nil {
cmdLogger.Fatal("could not read all history: ", err)
}

cmdLogger.Info("start doing other exports")
getOperations(allHistory.Operations, extra, gcpCredentials, gcsBucket, path+"exported_operations.txt", env)
getTrades(allHistory.Trades, extra, gcpCredentials, gcsBucket, path+"exported_trades.txt")
getEffects(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_effects.txt", env)
getTransactions(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_transactions.txt")
getDiagnosticEvents(allHistory.Ledgers, extra, gcpCredentials, gcsBucket, path+"exported_diagnostic_events.txt")
cmdLogger.Info("done doing other exports")
},
}

func getOperations(operations []input.OperationTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) {
outFileOperations := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range operations {
transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
txIndex := transformInput.Transaction.Index
cmdLogger.LogError(fmt.Errorf("could not transform operation %d in transaction %d in ledger %d: %v", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum, err))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFileOperations, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export operation: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFileOperations.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get frequent transient errors, more logging that identifies which file/export process failed might be helpful. Since this is a temp fix, i'll leave it up to you whether that's worthwhile to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's probably not that useful. Any issue would more easily be resolved by running the original pipeline instead of this hack


printTransformStats(len(operations), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getTrades(trades []input.TradeTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, tradeInput := range trades {
trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime)
if err != nil {
parsedID := toid.Parse(tradeInput.OperationHistoryID)
cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err))
numFailures += 1
continue
}

for _, transformed := range trades {
numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(err)
numFailures += 1
continue
}
totalNumBytes += numBytes
}
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(trades), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getEffects(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string, env utils.EnvironmentDetails) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range transactions {
LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq)
effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
txIndex := transformInput.Transaction.Index
cmdLogger.Errorf("could not transform transaction %d in ledger %d: %v", txIndex, LedgerSeq, err)
numFailures += 1
continue
}

for _, transformed := range effects {
numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(err)
numFailures += 1
continue
}
totalNumBytes += numBytes
}
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getTransactions(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range transactions {
transformed, err := transform.TransformTransaction(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func getDiagnosticEvents(transactions []input.LedgerTransformInput, extra map[string]string, gcpCredentials string, gcsBucket string, path string) {
outFile := mustOutFile(path)
numFailures := 0
for _, transformInput := range transactions {
transformed, err, ok := transform.TransformDiagnosticEvent(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform diagnostic events in transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

if !ok {
continue
}
for _, diagnosticEvent := range transformed {
_, err := exportEntry(diagnosticEvent, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err))
numFailures += 1
continue
}
}
}

outFile.Close()

printTransformStats(len(transactions), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
}

func init() {
rootCmd.AddCommand(allHistoryCmd)
utils.AddCommonFlags(allHistoryCmd.Flags())
utils.AddArchiveFlags("", allHistoryCmd.Flags())
utils.AddGcsFlags(allHistoryCmd.Flags())
allHistoryCmd.MarkFlagRequired("end-ledger")
}
6 changes: 3 additions & 3 deletions docker/stellar-core.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ VALIDATORS=[

# Stellar.org history store
[HISTORY.sdf1]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}"

[HISTORY.sdf2]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}"

[HISTORY.sdf3]
get="curl -sf http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}"
get="curl -sf -A stellar-etl/1.0.0 http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}"
101 changes: 101 additions & 0 deletions internal/input/all_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package input

import (
"context"
"fmt"
"io"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/stellar-etl/internal/toid"
"github.com/stellar/stellar-etl/internal/utils"
)

// AllHistoryTransformInput is a representation of the input for the TransformOperation function
type AllHistoryTransformInput struct {
Operations []OperationTransformInput
Trades []TradeTransformInput
Ledgers []LedgerTransformInput
}

// GetAllHistory returns a slice of operations, trades, effects, transactions, diagnostic events
// for the ledgers in the provided range (inclusive on both ends)
func GetAllHistory(start, end uint32, limit int64, env utils.EnvironmentDetails) (AllHistoryTransformInput, error) {
ctx := context.Background()

backend, err := env.CreateCaptiveCoreBackend()

if err != nil {
return AllHistoryTransformInput{}, err
}

opSlice := []OperationTransformInput{}
tradeSlice := []TradeTransformInput{}
txSlice := []LedgerTransformInput{}
err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(start, end))
panicIf(err)
for seq := start; seq <= end; seq++ {
changeReader, err := ingest.NewLedgerChangeReader(ctx, backend, env.NetworkPassphrase, seq)
if err != nil {
return AllHistoryTransformInput{}, err
}
txReader := changeReader.LedgerTransactionReader

ledgerCloseMeta, err := backend.GetLedger(ctx, seq)
if err != nil {
return AllHistoryTransformInput{}, fmt.Errorf("error getting ledger seq %d from the backend: %v", seq, err)
}

closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime)
if err != nil {
return AllHistoryTransformInput{}, err
}

lhe := txReader.GetHeader()

for limit < 0 {
tx, err := txReader.Read()
if err == io.EOF {
break
}

for index, op := range tx.Envelope.Operations() {
// Operations
opSlice = append(opSlice, OperationTransformInput{
Operation: op,
OperationIndex: int32(index),
Transaction: tx,
LedgerSeqNum: int32(seq),
LedgerCloseMeta: ledgerCloseMeta,
})

// Trades
if operationResultsInTrade(op) && tx.Result.Successful() {
tradeSlice = append(tradeSlice, TradeTransformInput{
OperationIndex: int32(index),
Transaction: tx,
CloseTime: closeTime,
OperationHistoryID: toid.New(int32(seq), int32(tx.Index), int32(index)).ToInt64(),
})
}
}
// Transactions
txSlice = append(txSlice, LedgerTransformInput{
Transaction: tx,
LedgerHistory: lhe,
LedgerCloseMeta: ledgerCloseMeta,
})

}

txReader.Close()
}

allHistoryTransformInput := AllHistoryTransformInput{
Operations: opSlice,
Trades: tradeSlice,
Ledgers: txSlice,
}

return allHistoryTransformInput, nil
}
1 change: 1 addition & 0 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func PrepareCaptiveCore(execPath string, tomlPath string, start, end uint32, env
NetworkPassphrase: env.NetworkPassphrase,
HistoryArchiveURLs: env.ArchiveURLs,
UseDB: false,
UserAgent: "stellar-etl/1.0.0",
},
)
if err != nil {
Expand Down
Loading