Skip to content

Commit

Permalink
Merge pull request #198 from stellar/export-tx
Browse files Browse the repository at this point in the history
create export-tx
  • Loading branch information
cayod authored Oct 16, 2023
2 parents e9c803c + 11e148d commit eba446f
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 0 deletions.
83 changes: 83 additions & 0 deletions cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

var ledgerTransactionCmd = &cobra.Command{
Use: "export_ledger_transaction",
Short: "Exports the ledger_transaction transaction data over a specified range.",
Long: `Exports the ledger_transaction transaction data over a specified range to an output file.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture)

ledgerTransaction, err := input.GetTransactions(startNum, endNum, limit, env)
if err != nil {
cmdLogger.Fatal("could not read ledger_transaction: ", err)
}

outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range ledgerTransaction {
transformed, err := transform.TransformLedgerTransaction(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform ledger_transaction transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(ledgerTransaction), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
},
}

func init() {
rootCmd.AddCommand(ledgerTransactionCmd)
utils.AddCommonFlags(ledgerTransactionCmd.Flags())
utils.AddArchiveFlags("ledger_transaction", ledgerTransactionCmd.Flags())
utils.AddGcsFlags(ledgerTransactionCmd.Flags())
ledgerTransactionCmd.MarkFlagRequired("end-ledger")

/*
Current flags:
start-ledger: the ledger sequence number for the beginning of the export period
end-ledger: the ledger sequence number for the end of the export range (*required)
limit: maximum number of ledger_transaction to export
TODO: measure a good default value that ensures all ledger_transaction within a 5 minute period will be exported with a single call
The current max_ledger_transaction_set_size is 1000 and there are 60 new ledgers in a 5 minute period:
1000*60 = 60000
output-file: filename of the output file
TODO: implement extra flags if possible
serialize-method: the method for serialization of the output data (JSON, XDR, etc)
start and end time as a replacement for start and end sequence numbers
*/
}
58 changes: 58 additions & 0 deletions internal/transform/ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transform

import (
"fmt"

"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// TransformTransaction converts a transaction from the history archive ingestion system into a form suitable for BigQuery
func TransformLedgerTransaction(transaction ingest.LedgerTransaction, lhe xdr.LedgerHeaderHistoryEntry) (LedgerTransactionOutput, error) {
ledgerHeader := lhe.Header
outputLedgerSequence := uint32(ledgerHeader.LedgerSeq)

outputTxEnvelope, err := xdr.MarshalBase64(transaction.Envelope)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxResult, err := xdr.MarshalBase64(&transaction.Result)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxMeta, err := xdr.MarshalBase64(transaction.UnsafeMeta)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxFeeMeta, err := xdr.MarshalBase64(transaction.FeeChanges)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxLedgerHistory, err := xdr.MarshalBase64(lhe)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputCloseTime, err := utils.TimePointToUTCTimeStamp(ledgerHeader.ScpValue.CloseTime)
if err != nil {
return LedgerTransactionOutput{}, fmt.Errorf("could not convert close time to UTC timestamp: %v", err)
}

transformedLedgerTransaction := LedgerTransactionOutput{
LedgerSequence: outputLedgerSequence,
TxEnvelope: outputTxEnvelope,
TxResult: outputTxResult,
TxMeta: outputTxMeta,
TxFeeMeta: outputTxFeeMeta,
TxLedgerHistory: outputTxLedgerHistory,
ClosedAt: outputCloseTime,
}

return transformedLedgerTransaction, nil
}
Loading

0 comments on commit eba446f

Please sign in to comment.