Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet file writing support #269

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cmd/command_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"fmt"
"os"
"path/filepath"

"github.com/stellar/stellar-etl/internal/transform"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)

type CloudStorage interface {
Expand Down Expand Up @@ -102,6 +106,10 @@ func exportFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType)
}

func exportParquetFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.parquet", start, end-1, dataType)
}

func deleteLocalFiles(path string) {
err := os.RemoveAll(path)
if err != nil {
Expand Down Expand Up @@ -135,3 +143,37 @@ func maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path strin
cmdLogger.Error("Unknown cloud provider")
}
}

// writeParquet creates the parquet file and writes the exported data into it.
//
// Parameters:
//
// data []transform.SchemaParquet - The slice of data to be written to the Parquet file.
// SchemaParquet is an interface used to call ToParquet()
// which is defined for each schema/export.
// path string - The file path where the Parquet file will be created and written.
// For example, "some/file/path/export_output.parquet"
// schema interface{} - The schema that defines the structure of the Parquet file.
//
// Errors:
//
// stellar-etl will log a Fatal error and stop in the case it cannot create or write to the parquet file
func writeParquet(data []transform.SchemaParquet, path string, schema interface{}) {
parquetFile, err := local.NewLocalFileWriter(path)
if err != nil {
cmdLogger.Fatal("could not create parquet file: ", err)
}
defer parquetFile.Close()

writer, err := writer.NewParquetWriter(parquetFile, schema, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 1 for? Looks like from docs it's a parallel writer? Would we ever want to multithread this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 1 for? Looks like from docs it's a parallel writer?

Yeah it is for the parallel writer

Would we ever want to multithread this?
I don't think so. I played around with it and it doesn't increase/decrease the write time by any notable margin. The records that we are writing are too small to actually benefit form multithreading

if err != nil {
cmdLogger.Fatal("could not create parquet file writer: ", err)
}
defer writer.WriteStop()

for _, record := range data {
if err := writer.Write(record.ToParquet()); err != nil {
cmdLogger.Fatal("could not write record to parquet file: ", err)
}
}
}
Comment on lines +174 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A batch conversion would have been so noice 😿

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would have been but I didn't see an example of it. Only writing sequentially from this example
https://github.com/xitongsys/parquet-go/blob/master/example/writer.go#L42-L53

Although I didn't spend any time digging through the code to see if it was easy to implement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I took a look as well. Could not find anything.

10 changes: 9 additions & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var assetsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -40,6 +40,7 @@ var assetsCmd = &cobra.Command{
seenIDs := map[int64]bool{}
numFailures := 0
totalNumBytes := 0
var transformedAssets []transform.SchemaParquet
for _, transformInput := range paymentOps {
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta)
if err != nil {
Expand All @@ -62,6 +63,8 @@ var assetsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedAssets = append(transformedAssets, transformed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we're creating a structure intended to be written to a parquet file even if the user does not pass the write parquet flag?

Do you know if this impacts runtime performance at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we are.

It does not impact performance by a noticeable amount but I will add the feature flag to skip this for clarity

}

outFile.Close()
Expand All @@ -70,6 +73,11 @@ var assetsCmd = &cobra.Command{
printTransformStats(len(paymentOps), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the just path path for json files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry can you clarify? I think you might be missing a word in your question

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So parquetPath is the file path where the parquet files will be written. Does the existing path directory contain json files? Or, what's actually contained in path directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this earlier. parquetPath is the fully formed filename so it'll look like bucket/scheduled_run_<date>/exported_<data type like assets>.parquet

So this directory path should only contain .parquet files for the airflow scheduled run/batch_id.

}
},
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/export_contract_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var contractEventsCmd = &cobra.Command{

outFile := mustOutFile(cmdArgs.Path)
numFailures := 0
var transformedEvents []transform.SchemaParquet
for _, transformInput := range transactions {
transformed, err := transform.TransformContractEvent(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
Expand All @@ -45,14 +46,23 @@ var contractEventsCmd = &cobra.Command{
numFailures += 1
continue
}

transformedEvents = append(transformedEvents, contractEvent)
}

}

outFile.Close()

printTransformStats(len(transactions), numFailures)

maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.Path)

if commonArgs.WriteParquet {
writeParquet(transformedEvents, cmdArgs.ParquetPath, new(transform.ContractEventOutputParquet))
maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.ParquetPath)
}

},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var effectsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -28,6 +28,7 @@ var effectsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedEffects []transform.SchemaParquet
for _, transformInput := range transactions {
LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq)
effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
Expand All @@ -46,6 +47,8 @@ var effectsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedEffects = append(transformedEffects, transformed)
}
}

