-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add parquet file writing support #269
Conversation
} | ||
|
||
// AddArchiveFlags adds the history archive specific flags: output, and limit | ||
// TODO: https://stellarorg.atlassian.net/browse/HUBBLE-386 Rename AddArchiveFlags to something more relevant | ||
func AddArchiveFlags(objectName string, flags *pflag.FlagSet) { | ||
flags.Uint32P("start-ledger", "s", 2, "The ledger sequence number for the beginning of the export period. Defaults to genesis ledger") | ||
flags.StringP("output", "o", "exported_"+objectName+".txt", "Filename of the output file") | ||
flags.String("parquet-output", "exported_"+objectName+".parquet", "Filename of the parquet output file") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note parquet-output needs to be defined twice because the refactor work from https://stellarorg.atlassian.net/browse/HUBBLE-386 has not been completed
for _, record := range data { | ||
if err := writer.Write(record.ToParquet()); err != nil { | ||
cmdLogger.Fatal("could not write record to parquet file: ", err) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A batch conversion would have been so noice 😿
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it would have been but I didn't see an example of it. Only writing sequentially from this example
https://github.com/xitongsys/parquet-go/blob/master/example/writer.go#L42-L53
Although I didn't spend any time digging through the code to see if it was easy to implement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I took a look as well. Could not find anything.
|
||
if commonArgs.WriteParquet { | ||
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet)) | ||
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the just path
path for json files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry can you clarify? I think you might be missing a word in your question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So parquetPath
is the file path where the parquet files will be written. Does the existing path
directory contain json files? Or, what's actually contained in path
directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this earlier. parquetPath
is the fully formed filename so it'll look like bucket/scheduled_run_<date>/exported_<data type like assets>.parquet
So this directory path should only contain .parquet files for the airflow scheduled run/batch_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looking good. Would love to see test results
What kind of tests did do you have in mind? The parquet file itself is basically just binary. I can write unit tests for the This is the output from reading the parquet file in with DuckDB if that's what you were interested in
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that was unexpected for me is that the ETL will output both a JSON formatted file in addition to a parquet file. I assumed that the write interface would let you select an output format and emit only the specified format.
Was this UX intended? Or do you have additional thoughts on redesign that should be added in the scope of Hubble-386
} | ||
defer parquetFile.Close() | ||
|
||
writer, err := writer.NewParquetWriter(parquetFile, schema, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the 1
for? Looks like from docs it's a parallel writer? Would we ever want to multithread this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the 1 for? Looks like from docs it's a parallel writer?
Yeah it is for the parallel writer
Would we ever want to multithread this?
I don't think so. I played around with it and it doesn't increase/decrease the write time by any notable margin. The records that we are writing are too small to actually benefit form multithreading
cmd/export_assets.go
Outdated
@@ -62,6 +63,8 @@ var assetsCmd = &cobra.Command{ | |||
continue | |||
} | |||
totalNumBytes += numBytes | |||
|
|||
transformedAssets = append(transformedAssets, transformed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we're creating a structure intended to be written to a parquet file even if the user does not pass the write parquet flag?
Do you know if this impacts runtime performance at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we are.
It does not impact performance by a noticeable amount but I will add the feature flag to skip this for clarity
cmd/export_ledger_entry_changes.go
Outdated
parquetSchema = new(transform.AccountSignerOutputParquet) | ||
skip = false | ||
case transform.ClaimableBalanceOutput: | ||
// Skip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this get skipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm it was out of original scope for Dune supported tables. I'm fine keeping it out of scope for MVP, but think we should add holistic support at some point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup I agree.
I left it out because of the way ClaimableBalanceOutput
struct is defined. It creates multiple nested structs (like for claimants
) which didn't nicely stringify
return string(jsonData) | ||
} | ||
|
||
func (lo LedgerOutput) ToParquet() interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would add a comment about converting uint32 to int64. That wasn't obvious to me at first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what was the decision between converting to int64
versus keeping as int32
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup will add a comment.
I needed to convert uint32 -> int64 otherwise uint32 could lose data uint32 max=4,294,967,295
vs int32 max=2,147,483,647
(single bit needed for +- in signed ints)
Note I did convert uint64 to int64 and if that becomes a problem we would need to look for a resolution (larger data type, custom bigint as a string)
internal/transform/schema_parquet.go
Outdated
LedgerSequence int64 `parquet:"name=ledger_sequence, type=INT64, convertedtype=INT64, convertedtype=UINT_64"` | ||
} | ||
|
||
//// Skipping ClaimableBalanceOutputParquet because it is not needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will admit I only glanced at the schema briefly and am reliant on tests to confirm the output is correct. What do you think is the easiest way for us to validate that the parquet file schema matches our expectations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well we currently don't really validate/check any of our schemas right now. IMO this check should be rolled up to the integration tests that we will write in the future.
From my point of view a unit test on the parquet schema would be to the effect of does int64 = int64
for each column. Whereas in an integration test with a golden file and golden output would be much more user friendly from the standpoint that you see the fully outputted schema instead of clumped up in unit tests
Yes, I was interested in seeing test results from airflow run. True, unit tests for the ToParquet may not be needed.
Perfect, that is what I was looking for 👌 |
Yeah that was intentional. I felt that the refactor of the parameters was out of scope for now and opted for a feature flag (write-parquet) instead of specifying the output format. The main problem is right now there is no real "write interface" and to add the output file format option cleanly we'd need to add a write interface Yeah we can add it to the scope of the JIRA ticket. |
Makes sense. I'm fine with the feature flag for now.
|
PR Checklist
PR Structure
otherwise).
Thoroughness
Release planning
semver, and I've changed the name of the BRANCH to release/_ , feature/_ or patch/* .
What
Adding parquet file writing support to stellar-etl. This is an addition to the end of current export_* commands.
Encodings for parquet files in
schema_parquet.go
come from xitongsys/parquet-go.Why
Originally requested from 3rd party analytics platforms. Parquet is a useful standardized data format that can be used directly for querying like with DuckDB or as a generic data lake layer that can be built upon
Known limitations