Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Jul 29, 2024
1 parent 29210be commit a72c743
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 49 deletions.
4 changes: 3 additions & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ var assetsCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedAssets = append(transformedAssets, transformed)
if commonArgs.WriteParquet {
transformedAssets = append(transformedAssets, transformed)
}
}

outFile.Close()
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_contract_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ var contractEventsCmd = &cobra.Command{
continue
}

transformedEvents = append(transformedEvents, contractEvent)
if commonArgs.WriteParquet {
transformedEvents = append(transformedEvents, contractEvent)
}
}

}
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ var effectsCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedEffects = append(transformedEffects, transformed)
if commonArgs.WriteParquet {
transformedEffects = append(transformedEffects, transformed)
}
}
}

Expand Down
85 changes: 45 additions & 40 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,46 +302,51 @@ func exportTransformedData(
if err != nil {
return err
}
switch v := o.(type) {
case transform.AccountOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountOutputParquet)
skip = false
case transform.AccountSignerOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skip
skip = true
case transform.ConfigSettingOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ConfigSettingOutputParquet)
skip = false
case transform.ContractCodeOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractCodeOutputParquet)
skip = false
case transform.ContractDataOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractDataOutputParquet)
skip = false
case transform.PoolOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.PoolOutputParquet)
skip = false
case transform.OfferOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.OfferOutputParquet)
skip = false
case transform.TrustlineOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TrustlineOutputParquet)
skip = false
case transform.TtlOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TtlOutputParquet)
skip = false

if WriteParquet {
switch v := o.(type) {
case transform.AccountOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountOutputParquet)
skip = false
case transform.AccountSignerOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skipping ClaimableBalanceOutputParquet because it is not needed in the current scope of work
// Note that ClaimableBalanceOutputParquet uses nested structs that will need to be handled
// for parquet conversio
skip = true
case transform.ConfigSettingOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ConfigSettingOutputParquet)
skip = false
case transform.ContractCodeOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractCodeOutputParquet)
skip = false
case transform.ContractDataOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractDataOutputParquet)
skip = false
case transform.PoolOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.PoolOutputParquet)
skip = false
case transform.OfferOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.OfferOutputParquet)
skip = false
case transform.TrustlineOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TrustlineOutputParquet)
skip = false
case transform.TtlOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TtlOutputParquet)
skip = false
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/export_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ var ledgersCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedLedgers = append(transformedLedgers, transformed)
if commonArgs.WriteParquet {
transformedLedgers = append(transformedLedgers, transformed)
}
}

outFile.Close()
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ var operationsCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedOps = append(transformedOps, transformed)
if commonArgs.WriteParquet {
transformedOps = append(transformedOps, transformed)
}
}

outFile.Close()
Expand Down
4 changes: 3 additions & 1 deletion cmd/export_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ var tradesCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedTrades = append(transformedTrades, transformed)
if commonArgs.WriteParquet {
transformedTrades = append(transformedTrades, transformed)
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/export_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ var transactionsCmd = &cobra.Command{
}
totalNumBytes += numBytes

transformedTransaction = append(transformedTransaction, transformed)
if commonArgs.WriteParquet {
transformedTransaction = append(transformedTransaction, transformed)
}
}

outFile.Close()
Expand Down
8 changes: 8 additions & 0 deletions internal/transform/parquet_converter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// This file includes interfaces and functions for converting data structures/schemas
// to the appropriate parquet data structure/schema.
//
// Note that uint32 data types need to be converted to int64 due to restrictions
// from the parquet-go package. Conversion is to int64 due to the possible loss of
// data in the conversion from uint32 -> int32.
// This applies to all the ToParquet() functions in this file.

package transform

Expand All @@ -21,6 +26,9 @@ func toJSONString(v interface{}) string {

func (lo LedgerOutput) ToParquet() interface{} {
return LedgerOutputParquet{
// Note that uint32 data types need to be converted to int64 due to restrictions
// from the parquet-go package. Conversion is to int64 due to the possible loss of
// data in the conversion from uint32 -> int32.
Sequence: int64(lo.Sequence),
LedgerHash: lo.LedgerHash,
PreviousLedgerHash: lo.PreviousLedgerHash,
Expand Down
4 changes: 3 additions & 1 deletion internal/transform/schema_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ type OperationOutputParquet struct {
LedgerSequence int64 `parquet:"name=ledger_sequence, type=INT64, convertedtype=INT64, convertedtype=UINT_64"`
}

//// Skipping ClaimableBalanceOutputParquet because it is not needed
//// Skipping ClaimableBalanceOutputParquet because it is not needed in the current scope of work
//// Note that ClaimableBalanceOutputParquet uses nested structs that will need to be handled
//// for parquet conversion
//type ClaimableBalanceOutputParquet struct {
//}

Expand Down
2 changes: 1 addition & 1 deletion internal/utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func AddCommonFlags(flags *pflag.FlagSet) {
flags.Uint32("num-workers", 10, "Number of workers to spawn that read txmeta files from the datastore.")
flags.Uint32("retry-limit", 3, "Datastore GetLedger retry limit.")
flags.Uint32("retry-wait", 5, "Time in seconds to wait for GetLedger retry.")
flags.Bool("write-parquet", true, "If set, write output as parquet files.")
flags.Bool("write-parquet", false, "If set, write output as parquet files.")
}

// AddArchiveFlags adds the history archive specific flags: output, and limit
Expand Down

0 comments on commit a72c743

Please sign in to comment.