Skip to content

Commit

Permalink
Add parquet file writing support (#269)
Browse files Browse the repository at this point in the history
* Add parquet file writing support
  • Loading branch information
chowbao authored Jul 29, 2024
1 parent 37b7123 commit e8aaa9c
Show file tree
Hide file tree
Showing 15 changed files with 1,630 additions and 12 deletions.
42 changes: 42 additions & 0 deletions cmd/command_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"fmt"
"os"
"path/filepath"

"github.com/stellar/stellar-etl/internal/transform"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)

type CloudStorage interface {
Expand Down Expand Up @@ -102,6 +106,10 @@ func exportFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType)
}

func exportParquetFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.parquet", start, end-1, dataType)
}

func deleteLocalFiles(path string) {
err := os.RemoveAll(path)
if err != nil {
Expand Down Expand Up @@ -135,3 +143,37 @@ func maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path strin
cmdLogger.Error("Unknown cloud provider")
}
}

// writeParquet creates the parquet file and writes the exported data into it.
//
// Parameters:
//
// data []transform.SchemaParquet - The slice of data to be written to the Parquet file.
// SchemaParquet is an interface used to call ToParquet()
// which is defined for each schema/export.
// path string - The file path where the Parquet file will be created and written.
// For example, "some/file/path/export_output.parquet"
// schema interface{} - The schema that defines the structure of the Parquet file.
//
// Errors:
//
// stellar-etl will log a Fatal error and stop in the case it cannot create or write to the parquet file
func writeParquet(data []transform.SchemaParquet, path string, schema interface{}) {
parquetFile, err := local.NewLocalFileWriter(path)
if err != nil {
cmdLogger.Fatal("could not create parquet file: ", err)
}
defer parquetFile.Close()

writer, err := writer.NewParquetWriter(parquetFile, schema, 1)
if err != nil {
cmdLogger.Fatal("could not create parquet file writer: ", err)
}
defer writer.WriteStop()

for _, record := range data {
if err := writer.Write(record.ToParquet()); err != nil {
cmdLogger.Fatal("could not write record to parquet file: ", err)
}
}
}
12 changes: 11 additions & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var assetsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -40,6 +40,7 @@ var assetsCmd = &cobra.Command{
seenIDs := map[int64]bool{}
numFailures := 0
totalNumBytes := 0
var transformedAssets []transform.SchemaParquet
for _, transformInput := range paymentOps {
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta)
if err != nil {
Expand All @@ -62,6 +63,10 @@ var assetsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

if commonArgs.WriteParquet {
transformedAssets = append(transformedAssets, transformed)
}
}

outFile.Close()
Expand All @@ -70,6 +75,11 @@ var assetsCmd = &cobra.Command{
printTransformStats(len(paymentOps), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
},
}

Expand Down
12 changes: 12 additions & 0 deletions cmd/export_contract_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var contractEventsCmd = &cobra.Command{

outFile := mustOutFile(cmdArgs.Path)
numFailures := 0
var transformedEvents []transform.SchemaParquet
for _, transformInput := range transactions {
transformed, err := transform.TransformContractEvent(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
Expand All @@ -45,14 +46,25 @@ var contractEventsCmd = &cobra.Command{
numFailures += 1
continue
}

if commonArgs.WriteParquet {
transformedEvents = append(transformedEvents, contractEvent)
}
}

}

outFile.Close()

printTransformStats(len(transactions), numFailures)

maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.Path)

if commonArgs.WriteParquet {
writeParquet(transformedEvents, cmdArgs.ParquetPath, new(transform.ContractEventOutputParquet))
maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.ParquetPath)
}

},
}

Expand Down
12 changes: 11 additions & 1 deletion cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var effectsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -28,6 +28,7 @@ var effectsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedEffects []transform.SchemaParquet
for _, transformInput := range transactions {
LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq)
effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
Expand All @@ -46,6 +47,10 @@ var effectsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

if commonArgs.WriteParquet {
transformedEffects = append(transformedEffects, transformed)
}
}
}

Expand All @@ -55,6 +60,11 @@ var effectsCmd = &cobra.Command{
printTransformStats(len(transactions), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedEffects, parquetPath, new(transform.EffectOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
},
}

Expand Down
69 changes: 67 additions & 2 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ be exported.`,
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs)

_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
_, configPath, startNum, batchSize, outputFolder, parquetOutputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

Expand All @@ -43,6 +43,11 @@ be exported.`,
cmdLogger.Fatalf("unable to mkdir %s: %v", outputFolder, err)
}

