From df85b2d0d868df2854ab3f3187d4aec1a81f1275 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 15:58:11 -0400 Subject: [PATCH] Add ledger sequence --- cmd/export_account_signers.go | 5 ++--- cmd/export_accounts.go | 5 ++--- cmd/export_claimable_balances.go | 5 ++--- cmd/export_config_setting.go | 5 ++--- cmd/export_contract_code.go | 5 ++--- cmd/export_contract_data.go | 5 ++--- cmd/export_expiration.go | 5 ++--- cmd/export_ledger_entry_changes.go | 20 ++++++++++---------- cmd/export_liquidity_pools.go | 5 ++--- cmd/export_offers.go | 5 ++--- cmd/export_trustlines.go | 5 ++--- internal/input/changes.go | 20 ++++++++------------ internal/transform/account.go | 11 +++++++++-- internal/transform/account_signer.go | 12 ++++++++++-- internal/transform/claimable_balance.go | 11 +++++++++-- internal/transform/config_setting.go | 11 +++++++++-- internal/transform/contract_code.go | 11 +++++++++-- internal/transform/contract_data.go | 11 +++++++++-- internal/transform/expiration.go | 11 +++++++++-- internal/transform/liquidity_pool.go | 11 +++++++++-- internal/transform/offer.go | 12 ++++++++++-- internal/transform/offer_normalized.go | 6 +++--- internal/transform/schema.go | 10 ++++++++++ internal/transform/trustline.go | 11 +++++++++-- 24 files changed, 143 insertions(+), 75 deletions(-) diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 0706477a..4e53a26d 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,10 +37,10 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 numSigners := 0 - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { if acc.AccountSignersChanged() { - transformed, err := transform.TransformSigners(acc, closedAt) + transformed, err := transform.TransformSigners(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) numFailures += 1 diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index 02f2ed66..eee10af0 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,9 +36,9 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc, closedAt) + transformed, err := transform.TransformAccount(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 7443707b..5db70976 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,8 +37,8 @@ var claimableBalancesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, balance := range balances { - var closedAt time.Time - transformed, err := transform.TransformClaimableBalance(balance, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformClaimableBalance(balance, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) numFailures += 1 diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 6a5d0742..903f3c64 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var configSettingCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, setting := range settings { - var closedAt time.Time - transformed, err := transform.TransformConfigSetting(setting, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformConfigSetting(setting, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) numFailures += 1 diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index a948f6da..6273c2b1 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var codeCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, code := range codes { - var closedAt time.Time - transformed, err := transform.TransformContractCode(code, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformContractCode(code, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) numFailures += 1 diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index cfe79e67..d2195d79 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,9 +34,9 @@ var dataCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, data := range datas { - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, closedAt) + transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) numFailures += 1 diff --git a/cmd/export_expiration.go b/cmd/export_expiration.go index 48b271e1..a3a8a410 100644 --- a/cmd/export_expiration.go +++ b/cmd/export_expiration.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var expirationCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, expiration := range expirations { - var closedAt time.Time - transformed, err := transform.TransformExpiration(expiration, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformExpiration(expiration, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err)) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 9f9c40b0..4c11ed71 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -118,7 +118,7 @@ be exported.`, continue } else if changed { - acc, err := transform.TransformAccount(change, changes.ClosedAts[i]) + acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -127,7 +127,7 @@ be exported.`, transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc) } if change.AccountSignersChanged() { - signers, err := transform.TransformSigners(change, changes.ClosedAts[i]) + signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err)) @@ -143,7 +143,7 @@ be exported.`, continue } for i, change := range changes.Changes { - balance, err := transform.TransformClaimableBalance(change, changes.ClosedAts[i]) + balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -156,7 +156,7 @@ be exported.`, continue } for i, change := range changes.Changes { - offer, err := transform.TransformOffer(change, changes.ClosedAts[i]) + offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -169,7 +169,7 @@ be exported.`, continue } for i, change := range changes.Changes { - trust, err := transform.TransformTrustline(change, changes.ClosedAts[i]) + trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -182,7 +182,7 @@ be exported.`, continue } for i, change := range changes.Changes { - pool, err := transform.TransformPool(change, changes.ClosedAts[i]) + pool, err := transform.TransformPool(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -196,7 +196,7 @@ be exported.`, } for i, change := range changes.Changes { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.ClosedAts[i]) + contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -215,7 +215,7 @@ be exported.`, continue } for i, change := range changes.Changes { - contractCode, err := transform.TransformContractCode(change, changes.ClosedAts[i]) + contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -228,7 +228,7 @@ be exported.`, continue } for i, change := range changes.Changes { - configSettings, err := transform.TransformConfigSetting(change, changes.ClosedAts[i]) + configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -241,7 +241,7 @@ be exported.`, continue } for i, change := range changes.Changes { - expiration, err := transform.TransformExpiration(change, changes.ClosedAts[i]) + expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index 8675e408..3393f930 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,8 +37,8 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 for _, pool := range pools { - var closedAt time.Time - transformed, err := transform.TransformPool(pool, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformPool(pool, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) numFailures += 1 diff --git a/cmd/export_offers.go b/cmd/export_offers.go index d7d25359..37b67cb8 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -39,8 +38,8 @@ var offersCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, offer := range offers { - var closedAt time.Time - transformed, err := transform.TransformOffer(offer, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformOffer(offer, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index c26274b0..f13dbd2b 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -39,8 +38,8 @@ var trustlinesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, trust := range trustlines { - var closedAt time.Time - transformed, err := transform.TransformTrustline(trust, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformTrustline(trust, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) numFailures += 1 diff --git a/internal/input/changes.go b/internal/input/changes.go index 53b929a9..a68947c3 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "math" - "time" "github.com/stellar/stellar-etl/internal/utils" @@ -18,14 +17,14 @@ var ( ExtractBatch = extractBatch ) -type ChangesClosedAt struct { - Changes []ingest.Change - ClosedAts []time.Time +type LedgerChanges struct { + Changes []ingest.Change + LedgerHeaders []xdr.LedgerHeaderHistoryEntry } // ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd) type ChangeBatch struct { - Changes map[xdr.LedgerEntryType]ChangesClosedAt + Changes map[xdr.LedgerEntryType]LedgerChanges BatchStart uint32 BatchEnd uint32 } @@ -98,7 +97,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeExpiration} - changesClosedAt := map[xdr.LedgerEntryType]ChangesClosedAt{} + changesClosedAt := map[xdr.LedgerEntryType]LedgerChanges{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{} @@ -113,16 +112,13 @@ func extractBatch( // if this ledger is available, we process its changes and move on to the next ledger by incrementing seq. // Otherwise, nothing is incremented, and we try again on the next iteration of the loop - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry if seq <= latestLedger { changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq) if err != nil { logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err) } - closedAt, err = utils.TimePointToUTCTimeStamp(changeReader.LedgerTransactionReader.GetHeader().Header.ScpValue.CloseTime) - if err != nil { - logger.Fatal(fmt.Sprintf("unable to read CloseTime for ledger %d: ", seq), err) - } + header = changeReader.LedgerTransactionReader.GetHeader() for { change, err := changeReader.Read() @@ -150,7 +146,7 @@ func extractBatch( for _, change := range compactor.GetChanges() { dataTypeChanges := changesClosedAt[dataType] dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) - dataTypeChanges.ClosedAts = append(dataTypeChanges.ClosedAts, closedAt) + dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header) changesClosedAt[dataType] = dataTypeChanges } } diff --git a/internal/transform/account.go b/internal/transform/account.go index 1a8d90d5..1a605ac6 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/guregu/null/zero" "github.com/stellar/go/ingest" @@ -11,7 +10,7 @@ import ( ) // TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOutput, error) { +func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return AccountOutput{}, err @@ -77,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return AccountOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedAccount := AccountOutput{ AccountID: outputID, Balance: utils.ConvertStroopValueToReal(outputBalance), @@ -100,6 +106,7 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedAccount, nil } diff --git a/internal/transform/account_signer.go b/internal/transform/account_signer.go index 0c22850a..25824d2f 100644 --- a/internal/transform/account_signer.go +++ b/internal/transform/account_signer.go @@ -3,15 +3,15 @@ package transform import ( "fmt" "sort" - "time" "github.com/guregu/null" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" "github.com/stellar/stellar-etl/internal/utils" ) // TransformSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery -func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]AccountSignerOutput, error) { +func TransformSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) { var signers []AccountSignerOutput ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) @@ -24,6 +24,13 @@ func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]Account return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return signers, err + } + + ledgerSequence := header.Header.LedgerSeq + sponsors := accountEntry.SponsorPerSigner() for signer, weight := range accountEntry.SignerSummary() { var sponsor null.String @@ -40,6 +47,7 @@ func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]Account LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), }) } sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight }) diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index c1947d20..73cfc0a0 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -22,7 +21,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { } // TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery -func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) (ClaimableBalanceOutput, error) { +func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ClaimableBalanceOutput{}, err @@ -46,6 +45,13 @@ func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) ( outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ClaimableBalanceOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformed := ClaimableBalanceOutput{ BalanceID: balanceID, AssetCode: outputAsset.AssetCode, @@ -60,6 +66,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) ( Flags: outputFlags, Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformed, nil } diff --git a/internal/transform/config_setting.go b/internal/transform/config_setting.go index c05cf0d2..280d138c 100644 --- a/internal/transform/config_setting.go +++ b/internal/transform/config_setting.go @@ -3,7 +3,6 @@ package transform import ( "fmt" "strconv" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -11,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (ConfigSettingOutput, error) { +func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ConfigSettingOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ConfigSettingOutput{}, err @@ -91,6 +90,13 @@ func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (Con bucketListSizeWindow = append(bucketListSizeWindow, uint64(sizeWindow)) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ConfigSettingOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedConfigSetting := ConfigSettingOutput{ ConfigSettingId: int32(configSettingId), ContractMaxSizeBytes: uint32(contractMaxSizeBytes), @@ -138,6 +144,7 @@ func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (Con LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedConfigSetting, nil } diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index b6e38a7f..e23571fe 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -11,7 +10,7 @@ import ( ) // TransformContractCode converts a contract code ledger change entry into a form suitable for BigQuery -func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (ContractCodeOutput, error) { +func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ContractCodeOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractCodeOutput{}, err @@ -32,6 +31,13 @@ func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (Cont contractCodeHashByte, _ := contractCode.Hash.MarshalBinary() contractCodeHash, _ := strkey.Encode(strkey.VersionByteContract, contractCodeHashByte) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractCodeOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedCode := ContractCodeOutput{ ContractCodeHash: contractCodeHash, ContractCodeExtV: int32(contractCodeExtV), @@ -39,6 +45,7 @@ func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (Cont LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedCode, nil } diff --git a/internal/transform/contract_data.go b/internal/transform/contract_data.go index 2fd32cd5..65cb983a 100644 --- a/internal/transform/contract_data.go +++ b/internal/transform/contract_data.go @@ -3,7 +3,6 @@ package transform import ( "fmt" "math/big" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -60,7 +59,7 @@ func NewTransformContractDataStruct(assetFrom AssetFromContractDataFunc, contrac } // TransformContractData converts a contract data ledger change entry into a form suitable for BigQuery -func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, closedAt time.Time) (ContractDataOutput, error, bool) { +func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, header xdr.LedgerHeaderHistoryEntry) (ContractDataOutput, error, bool) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractDataOutput{}, err, false @@ -108,6 +107,13 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. contractDataDurability := contractData.Durability.String() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractDataOutput{}, err, false + } + + ledgerSequence := header.Header.LedgerSeq + transformedData := ContractDataOutput{ ContractId: outputContractDataContractId, ContractKeyType: contractDataKeyType, @@ -121,6 +127,7 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedData, nil, true } diff --git a/internal/transform/expiration.go b/internal/transform/expiration.go index fb104049..a7d68b9c 100644 --- a/internal/transform/expiration.go +++ b/internal/transform/expiration.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -11,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (ExpirationOutput, error) { +func TransformExpiration(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ExpirationOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ExpirationOutput{}, err @@ -31,6 +30,13 @@ func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (Expira keyHash, _ := strkey.Encode(strkey.VersionByteContract, keyHashByte) expirationLedgerSeq := expiration.ExpirationLedgerSeq + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ExpirationOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := ExpirationOutput{ KeyHash: keyHash, ExpirationLedgerSeq: uint32(expirationLedgerSeq), @@ -38,6 +44,7 @@ func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (Expira LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index 55baf1fc..30cbaa24 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -10,7 +9,7 @@ import ( ) // TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery -func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, error) { +func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return PoolOutput{}, err @@ -47,6 +46,13 @@ func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, err = cp.Params.AssetB.Extract(&assetBType, &assetBCode, &assetBIssuer) assetBID := FarmHashAsset(assetBCode, assetBIssuer, assetBType) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return PoolOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := PoolOutput{ PoolID: PoolIDToString(lp.LiquidityPoolId), PoolType: poolType, @@ -67,6 +73,7 @@ func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index bb4fc012..21e1c4b4 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -2,15 +2,15 @@ package transform import ( "fmt" - "time" "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOffer converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput, error) { +func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (OfferOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return OfferOutput{}, err @@ -69,6 +69,13 @@ func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return OfferOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedOffer := OfferOutput{ SellerID: outputSellerID, OfferID: outputOfferID, @@ -90,6 +97,7 @@ func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput Deleted: outputDeleted, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedOffer, nil } diff --git a/internal/transform/offer_normalized.go b/internal/transform/offer_normalized.go index a7f8f8f4..83eced04 100644 --- a/internal/transform/offer_normalized.go +++ b/internal/transform/offer_normalized.go @@ -5,18 +5,18 @@ import ( "hash/fnv" "sort" "strings" - "time" "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOfferNormalized converts an offer into a normalized form, allowing it to be stored as part of the historical orderbook dataset func TransformOfferNormalized(ledgerChange ingest.Change, ledgerSeq uint32) (NormalizedOfferOutput, error) { - var closedAt time.Time - transformed, err := TransformOffer(ledgerChange, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := TransformOffer(ledgerChange, header) if err != nil { return NormalizedOfferOutput{}, err } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index ffa13d2b..4e8182c5 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -100,6 +100,7 @@ type AccountOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -112,6 +113,7 @@ type AccountSignerOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OperationOutput is a representation of an operation that aligns with the BigQuery table history_operations @@ -141,6 +143,7 @@ type ClaimableBalanceOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // Claimants @@ -195,6 +198,7 @@ type PoolOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -225,6 +229,7 @@ type TrustlineOutput struct { Sponsor null.String `json:"sponsor"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers @@ -249,6 +254,7 @@ type OfferOutput struct { Deleted bool `json:"deleted"` Sponsor null.String `json:"sponsor"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // TradeOutput is a representation of a trade that aligns with the BigQuery table history_trades @@ -491,6 +497,7 @@ type ContractDataOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ContractCodeOutput is a representation of contract code that aligns with the Bigquery table soroban_contract_code @@ -501,6 +508,7 @@ type ContractCodeOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` //ContractCodeCode string `json:"contract_code"` } @@ -553,6 +561,7 @@ type ConfigSettingOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ExpirationOutput is a representation of soroban expiration that aligns with the Bigquery table expirations @@ -563,6 +572,7 @@ type ExpirationOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // DiagnosticEventOutput is a representation of soroban expiration that aligns with the Bigquery table expirations diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index ca825c62..3099f306 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -3,7 +3,6 @@ package transform import ( "encoding/base64" "fmt" - "time" "github.com/guregu/null" "github.com/pkg/errors" @@ -15,7 +14,7 @@ import ( ) // TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery -func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (TrustlineOutput, error) { +func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TrustlineOutput{}, err @@ -52,6 +51,13 @@ func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (Trustli liabilities := trustEntry.Liabilities() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return TrustlineOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedTrustline := TrustlineOutput{ LedgerKey: outputLedgerKey, AccountID: outputAccountID, @@ -70,6 +76,7 @@ func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (Trustli Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedTrustline, nil