From eb3fdcf3013094e0d99f01dc8b15224af9592a6f Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 10:20:14 -0400 Subject: [PATCH 1/5] Add closed_at to ledger entry changes tables --- cmd/export_account_signers.go | 4 +- cmd/export_accounts.go | 4 +- cmd/export_claimable_balances.go | 4 +- cmd/export_config_setting.go | 4 +- cmd/export_contract_code.go | 4 +- cmd/export_contract_data.go | 4 +- cmd/export_expiration.go | 4 +- cmd/export_ledger_entry_changes.go | 38 ++++----- cmd/export_liquidity_pools.go | 4 +- cmd/export_offers.go | 4 +- cmd/export_trustlines.go | 4 +- internal/input/changes.go | 30 ++++--- internal/input/changes_test.go | 14 +++- internal/transform/account.go | 4 +- internal/transform/account_signer.go | 4 +- internal/transform/account_signer_test.go | 4 +- internal/transform/account_test.go | 4 +- internal/transform/claimable_balance.go | 4 +- internal/transform/claimable_balance_test.go | 4 +- internal/transform/config_setting.go | 4 +- internal/transform/config_setting_test.go | 4 +- internal/transform/contract_code.go | 4 +- internal/transform/contract_code_test.go | 4 +- internal/transform/contract_data.go | 4 +- internal/transform/contract_data_test.go | 4 +- internal/transform/expiration.go | 4 +- internal/transform/expiration_test.go | 4 +- internal/transform/liquidity_pool.go | 4 +- internal/transform/liquidity_pool_test.go | 4 +- internal/transform/offer.go | 4 +- internal/transform/offer_normalized.go | 4 +- internal/transform/offer_test.go | 4 +- internal/transform/schema.go | 88 +++++++++++--------- internal/transform/trustline.go | 4 +- internal/transform/trustline_test.go | 4 +- 35 files changed, 189 insertions(+), 105 deletions(-) diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 83025b35..0706477a 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,9 +38,10 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 numSigners := 0 + var closedAt time.Time for _, acc := range accounts { if acc.AccountSignersChanged() { - transformed, err := transform.TransformSigners(acc) + transformed, err := transform.TransformSigners(acc, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) numFailures += 1 diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index 2069ff69..02f2ed66 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -36,8 +37,9 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var closedAt time.Time for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc) + transformed, err := transform.TransformAccount(acc, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 6973cb52..7443707b 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,7 +38,8 @@ var claimableBalancesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, balance := range balances { - transformed, err := transform.TransformClaimableBalance(balance) + var closedAt time.Time + transformed, err := transform.TransformClaimableBalance(balance, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) numFailures += 1 diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 3127422c..6a5d0742 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -34,7 +35,8 @@ var configSettingCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, setting := range settings { - transformed, err := transform.TransformConfigSetting(setting) + var closedAt time.Time + transformed, err := transform.TransformConfigSetting(setting, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) numFailures += 1 diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index 62f07f0b..a948f6da 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -34,7 +35,8 @@ var codeCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, code := range codes { - transformed, err := transform.TransformContractCode(code) + var closedAt time.Time + transformed, err := transform.TransformContractCode(code, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) numFailures += 1 diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index 5dfbefb5..cfe79e67 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -34,8 +35,9 @@ var dataCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, data := range datas { + var closedAt time.Time TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase) + transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) numFailures += 1 diff --git a/cmd/export_expiration.go b/cmd/export_expiration.go index c48bb519..48b271e1 100644 --- a/cmd/export_expiration.go +++ b/cmd/export_expiration.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -34,7 +35,8 @@ var expirationCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, expiration := range expirations { - transformed, err := transform.TransformExpiration(expiration) + var closedAt time.Time + transformed, err := transform.TransformExpiration(expiration, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err)) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index dd76f6c7..9f9c40b0 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -112,13 +112,13 @@ be exported.`, if !exports["export-accounts"] { continue } - for _, change := range changes { + for i, change := range changes.Changes { if changed, err := change.AccountChangedExceptSigners(); err != nil { cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err)) continue } else if changed { - acc, err := transform.TransformAccount(change) + acc, err := transform.TransformAccount(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -127,7 +127,7 @@ be exported.`, transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc) } if change.AccountSignersChanged() { - signers, err := transform.TransformSigners(change) + signers, err := transform.TransformSigners(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err)) @@ -142,8 +142,8 @@ be exported.`, if !exports["export-balances"] { continue } - for _, change := range changes { - balance, err := transform.TransformClaimableBalance(change) + for i, change := range changes.Changes { + balance, err := transform.TransformClaimableBalance(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -155,8 +155,8 @@ be exported.`, if !exports["export-offers"] { continue } - for _, change := range changes { - offer, err := transform.TransformOffer(change) + for i, change := range changes.Changes { + offer, err := transform.TransformOffer(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -168,8 +168,8 @@ be exported.`, if !exports["export-trustlines"] { continue } - for _, change := range changes { - trust, err := transform.TransformTrustline(change) + for i, change := range changes.Changes { + trust, err := transform.TransformTrustline(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -181,8 +181,8 @@ be exported.`, if !exports["export-pools"] { continue } - for _, change := range changes { - pool, err := transform.TransformPool(change) + for i, change := range changes.Changes { + pool, err := transform.TransformPool(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -194,9 +194,9 @@ be exported.`, if !exports["export-contract-data"] { continue } - for _, change := range changes { + for i, change := range changes.Changes { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase) + contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -214,8 +214,8 @@ be exported.`, if !exports["export-contract-code"] { continue } - for _, change := range changes { - contractCode, err := transform.TransformContractCode(change) + for i, change := range changes.Changes { + contractCode, err := transform.TransformContractCode(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -227,8 +227,8 @@ be exported.`, if !exports["export-config-settings"] { continue } - for _, change := range changes { - configSettings, err := transform.TransformConfigSetting(change) + for i, change := range changes.Changes { + configSettings, err := transform.TransformConfigSetting(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -240,8 +240,8 @@ be exported.`, if !exports["export-expiration"] { continue } - for _, change := range changes { - expiration, err := transform.TransformExpiration(change) + for i, change := range changes.Changes { + expiration, err := transform.TransformExpiration(change, changes.ClosedAts[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index 03aa2bea..8675e408 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,7 +38,8 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 for _, pool := range pools { - transformed, err := transform.TransformPool(pool) + var closedAt time.Time + transformed, err := transform.TransformPool(pool, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) numFailures += 1 diff --git a/cmd/export_offers.go b/cmd/export_offers.go index 237aa9a6..d7d25359 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,7 +39,8 @@ var offersCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, offer := range offers { - transformed, err := transform.TransformOffer(offer) + var closedAt time.Time + transformed, err := transform.TransformOffer(offer, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index 58414903..c26274b0 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,7 +39,8 @@ var trustlinesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, trust := range trustlines { - transformed, err := transform.TransformTrustline(trust) + var closedAt time.Time + transformed, err := transform.TransformTrustline(trust, closedAt) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) numFailures += 1 diff --git a/internal/input/changes.go b/internal/input/changes.go index 37a54d09..53b929a9 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "time" "github.com/stellar/stellar-etl/internal/utils" @@ -17,9 +18,14 @@ var ( ExtractBatch = extractBatch ) +type ChangesClosedAt struct { + Changes []ingest.Change + ClosedAts []time.Time +} + // ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd) type ChangeBatch struct { - Changes map[xdr.LedgerEntryType][]ingest.Change + Changes map[xdr.LedgerEntryType]ChangesClosedAt BatchStart uint32 BatchEnd uint32 } @@ -92,7 +98,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeExpiration} - changes := map[xdr.LedgerEntryType][]ingest.Change{} + changesClosedAt := map[xdr.LedgerEntryType]ChangesClosedAt{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{} @@ -107,19 +113,16 @@ func extractBatch( // if this ledger is available, we process its changes and move on to the next ledger by incrementing seq. // Otherwise, nothing is incremented, and we try again on the next iteration of the loop + var closedAt time.Time if seq <= latestLedger { changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq) if err != nil { logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err) } - // TODO: Add in ledger_closed_at; Update changeCompactors to also save ledger close time. - // AddChange is from the go monorepo so it might be easier to just add a addledgerclose func after it - //txReader := changeReader.LedgerTransactionReader - - //closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime) - //if err != nil { - // logger.Fatal(fmt.Sprintf("unable to read close time for ledger %d: ", seq), err) - //} + closedAt, err = utils.TimePointToUTCTimeStamp(changeReader.LedgerTransactionReader.GetHeader().Header.ScpValue.CloseTime) + if err != nil { + logger.Fatal(fmt.Sprintf("unable to read CloseTime for ledger %d: ", seq), err) + } for { change, err := changeReader.Read() @@ -145,14 +148,17 @@ func extractBatch( for dataType, compactor := range changeCompactors { for _, change := range compactor.GetChanges() { - changes[dataType] = append(changes[dataType], change) + dataTypeChanges := changesClosedAt[dataType] + dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) + dataTypeChanges.ClosedAts = append(dataTypeChanges.ClosedAts, closedAt) + changesClosedAt[dataType] = dataTypeChanges } } } return ChangeBatch{ - Changes: changes, + Changes: changesClosedAt, BatchStart: batchStart, BatchEnd: batchEnd, } diff --git a/internal/input/changes_test.go b/internal/input/changes_test.go index 0e739d53..79c4cb4f 100644 --- a/internal/input/changes_test.go +++ b/internal/input/changes_test.go @@ -2,13 +2,14 @@ package input import ( "testing" + "time" + "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/log" "github.com/stellar/stellar-etl/internal/utils" "github.com/stretchr/testify/assert" - "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" ) @@ -114,8 +115,13 @@ func TestSendBatchToChannel(t *testing.T) { } func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch { - changes := map[xdr.LedgerEntryType][]ingest.Change{ - entryType: {{Type: entry.Data.Type, Post: &entry}}, + changes := map[xdr.LedgerEntryType]ChangesClosedAt{ + entryType: { + Changes: []ingest.Change{ + {Type: entry.Data.Type, Post: &entry}, + }, + ClosedAts: []time.Time{}, + }, } return ChangeBatch{ Changes: changes, @@ -128,7 +134,7 @@ func mockExtractBatch( env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch { log.Errorf("mock called") return ChangeBatch{ - Changes: map[xdr.LedgerEntryType][]ingest.Change{}, + Changes: map[xdr.LedgerEntryType]ChangesClosedAt{}, BatchStart: batchStart, BatchEnd: batchEnd, } diff --git a/internal/transform/account.go b/internal/transform/account.go index 59c55582..1a8d90d5 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/guregu/null/zero" "github.com/stellar/go/ingest" @@ -10,7 +11,7 @@ import ( ) // TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { +func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return AccountOutput{}, err @@ -98,6 +99,7 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { NumSponsoring: uint32(accountEntry.NumSponsoring()), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedAccount, nil } diff --git a/internal/transform/account_signer.go b/internal/transform/account_signer.go index f8e3b238..0c22850a 100644 --- a/internal/transform/account_signer.go +++ b/internal/transform/account_signer.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "sort" + "time" "github.com/guregu/null" "github.com/stellar/go/ingest" @@ -10,7 +11,7 @@ import ( ) // TransformSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery -func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) { +func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]AccountSignerOutput, error) { var signers []AccountSignerOutput ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) @@ -38,6 +39,7 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) LastModifiedLedger: outputLastModifiedLedger, LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, }) } sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight }) diff --git a/internal/transform/account_signer_test.go b/internal/transform/account_signer_test.go index ec3e0709..7de456ab 100644 --- a/internal/transform/account_signer_test.go +++ b/internal/transform/account_signer_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" @@ -50,7 +51,8 @@ func TestTransformAccountSigner(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformSigners(test.input.injest) + var closedAt time.Time + actualOutput, actualError := TransformSigners(test.input.injest, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/account_test.go b/internal/transform/account_test.go index 53dd3ac2..42befbb1 100644 --- a/internal/transform/account_test.go +++ b/internal/transform/account_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" "github.com/stretchr/testify/assert" @@ -94,7 +95,8 @@ func TestTransformAccount(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformAccount(test.input.ledgerChange) + var closedAt time.Time + actualOutput, actualError := TransformAccount(test.input.ledgerChange, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index 7db3ef8f..c1947d20 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -21,7 +22,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { } // TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery -func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutput, error) { +func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ClaimableBalanceOutput{}, err @@ -58,6 +59,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutp LedgerEntryChange: uint32(changeType), Flags: outputFlags, Deleted: outputDeleted, + ClosedAt: closedAt, } return transformed, nil } diff --git a/internal/transform/claimable_balance_test.go b/internal/transform/claimable_balance_test.go index 4886e5e5..24a2d374 100644 --- a/internal/transform/claimable_balance_test.go +++ b/internal/transform/claimable_balance_test.go @@ -2,6 +2,7 @@ package transform import ( "testing" + "time" "github.com/guregu/null" "github.com/stellar/go/ingest" @@ -34,7 +35,8 @@ func TestTransformClaimableBalance(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformClaimableBalance(test.input.ingest) + var closedAt time.Time + actualOutput, actualError := TransformClaimableBalance(test.input.ingest, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/config_setting.go b/internal/transform/config_setting.go index 1e3e8fd2..c05cf0d2 100644 --- a/internal/transform/config_setting.go +++ b/internal/transform/config_setting.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "strconv" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -10,7 +11,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, error) { +func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (ConfigSettingOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ConfigSettingOutput{}, err @@ -136,6 +137,7 @@ func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, er LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedConfigSetting, nil } diff --git a/internal/transform/config_setting_test.go b/internal/transform/config_setting_test.go index 1f696af5..169789c0 100644 --- a/internal/transform/config_setting_test.go +++ b/internal/transform/config_setting_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,8 @@ func TestTransformConfigSetting(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformConfigSetting(test.input) + var closedAt time.Time + actualOutput, actualError := TransformConfigSetting(test.input, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index 5ee11750..b6e38a7f 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -10,7 +11,7 @@ import ( ) // TransformContractCode converts a contract code ledger change entry into a form suitable for BigQuery -func TransformContractCode(ledgerChange ingest.Change) (ContractCodeOutput, error) { +func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (ContractCodeOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractCodeOutput{}, err @@ -37,6 +38,7 @@ func TransformContractCode(ledgerChange ingest.Change) (ContractCodeOutput, erro LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedCode, nil } diff --git a/internal/transform/contract_code_test.go b/internal/transform/contract_code_test.go index 8a203b6d..e4691015 100644 --- a/internal/transform/contract_code_test.go +++ b/internal/transform/contract_code_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,8 @@ func TestTransformContractCode(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformContractCode(test.input) + var closedAt time.Time + actualOutput, actualError := TransformContractCode(test.input, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/contract_data.go b/internal/transform/contract_data.go index b8c26fea..2fd32cd5 100644 --- a/internal/transform/contract_data.go +++ b/internal/transform/contract_data.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "math/big" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -59,7 +60,7 @@ func NewTransformContractDataStruct(assetFrom AssetFromContractDataFunc, contrac } // TransformContractData converts a contract data ledger change entry into a form suitable for BigQuery -func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string) (ContractDataOutput, error, bool) { +func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, closedAt time.Time) (ContractDataOutput, error, bool) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractDataOutput{}, err, false @@ -119,6 +120,7 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedData, nil, true } diff --git a/internal/transform/contract_data_test.go b/internal/transform/contract_data_test.go index e840ca2d..578d0f3d 100644 --- a/internal/transform/contract_data_test.go +++ b/internal/transform/contract_data_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "testing" + "time" "github.com/stretchr/testify/assert" @@ -47,8 +48,9 @@ func TestTransformContractData(t *testing.T) { } for _, test := range tests { + var closedAt time.Time TransformContractData := NewTransformContractDataStruct(MockAssetFromContractData, MockContractBalanceFromContractData) - actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase) + actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/expiration.go b/internal/transform/expiration.go index b0cfd98c..fb104049 100644 --- a/internal/transform/expiration.go +++ b/internal/transform/expiration.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -10,7 +11,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformExpiration(ledgerChange ingest.Change) (ExpirationOutput, error) { +func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (ExpirationOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ExpirationOutput{}, err @@ -36,6 +37,7 @@ func TransformExpiration(ledgerChange ingest.Change) (ExpirationOutput, error) { LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedPool, nil diff --git a/internal/transform/expiration_test.go b/internal/transform/expiration_test.go index 721079f3..ac1cef9d 100644 --- a/internal/transform/expiration_test.go +++ b/internal/transform/expiration_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,8 @@ func TestTransformExpiration(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformExpiration(test.input) + var closedAt time.Time + actualOutput, actualError := TransformExpiration(test.input, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index 6ee41209..55baf1fc 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -9,7 +10,7 @@ import ( ) // TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery -func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { +func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return PoolOutput{}, err @@ -65,6 +66,7 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedPool, nil } diff --git a/internal/transform/liquidity_pool_test.go b/internal/transform/liquidity_pool_test.go index 3664da3f..07dc25e2 100644 --- a/internal/transform/liquidity_pool_test.go +++ b/internal/transform/liquidity_pool_test.go @@ -2,6 +2,7 @@ package transform import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -46,7 +47,8 @@ func TestTransformPool(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformPool(test.input.ingest) + var closedAt time.Time + actualOutput, actualError := TransformPool(test.input.ingest, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index c0cc787e..bb4fc012 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -2,6 +2,7 @@ package transform import ( "fmt" + "time" "github.com/stellar/stellar-etl/internal/utils" @@ -9,7 +10,7 @@ import ( ) // TransformOffer converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { +func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return OfferOutput{}, err @@ -88,6 +89,7 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), + ClosedAt: closedAt, } return transformedOffer, nil } diff --git a/internal/transform/offer_normalized.go b/internal/transform/offer_normalized.go index ddfdc53c..a7f8f8f4 100644 --- a/internal/transform/offer_normalized.go +++ b/internal/transform/offer_normalized.go @@ -5,6 +5,7 @@ import ( "hash/fnv" "sort" "strings" + "time" "github.com/stellar/stellar-etl/internal/utils" @@ -14,7 +15,8 @@ import ( // TransformOfferNormalized converts an offer into a normalized form, allowing it to be stored as part of the historical orderbook dataset func TransformOfferNormalized(ledgerChange ingest.Change, ledgerSeq uint32) (NormalizedOfferOutput, error) { - transformed, err := TransformOffer(ledgerChange) + var closedAt time.Time + transformed, err := TransformOffer(ledgerChange, closedAt) if err != nil { return NormalizedOfferOutput{}, err } diff --git a/internal/transform/offer_test.go b/internal/transform/offer_test.go index 2cc548dc..268acc9c 100644 --- a/internal/transform/offer_test.go +++ b/internal/transform/offer_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" "github.com/stretchr/testify/assert" @@ -96,7 +97,8 @@ func TestTransformOffer(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformOffer(test.input.ingest) + var closedAt time.Time + actualOutput, actualError := TransformOffer(test.input.ingest, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index 0668da11..ffa13d2b 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -99,6 +99,7 @@ type AccountOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -110,6 +111,7 @@ type AccountSignerOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // OperationOutput is a representation of an operation that aligns with the BigQuery table history_operations @@ -138,6 +140,7 @@ type ClaimableBalanceOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // Claimants @@ -173,24 +176,25 @@ type LiquidityPoolAsset struct { // PoolOutput is a representation of a liquidity pool that aligns with the Bigquery table liquidity_pools type PoolOutput struct { - PoolID string `json:"liquidity_pool_id"` - PoolType string `json:"type"` - PoolFee uint32 `json:"fee"` - TrustlineCount uint64 `json:"trustline_count"` - PoolShareCount float64 `json:"pool_share_count"` - AssetAType string `json:"asset_a_type"` - AssetACode string `json:"asset_a_code"` - AssetAIssuer string `json:"asset_a_issuer"` - AssetAReserve float64 `json:"asset_a_amount"` - AssetAID int64 `json:"asset_a_id"` - AssetBType string `json:"asset_b_type"` - AssetBCode string `json:"asset_b_code"` - AssetBIssuer string `json:"asset_b_issuer"` - AssetBReserve float64 `json:"asset_b_amount"` - AssetBID int64 `json:"asset_b_id"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + PoolID string `json:"liquidity_pool_id"` + PoolType string `json:"type"` + PoolFee uint32 `json:"fee"` + TrustlineCount uint64 `json:"trustline_count"` + PoolShareCount float64 `json:"pool_share_count"` + AssetAType string `json:"asset_a_type"` + AssetACode string `json:"asset_a_code"` + AssetAIssuer string `json:"asset_a_issuer"` + AssetAReserve float64 `json:"asset_a_amount"` + AssetAID int64 `json:"asset_a_id"` + AssetBType string `json:"asset_b_type"` + AssetBCode string `json:"asset_b_code"` + AssetBIssuer string `json:"asset_b_issuer"` + AssetBReserve float64 `json:"asset_b_amount"` + AssetBID int64 `json:"asset_b_id"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -220,6 +224,7 @@ type TrustlineOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Sponsor null.String `json:"sponsor"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers @@ -243,6 +248,7 @@ type OfferOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` Sponsor null.String `json:"sponsor"` + ClosedAt time.Time `json:"closed_at"` } // TradeOutput is a representation of a trade that aligns with the BigQuery table history_trades @@ -473,26 +479,28 @@ type TestTransaction struct { // ContractDataOutput is a representation of contract data that aligns with the Bigquery table soroban_contract_data type ContractDataOutput struct { - ContractId string `json:"contract_id"` - ContractKeyType string `json:"contract_key_type"` - ContractDurability string `json:"contract_durability"` - ContractDataAssetCode string `json:"asset_code"` - ContractDataAssetIssuer string `json:"asset_issuer"` - ContractDataAssetType string `json:"asset_type"` - ContractDataBalanceHolder string `json:"balance_holder"` - ContractDataBalance string `json:"balance"` // balance is a string because it is go type big.Int - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + ContractId string `json:"contract_id"` + ContractKeyType string `json:"contract_key_type"` + ContractDurability string `json:"contract_durability"` + ContractDataAssetCode string `json:"asset_code"` + ContractDataAssetIssuer string `json:"asset_issuer"` + ContractDataAssetType string `json:"asset_type"` + ContractDataBalanceHolder string `json:"balance_holder"` + ContractDataBalance string `json:"balance"` // balance is a string because it is go type big.Int + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // ContractCodeOutput is a representation of contract code that aligns with the Bigquery table soroban_contract_code type ContractCodeOutput struct { - ContractCodeHash string `json:"contract_code_hash"` - ContractCodeExtV int32 `json:"contract_code_ext_v"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + ContractCodeHash string `json:"contract_code_hash"` + ContractCodeExtV int32 `json:"contract_code_ext_v"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` //ContractCodeCode string `json:"contract_code"` } @@ -544,15 +552,17 @@ type ConfigSettingOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // ExpirationOutput is a representation of soroban expiration that aligns with the Bigquery table expirations type ExpirationOutput struct { - KeyHash string `json:"key_hash"` // key_hash is contract_code_hash or contract_id - ExpirationLedgerSeq uint32 `json:"expiration_ledger_seq"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + KeyHash string `json:"key_hash"` // key_hash is contract_code_hash or contract_id + ExpirationLedgerSeq uint32 `json:"expiration_ledger_seq"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` } // DiagnosticEventOutput is a representation of soroban expiration that aligns with the Bigquery table expirations diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index 6373050c..ca825c62 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -3,6 +3,7 @@ package transform import ( "encoding/base64" "fmt" + "time" "github.com/guregu/null" "github.com/pkg/errors" @@ -14,7 +15,7 @@ import ( ) // TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery -func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { +func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TrustlineOutput{}, err @@ -68,6 +69,7 @@ func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { LedgerEntryChange: uint32(changeType), Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, + ClosedAt: closedAt, } return transformedTrustline, nil diff --git a/internal/transform/trustline_test.go b/internal/transform/trustline_test.go index 3d6e80a6..87846b74 100644 --- a/internal/transform/trustline_test.go +++ b/internal/transform/trustline_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -49,7 +50,8 @@ func TestTransformTrustline(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformTrustline(test.input.ingest) + var closedAt time.Time + actualOutput, actualError := TransformTrustline(test.input.ingest, closedAt) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } From df85b2d0d868df2854ab3f3187d4aec1a81f1275 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 15:58:11 -0400 Subject: [PATCH 2/5] Add ledger sequence --- cmd/export_account_signers.go | 5 ++--- cmd/export_accounts.go | 5 ++--- cmd/export_claimable_balances.go | 5 ++--- cmd/export_config_setting.go | 5 ++--- cmd/export_contract_code.go | 5 ++--- cmd/export_contract_data.go | 5 ++--- cmd/export_expiration.go | 5 ++--- cmd/export_ledger_entry_changes.go | 20 ++++++++++---------- cmd/export_liquidity_pools.go | 5 ++--- cmd/export_offers.go | 5 ++--- cmd/export_trustlines.go | 5 ++--- internal/input/changes.go | 20 ++++++++------------ internal/transform/account.go | 11 +++++++++-- internal/transform/account_signer.go | 12 ++++++++++-- internal/transform/claimable_balance.go | 11 +++++++++-- internal/transform/config_setting.go | 11 +++++++++-- internal/transform/contract_code.go | 11 +++++++++-- internal/transform/contract_data.go | 11 +++++++++-- internal/transform/expiration.go | 11 +++++++++-- internal/transform/liquidity_pool.go | 11 +++++++++-- internal/transform/offer.go | 12 ++++++++++-- internal/transform/offer_normalized.go | 6 +++--- internal/transform/schema.go | 10 ++++++++++ internal/transform/trustline.go | 11 +++++++++-- 24 files changed, 143 insertions(+), 75 deletions(-) diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 0706477a..4e53a26d 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,10 +37,10 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 numSigners := 0 - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { if acc.AccountSignersChanged() { - transformed, err := transform.TransformSigners(acc, closedAt) + transformed, err := transform.TransformSigners(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) numFailures += 1 diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index 02f2ed66..eee10af0 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,9 +36,9 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc, closedAt) + transformed, err := transform.TransformAccount(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 7443707b..5db70976 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,8 +37,8 @@ var claimableBalancesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, balance := range balances { - var closedAt time.Time - transformed, err := transform.TransformClaimableBalance(balance, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformClaimableBalance(balance, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) numFailures += 1 diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 6a5d0742..903f3c64 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var configSettingCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, setting := range settings { - var closedAt time.Time - transformed, err := transform.TransformConfigSetting(setting, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformConfigSetting(setting, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) numFailures += 1 diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index a948f6da..6273c2b1 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var codeCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, code := range codes { - var closedAt time.Time - transformed, err := transform.TransformContractCode(code, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformContractCode(code, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) numFailures += 1 diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index cfe79e67..d2195d79 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,9 +34,9 @@ var dataCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, data := range datas { - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, closedAt) + transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) numFailures += 1 diff --git a/cmd/export_expiration.go b/cmd/export_expiration.go index 48b271e1..a3a8a410 100644 --- a/cmd/export_expiration.go +++ b/cmd/export_expiration.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,8 +34,8 @@ var expirationCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, expiration := range expirations { - var closedAt time.Time - transformed, err := transform.TransformExpiration(expiration, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformExpiration(expiration, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err)) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 9f9c40b0..4c11ed71 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -118,7 +118,7 @@ be exported.`, continue } else if changed { - acc, err := transform.TransformAccount(change, changes.ClosedAts[i]) + acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -127,7 +127,7 @@ be exported.`, transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc) } if change.AccountSignersChanged() { - signers, err := transform.TransformSigners(change, changes.ClosedAts[i]) + signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err)) @@ -143,7 +143,7 @@ be exported.`, continue } for i, change := range changes.Changes { - balance, err := transform.TransformClaimableBalance(change, changes.ClosedAts[i]) + balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -156,7 +156,7 @@ be exported.`, continue } for i, change := range changes.Changes { - offer, err := transform.TransformOffer(change, changes.ClosedAts[i]) + offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -169,7 +169,7 @@ be exported.`, continue } for i, change := range changes.Changes { - trust, err := transform.TransformTrustline(change, changes.ClosedAts[i]) + trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -182,7 +182,7 @@ be exported.`, continue } for i, change := range changes.Changes { - pool, err := transform.TransformPool(change, changes.ClosedAts[i]) + pool, err := transform.TransformPool(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -196,7 +196,7 @@ be exported.`, } for i, change := range changes.Changes { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.ClosedAts[i]) + contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -215,7 +215,7 @@ be exported.`, continue } for i, change := range changes.Changes { - contractCode, err := transform.TransformContractCode(change, changes.ClosedAts[i]) + contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -228,7 +228,7 @@ be exported.`, continue } for i, change := range changes.Changes { - configSettings, err := transform.TransformConfigSetting(change, changes.ClosedAts[i]) + configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -241,7 +241,7 @@ be exported.`, continue } for i, change := range changes.Changes { - expiration, err := transform.TransformExpiration(change, changes.ClosedAts[i]) + expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index 8675e408..3393f930 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -38,8 +37,8 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 for _, pool := range pools { - var closedAt time.Time - transformed, err := transform.TransformPool(pool, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformPool(pool, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) numFailures += 1 diff --git a/cmd/export_offers.go b/cmd/export_offers.go index d7d25359..37b67cb8 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -39,8 +38,8 @@ var offersCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, offer := range offers { - var closedAt time.Time - transformed, err := transform.TransformOffer(offer, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformOffer(offer, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index c26274b0..f13dbd2b 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -39,8 +38,8 @@ var trustlinesCmd = &cobra.Command{ numFailures := 0 totalNumBytes := 0 for _, trust := range trustlines { - var closedAt time.Time - transformed, err := transform.TransformTrustline(trust, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := transform.TransformTrustline(trust, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) numFailures += 1 diff --git a/internal/input/changes.go b/internal/input/changes.go index 53b929a9..a68947c3 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "math" - "time" "github.com/stellar/stellar-etl/internal/utils" @@ -18,14 +17,14 @@ var ( ExtractBatch = extractBatch ) -type ChangesClosedAt struct { - Changes []ingest.Change - ClosedAts []time.Time +type LedgerChanges struct { + Changes []ingest.Change + LedgerHeaders []xdr.LedgerHeaderHistoryEntry } // ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd) type ChangeBatch struct { - Changes map[xdr.LedgerEntryType]ChangesClosedAt + Changes map[xdr.LedgerEntryType]LedgerChanges BatchStart uint32 BatchEnd uint32 } @@ -98,7 +97,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeExpiration} - changesClosedAt := map[xdr.LedgerEntryType]ChangesClosedAt{} + changesClosedAt := map[xdr.LedgerEntryType]LedgerChanges{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{} @@ -113,16 +112,13 @@ func extractBatch( // if this ledger is available, we process its changes and move on to the next ledger by incrementing seq. // Otherwise, nothing is incremented, and we try again on the next iteration of the loop - var closedAt time.Time + var header xdr.LedgerHeaderHistoryEntry if seq <= latestLedger { changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq) if err != nil { logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err) } - closedAt, err = utils.TimePointToUTCTimeStamp(changeReader.LedgerTransactionReader.GetHeader().Header.ScpValue.CloseTime) - if err != nil { - logger.Fatal(fmt.Sprintf("unable to read CloseTime for ledger %d: ", seq), err) - } + header = changeReader.LedgerTransactionReader.GetHeader() for { change, err := changeReader.Read() @@ -150,7 +146,7 @@ func extractBatch( for _, change := range compactor.GetChanges() { dataTypeChanges := changesClosedAt[dataType] dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) - dataTypeChanges.ClosedAts = append(dataTypeChanges.ClosedAts, closedAt) + dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header) changesClosedAt[dataType] = dataTypeChanges } } diff --git a/internal/transform/account.go b/internal/transform/account.go index 1a8d90d5..1a605ac6 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/guregu/null/zero" "github.com/stellar/go/ingest" @@ -11,7 +10,7 @@ import ( ) // TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOutput, error) { +func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return AccountOutput{}, err @@ -77,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return AccountOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedAccount := AccountOutput{ AccountID: outputID, Balance: utils.ConvertStroopValueToReal(outputBalance), @@ -100,6 +106,7 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedAccount, nil } diff --git a/internal/transform/account_signer.go b/internal/transform/account_signer.go index 0c22850a..25824d2f 100644 --- a/internal/transform/account_signer.go +++ b/internal/transform/account_signer.go @@ -3,15 +3,15 @@ package transform import ( "fmt" "sort" - "time" "github.com/guregu/null" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" "github.com/stellar/stellar-etl/internal/utils" ) // TransformSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery -func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]AccountSignerOutput, error) { +func TransformSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) { var signers []AccountSignerOutput ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) @@ -24,6 +24,13 @@ func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]Account return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return signers, err + } + + ledgerSequence := header.Header.LedgerSeq + sponsors := accountEntry.SponsorPerSigner() for signer, weight := range accountEntry.SignerSummary() { var sponsor null.String @@ -40,6 +47,7 @@ func TransformSigners(ledgerChange ingest.Change, closedAt time.Time) ([]Account LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), }) } sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight }) diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index c1947d20..73cfc0a0 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -22,7 +21,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { } // TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery -func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) (ClaimableBalanceOutput, error) { +func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ClaimableBalanceOutput{}, err @@ -46,6 +45,13 @@ func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) ( outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ClaimableBalanceOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformed := ClaimableBalanceOutput{ BalanceID: balanceID, AssetCode: outputAsset.AssetCode, @@ -60,6 +66,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change, closedAt time.Time) ( Flags: outputFlags, Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformed, nil } diff --git a/internal/transform/config_setting.go b/internal/transform/config_setting.go index c05cf0d2..280d138c 100644 --- a/internal/transform/config_setting.go +++ b/internal/transform/config_setting.go @@ -3,7 +3,6 @@ package transform import ( "fmt" "strconv" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -11,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (ConfigSettingOutput, error) { +func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ConfigSettingOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ConfigSettingOutput{}, err @@ -91,6 +90,13 @@ func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (Con bucketListSizeWindow = append(bucketListSizeWindow, uint64(sizeWindow)) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ConfigSettingOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedConfigSetting := ConfigSettingOutput{ ConfigSettingId: int32(configSettingId), ContractMaxSizeBytes: uint32(contractMaxSizeBytes), @@ -138,6 +144,7 @@ func TransformConfigSetting(ledgerChange ingest.Change, closedAt time.Time) (Con LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedConfigSetting, nil } diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index b6e38a7f..e23571fe 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -11,7 +10,7 @@ import ( ) // TransformContractCode converts a contract code ledger change entry into a form suitable for BigQuery -func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (ContractCodeOutput, error) { +func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ContractCodeOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractCodeOutput{}, err @@ -32,6 +31,13 @@ func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (Cont contractCodeHashByte, _ := contractCode.Hash.MarshalBinary() contractCodeHash, _ := strkey.Encode(strkey.VersionByteContract, contractCodeHashByte) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractCodeOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedCode := ContractCodeOutput{ ContractCodeHash: contractCodeHash, ContractCodeExtV: int32(contractCodeExtV), @@ -39,6 +45,7 @@ func TransformContractCode(ledgerChange ingest.Change, closedAt time.Time) (Cont LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedCode, nil } diff --git a/internal/transform/contract_data.go b/internal/transform/contract_data.go index 2fd32cd5..65cb983a 100644 --- a/internal/transform/contract_data.go +++ b/internal/transform/contract_data.go @@ -3,7 +3,6 @@ package transform import ( "fmt" "math/big" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -60,7 +59,7 @@ func NewTransformContractDataStruct(assetFrom AssetFromContractDataFunc, contrac } // TransformContractData converts a contract data ledger change entry into a form suitable for BigQuery -func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, closedAt time.Time) (ContractDataOutput, error, bool) { +func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, header xdr.LedgerHeaderHistoryEntry) (ContractDataOutput, error, bool) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractDataOutput{}, err, false @@ -108,6 +107,13 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. contractDataDurability := contractData.Durability.String() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractDataOutput{}, err, false + } + + ledgerSequence := header.Header.LedgerSeq + transformedData := ContractDataOutput{ ContractId: outputContractDataContractId, ContractKeyType: contractDataKeyType, @@ -121,6 +127,7 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedData, nil, true } diff --git a/internal/transform/expiration.go b/internal/transform/expiration.go index fb104049..a7d68b9c 100644 --- a/internal/transform/expiration.go +++ b/internal/transform/expiration.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/strkey" @@ -11,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (ExpirationOutput, error) { +func TransformExpiration(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ExpirationOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ExpirationOutput{}, err @@ -31,6 +30,13 @@ func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (Expira keyHash, _ := strkey.Encode(strkey.VersionByteContract, keyHashByte) expirationLedgerSeq := expiration.ExpirationLedgerSeq + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ExpirationOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := ExpirationOutput{ KeyHash: keyHash, ExpirationLedgerSeq: uint32(expirationLedgerSeq), @@ -38,6 +44,7 @@ func TransformExpiration(ledgerChange ingest.Change, closedAt time.Time) (Expira LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index 55baf1fc..30cbaa24 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -2,7 +2,6 @@ package transform import ( "fmt" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" @@ -10,7 +9,7 @@ import ( ) // TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery -func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, error) { +func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return PoolOutput{}, err @@ -47,6 +46,13 @@ func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, err = cp.Params.AssetB.Extract(&assetBType, &assetBCode, &assetBIssuer) assetBID := FarmHashAsset(assetBCode, assetBIssuer, assetBType) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return PoolOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := PoolOutput{ PoolID: PoolIDToString(lp.LiquidityPoolId), PoolType: poolType, @@ -67,6 +73,7 @@ func TransformPool(ledgerChange ingest.Change, closedAt time.Time) (PoolOutput, LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index bb4fc012..21e1c4b4 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -2,15 +2,15 @@ package transform import ( "fmt" - "time" "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOffer converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput, error) { +func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (OfferOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return OfferOutput{}, err @@ -69,6 +69,13 @@ func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return OfferOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedOffer := OfferOutput{ SellerID: outputSellerID, OfferID: outputOfferID, @@ -90,6 +97,7 @@ func TransformOffer(ledgerChange ingest.Change, closedAt time.Time) (OfferOutput Deleted: outputDeleted, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedOffer, nil } diff --git a/internal/transform/offer_normalized.go b/internal/transform/offer_normalized.go index a7f8f8f4..83eced04 100644 --- a/internal/transform/offer_normalized.go +++ b/internal/transform/offer_normalized.go @@ -5,18 +5,18 @@ import ( "hash/fnv" "sort" "strings" - "time" "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOfferNormalized converts an offer into a normalized form, allowing it to be stored as part of the historical orderbook dataset func TransformOfferNormalized(ledgerChange ingest.Change, ledgerSeq uint32) (NormalizedOfferOutput, error) { - var closedAt time.Time - transformed, err := TransformOffer(ledgerChange, closedAt) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := TransformOffer(ledgerChange, header) if err != nil { return NormalizedOfferOutput{}, err } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index ffa13d2b..4e8182c5 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -100,6 +100,7 @@ type AccountOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -112,6 +113,7 @@ type AccountSignerOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OperationOutput is a representation of an operation that aligns with the BigQuery table history_operations @@ -141,6 +143,7 @@ type ClaimableBalanceOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // Claimants @@ -195,6 +198,7 @@ type PoolOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -225,6 +229,7 @@ type TrustlineOutput struct { Sponsor null.String `json:"sponsor"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers @@ -249,6 +254,7 @@ type OfferOutput struct { Deleted bool `json:"deleted"` Sponsor null.String `json:"sponsor"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // TradeOutput is a representation of a trade that aligns with the BigQuery table history_trades @@ -491,6 +497,7 @@ type ContractDataOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ContractCodeOutput is a representation of contract code that aligns with the Bigquery table soroban_contract_code @@ -501,6 +508,7 @@ type ContractCodeOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` //ContractCodeCode string `json:"contract_code"` } @@ -553,6 +561,7 @@ type ConfigSettingOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ExpirationOutput is a representation of soroban expiration that aligns with the Bigquery table expirations @@ -563,6 +572,7 @@ type ExpirationOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // DiagnosticEventOutput is a representation of soroban expiration that aligns with the Bigquery table expirations diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index ca825c62..3099f306 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -3,7 +3,6 @@ package transform import ( "encoding/base64" "fmt" - "time" "github.com/guregu/null" "github.com/pkg/errors" @@ -15,7 +14,7 @@ import ( ) // TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery -func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (TrustlineOutput, error) { +func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TrustlineOutput{}, err @@ -52,6 +51,13 @@ func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (Trustli liabilities := trustEntry.Liabilities() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return TrustlineOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedTrustline := TrustlineOutput{ LedgerKey: outputLedgerKey, AccountID: outputAccountID, @@ -70,6 +76,7 @@ func TransformTrustline(ledgerChange ingest.Change, closedAt time.Time) (Trustli Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedTrustline, nil From f3ff0667645006a5a0f7b75106a9a0ae5916a44e Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 16:20:17 -0400 Subject: [PATCH 3/5] Update tests --- internal/input/changes_test.go | 7 +++---- internal/transform/account_signer_test.go | 17 +++++++++++++++-- internal/transform/account_test.go | 13 +++++++++++-- internal/transform/claimable_balance_test.go | 13 +++++++++++-- internal/transform/config_setting_test.go | 13 +++++++++++-- internal/transform/contract_code_test.go | 13 +++++++++++-- internal/transform/contract_data_test.go | 13 +++++++++++-- internal/transform/expiration_test.go | 13 +++++++++++-- internal/transform/liquidity_pool_test.go | 13 +++++++++++-- internal/transform/offer_test.go | 13 +++++++++++-- internal/transform/trustline_test.go | 15 +++++++++++++-- 11 files changed, 119 insertions(+), 24 deletions(-) diff --git a/internal/input/changes_test.go b/internal/input/changes_test.go index 79c4cb4f..cabed9ca 100644 --- a/internal/input/changes_test.go +++ b/internal/input/changes_test.go @@ -2,7 +2,6 @@ package input import ( "testing" - "time" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" @@ -115,12 +114,12 @@ func TestSendBatchToChannel(t *testing.T) { } func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch { - changes := map[xdr.LedgerEntryType]ChangesClosedAt{ + changes := map[xdr.LedgerEntryType]LedgerChanges{ entryType: { Changes: []ingest.Change{ {Type: entry.Data.Type, Post: &entry}, }, - ClosedAts: []time.Time{}, + LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{}, }, } return ChangeBatch{ @@ -134,7 +133,7 @@ func mockExtractBatch( env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch { log.Errorf("mock called") return ChangeBatch{ - Changes: map[xdr.LedgerEntryType]ChangesClosedAt{}, + Changes: map[xdr.LedgerEntryType]LedgerChanges{}, BatchStart: batchStart, BatchEnd: batchEnd, } diff --git a/internal/transform/account_signer_test.go b/internal/transform/account_signer_test.go index 7de456ab..0a0306ca 100644 --- a/internal/transform/account_signer_test.go +++ b/internal/transform/account_signer_test.go @@ -51,8 +51,15 @@ func TestTransformAccountSigner(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformSigners(test.input.injest, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformSigners(test.input.injest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -131,6 +138,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { AccountID: testAccount1ID.Address(), Signer: "GACAKBQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB3BQ", @@ -139,6 +148,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { AccountID: testAccount1ID.Address(), Signer: "GAFAWDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABNDC", @@ -147,6 +158,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/account_test.go b/internal/transform/account_test.go index 42befbb1..c99b66fc 100644 --- a/internal/transform/account_test.go +++ b/internal/transform/account_test.go @@ -95,8 +95,15 @@ func TestTransformAccount(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformAccount(test.input.ledgerChange, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformAccount(test.input.ledgerChange, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -183,5 +190,7 @@ func makeAccountTestOutput() AccountOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/claimable_balance_test.go b/internal/transform/claimable_balance_test.go index 24a2d374..90690fa1 100644 --- a/internal/transform/claimable_balance_test.go +++ b/internal/transform/claimable_balance_test.go @@ -35,8 +35,15 @@ func TestTransformClaimableBalance(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformClaimableBalance(test.input.ingest, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformClaimableBalance(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -117,5 +124,7 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/config_setting_test.go b/internal/transform/config_setting_test.go index 169789c0..5d94498c 100644 --- a/internal/transform/config_setting_test.go +++ b/internal/transform/config_setting_test.go @@ -44,8 +44,15 @@ func TestTransformConfigSetting(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformConfigSetting(test.input, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformConfigSetting(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -126,6 +133,8 @@ func makeConfigSettingTestOutput() []ConfigSettingOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/contract_code_test.go b/internal/transform/contract_code_test.go index e4691015..9a47e4dc 100644 --- a/internal/transform/contract_code_test.go +++ b/internal/transform/contract_code_test.go @@ -44,8 +44,15 @@ func TestTransformContractCode(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformContractCode(test.input, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformContractCode(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -84,6 +91,8 @@ func makeContractCodeTestOutput() []ContractCodeOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/contract_data_test.go b/internal/transform/contract_data_test.go index 578d0f3d..2c5234d5 100644 --- a/internal/transform/contract_data_test.go +++ b/internal/transform/contract_data_test.go @@ -48,9 +48,16 @@ func TestTransformContractData(t *testing.T) { } for _, test := range tests { - var closedAt time.Time + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } TransformContractData := NewTransformContractDataStruct(MockAssetFromContractData, MockContractBalanceFromContractData) - actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase, closedAt) + actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -129,6 +136,8 @@ func makeContractDataTestOutput() []ContractDataOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/expiration_test.go b/internal/transform/expiration_test.go index ac1cef9d..d28a6d46 100644 --- a/internal/transform/expiration_test.go +++ b/internal/transform/expiration_test.go @@ -44,8 +44,15 @@ func TestTransformExpiration(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformExpiration(test.input, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformExpiration(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -93,6 +100,8 @@ func makeExpirationTestOutput() []ExpirationOutput { LastModifiedLedger: 1, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/liquidity_pool_test.go b/internal/transform/liquidity_pool_test.go index 07dc25e2..8a60f59d 100644 --- a/internal/transform/liquidity_pool_test.go +++ b/internal/transform/liquidity_pool_test.go @@ -47,8 +47,15 @@ func TestTransformPool(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformPool(test.input.ingest, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformPool(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -118,5 +125,7 @@ func makePoolTestOutput() PoolOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/offer_test.go b/internal/transform/offer_test.go index 268acc9c..303693eb 100644 --- a/internal/transform/offer_test.go +++ b/internal/transform/offer_test.go @@ -97,8 +97,15 @@ func TestTransformOffer(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformOffer(test.input.ingest, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformOffer(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -171,5 +178,7 @@ func makeOfferTestOutput() OfferOutput { LedgerEntryChange: 2, Deleted: true, Sponsor: null.StringFrom(testAccount3Address), + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/trustline_test.go b/internal/transform/trustline_test.go index 87846b74..24efa2f5 100644 --- a/internal/transform/trustline_test.go +++ b/internal/transform/trustline_test.go @@ -50,8 +50,15 @@ func TestTransformTrustline(t *testing.T) { } for _, test := range tests { - var closedAt time.Time - actualOutput, actualError := TransformTrustline(test.input.ingest, closedAt) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformTrustline(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -133,6 +140,8 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { LedgerKey: "AAAAAQAAAAAcR0GXGO76pFs4y38vJVAanjnLg4emNun7zAx0pHcDGAAAAAMBAwQFBwkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", @@ -148,6 +157,8 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 123456789, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } From f51cdb3d691fa8c394272ff0d3018ae68259a1b0 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 16:37:10 -0400 Subject: [PATCH 4/5] Rename variable --- internal/input/changes.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/input/changes.go b/internal/input/changes.go index a68947c3..72387693 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -97,7 +97,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeExpiration} - changesClosedAt := map[xdr.LedgerEntryType]LedgerChanges{} + ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{} @@ -144,17 +144,17 @@ func extractBatch( for dataType, compactor := range changeCompactors { for _, change := range compactor.GetChanges() { - dataTypeChanges := changesClosedAt[dataType] + dataTypeChanges := ledgerChanges[dataType] dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header) - changesClosedAt[dataType] = dataTypeChanges + ledgerChanges[dataType] = dataTypeChanges } } } return ChangeBatch{ - Changes: changesClosedAt, + Changes: ledgerChanges, BatchStart: batchStart, BatchEnd: batchEnd, } From fe1dd24147d1618fee7bf131b4bc14b36d5daaf4 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 Nov 2023 16:54:01 -0400 Subject: [PATCH 5/5] Move header var --- cmd/export_claimable_balances.go | 2 +- cmd/export_config_setting.go | 2 +- cmd/export_contract_code.go | 2 +- cmd/export_contract_data.go | 2 +- cmd/export_expiration.go | 2 +- cmd/export_liquidity_pools.go | 2 +- cmd/export_offers.go | 2 +- cmd/export_trustlines.go | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 5db70976..119693c4 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -36,8 +36,8 @@ var claimableBalancesCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, balance := range balances { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformClaimableBalance(balance, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 903f3c64..60630648 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -33,8 +33,8 @@ var configSettingCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, setting := range settings { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformConfigSetting(setting, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index 6273c2b1..2980286d 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -33,8 +33,8 @@ var codeCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, code := range codes { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformContractCode(code, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index d2195d79..62c57a3f 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -33,8 +33,8 @@ var dataCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, data := range datas { - var header xdr.LedgerHeaderHistoryEntry TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) if err != nil { diff --git a/cmd/export_expiration.go b/cmd/export_expiration.go index a3a8a410..54fdd104 100644 --- a/cmd/export_expiration.go +++ b/cmd/export_expiration.go @@ -33,8 +33,8 @@ var expirationCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, expiration := range expirations { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformExpiration(expiration, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err)) diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index 3393f930..f0f7f236 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -36,8 +36,8 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, pool := range pools { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformPool(pool, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) diff --git a/cmd/export_offers.go b/cmd/export_offers.go index 37b67cb8..17898e13 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -37,8 +37,8 @@ var offersCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, offer := range offers { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformOffer(offer, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index f13dbd2b..52e4075e 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -37,8 +37,8 @@ var trustlinesCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, trust := range trustlines { - var header xdr.LedgerHeaderHistoryEntry transformed, err := transform.TransformTrustline(trust, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err))