-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add parquet file writing support #269
Changes from 4 commits
42e11ce
e803b82
8a02cdb
4d68918
29210be
a72c743
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,10 @@ import ( | |
"fmt" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/stellar/stellar-etl/internal/transform" | ||
"github.com/xitongsys/parquet-go-source/local" | ||
"github.com/xitongsys/parquet-go/writer" | ||
) | ||
|
||
type CloudStorage interface { | ||
|
@@ -102,6 +106,10 @@ func exportFilename(start, end uint32, dataType string) string { | |
return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType) | ||
} | ||
|
||
func exportParquetFilename(start, end uint32, dataType string) string { | ||
return fmt.Sprintf("%d-%d-%s.parquet", start, end-1, dataType) | ||
} | ||
|
||
func deleteLocalFiles(path string) { | ||
err := os.RemoveAll(path) | ||
if err != nil { | ||
|
@@ -135,3 +143,37 @@ func maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path strin | |
cmdLogger.Error("Unknown cloud provider") | ||
} | ||
} | ||
|
||
// writeParquet creates the parquet file and writes the exported data into it. | ||
// | ||
// Parameters: | ||
// | ||
// data []transform.SchemaParquet - The slice of data to be written to the Parquet file. | ||
// SchemaParquet is an interface used to call ToParquet() | ||
// which is defined for each schema/export. | ||
// path string - The file path where the Parquet file will be created and written. | ||
// For example, "some/file/path/export_output.parquet" | ||
// schema interface{} - The schema that defines the structure of the Parquet file. | ||
// | ||
// Errors: | ||
// | ||
// stellar-etl will log a Fatal error and stop in the case it cannot create or write to the parquet file | ||
func writeParquet(data []transform.SchemaParquet, path string, schema interface{}) { | ||
parquetFile, err := local.NewLocalFileWriter(path) | ||
if err != nil { | ||
cmdLogger.Fatal("could not create parquet file: ", err) | ||
} | ||
defer parquetFile.Close() | ||
|
||
writer, err := writer.NewParquetWriter(parquetFile, schema, 1) | ||
if err != nil { | ||
cmdLogger.Fatal("could not create parquet file writer: ", err) | ||
} | ||
defer writer.WriteStop() | ||
|
||
for _, record := range data { | ||
if err := writer.Write(record.ToParquet()); err != nil { | ||
cmdLogger.Fatal("could not write record to parquet file: ", err) | ||
} | ||
} | ||
} | ||
Comment on lines
+174
to
+179
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A batch conversion would have been so noice 😿 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it would have been but I didn't see an example of it. Only writing sequentially from this example Although I didn't spend any time digging through the code to see if it was easy to implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. I took a look as well. Could not find anything. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ var assetsCmd = &cobra.Command{ | |
cmdLogger.SetLevel(logrus.InfoLevel) | ||
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger) | ||
cmdLogger.StrictExport = commonArgs.StrictExport | ||
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) | ||
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger) | ||
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) | ||
env := utils.GetEnvironmentDetails(commonArgs) | ||
|
||
|
@@ -40,6 +40,7 @@ var assetsCmd = &cobra.Command{ | |
seenIDs := map[int64]bool{} | ||
numFailures := 0 | ||
totalNumBytes := 0 | ||
var transformedAssets []transform.SchemaParquet | ||
for _, transformInput := range paymentOps { | ||
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta) | ||
if err != nil { | ||
|
@@ -62,6 +63,8 @@ var assetsCmd = &cobra.Command{ | |
continue | ||
} | ||
totalNumBytes += numBytes | ||
|
||
transformedAssets = append(transformedAssets, transformed) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that we're creating a structure intended to be written to a parquet file even if the user does not pass the write parquet flag? Do you know if this impacts runtime performance at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we are. It does not impact performance by a noticeable amount but I will add the feature flag to skip this for clarity |
||
} | ||
|
||
outFile.Close() | ||
|
@@ -70,6 +73,11 @@ var assetsCmd = &cobra.Command{ | |
printTransformStats(len(paymentOps), numFailures) | ||
|
||
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) | ||
|
||
if commonArgs.WriteParquet { | ||
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet)) | ||
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry can you clarify? I think you might be missing a word in your question There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I missed this earlier. So this directory path should only contain .parquet files for the airflow scheduled run/batch_id. |
||
} | ||
}, | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,7 @@ be exported.`, | |
cmdLogger.StrictExport = commonArgs.StrictExport | ||
env := utils.GetEnvironmentDetails(commonArgs) | ||
|
||
_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) | ||
_, configPath, startNum, batchSize, outputFolder, parquetOutputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) | ||
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) | ||
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger) | ||
|
||
|
@@ -43,6 +43,11 @@ be exported.`, | |
cmdLogger.Fatalf("unable to mkdir %s: %v", outputFolder, err) | ||
} | ||
|
||
err = os.MkdirAll(parquetOutputFolder, os.ModePerm) | ||
if err != nil { | ||
cmdLogger.Fatalf("unable to mkdir %s: %v", parquetOutputFolder, err) | ||
} | ||
|
||
if batchSize <= 0 { | ||
cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize) | ||
} | ||
|
@@ -256,11 +261,13 @@ be exported.`, | |
batch.BatchStart, | ||
batch.BatchEnd, | ||
outputFolder, | ||
parquetOutputFolder, | ||
transformedOutputs, | ||
cloudCredentials, | ||
cloudStorageBucket, | ||
cloudProvider, | ||
commonArgs.Extra, | ||
commonArgs.WriteParquet, | ||
) | ||
if err != nil { | ||
cmdLogger.LogError(err) | ||
|
@@ -274,23 +281,76 @@ be exported.`, | |
func exportTransformedData( | ||
start, end uint32, | ||
folderPath string, | ||
parquetFolderPath string, | ||
transformedOutput map[string][]interface{}, | ||
cloudCredentials, cloudStorageBucket, cloudProvider string, | ||
extra map[string]string) error { | ||
extra map[string]string, | ||
WriteParquet bool) error { | ||
|
||
for resource, output := range transformedOutput { | ||
// Filenames are typically exclusive of end point. This processor | ||
// is different and we have to increment by 1 since the end batch number | ||
// is included in this filename. | ||
path := filepath.Join(folderPath, exportFilename(start, end+1, resource)) | ||
parquetPath := filepath.Join(parquetFolderPath, exportParquetFilename(start, end+1, resource)) | ||
outFile := mustOutFile(path) | ||
var transformedResource []transform.SchemaParquet | ||
var parquetSchema interface{} | ||
var skip bool | ||
for _, o := range output { | ||
_, err := exportEntry(o, outFile, extra) | ||
if err != nil { | ||
return err | ||
} | ||
switch v := o.(type) { | ||
case transform.AccountOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.AccountOutputParquet) | ||
skip = false | ||
case transform.AccountSignerOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.AccountSignerOutputParquet) | ||
skip = false | ||
case transform.ClaimableBalanceOutput: | ||
// Skip | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this get skipped? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm it was out of original scope for Dune supported tables. I'm fine keeping it out of scope for MVP, but think we should add holistic support at some point There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup I agree. I left it out because of the way |
||
skip = true | ||
case transform.ConfigSettingOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.ConfigSettingOutputParquet) | ||
skip = false | ||
case transform.ContractCodeOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.ContractCodeOutputParquet) | ||
skip = false | ||
case transform.ContractDataOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.ContractDataOutputParquet) | ||
skip = false | ||
case transform.PoolOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.PoolOutputParquet) | ||
skip = false | ||
case transform.OfferOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.OfferOutputParquet) | ||
skip = false | ||
case transform.TrustlineOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.TrustlineOutputParquet) | ||
skip = false | ||
case transform.TtlOutput: | ||
transformedResource = append(transformedResource, v) | ||
parquetSchema = new(transform.TtlOutputParquet) | ||
skip = false | ||
} | ||
} | ||
|
||
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path) | ||
|
||
if !skip && WriteParquet { | ||
writeParquet(transformedResource, parquetPath, parquetSchema) | ||
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath) | ||
} | ||
} | ||
|
||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the
1
for? Looks like from docs it's a parallel writer? Would we ever want to multithread this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is for the parallel writer