err = os.MkdirAll(parquetOutputFolder, os.ModePerm)
if err != nil {
cmdLogger.Fatalf("unable to mkdir %s: %v", parquetOutputFolder, err)
}

if batchSize <= 0 {
cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize)
}
Expand Down Expand Up @@ -256,11 +261,13 @@ be exported.`,
batch.BatchStart,
batch.BatchEnd,
outputFolder,
parquetOutputFolder,
transformedOutputs,
cloudCredentials,
cloudStorageBucket,
cloudProvider,
commonArgs.Extra,
commonArgs.WriteParquet,
)
if err != nil {
cmdLogger.LogError(err)
Expand All @@ -274,23 +281,81 @@ be exported.`,
func exportTransformedData(
start, end uint32,
folderPath string,
parquetFolderPath string,
transformedOutput map[string][]interface{},
cloudCredentials, cloudStorageBucket, cloudProvider string,
extra map[string]string) error {
extra map[string]string,
WriteParquet bool) error {

for resource, output := range transformedOutput {
// Filenames are typically exclusive of end point. This processor
// is different and we have to increment by 1 since the end batch number
// is included in this filename.
path := filepath.Join(folderPath, exportFilename(start, end+1, resource))
parquetPath := filepath.Join(parquetFolderPath, exportParquetFilename(start, end+1, resource))
outFile := mustOutFile(path)
var transformedResource []transform.SchemaParquet
var parquetSchema interface{}
var skip bool
for _, o := range output {
_, err := exportEntry(o, outFile, extra)
if err != nil {
return err
}

if WriteParquet {
switch v := o.(type) {
case transform.AccountOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountOutputParquet)
skip = false
case transform.AccountSignerOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skipping ClaimableBalanceOutputParquet because it is not needed in the current scope of work
// Note that ClaimableBalanceOutputParquet uses nested structs that will need to be handled
// for parquet conversio
skip = true
case transform.ConfigSettingOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ConfigSettingOutputParquet)
skip = false
case transform.ContractCodeOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractCodeOutputParquet)
skip = false
case transform.ContractDataOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractDataOutputParquet)
skip = false
case transform.PoolOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.PoolOutputParquet)
skip = false
case transform.OfferOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.OfferOutputParquet)
skip = false
case transform.TrustlineOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TrustlineOutputParquet)
skip = false
case transform.TtlOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TtlOutputParquet)
skip = false
}
}
}

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if !skip && WriteParquet {
writeParquet(transformedResource, parquetPath, parquetSchema)
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgerTransactionCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, _, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand Down
12 changes: 11 additions & 1 deletion cmd/export_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgersCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -38,6 +38,7 @@ var ledgersCmd = &cobra.Command{

numFailures := 0
totalNumBytes := 0
var transformedLedgers []transform.SchemaParquet
for i, ledger := range ledgers {
transformed, err := transform.TransformLedger(ledger.Ledger, ledger.LCM)
if err != nil {
Expand All @@ -53,6 +54,10 @@ var ledgersCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

if commonArgs.WriteParquet {
transformedLedgers = append(transformedLedgers, transformed)
}
}

outFile.Close()
Expand All @@ -61,6 +66,11 @@ var ledgersCmd = &cobra.Command{
printTransformStats(len(ledgers), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedLedgers, parquetPath, new(transform.LedgerOutputParquet))
}
},
}

Expand Down
12 changes: 11 additions & 1 deletion cmd/export_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var operationsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -30,6 +30,7 @@ var operationsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedOps []transform.SchemaParquet
for _, transformInput := range operations {
transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
Expand All @@ -46,6 +47,10 @@ var operationsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

if commonArgs.WriteParquet {
transformedOps = append(transformedOps, transformed)
}
}

outFile.Close()
Expand All @@ -54,6 +59,11 @@ var operationsCmd = &cobra.Command{
printTransformStats(len(operations), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedOps, parquetPath, new(transform.OperationOutputParquet))
}
},
}

Expand Down
Loading

0 comments on commit e8aaa9c

Please sign in to comment.