Skip to content

Commit

Permalink
Merge pull request #208 from stellar/add-closed-at-for-ledger-entry-c…
Browse files Browse the repository at this point in the history
…hanges

Add closed_at to ledger entry changes tables
  • Loading branch information
chowbao authored Nov 2, 2023
2 parents 0ea7a30 + fe1dd24 commit 08fd8de
Show file tree
Hide file tree
Showing 35 changed files with 352 additions and 105 deletions.
3 changes: 2 additions & 1 deletion cmd/export_account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
numSigners := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
if acc.AccountSignersChanged() {
transformed, err := transform.TransformSigners(acc)
transformed, err := transform.TransformSigners(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
transformed, err := transform.TransformAccount(acc)
transformed, err := transform.TransformAccount(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ var claimableBalancesCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, balance := range balances {
transformed, err := transform.TransformClaimableBalance(balance)
transformed, err := transform.TransformClaimableBalance(balance, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_config_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var configSettingCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, setting := range settings {
transformed, err := transform.TransformConfigSetting(setting)
transformed, err := transform.TransformConfigSetting(setting, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var codeCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, code := range codes {
transformed, err := transform.TransformContractCode(code)
transformed, err := transform.TransformContractCode(code, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ var dataCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, data := range datas {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var expirationCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, expiration := range expirations {
transformed, err := transform.TransformExpiration(expiration)
transformed, err := transform.TransformExpiration(expiration, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err))
numFailures += 1
Expand Down
38 changes: 19 additions & 19 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ be exported.`,
if !exports["export-accounts"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
if changed, err := change.AccountChangedExceptSigners(); err != nil {
cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err))
continue
} else if changed {

acc, err := transform.TransformAccount(change)
acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -127,7 +127,7 @@ be exported.`,
transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc)
}
if change.AccountSignersChanged() {
signers, err := transform.TransformSigners(change)
signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -142,8 +142,8 @@ be exported.`,
if !exports["export-balances"] {
continue
}
for _, change := range changes {
balance, err := transform.TransformClaimableBalance(change)
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -155,8 +155,8 @@ be exported.`,
if !exports["export-offers"] {
continue
}
for _, change := range changes {
offer, err := transform.TransformOffer(change)
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -168,8 +168,8 @@ be exported.`,
if !exports["export-trustlines"] {
continue
}
for _, change := range changes {
trust, err := transform.TransformTrustline(change)
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -181,8 +181,8 @@ be exported.`,
if !exports["export-pools"] {
continue
}
for _, change := range changes {
pool, err := transform.TransformPool(change)
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -194,9 +194,9 @@ be exported.`,
if !exports["export-contract-data"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -214,8 +214,8 @@ be exported.`,
if !exports["export-contract-code"] {
continue
}
for _, change := range changes {
contractCode, err := transform.TransformContractCode(change)
for i, change := range changes.Changes {
contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -227,8 +227,8 @@ be exported.`,
if !exports["export-config-settings"] {
continue
}
for _, change := range changes {
configSettings, err := transform.TransformConfigSetting(change)
for i, change := range changes.Changes {
configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -240,8 +240,8 @@ be exported.`,
if !exports["export-expiration"] {
continue
}
for _, change := range changes {
expiration, err := transform.TransformExpiration(change)
for i, change := range changes.Changes {
expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, pool := range pools {
transformed, err := transform.TransformPool(pool)
transformed, err := transform.TransformPool(pool, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var offersCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, offer := range offers {
transformed, err := transform.TransformOffer(offer)
transformed, err := transform.TransformOffer(offer, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_trustlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var trustlinesCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, trust := range trustlines {
transformed, err := transform.TransformTrustline(trust)
transformed, err := transform.TransformTrustline(trust, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err))
numFailures += 1
Expand Down
26 changes: 14 additions & 12 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ var (
ExtractBatch = extractBatch
)

type LedgerChanges struct {
Changes []ingest.Change
LedgerHeaders []xdr.LedgerHeaderHistoryEntry
}

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes map[xdr.LedgerEntryType][]ingest.Change
Changes map[xdr.LedgerEntryType]LedgerChanges
BatchStart uint32
BatchEnd uint32
}
Expand Down Expand Up @@ -92,7 +97,7 @@ func extractBatch(
xdr.LedgerEntryTypeConfigSetting,
xdr.LedgerEntryTypeExpiration}

changes := map[xdr.LedgerEntryType][]ingest.Change{}
ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{}
ctx := context.Background()
for seq := batchStart; seq <= batchEnd; {
changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{}
Expand All @@ -107,19 +112,13 @@ func extractBatch(

// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented, and we try again on the next iteration of the loop
var header xdr.LedgerHeaderHistoryEntry
if seq <= latestLedger {
changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
// TODO: Add in ledger_closed_at; Update changeCompactors to also save ledger close time.
// AddChange is from the go monorepo so it might be easier to just add a addledgerclose func after it
//txReader := changeReader.LedgerTransactionReader

//closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime)
//if err != nil {
// logger.Fatal(fmt.Sprintf("unable to read close time for ledger %d: ", seq), err)
//}
header = changeReader.LedgerTransactionReader.GetHeader()

for {
change, err := changeReader.Read()
Expand All @@ -145,14 +144,17 @@ func extractBatch(

for dataType, compactor := range changeCompactors {
for _, change := range compactor.GetChanges() {
changes[dataType] = append(changes[dataType], change)
dataTypeChanges := ledgerChanges[dataType]
dataTypeChanges.Changes = append(dataTypeChanges.Changes, change)
dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header)
ledgerChanges[dataType] = dataTypeChanges
}
}

}

return ChangeBatch{
Changes: changes,
Changes: ledgerChanges,
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
13 changes: 9 additions & 4 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package input
import (
"testing"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-etl/internal/utils"
"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -114,8 +114,13 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch {
changes := map[xdr.LedgerEntryType][]ingest.Change{
entryType: {{Type: entry.Data.Type, Post: &entry}},
changes := map[xdr.LedgerEntryType]LedgerChanges{
entryType: {
Changes: []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
},
LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{},
},
}
return ChangeBatch{
Changes: changes,
Expand All @@ -128,7 +133,7 @@ func mockExtractBatch(
env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch {
log.Errorf("mock called")
return ChangeBatch{
Changes: map[xdr.LedgerEntryType][]ingest.Change{},
Changes: map[xdr.LedgerEntryType]LedgerChanges{},
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
11 changes: 10 additions & 1 deletion internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
Expand Down Expand Up @@ -76,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: utils.ConvertStroopValueToReal(outputBalance),
Expand All @@ -98,6 +105,8 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
}
return transformedAccount, nil
}
Loading

0 comments on commit 08fd8de

Please sign in to comment.