Expand All @@ -55,6 +58,11 @@ var effectsCmd = &cobra.Command{
printTransformStats(len(transactions), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedEffects, parquetPath, new(transform.EffectOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
},
}

Expand Down
64 changes: 62 additions & 2 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ be exported.`,
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs)

_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
_, configPath, startNum, batchSize, outputFolder, parquetOutputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

Expand All @@ -43,6 +43,11 @@ be exported.`,
cmdLogger.Fatalf("unable to mkdir %s: %v", outputFolder, err)
}

err = os.MkdirAll(parquetOutputFolder, os.ModePerm)
if err != nil {
cmdLogger.Fatalf("unable to mkdir %s: %v", parquetOutputFolder, err)
}

if batchSize <= 0 {
cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize)
}
Expand Down Expand Up @@ -256,11 +261,13 @@ be exported.`,
batch.BatchStart,
batch.BatchEnd,
outputFolder,
parquetOutputFolder,
transformedOutputs,
cloudCredentials,
cloudStorageBucket,
cloudProvider,
commonArgs.Extra,
commonArgs.WriteParquet,
)
if err != nil {
cmdLogger.LogError(err)
Expand All @@ -274,23 +281,76 @@ be exported.`,
func exportTransformedData(
start, end uint32,
folderPath string,
parquetFolderPath string,
transformedOutput map[string][]interface{},
cloudCredentials, cloudStorageBucket, cloudProvider string,
extra map[string]string) error {
extra map[string]string,
WriteParquet bool) error {

for resource, output := range transformedOutput {
// Filenames are typically exclusive of end point. This processor
// is different and we have to increment by 1 since the end batch number
// is included in this filename.
path := filepath.Join(folderPath, exportFilename(start, end+1, resource))
parquetPath := filepath.Join(parquetFolderPath, exportParquetFilename(start, end+1, resource))
outFile := mustOutFile(path)
var transformedResource []transform.SchemaParquet
var parquetSchema interface{}
var skip bool
for _, o := range output {
_, err := exportEntry(o, outFile, extra)
if err != nil {
return err
}
switch v := o.(type) {
case transform.AccountOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountOutputParquet)
skip = false
case transform.AccountSignerOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this get skipped?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm it was out of original scope for Dune supported tables. I'm fine keeping it out of scope for MVP, but think we should add holistic support at some point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I agree.

I left it out because of the way ClaimableBalanceOutput struct is defined. It creates multiple nested structs (like for claimants) which didn't nicely stringify

skip = true
case transform.ConfigSettingOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ConfigSettingOutputParquet)
skip = false
case transform.ContractCodeOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractCodeOutputParquet)
skip = false
case transform.ContractDataOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractDataOutputParquet)
skip = false
case transform.PoolOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.PoolOutputParquet)
skip = false
case transform.OfferOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.OfferOutputParquet)
skip = false
case transform.TrustlineOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TrustlineOutputParquet)
skip = false
case transform.TtlOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TtlOutputParquet)
skip = false
}
}

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if !skip && WriteParquet {
writeParquet(transformedResource, parquetPath, parquetSchema)
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgerTransactionCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, _, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgersCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -38,6 +38,7 @@ var ledgersCmd = &cobra.Command{

numFailures := 0
totalNumBytes := 0
var transformedLedgers []transform.SchemaParquet
for i, ledger := range ledgers {
transformed, err := transform.TransformLedger(ledger.Ledger, ledger.LCM)
if err != nil {
Expand All @@ -53,6 +54,8 @@ var ledgersCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedLedgers = append(transformedLedgers, transformed)
}

outFile.Close()
Expand All @@ -61,6 +64,11 @@ var ledgersCmd = &cobra.Command{
printTransformStats(len(ledgers), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedLedgers, parquetPath, new(transform.LedgerOutputParquet))
}
},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var operationsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -30,6 +30,7 @@ var operationsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedOps []transform.SchemaParquet
for _, transformInput := range operations {
transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
Expand All @@ -46,6 +47,8 @@ var operationsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedOps = append(transformedOps, transformed)
}

outFile.Close()
Expand All @@ -54,6 +57,11 @@ var operationsCmd = &cobra.Command{
printTransformStats(len(operations), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedOps, parquetPath, new(transform.OperationOutputParquet))
}
},
}

Expand Down
Loading
Loading