Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add closed_at to ledger entry changes tables #208

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/export_account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
numSigners := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
if acc.AccountSignersChanged() {
transformed, err := transform.TransformSigners(acc)
transformed, err := transform.TransformSigners(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
transformed, err := transform.TransformAccount(acc)
transformed, err := transform.TransformAccount(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ var claimableBalancesCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, balance := range balances {
transformed, err := transform.TransformClaimableBalance(balance)
transformed, err := transform.TransformClaimableBalance(balance, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_config_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var configSettingCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, setting := range settings {
transformed, err := transform.TransformConfigSetting(setting)
transformed, err := transform.TransformConfigSetting(setting, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var codeCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, code := range codes {
transformed, err := transform.TransformContractCode(code)
transformed, err := transform.TransformContractCode(code, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ var dataCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, data := range datas {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var expirationCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, expiration := range expirations {
transformed, err := transform.TransformExpiration(expiration)
transformed, err := transform.TransformExpiration(expiration, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err))
numFailures += 1
Expand Down
38 changes: 19 additions & 19 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ be exported.`,
if !exports["export-accounts"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
if changed, err := change.AccountChangedExceptSigners(); err != nil {
cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err))
continue
} else if changed {

acc, err := transform.TransformAccount(change)
acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -127,7 +127,7 @@ be exported.`,
transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc)
}
if change.AccountSignersChanged() {
signers, err := transform.TransformSigners(change)
signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -142,8 +142,8 @@ be exported.`,
if !exports["export-balances"] {
continue
}
for _, change := range changes {
balance, err := transform.TransformClaimableBalance(change)
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -155,8 +155,8 @@ be exported.`,
if !exports["export-offers"] {
continue
}
for _, change := range changes {
offer, err := transform.TransformOffer(change)
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -168,8 +168,8 @@ be exported.`,
if !exports["export-trustlines"] {
continue
}
for _, change := range changes {
trust, err := transform.TransformTrustline(change)
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -181,8 +181,8 @@ be exported.`,
if !exports["export-pools"] {
continue
}
for _, change := range changes {
pool, err := transform.TransformPool(change)
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -194,9 +194,9 @@ be exported.`,
if !exports["export-contract-data"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -214,8 +214,8 @@ be exported.`,
if !exports["export-contract-code"] {
continue
}
for _, change := range changes {
contractCode, err := transform.TransformContractCode(change)
for i, change := range changes.Changes {
contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -227,8 +227,8 @@ be exported.`,
if !exports["export-config-settings"] {
continue
}
for _, change := range changes {
configSettings, err := transform.TransformConfigSetting(change)
for i, change := range changes.Changes {
configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -240,8 +240,8 @@ be exported.`,
if !exports["export-expiration"] {
continue
}
for _, change := range changes {
expiration, err := transform.TransformExpiration(change)
for i, change := range changes.Changes {
expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, pool := range pools {
transformed, err := transform.TransformPool(pool)
transformed, err := transform.TransformPool(pool, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var offersCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, offer := range offers {
transformed, err := transform.TransformOffer(offer)
transformed, err := transform.TransformOffer(offer, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_trustlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var trustlinesCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, trust := range trustlines {
transformed, err := transform.TransformTrustline(trust)
transformed, err := transform.TransformTrustline(trust, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err))
numFailures += 1
Expand Down
26 changes: 14 additions & 12 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ var (
ExtractBatch = extractBatch
)

type LedgerChanges struct {
Changes []ingest.Change
LedgerHeaders []xdr.LedgerHeaderHistoryEntry
}

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes map[xdr.LedgerEntryType][]ingest.Change
Changes map[xdr.LedgerEntryType]LedgerChanges
BatchStart uint32
BatchEnd uint32
}
Expand Down Expand Up @@ -92,7 +97,7 @@ func extractBatch(
xdr.LedgerEntryTypeConfigSetting,
xdr.LedgerEntryTypeExpiration}

changes := map[xdr.LedgerEntryType][]ingest.Change{}
ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{}
ctx := context.Background()
for seq := batchStart; seq <= batchEnd; {
changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{}
Expand All @@ -107,19 +112,13 @@ func extractBatch(

// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented, and we try again on the next iteration of the loop
var header xdr.LedgerHeaderHistoryEntry
if seq <= latestLedger {
changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
// TODO: Add in ledger_closed_at; Update changeCompactors to also save ledger close time.
// AddChange is from the go monorepo so it might be easier to just add a addledgerclose func after it
//txReader := changeReader.LedgerTransactionReader

//closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime)
//if err != nil {
// logger.Fatal(fmt.Sprintf("unable to read close time for ledger %d: ", seq), err)
//}
header = changeReader.LedgerTransactionReader.GetHeader()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find :) was this method easy to find? Or would it be helpful to add to the ingest documentation for ease of use in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't necessarily obvious because of all the different reader nesting. But when you know it exists it is easy. Probably would be useful in the ingest documentation. It would be nice to get a reader/backend to xdr.* mapping like {Reader: [xdr.TxMeta, xdr.CloseMeta, xdr.LedgerHeader, etc...]} without having to look through the xdr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Let's discuss with the rest of platform whether it makes sense to change the backend xdr mapping to be clearer.


for {
change, err := changeReader.Read()
Expand All @@ -145,14 +144,17 @@ func extractBatch(

for dataType, compactor := range changeCompactors {
for _, change := range compactor.GetChanges() {
changes[dataType] = append(changes[dataType], change)
dataTypeChanges := ledgerChanges[dataType]
dataTypeChanges.Changes = append(dataTypeChanges.Changes, change)
dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header)
ledgerChanges[dataType] = dataTypeChanges
}
}

}

return ChangeBatch{
Changes: changes,
Changes: ledgerChanges,
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
13 changes: 9 additions & 4 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package input
import (
"testing"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-etl/internal/utils"
"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -114,8 +114,13 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch {
changes := map[xdr.LedgerEntryType][]ingest.Change{
entryType: {{Type: entry.Data.Type, Post: &entry}},
changes := map[xdr.LedgerEntryType]LedgerChanges{
entryType: {
Changes: []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
},
LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{},
},
}
return ChangeBatch{
Changes: changes,
Expand All @@ -128,7 +133,7 @@ func mockExtractBatch(
env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch {
log.Errorf("mock called")
return ChangeBatch{
Changes: map[xdr.LedgerEntryType][]ingest.Change{},
Changes: map[xdr.LedgerEntryType]LedgerChanges{},
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
11 changes: 10 additions & 1 deletion internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
Expand Down Expand Up @@ -76,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: utils.ConvertStroopValueToReal(outputBalance),
Expand All @@ -98,6 +105,8 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
}
return transformedAccount, nil
}
Loading
Loading