Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State table verification #169

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a8d105f
add verify library
lucaszanotelli Nov 17, 2022
62ba8a5
add function to find if ledger is a checkpoint
lucaszanotelli Nov 17, 2022
c5f68eb
add captive core folder path to gitignore
lucaszanotelli Nov 17, 2022
554d18f
add raw fields to trustline output
lucaszanotelli Nov 17, 2022
1d5cd44
add raw fields to offer output
lucaszanotelli Nov 17, 2022
1885219
add raw fields to account output
lucaszanotelli Nov 17, 2022
df333a5
add raw fields to claimable balance output
lucaszanotelli Nov 17, 2022
62091d4
add raw fields to liquidity pool output
lucaszanotelli Nov 17, 2022
8b2e3b5
create type for transformed outputs
lucaszanotelli Nov 17, 2022
28ce0ac
testing verify implementation
lucaszanotelli Nov 17, 2022
745b668
update tests
lucaszanotelli Nov 18, 2022
cbbc778
clear extra code from horizon
lucaszanotelli Nov 18, 2022
76e2dfd
add header disclaimer
lucaszanotelli Nov 18, 2022
20398ce
implement checkpoint ledgers verification
lucaszanotelli Nov 18, 2022
314711c
revert transformed outputs to map
lucaszanotelli Nov 21, 2022
955d127
remove comments
lucaszanotelli Nov 21, 2022
0ea7ba6
change transformed output name
lucaszanotelli Nov 21, 2022
e4578e5
uncomment actual code
lucaszanotelli Nov 21, 2022
b34f1b7
go mod tidy
lucaszanotelli Dec 12, 2022
cf981ed
remove copies from horizon
lucaszanotelli Dec 14, 2022
4c11faa
import verify module from monorepo
lucaszanotelli Dec 14, 2022
39b6342
remove boolean output
lucaszanotelli Dec 14, 2022
26adc61
add error verify to CreateHistoryArchiveClient
lucaszanotelli Dec 14, 2022
58cf796
reduce verifyBatchSize 24000
lucaszanotelli Jan 12, 2023
f56645c
check if signer entry is deleted
lucaszanotelli Jan 12, 2023
7360909
remove extra element check
lucaszanotelli Jan 12, 2023
57e871f
verify if entry exists before comparison
lucaszanotelli Jan 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
/.vscode/*.sql
/.vscode/settings.json
/.vscode/launch.json
/captive-core*
.idea
debug
.bundle
*.swp
*.crt
*.csr
*.key
*.txt
stellar-etl
63 changes: 56 additions & 7 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"math"
"os"
Expand All @@ -11,8 +12,11 @@ import (
"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
"github.com/stellar/stellar-etl/internal/utils/verify"
)

const verifyBatchSize = 50000

var exportLedgerEntryChangesCmd = &cobra.Command{
Use: "export_ledger_entry_changes",
Short: "This command exports the changes in accounts, offers, trustlines and liquidity pools.",
Expand All @@ -29,10 +33,11 @@ be exported.`,
endNum, strictExport, isTest, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest)

archive, _ := utils.CreateHistoryArchiveClient(env.ArchiveURLs)
execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exportAccounts, exportOffers, exportTrustlines, exportPools, exportBalances := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
ctx := context.Background()

err := os.MkdirAll(outputFolder, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -71,6 +76,7 @@ be exported.`,
endNum = math.MaxInt32
}

verifyOutputs := make(map[uint32]transform.TransformedOutput, endNum)
changeChan := make(chan input.ChangeBatch)
closeChan := make(chan int)
go input.StreamChanges(core, startNum, endNum, batchSize, changeChan, closeChan, env, cmdLogger)
Expand All @@ -91,77 +97,120 @@ be exported.`,
"trustlines": {},
"liquidity_pools": {},
}

for entryType, changes := range batch.Changes {
switch entryType {
case xdr.LedgerEntryTypeAccount:
for _, change := range changes {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
if changed, err := change.AccountChangedExceptSigners(); err != nil {
cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err))
continue
} else if changed {
acc, err := transform.TransformAccount(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
continue
}
transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Accounts = append(x.Accounts, acc)
verifyOutputs[actualLedger] = x
}
}
if change.AccountSignersChanged() {
signers, err := transform.TransformSigners(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err))
continue
}
for _, s := range signers {
transformedOutputs["signers"] = append(transformedOutputs["signers"], s)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Signers = append(x.Signers, s)
verifyOutputs[actualLedger] = x
}
}
}
}
case xdr.LedgerEntryTypeClaimableBalance:
for _, change := range changes {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
balance, err := transform.TransformClaimableBalance(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
continue
}
transformedOutputs["claimable_balances"] = append(transformedOutputs["claimable_balances"], balance)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Claimable_balances = append(x.Claimable_balances, balance)
verifyOutputs[actualLedger] = x
}
}
case xdr.LedgerEntryTypeOffer:
for _, change := range changes {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
offer, err := transform.TransformOffer(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
continue
}
transformedOutputs["offers"] = append(transformedOutputs["offers"], offer)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Offers = append(x.Offers, offer)
verifyOutputs[actualLedger] = x
}
}
case xdr.LedgerEntryTypeTrustline:
for _, change := range changes {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
trust, err := transform.TransformTrustline(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
continue
}
transformedOutputs["trustlines"] = append(transformedOutputs["trustlines"], trust)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Trustlines = append(x.Trustlines, trust)
verifyOutputs[actualLedger] = x
}
}
case xdr.LedgerEntryTypeLiquidityPool:
for _, change := range changes {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
pool, err := transform.TransformPool(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
continue
}
transformedOutputs["liquidity_pools"] = append(transformedOutputs["liquidity_pools"], pool)

if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok {
x := verifyOutputs[actualLedger]
x.Liquidity_pools = append(x.Liquidity_pools, pool)
verifyOutputs[actualLedger] = x
}
}
}
}

for checkpointLedgers := range verifyOutputs {
_, err := verify.VerifyState(ctx, verifyOutputs[checkpointLedgers], archive, checkpointLedgers, verifyBatchSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know where the verifyBatchSize comes from? Do we know that 50000 is big enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verifyBatchSize value came from here. About the second question, I don't know the answer. Maybe @bartekn could you help us with that?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value can be changed. It's basically as big as possible but small enough to not cause OOM crash.

if err != nil {
panic(err)
}
}

err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, gcpCredentials, gcsBucket, extra)
if err != nil {
cmdLogger.LogError(err)
Expand Down
51 changes: 29 additions & 22 deletions internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stellar/stellar-etl/internal/utils"
)

//TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
Expand Down Expand Up @@ -77,27 +77,34 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: utils.ConvertStroopValueToReal(outputBalance),
BuyingLiabilities: utils.ConvertStroopValueToReal(outputBuyingLiabilities),
SellingLiabilities: utils.ConvertStroopValueToReal(outputSellingLiabilities),
SequenceNumber: outputSequenceNumber,
SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)),
SequenceTime: zero.IntFrom(int64(outputSequenceTime)),
NumSubentries: outputNumSubentries,
InflationDestination: outputInflationDest,
Flags: outputFlags,
HomeDomain: outputHomeDomain,
MasterWeight: outputMasterWeight,
ThresholdLow: outputThreshLow,
ThresholdMedium: outputThreshMed,
ThresholdHigh: outputThreshHigh,
LastModifiedLedger: outputLastModifiedLedger,
Sponsor: ledgerEntrySponsorToNullString(ledgerEntry),
NumSponsored: uint32(accountEntry.NumSponsored()),
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
AccountID: outputID,
RawBalance: outputBalance,
RawBuyingLiabilities: outputBuyingLiabilities,
RawSellingLiabilities: outputSellingLiabilities,
sydneynotthecity marked this conversation as resolved.
Show resolved Hide resolved
Balance: utils.ConvertStroopValueToReal(outputBalance),
BuyingLiabilities: utils.ConvertStroopValueToReal(outputBuyingLiabilities),
SellingLiabilities: utils.ConvertStroopValueToReal(outputSellingLiabilities),
SequenceNumber: outputSequenceNumber,
SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)),
SequenceTime: zero.IntFrom(int64(outputSequenceTime)),
NumSubentries: outputNumSubentries,
InflationDestination: outputInflationDest,
Flags: outputFlags,
HomeDomain: outputHomeDomain,
MasterWeight: outputMasterWeight,
ThresholdLow: outputThreshLow,
ThresholdMedium: outputThreshMed,
ThresholdHigh: outputThreshHigh,
LastModifiedLedger: outputLastModifiedLedger,
Sponsor: ledgerEntrySponsorToNullString(ledgerEntry),
NumSponsored: uint32(accountEntry.NumSponsored()),
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
RawMasterWeight: accountEntry.MasterKeyWeight(),
RawThresholdLow: accountEntry.ThresholdLow(),
RawThresholdMedium: accountEntry.ThresholdMedium(),
RawThresholdHigh: accountEntry.ThresholdHigh(),
}
return transformedAccount, nil
}
45 changes: 26 additions & 19 deletions internal/transform/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,31 @@ func makeAccountTestInput() ingest.Change {

func makeAccountTestOutput() AccountOutput {
return AccountOutput{
AccountID: testAccount1Address,
Balance: 1.0959979,
BuyingLiabilities: 0.0001,
SellingLiabilities: 0.00015,
SequenceNumber: 117801117454198833,
NumSubentries: 141,
InflationDestination: testAccount2Address,
Flags: 4,
HomeDomain: "examplehome.com",
MasterWeight: 2,
ThresholdLow: 1,
ThresholdMedium: 3,
ThresholdHigh: 5,
Sponsor: null.StringFrom(testAccount3Address),
NumSponsored: 3,
NumSponsoring: 1,
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
AccountID: testAccount1Address,
RawBalance: 10959979,
RawBuyingLiabilities: 1000,
RawSellingLiabilities: 1500,
Balance: 1.0959979,
BuyingLiabilities: 0.0001,
SellingLiabilities: 0.00015,
SequenceNumber: 117801117454198833,
NumSubentries: 141,
InflationDestination: testAccount2Address,
Flags: 4,
HomeDomain: "examplehome.com",
RawMasterWeight: 2,
RawThresholdLow: 1,
RawThresholdMedium: 3,
RawThresholdHigh: 5,
MasterWeight: 2,
ThresholdLow: 1,
ThresholdMedium: 3,
ThresholdHigh: 5,
Sponsor: null.StringFrom(testAccount3Address),
NumSponsored: 3,
NumSponsoring: 1,
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
}
}
4 changes: 3 additions & 1 deletion internal/transform/claimable_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant {
return transformed
}

//TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery
// TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery
func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
Expand All @@ -47,11 +47,13 @@ func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutp

transformed := ClaimableBalanceOutput{
BalanceID: balanceID,
Asset: balanceEntry.Asset,
sydneynotthecity marked this conversation as resolved.
Show resolved Hide resolved
AssetCode: outputAsset.AssetCode,
AssetIssuer: outputAsset.AssetIssuer,
AssetType: outputAsset.AssetType,
Claimants: outputClaimants,
AssetAmount: float64(outputAmount) / 1.0e7,
RawAssetAmount: outputAmount,
Sponsor: ledgerEntrySponsorToNullString(ledgerEntry),
LastModifiedLedger: outputLastModifiedLedger,
LedgerEntryChange: uint32(changeType),
Expand Down
2 changes: 2 additions & 0 deletions internal/transform/claimable_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput {
},
},
},
Asset: xdr.MustNewCreditAsset("\x01\x02\x03\x04\x05\x06\a\b\t", "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN"),
AssetIssuer: "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN",
AssetType: "credit_alphanum12",
AssetCode: "\x01\x02\x03\x04\x05\x06\a\b\t",
AssetAmount: 999,
RawAssetAmount: 9990000000,
Sponsor: null.StringFrom("GAAQEAYEAUDAOCAJAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABO3W"),
Flags: 10,
LastModifiedLedger: 30705278,
Expand Down
8 changes: 7 additions & 1 deletion internal/transform/liquidity_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/stellar/stellar-etl/internal/utils"
)

//TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery
// TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery
func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
Expand Down Expand Up @@ -46,21 +46,27 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) {

transformedPool := PoolOutput{
PoolID: PoolIDToString(lp.LiquidityPoolId),
RawPoolType: lp.Body.Type,
PoolType: poolType,
PoolFee: uint32(cp.Params.Fee),
TrustlineCount: uint64(cp.PoolSharesTrustLineCount),
PoolShareCount: utils.ConvertStroopValueToReal(cp.TotalPoolShares),
AssetA: cp.Params.AssetA,
AssetAType: assetAType,
AssetACode: assetACode,
AssetAIssuer: assetAIssuer,
AssetAReserve: utils.ConvertStroopValueToReal(cp.ReserveA),
AssetB: cp.Params.AssetB,
AssetBType: assetBType,
AssetBCode: assetBCode,
AssetBIssuer: assetBIssuer,
AssetBReserve: utils.ConvertStroopValueToReal(cp.ReserveB),
LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
RawAssetAReserve: cp.ReserveA,
RawAssetBReserve: cp.ReserveB,
RawPoolShareCount: cp.TotalPoolShares,
}
return transformedPool, nil
}
6 changes: 6 additions & 0 deletions internal/transform/liquidity_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,11 @@ func makePoolTestOutput() PoolOutput {
LastModifiedLedger: 30705278,
LedgerEntryChange: 2,
Deleted: true,
AssetA: xdr.MustNewNativeAsset(),
AssetB: xdr.MustNewCreditAsset("USSD", "GBVVRXLMNCJQW3IDDXC3X6XCH35B5Q7QXNMMFPENSOGUPQO7WO7HGZPA"),
RawAssetAReserve: 105,
RawAssetBReserve: 10,
RawPoolShareCount: 35,
RawPoolType: 0,
}
}
3 changes: 3 additions & 0 deletions internal/transform/offer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) {
transformedOffer := OfferOutput{
SellerID: outputSellerID,
OfferID: outputOfferID,
SellingAsset: offerEntry.Selling,
BuyingAsset: offerEntry.Buying,
SellingAssetType: outputSellingAsset.AssetType,
SellingAssetCode: outputSellingAsset.AssetCode,
SellingAssetIssuer: outputSellingAsset.AssetIssuer,
BuyingAssetType: outputBuyingAsset.AssetType,
BuyingAssetCode: outputBuyingAsset.AssetCode,
BuyingAssetIssuer: outputBuyingAsset.AssetIssuer,
Amount: utils.ConvertStroopValueToReal(outputAmount),
RawAmount: outputAmount,
PriceN: outputPriceN,
PriceD: outputPriceD,
Price: outputPrice,
Expand Down
Loading