Skip to content

Commit

Permalink
Add ledger sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Nov 2, 2023
1 parent eb3fdcf commit df85b2d
Show file tree
Hide file tree
Showing 24 changed files with 143 additions and 75 deletions.
5 changes: 2 additions & 3 deletions cmd/export_account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -38,10 +37,10 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
numSigners := 0
var closedAt time.Time
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
if acc.AccountSignersChanged() {
transformed, err := transform.TransformSigners(acc, closedAt)
transformed, err := transform.TransformSigners(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -37,9 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var closedAt time.Time
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
transformed, err := transform.TransformAccount(acc, closedAt)
transformed, err := transform.TransformAccount(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -38,8 +37,8 @@ var claimableBalancesCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, balance := range balances {
var closedAt time.Time
transformed, err := transform.TransformClaimableBalance(balance, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformClaimableBalance(balance, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_config_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -35,8 +34,8 @@ var configSettingCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, setting := range settings {
var closedAt time.Time
transformed, err := transform.TransformConfigSetting(setting, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformConfigSetting(setting, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -35,8 +34,8 @@ var codeCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, code := range codes {
var closedAt time.Time
transformed, err := transform.TransformContractCode(code, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformContractCode(code, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_contract_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -35,9 +34,9 @@ var dataCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, data := range datas {
var closedAt time.Time
var header xdr.LedgerHeaderHistoryEntry
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, closedAt)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -35,8 +34,8 @@ var expirationCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, expiration := range expirations {
var closedAt time.Time
transformed, err := transform.TransformExpiration(expiration, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformExpiration(expiration, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err))
numFailures += 1
Expand Down
20 changes: 10 additions & 10 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ be exported.`,
continue
} else if changed {

acc, err := transform.TransformAccount(change, changes.ClosedAts[i])
acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -127,7 +127,7 @@ be exported.`,
transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc)
}
if change.AccountSignersChanged() {
signers, err := transform.TransformSigners(change, changes.ClosedAts[i])
signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -143,7 +143,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.ClosedAts[i])
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -156,7 +156,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.ClosedAts[i])
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -169,7 +169,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.ClosedAts[i])
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -182,7 +182,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.ClosedAts[i])
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -196,7 +196,7 @@ be exported.`,
}
for i, change := range changes.Changes {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.ClosedAts[i])
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -215,7 +215,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
contractCode, err := transform.TransformContractCode(change, changes.ClosedAts[i])
contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -228,7 +228,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
configSettings, err := transform.TransformConfigSetting(change, changes.ClosedAts[i])
configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -241,7 +241,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
expiration, err := transform.TransformExpiration(change, changes.ClosedAts[i])
expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -38,8 +37,8 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
for _, pool := range pools {
var closedAt time.Time
transformed, err := transform.TransformPool(pool, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformPool(pool, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,8 +38,8 @@ var offersCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, offer := range offers {
var closedAt time.Time
transformed, err := transform.TransformOffer(offer, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformOffer(offer, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err))
numFailures += 1
Expand Down
5 changes: 2 additions & 3 deletions cmd/export_trustlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,8 +38,8 @@ var trustlinesCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, trust := range trustlines {
var closedAt time.Time
transformed, err := transform.TransformTrustline(trust, closedAt)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformTrustline(trust, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err))
numFailures += 1
Expand Down
20 changes: 8 additions & 12 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"math"
"time"

"github.com/stellar/stellar-etl/internal/utils"

Expand All @@ -18,14 +17,14 @@ var (
ExtractBatch = extractBatch
)

type ChangesClosedAt struct {
Changes []ingest.Change
ClosedAts []time.Time
type LedgerChanges struct {
Changes []ingest.Change
LedgerHeaders []xdr.LedgerHeaderHistoryEntry
}

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes map[xdr.LedgerEntryType]ChangesClosedAt
Changes map[xdr.LedgerEntryType]LedgerChanges
BatchStart uint32
BatchEnd uint32
}
Expand Down Expand Up @@ -98,7 +97,7 @@ func extractBatch(
xdr.LedgerEntryTypeConfigSetting,
xdr.LedgerEntryTypeExpiration}

changesClosedAt := map[xdr.LedgerEntryType]ChangesClosedAt{}
changesClosedAt := map[xdr.LedgerEntryType]LedgerChanges{}
ctx := context.Background()
for seq := batchStart; seq <= batchEnd; {
changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{}
Expand All @@ -113,16 +112,13 @@ func extractBatch(

// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented, and we try again on the next iteration of the loop
var closedAt time.Time
var header xdr.LedgerHeaderHistoryEntry
if seq <= latestLedger {
changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
closedAt, err = utils.TimePointToUTCTimeStamp(changeReader.LedgerTransactionReader.GetHeader().Header.ScpValue.CloseTime)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to read CloseTime for ledger %d: ", seq), err)
}
header = changeReader.LedgerTransactionReader.GetHeader()

for {
change, err := changeReader.Read()
Expand Down Expand Up @@ -150,7 +146,7 @@ func extractBatch(
for _, change := range compactor.GetChanges() {
dataTypeChanges := changesClosedAt[dataType]
dataTypeChanges.Changes = append(dataTypeChanges.Changes, change)
dataTypeChanges.ClosedAts = append(dataTypeChanges.ClosedAts, closedAt)
dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header)
changesClosedAt[dataType] = dataTypeChanges
}
}
Expand Down
11 changes: 9 additions & 2 deletions internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package transform

import (
"fmt"
"time"

"github.com/guregu/null/zero"
"github.com/stellar/go/ingest"
Expand All @@ -11,7 +10,7 @@ import (
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOutput, error) {
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
Expand Down Expand Up @@ -77,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: utils.ConvertStroopValueToReal(outputBalance),
Expand All @@ -100,6 +106,7 @@ func TransformAccount(ledgerChange ingest.Change, closedAt time.Time) (AccountOu
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
}
return transformedAccount, nil
}
Loading

0 comments on commit df85b2d

Please sign in to comment.