diff --git a/cmd/export_account_signers.go b/cmd/export_account_signers.go index 83025b35..4e53a26d 100644 --- a/cmd/export_account_signers.go +++ b/cmd/export_account_signers.go @@ -37,9 +37,10 @@ the export_ledger_entry_changes command.`, numFailures := 0 totalNumBytes := 0 numSigners := 0 + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { if acc.AccountSignersChanged() { - transformed, err := transform.TransformSigners(acc) + transformed, err := transform.TransformSigners(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err)) numFailures += 1 diff --git a/cmd/export_accounts.go b/cmd/export_accounts.go index 2069ff69..eee10af0 100644 --- a/cmd/export_accounts.go +++ b/cmd/export_accounts.go @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, acc := range accounts { - transformed, err := transform.TransformAccount(acc) + transformed, err := transform.TransformAccount(acc, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err)) numFailures += 1 diff --git a/cmd/export_claimable_balances.go b/cmd/export_claimable_balances.go index 6973cb52..119693c4 100644 --- a/cmd/export_claimable_balances.go +++ b/cmd/export_claimable_balances.go @@ -36,8 +36,9 @@ var claimableBalancesCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, balance := range balances { - transformed, err := transform.TransformClaimableBalance(balance) + transformed, err := transform.TransformClaimableBalance(balance, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err)) numFailures += 1 diff --git a/cmd/export_config_setting.go b/cmd/export_config_setting.go index 3127422c..60630648 100644 --- a/cmd/export_config_setting.go +++ b/cmd/export_config_setting.go @@ -33,8 +33,9 @@ var configSettingCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, setting := range settings { - transformed, err := transform.TransformConfigSetting(setting) + transformed, err := transform.TransformConfigSetting(setting, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err)) numFailures += 1 diff --git a/cmd/export_contract_code.go b/cmd/export_contract_code.go index 62f07f0b..2980286d 100644 --- a/cmd/export_contract_code.go +++ b/cmd/export_contract_code.go @@ -33,8 +33,9 @@ var codeCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, code := range codes { - transformed, err := transform.TransformContractCode(code) + transformed, err := transform.TransformContractCode(code, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err)) numFailures += 1 diff --git a/cmd/export_contract_data.go b/cmd/export_contract_data.go index 5dfbefb5..62c57a3f 100644 --- a/cmd/export_contract_data.go +++ b/cmd/export_contract_data.go @@ -33,9 +33,10 @@ var dataCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, data := range datas { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase) + transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err)) numFailures += 1 diff --git a/cmd/export_expiration.go b/cmd/export_expiration.go index c48bb519..54fdd104 100644 --- a/cmd/export_expiration.go +++ b/cmd/export_expiration.go @@ -33,8 +33,9 @@ var expirationCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, expiration := range expirations { - transformed, err := transform.TransformExpiration(expiration) + transformed, err := transform.TransformExpiration(expiration, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err)) numFailures += 1 diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index dd76f6c7..4c11ed71 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -112,13 +112,13 @@ be exported.`, if !exports["export-accounts"] { continue } - for _, change := range changes { + for i, change := range changes.Changes { if changed, err := change.AccountChangedExceptSigners(); err != nil { cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err)) continue } else if changed { - acc, err := transform.TransformAccount(change) + acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -127,7 +127,7 @@ be exported.`, transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc) } if change.AccountSignersChanged() { - signers, err := transform.TransformSigners(change) + signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err)) @@ -142,8 +142,8 @@ be exported.`, if !exports["export-balances"] { continue } - for _, change := range changes { - balance, err := transform.TransformClaimableBalance(change) + for i, change := range changes.Changes { + balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -155,8 +155,8 @@ be exported.`, if !exports["export-offers"] { continue } - for _, change := range changes { - offer, err := transform.TransformOffer(change) + for i, change := range changes.Changes { + offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -168,8 +168,8 @@ be exported.`, if !exports["export-trustlines"] { continue } - for _, change := range changes { - trust, err := transform.TransformTrustline(change) + for i, change := range changes.Changes { + trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -181,8 +181,8 @@ be exported.`, if !exports["export-pools"] { continue } - for _, change := range changes { - pool, err := transform.TransformPool(change) + for i, change := range changes.Changes { + pool, err := transform.TransformPool(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -194,9 +194,9 @@ be exported.`, if !exports["export-contract-data"] { continue } - for _, change := range changes { + for i, change := range changes.Changes { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase) + contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -214,8 +214,8 @@ be exported.`, if !exports["export-contract-code"] { continue } - for _, change := range changes { - contractCode, err := transform.TransformContractCode(change) + for i, change := range changes.Changes { + contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -227,8 +227,8 @@ be exported.`, if !exports["export-config-settings"] { continue } - for _, change := range changes { - configSettings, err := transform.TransformConfigSetting(change) + for i, change := range changes.Changes { + configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -240,8 +240,8 @@ be exported.`, if !exports["export-expiration"] { continue } - for _, change := range changes { - expiration, err := transform.TransformExpiration(change) + for i, change := range changes.Changes { + expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i]) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) diff --git a/cmd/export_liquidity_pools.go b/cmd/export_liquidity_pools.go index 03aa2bea..f0f7f236 100644 --- a/cmd/export_liquidity_pools.go +++ b/cmd/export_liquidity_pools.go @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`, outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, pool := range pools { - transformed, err := transform.TransformPool(pool) + transformed, err := transform.TransformPool(pool, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err)) numFailures += 1 diff --git a/cmd/export_offers.go b/cmd/export_offers.go index 237aa9a6..17898e13 100644 --- a/cmd/export_offers.go +++ b/cmd/export_offers.go @@ -37,8 +37,9 @@ var offersCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, offer := range offers { - transformed, err := transform.TransformOffer(offer) + transformed, err := transform.TransformOffer(offer, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err)) numFailures += 1 diff --git a/cmd/export_trustlines.go b/cmd/export_trustlines.go index 58414903..52e4075e 100644 --- a/cmd/export_trustlines.go +++ b/cmd/export_trustlines.go @@ -37,8 +37,9 @@ var trustlinesCmd = &cobra.Command{ outFile := mustOutFile(path) numFailures := 0 totalNumBytes := 0 + var header xdr.LedgerHeaderHistoryEntry for _, trust := range trustlines { - transformed, err := transform.TransformTrustline(trust) + transformed, err := transform.TransformTrustline(trust, header) if err != nil { cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err)) numFailures += 1 diff --git a/internal/input/changes.go b/internal/input/changes.go index 37a54d09..72387693 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -17,9 +17,14 @@ var ( ExtractBatch = extractBatch ) +type LedgerChanges struct { + Changes []ingest.Change + LedgerHeaders []xdr.LedgerHeaderHistoryEntry +} + // ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd) type ChangeBatch struct { - Changes map[xdr.LedgerEntryType][]ingest.Change + Changes map[xdr.LedgerEntryType]LedgerChanges BatchStart uint32 BatchEnd uint32 } @@ -92,7 +97,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeExpiration} - changes := map[xdr.LedgerEntryType][]ingest.Change{} + ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{} @@ -107,19 +112,13 @@ func extractBatch( // if this ledger is available, we process its changes and move on to the next ledger by incrementing seq. // Otherwise, nothing is incremented, and we try again on the next iteration of the loop + var header xdr.LedgerHeaderHistoryEntry if seq <= latestLedger { changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq) if err != nil { logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err) } - // TODO: Add in ledger_closed_at; Update changeCompactors to also save ledger close time. - // AddChange is from the go monorepo so it might be easier to just add a addledgerclose func after it - //txReader := changeReader.LedgerTransactionReader - - //closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime) - //if err != nil { - // logger.Fatal(fmt.Sprintf("unable to read close time for ledger %d: ", seq), err) - //} + header = changeReader.LedgerTransactionReader.GetHeader() for { change, err := changeReader.Read() @@ -145,14 +144,17 @@ func extractBatch( for dataType, compactor := range changeCompactors { for _, change := range compactor.GetChanges() { - changes[dataType] = append(changes[dataType], change) + dataTypeChanges := ledgerChanges[dataType] + dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) + dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header) + ledgerChanges[dataType] = dataTypeChanges } } } return ChangeBatch{ - Changes: changes, + Changes: ledgerChanges, BatchStart: batchStart, BatchEnd: batchEnd, } diff --git a/internal/input/changes_test.go b/internal/input/changes_test.go index 0e739d53..cabed9ca 100644 --- a/internal/input/changes_test.go +++ b/internal/input/changes_test.go @@ -3,12 +3,12 @@ package input import ( "testing" + "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/log" "github.com/stellar/stellar-etl/internal/utils" "github.com/stretchr/testify/assert" - "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" ) @@ -114,8 +114,13 @@ func TestSendBatchToChannel(t *testing.T) { } func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch { - changes := map[xdr.LedgerEntryType][]ingest.Change{ - entryType: {{Type: entry.Data.Type, Post: &entry}}, + changes := map[xdr.LedgerEntryType]LedgerChanges{ + entryType: { + Changes: []ingest.Change{ + {Type: entry.Data.Type, Post: &entry}, + }, + LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{}, + }, } return ChangeBatch{ Changes: changes, @@ -128,7 +133,7 @@ func mockExtractBatch( env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch { log.Errorf("mock called") return ChangeBatch{ - Changes: map[xdr.LedgerEntryType][]ingest.Change{}, + Changes: map[xdr.LedgerEntryType]LedgerChanges{}, BatchStart: batchStart, BatchEnd: batchEnd, } diff --git a/internal/transform/account.go b/internal/transform/account.go index 59c55582..1a605ac6 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -10,7 +10,7 @@ import ( ) // TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { +func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return AccountOutput{}, err @@ -76,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return AccountOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedAccount := AccountOutput{ AccountID: outputID, Balance: utils.ConvertStroopValueToReal(outputBalance), @@ -98,6 +105,8 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { NumSponsoring: uint32(accountEntry.NumSponsoring()), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedAccount, nil } diff --git a/internal/transform/account_signer.go b/internal/transform/account_signer.go index f8e3b238..25824d2f 100644 --- a/internal/transform/account_signer.go +++ b/internal/transform/account_signer.go @@ -6,11 +6,12 @@ import ( "github.com/guregu/null" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" "github.com/stellar/stellar-etl/internal/utils" ) // TransformSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery -func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) { +func TransformSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) { var signers []AccountSignerOutput ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) @@ -23,6 +24,13 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return signers, err + } + + ledgerSequence := header.Header.LedgerSeq + sponsors := accountEntry.SponsorPerSigner() for signer, weight := range accountEntry.SignerSummary() { var sponsor null.String @@ -38,6 +46,8 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) LastModifiedLedger: outputLastModifiedLedger, LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), }) } sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight }) diff --git a/internal/transform/account_signer_test.go b/internal/transform/account_signer_test.go index ec3e0709..0a0306ca 100644 --- a/internal/transform/account_signer_test.go +++ b/internal/transform/account_signer_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" @@ -50,7 +51,15 @@ func TestTransformAccountSigner(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformSigners(test.input.injest) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformSigners(test.input.injest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -129,6 +138,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { AccountID: testAccount1ID.Address(), Signer: "GACAKBQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB3BQ", @@ -137,6 +148,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { AccountID: testAccount1ID.Address(), Signer: "GAFAWDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABNDC", @@ -145,6 +158,8 @@ func makeSignersTestOutput() []AccountSignerOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/account_test.go b/internal/transform/account_test.go index 53dd3ac2..c99b66fc 100644 --- a/internal/transform/account_test.go +++ b/internal/transform/account_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" "github.com/stretchr/testify/assert" @@ -94,7 +95,15 @@ func TestTransformAccount(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformAccount(test.input.ledgerChange) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformAccount(test.input.ledgerChange, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -181,5 +190,7 @@ func makeAccountTestOutput() AccountOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index 7db3ef8f..73cfc0a0 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -21,7 +21,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { } // TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery -func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutput, error) { +func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ClaimableBalanceOutput{}, err @@ -45,6 +45,13 @@ func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutp outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ClaimableBalanceOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformed := ClaimableBalanceOutput{ BalanceID: balanceID, AssetCode: outputAsset.AssetCode, @@ -58,6 +65,8 @@ func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutp LedgerEntryChange: uint32(changeType), Flags: outputFlags, Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformed, nil } diff --git a/internal/transform/claimable_balance_test.go b/internal/transform/claimable_balance_test.go index 4886e5e5..90690fa1 100644 --- a/internal/transform/claimable_balance_test.go +++ b/internal/transform/claimable_balance_test.go @@ -2,6 +2,7 @@ package transform import ( "testing" + "time" "github.com/guregu/null" "github.com/stellar/go/ingest" @@ -34,7 +35,15 @@ func TestTransformClaimableBalance(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformClaimableBalance(test.input.ingest) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformClaimableBalance(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -115,5 +124,7 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/config_setting.go b/internal/transform/config_setting.go index 1e3e8fd2..280d138c 100644 --- a/internal/transform/config_setting.go +++ b/internal/transform/config_setting.go @@ -10,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, error) { +func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ConfigSettingOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ConfigSettingOutput{}, err @@ -90,6 +90,13 @@ func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, er bucketListSizeWindow = append(bucketListSizeWindow, uint64(sizeWindow)) } + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ConfigSettingOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedConfigSetting := ConfigSettingOutput{ ConfigSettingId: int32(configSettingId), ContractMaxSizeBytes: uint32(contractMaxSizeBytes), @@ -136,6 +143,8 @@ func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, er LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedConfigSetting, nil } diff --git a/internal/transform/config_setting_test.go b/internal/transform/config_setting_test.go index 1f696af5..5d94498c 100644 --- a/internal/transform/config_setting_test.go +++ b/internal/transform/config_setting_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,15 @@ func TestTransformConfigSetting(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformConfigSetting(test.input) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformConfigSetting(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -124,6 +133,8 @@ func makeConfigSettingTestOutput() []ConfigSettingOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index 5ee11750..e23571fe 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -10,7 +10,7 @@ import ( ) // TransformContractCode converts a contract code ledger change entry into a form suitable for BigQuery -func TransformContractCode(ledgerChange ingest.Change) (ContractCodeOutput, error) { +func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ContractCodeOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractCodeOutput{}, err @@ -31,12 +31,21 @@ func TransformContractCode(ledgerChange ingest.Change) (ContractCodeOutput, erro contractCodeHashByte, _ := contractCode.Hash.MarshalBinary() contractCodeHash, _ := strkey.Encode(strkey.VersionByteContract, contractCodeHashByte) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractCodeOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedCode := ContractCodeOutput{ ContractCodeHash: contractCodeHash, ContractCodeExtV: int32(contractCodeExtV), LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedCode, nil } diff --git a/internal/transform/contract_code_test.go b/internal/transform/contract_code_test.go index 8a203b6d..9a47e4dc 100644 --- a/internal/transform/contract_code_test.go +++ b/internal/transform/contract_code_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,15 @@ func TestTransformContractCode(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformContractCode(test.input) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformContractCode(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -82,6 +91,8 @@ func makeContractCodeTestOutput() []ContractCodeOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/contract_data.go b/internal/transform/contract_data.go index b8c26fea..65cb983a 100644 --- a/internal/transform/contract_data.go +++ b/internal/transform/contract_data.go @@ -59,7 +59,7 @@ func NewTransformContractDataStruct(assetFrom AssetFromContractDataFunc, contrac } // TransformContractData converts a contract data ledger change entry into a form suitable for BigQuery -func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string) (ContractDataOutput, error, bool) { +func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, header xdr.LedgerHeaderHistoryEntry) (ContractDataOutput, error, bool) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractDataOutput{}, err, false @@ -107,6 +107,13 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. contractDataDurability := contractData.Durability.String() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ContractDataOutput{}, err, false + } + + ledgerSequence := header.Header.LedgerSeq + transformedData := ContractDataOutput{ ContractId: outputContractDataContractId, ContractKeyType: contractDataKeyType, @@ -119,6 +126,8 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedData, nil, true } diff --git a/internal/transform/contract_data_test.go b/internal/transform/contract_data_test.go index e840ca2d..2c5234d5 100644 --- a/internal/transform/contract_data_test.go +++ b/internal/transform/contract_data_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "testing" + "time" "github.com/stretchr/testify/assert" @@ -47,8 +48,16 @@ func TestTransformContractData(t *testing.T) { } for _, test := range tests { + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } TransformContractData := NewTransformContractDataStruct(MockAssetFromContractData, MockContractBalanceFromContractData) - actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase) + actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -127,6 +136,8 @@ func makeContractDataTestOutput() []ContractDataOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/expiration.go b/internal/transform/expiration.go index b0cfd98c..a7d68b9c 100644 --- a/internal/transform/expiration.go +++ b/internal/transform/expiration.go @@ -10,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformExpiration(ledgerChange ingest.Change) (ExpirationOutput, error) { +func TransformExpiration(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ExpirationOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ExpirationOutput{}, err @@ -30,12 +30,21 @@ func TransformExpiration(ledgerChange ingest.Change) (ExpirationOutput, error) { keyHash, _ := strkey.Encode(strkey.VersionByteContract, keyHashByte) expirationLedgerSeq := expiration.ExpirationLedgerSeq + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return ExpirationOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := ExpirationOutput{ KeyHash: keyHash, ExpirationLedgerSeq: uint32(expirationLedgerSeq), LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil diff --git a/internal/transform/expiration_test.go b/internal/transform/expiration_test.go index 721079f3..d28a6d46 100644 --- a/internal/transform/expiration_test.go +++ b/internal/transform/expiration_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,15 @@ func TestTransformExpiration(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformExpiration(test.input) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformExpiration(test.input, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -91,6 +100,8 @@ func makeExpirationTestOutput() []ExpirationOutput { LastModifiedLedger: 1, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } } diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index 6ee41209..30cbaa24 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -9,7 +9,7 @@ import ( ) // TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery -func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { +func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return PoolOutput{}, err @@ -46,6 +46,13 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { err = cp.Params.AssetB.Extract(&assetBType, &assetBCode, &assetBIssuer) assetBID := FarmHashAsset(assetBCode, assetBIssuer, assetBType) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return PoolOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedPool := PoolOutput{ PoolID: PoolIDToString(lp.LiquidityPoolId), PoolType: poolType, @@ -65,6 +72,8 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedPool, nil } diff --git a/internal/transform/liquidity_pool_test.go b/internal/transform/liquidity_pool_test.go index 3664da3f..8a60f59d 100644 --- a/internal/transform/liquidity_pool_test.go +++ b/internal/transform/liquidity_pool_test.go @@ -2,6 +2,7 @@ package transform import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -46,7 +47,15 @@ func TestTransformPool(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformPool(test.input.ingest) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformPool(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -116,5 +125,7 @@ func makePoolTestOutput() PoolOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index c0cc787e..21e1c4b4 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -6,10 +6,11 @@ import ( "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOffer converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { +func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (OfferOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return OfferOutput{}, err @@ -68,6 +69,13 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return OfferOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedOffer := OfferOutput{ SellerID: outputSellerID, OfferID: outputOfferID, @@ -88,6 +96,8 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedOffer, nil } diff --git a/internal/transform/offer_normalized.go b/internal/transform/offer_normalized.go index ddfdc53c..83eced04 100644 --- a/internal/transform/offer_normalized.go +++ b/internal/transform/offer_normalized.go @@ -9,12 +9,14 @@ import ( "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" ) // TransformOfferNormalized converts an offer into a normalized form, allowing it to be stored as part of the historical orderbook dataset func TransformOfferNormalized(ledgerChange ingest.Change, ledgerSeq uint32) (NormalizedOfferOutput, error) { - transformed, err := TransformOffer(ledgerChange) + var header xdr.LedgerHeaderHistoryEntry + transformed, err := TransformOffer(ledgerChange, header) if err != nil { return NormalizedOfferOutput{}, err } diff --git a/internal/transform/offer_test.go b/internal/transform/offer_test.go index 2cc548dc..303693eb 100644 --- a/internal/transform/offer_test.go +++ b/internal/transform/offer_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/guregu/null" "github.com/stretchr/testify/assert" @@ -96,7 +97,15 @@ func TestTransformOffer(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformOffer(test.input.ingest) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformOffer(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -169,5 +178,7 @@ func makeOfferTestOutput() OfferOutput { LedgerEntryChange: 2, Deleted: true, Sponsor: null.StringFrom(testAccount3Address), + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), } } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index 0668da11..4e8182c5 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -99,6 +99,8 @@ type AccountOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -110,6 +112,8 @@ type AccountSignerOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OperationOutput is a representation of an operation that aligns with the BigQuery table history_operations @@ -138,6 +142,8 @@ type ClaimableBalanceOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // Claimants @@ -173,24 +179,26 @@ type LiquidityPoolAsset struct { // PoolOutput is a representation of a liquidity pool that aligns with the Bigquery table liquidity_pools type PoolOutput struct { - PoolID string `json:"liquidity_pool_id"` - PoolType string `json:"type"` - PoolFee uint32 `json:"fee"` - TrustlineCount uint64 `json:"trustline_count"` - PoolShareCount float64 `json:"pool_share_count"` - AssetAType string `json:"asset_a_type"` - AssetACode string `json:"asset_a_code"` - AssetAIssuer string `json:"asset_a_issuer"` - AssetAReserve float64 `json:"asset_a_amount"` - AssetAID int64 `json:"asset_a_id"` - AssetBType string `json:"asset_b_type"` - AssetBCode string `json:"asset_b_code"` - AssetBIssuer string `json:"asset_b_issuer"` - AssetBReserve float64 `json:"asset_b_amount"` - AssetBID int64 `json:"asset_b_id"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + PoolID string `json:"liquidity_pool_id"` + PoolType string `json:"type"` + PoolFee uint32 `json:"fee"` + TrustlineCount uint64 `json:"trustline_count"` + PoolShareCount float64 `json:"pool_share_count"` + AssetAType string `json:"asset_a_type"` + AssetACode string `json:"asset_a_code"` + AssetAIssuer string `json:"asset_a_issuer"` + AssetAReserve float64 `json:"asset_a_amount"` + AssetAID int64 `json:"asset_a_id"` + AssetBType string `json:"asset_b_type"` + AssetBCode string `json:"asset_b_code"` + AssetBIssuer string `json:"asset_b_issuer"` + AssetBReserve float64 `json:"asset_b_amount"` + AssetBID int64 `json:"asset_b_id"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -220,6 +228,8 @@ type TrustlineOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Sponsor null.String `json:"sponsor"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers @@ -243,6 +253,8 @@ type OfferOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` Sponsor null.String `json:"sponsor"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // TradeOutput is a representation of a trade that aligns with the BigQuery table history_trades @@ -473,26 +485,30 @@ type TestTransaction struct { // ContractDataOutput is a representation of contract data that aligns with the Bigquery table soroban_contract_data type ContractDataOutput struct { - ContractId string `json:"contract_id"` - ContractKeyType string `json:"contract_key_type"` - ContractDurability string `json:"contract_durability"` - ContractDataAssetCode string `json:"asset_code"` - ContractDataAssetIssuer string `json:"asset_issuer"` - ContractDataAssetType string `json:"asset_type"` - ContractDataBalanceHolder string `json:"balance_holder"` - ContractDataBalance string `json:"balance"` // balance is a string because it is go type big.Int - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + ContractId string `json:"contract_id"` + ContractKeyType string `json:"contract_key_type"` + ContractDurability string `json:"contract_durability"` + ContractDataAssetCode string `json:"asset_code"` + ContractDataAssetIssuer string `json:"asset_issuer"` + ContractDataAssetType string `json:"asset_type"` + ContractDataBalanceHolder string `json:"balance_holder"` + ContractDataBalance string `json:"balance"` // balance is a string because it is go type big.Int + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ContractCodeOutput is a representation of contract code that aligns with the Bigquery table soroban_contract_code type ContractCodeOutput struct { - ContractCodeHash string `json:"contract_code_hash"` - ContractCodeExtV int32 `json:"contract_code_ext_v"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + ContractCodeHash string `json:"contract_code_hash"` + ContractCodeExtV int32 `json:"contract_code_ext_v"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` //ContractCodeCode string `json:"contract_code"` } @@ -544,15 +560,19 @@ type ConfigSettingOutput struct { LastModifiedLedger uint32 `json:"last_modified_ledger"` LedgerEntryChange uint32 `json:"ledger_entry_change"` Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // ExpirationOutput is a representation of soroban expiration that aligns with the Bigquery table expirations type ExpirationOutput struct { - KeyHash string `json:"key_hash"` // key_hash is contract_code_hash or contract_id - ExpirationLedgerSeq uint32 `json:"expiration_ledger_seq"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + KeyHash string `json:"key_hash"` // key_hash is contract_code_hash or contract_id + ExpirationLedgerSeq uint32 `json:"expiration_ledger_seq"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + ClosedAt time.Time `json:"closed_at"` + LedgerSequence uint32 `json:"ledger_sequence"` } // DiagnosticEventOutput is a representation of soroban expiration that aligns with the Bigquery table expirations diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index 6373050c..3099f306 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -14,7 +14,7 @@ import ( ) // TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery -func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { +func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TrustlineOutput{}, err @@ -51,6 +51,13 @@ func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { liabilities := trustEntry.Liabilities() + closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) + if err != nil { + return TrustlineOutput{}, err + } + + ledgerSequence := header.Header.LedgerSeq + transformedTrustline := TrustlineOutput{ LedgerKey: outputLedgerKey, AccountID: outputAccountID, @@ -68,6 +75,8 @@ func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { LedgerEntryChange: uint32(changeType), Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, + ClosedAt: closedAt, + LedgerSequence: uint32(ledgerSequence), } return transformedTrustline, nil diff --git a/internal/transform/trustline_test.go b/internal/transform/trustline_test.go index 3d6e80a6..24efa2f5 100644 --- a/internal/transform/trustline_test.go +++ b/internal/transform/trustline_test.go @@ -3,6 +3,7 @@ package transform import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" @@ -49,7 +50,15 @@ func TestTransformTrustline(t *testing.T) { } for _, test := range tests { - actualOutput, actualError := TransformTrustline(test.input.ingest) + header := xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + } + actualOutput, actualError := TransformTrustline(test.input.ingest, header) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -131,6 +140,8 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, { LedgerKey: "AAAAAQAAAAAcR0GXGO76pFs4y38vJVAanjnLg4emNun7zAx0pHcDGAAAAAMBAwQFBwkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", @@ -146,6 +157,8 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 123456789, LedgerEntryChange: 1, Deleted: false, + LedgerSequence: 10, + ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), }, } }