Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet file writing support #269

Merged
merged 6 commits into from
Jul 29, 2024
Merged

Conversation

chowbao
Copy link
Contributor

@chowbao chowbao commented Jul 24, 2024

PR Checklist

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with the jira ticket associated with the PR.

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated the README with the added features, breaking changes, new instructions on how to use the repository. I updated the description of the fuction with the changes that were made.

Release planning

  • I've decided if this PR requires a new major/minor/patch version accordingly to
    semver, and I've changed the name of the BRANCH to release/_ , feature/_ or patch/* .

What

Adding parquet file writing support to stellar-etl. This is an addition to the end of current export_* commands.

Encodings for parquet files in schema_parquet.go come from xitongsys/parquet-go.

Why

Originally requested from 3rd party analytics platforms. Parquet is a useful standardized data format that can be used directly for querying like with DuckDB or as a generic data lake layer that can be built upon

Known limitations

  • The parquet reader required/was limited to only certain data types/conversions. This PR opts to duplicate the schemas/structs into a separate file.
  • TODO: test with airflow changes

@chowbao chowbao requested a review from a team as a code owner July 24, 2024 15:08
}

// AddArchiveFlags adds the history archive specific flags: output, and limit
// TODO: https://stellarorg.atlassian.net/browse/HUBBLE-386 Rename AddArchiveFlags to something more relevant
func AddArchiveFlags(objectName string, flags *pflag.FlagSet) {
flags.Uint32P("start-ledger", "s", 2, "The ledger sequence number for the beginning of the export period. Defaults to genesis ledger")
flags.StringP("output", "o", "exported_"+objectName+".txt", "Filename of the output file")
flags.String("parquet-output", "exported_"+objectName+".parquet", "Filename of the parquet output file")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note parquet-output needs to be defined twice because the refactor work from https://stellarorg.atlassian.net/browse/HUBBLE-386 has not been completed

Comment on lines +174 to +179
for _, record := range data {
if err := writer.Write(record.ToParquet()); err != nil {
cmdLogger.Fatal("could not write record to parquet file: ", err)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A batch conversion would have been so noice 😿

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would have been but I didn't see an example of it. Only writing sequentially from this example
https://github.com/xitongsys/parquet-go/blob/master/example/writer.go#L42-L53

Although I didn't spend any time digging through the code to see if it was easy to implement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I took a look as well. Could not find anything.


if commonArgs.WriteParquet {
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the just path path for json files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry can you clarify? I think you might be missing a word in your question

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So parquetPath is the file path where the parquet files will be written. Does the existing path directory contain json files? Or, what's actually contained in path directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this earlier. parquetPath is the fully formed filename so it'll look like bucket/scheduled_run_<date>/exported_<data type like assets>.parquet

So this directory path should only contain .parquet files for the airflow scheduled run/batch_id.

Copy link
Contributor

@amishas157 amishas157 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looking good. Would love to see test results

@chowbao
Copy link
Contributor Author

chowbao commented Jul 24, 2024

Generally looking good. Would love to see test results

What kind of tests did do you have in mind?

The parquet file itself is basically just binary. I can write unit tests for the ToParquet() functions but I'm not sure if that is helpful cause it is just literally passing the same values into *OutputParquet

This is the output from reading the parquet file in with DuckDB if that's what you were interested in

select * from '52696484-52696485-trustlines.parquet';
┌──────────────────────┬──────────────────────┬─────────────┬──────────────────────┬───┬─────────┬─────────┬─────────────────────┬─────────────────┐
│      ledger_key      │      account_id      │ asset_code  │     asset_issuer     │ … │ sponsor │ deleted │      closed_at      │ ledger_sequence │
│       varchar        │       varchar        │   varchar   │       varchar        │   │ varchar │ boolean │      timestamp      │     uint64      │
├──────────────────────┼──────────────────────┼─────────────┼──────────────────────┼───┼─────────┼─────────┼─────────────────────┼─────────────────┤
│ AAAAAQAAAACpkg9bv2…  │ GCUZED23X5S2E2KCJT…  │ XXRP        │ GCYWSRUEEAB5KODQYN…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABM0rs/rF…  │ GBGNFOZ7VRJMQ5YEHZ…  │ BCH         │ GAVQ5RFK4NX3JWHCBF…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAACNKOzGxu…  │ GCGSR3GGY3WDLZ4X2O…  │ ACH         │ GAM6I2U5PPMNWOLRGE…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAADnX9uFBi…  │ GDTV7W4FAYWUDCW3YG…  │ BLND        │ GDJEHTBE6ZHUXSWFI6…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAB1Rk0XFM…  │ GB2UMTIXCTHPBS2ZGW…  │ SCENARIO    │ GDKVRHPFCS6UPRNG6P…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAACNKOzGxu…  │ GCGSR3GGY3WDLZ4X2O…  │ ATOM        │ GAHGJY3J2L3ZTYS2IB…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABM7YDzYs…  │ GBGO3AHTMLEOZ6OMUX…  │ BLACKROCK   │ GD34CA5KSZERZQFYB5…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAACNKOzGxu…  │ GCGSR3GGY3WDLZ4X2O…  │ TRX         │ GC2D2GU3B2NMU2N3H7…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAB1Rk0XFM…  │ GB2UMTIXCTHPBS2ZGW…  │ VET         │ GC24D4FIFNYV5VL5B2…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABM0rs/rF…  │ GBGNFOZ7VRJMQ5YEHZ…  │ SHIB        │ GD5LN5KONK3CGDMUXE…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABewxAwsx…  │ GBPMGEBQWMMPFQRXUQ…  │ NICE        │ GCLQNUO2P5AFUEOYZZ…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAADRcslZae…  │ GDIXFSKZNHUYEQUDMP…  │ DOGET       │ GDOEVDDBU6OBWKL7VH…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABj8z9leD…  │ GBR7GP3FPA7NHCR7IQ…  │ KAVACHAIN   │ GCKPCGN4QPO2AECRFE…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAACNKOzGxu…  │ GCGSR3GGY3WDLZ4X2O…  │ LTC         │ GAEUHK4T66HDESLGYQ…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAB1Rk0XFM…  │ GB2UMTIXCTHPBS2ZGW…  │ TRX         │ GC2D2GU3B2NMU2N3H7…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAB1Rk0XFM…  │ GB2UMTIXCTHPBS2ZGW…  │ FTM         │ GBKHIUMQIYQ2YAI3R4…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAD7hl/WcG…  │ GD5YMX6WOBS7QLRUO4…  │ QXRP        │ GBZEPCEX2GPQTOVJXZ…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAACPJZABNb…  │ GCHSLEABGW3OQS2DZM…  │ STELLARMAN  │ GACERHSMXTQPNEVNPX…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAAD8aubhZm…  │ GD6GVZXBMZWVPX46M2…  │ USDC        │ GA5ZSEJYB37JRC5AVC…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│ AAAAAQAAAABsQFh8Jc…  │ GBWEAWD4EXBKBENGPM…  │ XFLR        │ GDJA7HBH6HWPLQ3MSF…  │ … │         │ false   │ 2024-07-23 03:43:00 │        52696484 │
│          ·           │          ·           │  ·          │          ·           │ · │    ·    │   ·     │          ·          │            ·    │
│          ·           │          ·           │  ·          │          ·           │ · │    ·    │   ·     │          ·          │            ·    │
│          ·           │          ·           │  ·          │          ·           │ · │    ·    │   ·     │          ·          │            ·    │
│ AAAAAQAAAAB49eM/29…  │ GB4PLYZ73POGZ65CAG…  │ yXXA        │ GDWJU236MNVYQ5YBQV…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAAB49eM/29…  │ GB4PLYZ73POGZ65CAG…  │ CFTC        │ GC72MN2IUJFJXPBCPF…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADb6L96XV…  │ GDN6RP32LVP32RAY7Z…  │ VFTNX       │ GBG7PYACRRTV42ORBW…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADb6L96XV…  │ GDN6RP32LVP32RAY7Z…  │ NVIDIA      │ GBVEYQLGEXPE3L3376…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADoTmJ/Ep…  │ GDUE4YT7CKJL55PCO2…  │ xlmPLAT     │ GBV7ORCOSGHUXIAD4T…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAAB49eM/29…  │ GB4PLYZ73POGZ65CAG…  │ SPB         │ GCHYBCYAKUE5CYZHBY…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADoTmJ/Ep…  │ GDUE4YT7CKJL55PCO2…  │ SOROBTC     │ GDIP35LBWBRKQWFV56…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADoTmJ/Ep…  │ GDUE4YT7CKJL55PCO2…  │ XRPBenq     │ GA2V2LO3ZS6RPMIBWU…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAABrvimxE8…  │ GBV34KNRCPF5WFEHR7…  │ XRP         │ GBXRPL45NPHCVMFFAY…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAACeS7nc96…  │ GCPEXOO466WPVLPUKW…  │ downvoteICE │ GAXSGZ2JM3LNWOO4WR…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAAB49eM/29…  │ GB4PLYZ73POGZ65CAG…  │ QNT         │ GA2YT3ITDNZ72LNQ7I…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAABeAovmrw…  │ GBPAFC7GV4DBU6NVD7…  │ USDC        │ GA5ZSEJYB37JRC5AVC…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAABECu3LYz…  │ GBCAV3OLMM266S2NT6…  │ QIOTA       │ GBA6XEA5UAPDXOIZ6B…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADb6L96XV…  │ GDN6RP32LVP32RAY7Z…  │ LTC         │ GBB35TMN7RSSO263HZ…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAACJqboYRj…  │ GCE2TOQYIY2Z3HL3FY…  │ yUSDC       │ GDGTVWSM4MGS4T7Z6W…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAABEqgkHww…  │ GBCKUCIHYMCHPX3MQV…  │ governICE   │ GAXSGZ2JM3LNWOO4WR…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAAB49eM/29…  │ GB4PLYZ73POGZ65CAG…  │ XAQUA       │ GAM36CCY7XNJQINPC7…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAAC3ZFS68s…  │ GC3WIVF26LA7EUUKYZ…  │ BARD        │ GDMSCG4PFEU7ONMTPX…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADb6L96XV…  │ GDN6RP32LVP32RAY7Z…  │ GOOGL       │ GD3WF6TNJEJCMZRVBO…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
│ AAAAAQAAAADoTmJ/Ep…  │ GDUE4YT7CKJL55PCO2…  │ cubist      │ GADLZAGWRN7FE6ZOFJ…  │ … │         │ false   │ 2024-07-23 03:43:06 │        52696485 │
├──────────────────────┴──────────────────────┴─────────────┴──────────────────────┴───┴─────────┴─────────┴─────────────────────┴─────────────────┤
│ 526 rows (40 shown)                                                                                                         18 columns (8 shown) │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Copy link
Contributor

@sydneynotthecity sydneynotthecity left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that was unexpected for me is that the ETL will output both a JSON formatted file in addition to a parquet file. I assumed that the write interface would let you select an output format and emit only the specified format.

Was this UX intended? Or do you have additional thoughts on redesign that should be added in the scope of Hubble-386

}
defer parquetFile.Close()

writer, err := writer.NewParquetWriter(parquetFile, schema, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 1 for? Looks like from docs it's a parallel writer? Would we ever want to multithread this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 1 for? Looks like from docs it's a parallel writer?

Yeah it is for the parallel writer

Would we ever want to multithread this?
I don't think so. I played around with it and it doesn't increase/decrease the write time by any notable margin. The records that we are writing are too small to actually benefit form multithreading

@@ -62,6 +63,8 @@ var assetsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedAssets = append(transformedAssets, transformed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we're creating a structure intended to be written to a parquet file even if the user does not pass the write parquet flag?

Do you know if this impacts runtime performance at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we are.

It does not impact performance by a noticeable amount but I will add the feature flag to skip this for clarity

parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this get skipped?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm it was out of original scope for Dune supported tables. I'm fine keeping it out of scope for MVP, but think we should add holistic support at some point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I agree.

I left it out because of the way ClaimableBalanceOutput struct is defined. It creates multiple nested structs (like for claimants) which didn't nicely stringify

return string(jsonData)
}

func (lo LedgerOutput) ToParquet() interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would add a comment about converting uint32 to int64. That wasn't obvious to me at first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what was the decision between converting to int64 versus keeping as int32?

Copy link
Contributor Author

@chowbao chowbao Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup will add a comment.

I needed to convert uint32 -> int64 otherwise uint32 could lose data uint32 max=4,294,967,295 vs int32 max=2,147,483,647 (single bit needed for +- in signed ints)

Note I did convert uint64 to int64 and if that becomes a problem we would need to look for a resolution (larger data type, custom bigint as a string)

LedgerSequence int64 `parquet:"name=ledger_sequence, type=INT64, convertedtype=INT64, convertedtype=UINT_64"`
}

//// Skipping ClaimableBalanceOutputParquet because it is not needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will admit I only glanced at the schema briefly and am reliant on tests to confirm the output is correct. What do you think is the easiest way for us to validate that the parquet file schema matches our expectations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we currently don't really validate/check any of our schemas right now. IMO this check should be rolled up to the integration tests that we will write in the future.

From my point of view a unit test on the parquet schema would be to the effect of does int64 = int64 for each column. Whereas in an integration test with a golden file and golden output would be much more user friendly from the standpoint that you see the fully outputted schema instead of clumped up in unit tests

@amishas157
Copy link
Contributor

What kind of tests did do you have in mind?
The parquet file itself is basically just binary. I can write unit tests for the ToParquet() functions but I'm not sure if that is helpful cause it is just literally passing the same values into *OutputParquet

Yes, I was interested in seeing test results from airflow run. True, unit tests for the ToParquet may not be needed.

This is the output from reading the parquet file in with DuckDB if that's what you were interested in

Perfect, that is what I was looking for 👌

@chowbao
Copy link
Contributor Author

chowbao commented Jul 25, 2024

One thing that was unexpected for me is that the ETL will output both a JSON formatted file in addition to a parquet file. I assumed that the write interface would let you select an output format and emit only the specified format.

Was this UX intended? Or do you have additional thoughts on redesign that should be added in the scope of Hubble-386

Yeah that was intentional. I felt that the refactor of the parameters was out of scope for now and opted for a feature flag (write-parquet) instead of specifying the output format.

The main problem is right now there is no real "write interface" and to add the output file format option cleanly we'd need to add a write interface

Yeah we can add it to the scope of the JIRA ticket.

@sydneynotthecity
Copy link
Contributor

Makes sense. I'm fine with the feature flag for now.

The main problem is right now there is no real "write interface" and to add the output file format option cleanly we'd need to add a write interface

  • 1 I really like the idea of a write interface. Makes the library much more extensible to other use cases

@chowbao chowbao merged commit e8aaa9c into master Jul 29, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants