From 4ecb6434e657c73d6603c003158c439978610c83 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 6 Oct 2023 18:31:18 +0100 Subject: [PATCH] ingest: Extract ledger entry changes from Tx Meta in a deterministic order (#5070) --- ingest/change.go | 56 +++++ ingest/change_test.go | 215 ++++++++++++++++++ ingest/ledger_change_reader.go | 1 + services/horizon/CHANGELOG.md | 4 + .../ingest/processors/effects_processor.go | 20 +- .../transaction_operation_wrapper_test.go | 10 +- .../integration/liquidity_pool_test.go | 34 ++- xdr/ledger_close_meta.go | 4 +- 8 files changed, 318 insertions(+), 26 deletions(-) create mode 100644 ingest/change_test.go diff --git a/ingest/change.go b/ingest/change.go index 7d9b761db2..027b37b861 100644 --- a/ingest/change.go +++ b/ingest/change.go @@ -2,6 +2,7 @@ package ingest import ( "bytes" + "sort" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -20,6 +21,13 @@ type Change struct { Post *xdr.LedgerEntry } +func (c *Change) ledgerKey() (xdr.LedgerKey, error) { + if c.Pre != nil { + return c.Pre.LedgerKey() + } + return c.Post.LedgerKey() +} + // GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. // Each `update` and `removed` is preceded with `state` and `create` changes // are alone, without `state`. The transformation we're doing is to move each @@ -64,9 +72,57 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) } } + sortChanges(changes) return changes } +type sortableChanges struct { + changes []Change + ledgerKeys [][]byte +} + +func newSortableChanges(changes []Change) sortableChanges { + ledgerKeys := make([][]byte, len(changes)) + for i, c := range changes { + lk, err := c.ledgerKey() + if err != nil { + panic(err) + } + lkBytes, err := lk.MarshalBinary() + if err != nil { + panic(err) + } + ledgerKeys[i] = lkBytes + } + return sortableChanges{ + changes: changes, + ledgerKeys: ledgerKeys, + } +} + +func (s sortableChanges) Len() int { + return len(s.changes) +} + +func (s sortableChanges) Less(i, j int) bool { + return bytes.Compare(s.ledgerKeys[i], s.ledgerKeys[j]) < 0 +} + +func (s sortableChanges) Swap(i, j int) { + s.changes[i], s.changes[j] = s.changes[j], s.changes[i] + s.ledgerKeys[i], s.ledgerKeys[j] = s.ledgerKeys[j], s.ledgerKeys[i] +} + +// sortChanges is applied on a list of changes to ensure that LedgerEntryChanges +// from Tx Meta are ingested in a deterministic order. +// The changes are sorted by ledger key. It is unexpected for there to be +// multiple changes with the same ledger key in a LedgerEntryChanges group, +// but if that is the case, we fall back to the original ordering of the changes +// by using a stable sorting algorithm. +func sortChanges(changes []Change) { + sort.Stable(newSortableChanges(changes)) +} + // LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { switch { diff --git a/ingest/change_test.go b/ingest/change_test.go new file mode 100644 index 0000000000..d8ae9492dc --- /dev/null +++ b/ingest/change_test.go @@ -0,0 +1,215 @@ +package ingest + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/xdr" +) + +func assertChangesAreEqual(t *testing.T, a, b Change) { + assert.Equal(t, a.Type, b.Type) + if a.Pre == nil { + assert.Nil(t, b.Pre) + } else { + aBytes, err := a.Pre.MarshalBinary() + assert.NoError(t, err) + bBytes, err := b.Pre.MarshalBinary() + assert.NoError(t, err) + assert.Equal(t, aBytes, bBytes) + } + if a.Post == nil { + assert.Nil(t, b.Post) + } else { + aBytes, err := a.Post.MarshalBinary() + assert.NoError(t, err) + bBytes, err := b.Post.MarshalBinary() + assert.NoError(t, err) + assert.Equal(t, aBytes, bBytes) + } +} + +func TestSortChanges(t *testing.T) { + for _, testCase := range []struct { + input []Change + expected []Change + }{ + {[]Change{}, []Change{}}, + { + []Change{ + { + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + }, + }, + []Change{ + { + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + }, + }, + }, + { + []Change{ + { + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: 25, + }, + }, + }, + }, + { + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF"), + Balance: 20, + }, + }, + }, + Post: nil, + }, + { + Type: xdr.LedgerEntryTypeExpiration, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeExpiration, + Expiration: &xdr.ExpirationEntry{ + KeyHash: xdr.Hash{1}, + ExpirationLedgerSeq: 50, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeExpiration, + Expiration: &xdr.ExpirationEntry{ + KeyHash: xdr.Hash{1}, + ExpirationLedgerSeq: 100, + }, + }, + }, + }, + { + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: 25, + }, + }, + }, + Post: nil, + }, + }, + []Change{ + { + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF"), + Balance: 20, + }, + }, + }, + Post: nil, + }, + { + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: 25, + }, + }, + }, + }, + { + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: 25, + }, + }, + }, + Post: nil, + }, + + { + Type: xdr.LedgerEntryTypeExpiration, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeExpiration, + Expiration: &xdr.ExpirationEntry{ + KeyHash: xdr.Hash{1}, + ExpirationLedgerSeq: 50, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeExpiration, + Expiration: &xdr.ExpirationEntry{ + KeyHash: xdr.Hash{1}, + ExpirationLedgerSeq: 100, + }, + }, + }, + }, + }, + }, + } { + sortChanges(testCase.input) + assert.Equal(t, len(testCase.input), len(testCase.expected)) + for i := range testCase.input { + assertChangesAreEqual(t, testCase.input[i], testCase.expected[i]) + } + } +} diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index a539e057ea..d09c579dbd 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -139,6 +139,7 @@ func (r *LedgerChangeReader) Read() (Change, error) { Post: nil, } } + sortChanges(changes) r.pending = append(r.pending, changes...) r.state++ return r.Read() diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 62700067f8..4e82f15246 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## 2.27.0 + +### Fixed +- Ordering of effects are now deterministic. Previously the order of some Horizon effects could vary upon reingestion but this issue has now been fixed ([5070](https://github.com/stellar/go/pull/5070)). ## 2.27.0-rc2 ### Fixed diff --git a/services/horizon/internal/ingest/processors/effects_processor.go b/services/horizon/internal/ingest/processors/effects_processor.go index 4cfd703bd2..d6c0d781d6 100644 --- a/services/horizon/internal/ingest/processors/effects_processor.go +++ b/services/horizon/internal/ingest/processors/effects_processor.go @@ -10,6 +10,7 @@ import ( "strconv" "github.com/guregu/null" + "github.com/stellar/go/amount" "github.com/stellar/go/ingest" "github.com/stellar/go/keypair" @@ -1241,12 +1242,6 @@ func setTrustLineFlagDetails(flagDetails map[string]interface{}, flags xdr.Trust } } -type sortableClaimableBalanceEntries []*xdr.ClaimableBalanceEntry - -func (s sortableClaimableBalanceEntries) Len() int { return len(s) } -func (s sortableClaimableBalanceEntries) Less(i, j int) bool { return s[i].Asset.LessThan(s[j].Asset) } -func (s sortableClaimableBalanceEntries) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error { source := e.operation.SourceAccount() lp, delta, err := e.operation.getLiquidityPoolAndProductDelta(nil) @@ -1262,7 +1257,6 @@ func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error { return err } assetToCBID := map[string]string{} - var cbs sortableClaimableBalanceEntries for _, change := range changes { if change.Type == xdr.LedgerEntryTypeClaimableBalance && change.Pre == nil && change.Post != nil { cb := change.Post.Data.ClaimableBalance @@ -1271,21 +1265,15 @@ func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error { return err } assetToCBID[cb.Asset.StringCanonical()] = id - cbs = append(cbs, cb) + if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil { + return err + } } } if len(assetToCBID) == 0 { // no claimable balances were created, and thus, no revocation happened return nil } - // Core's claimable balance metadata isn't ordered, so we order it ourselves - // so that effects are ordered consistently - sort.Sort(cbs) - for _, cb := range cbs { - if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil { - return err - } - } reservesRevoked := make([]map[string]string, 0, 2) for _, aa := range []base.AssetAmount{ diff --git a/services/horizon/internal/ingest/processors/transaction_operation_wrapper_test.go b/services/horizon/internal/ingest/processors/transaction_operation_wrapper_test.go index a96d4efef9..97b83f8f1b 100644 --- a/services/horizon/internal/ingest/processors/transaction_operation_wrapper_test.go +++ b/services/horizon/internal/ingest/processors/transaction_operation_wrapper_test.go @@ -5,10 +5,11 @@ package processors import ( "testing" - "github.com/stellar/go/protocols/horizon/base" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/stellar/go/protocols/horizon/base" + "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" . "github.com/stellar/go/services/horizon/internal/test/transactions" @@ -1521,6 +1522,13 @@ func getSponsoredSandwichWrappers() []*transactionOperationWrapper { { Type: xdr.LedgerEntryChangeTypeLedgerEntryCreated, Created: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAUJETIZVEP2NRYLUESJ3LS66NVCEGMON4UDCBCSBEVPIID773P2W6AY"), + Balance: 100, + }, + }, LastModifiedLedgerSeq: xdr.Uint32(ledgerSeq), Ext: xdr.LedgerEntryExt{ V: 1, diff --git a/services/horizon/internal/integration/liquidity_pool_test.go b/services/horizon/internal/integration/liquidity_pool_test.go index 3f792486ad..9106003179 100644 --- a/services/horizon/internal/integration/liquidity_pool_test.go +++ b/services/horizon/internal/integration/liquidity_pool_test.go @@ -606,27 +606,45 @@ func TestLiquidityPoolRevoke(t *testing.T) { tt.Equal(master.Address(), ef4.Asset.Issuer) tt.Equal(shareAccount.GetAccountID(), ef4.Trustor) + // the ordering of the claimable_balance_created effects depends on + // the ids of the claimable balances which can vary between test runs. + // we assert that there will be two claimable balances created, + // one holding 777 usd and another holding 400 xlm but + // we don't know the ordering since it depends on the claimable + // balance ids which we don't know ahead of time. + usdAsset := fmt.Sprintf("USD:%s", master.Address()) + expectedAmount := map[string]string{ + usdAsset: "777.0000000", + "native": "400.0000000", + } ef5 := (effs.Embedded.Records[4]).(effects.ClaimableBalanceCreated) tt.Equal("claimable_balance_created", ef5.Type) - tt.Equal("native", ef5.Asset) - tt.Equal("400.0000000", ef5.Amount) + var expectedNextAsset string + if ef5.Asset == usdAsset { + expectedNextAsset = "native" + } else if ef5.Asset == "native" { + expectedNextAsset = usdAsset + } else { + tt.Failf("unexpected asset %v", ef5.Asset) + } + tt.Equal(expectedAmount[ef5.Asset], ef5.Amount) ef6 := (effs.Embedded.Records[5]).(effects.ClaimableBalanceClaimantCreated) tt.Equal("claimable_balance_claimant_created", ef6.Type) - tt.Equal("native", ef6.Asset) - tt.Equal("400.0000000", ef6.Amount) + tt.Equal(ef5.Asset, ef6.Asset) + tt.Equal(ef5.Amount, ef6.Amount) tt.Equal(shareKeys.Address(), ef6.Account) tt.Equal(xdr.ClaimPredicateTypeClaimPredicateUnconditional, ef6.Predicate.Type) ef7 := (effs.Embedded.Records[6]).(effects.ClaimableBalanceCreated) tt.Equal("claimable_balance_created", ef7.Type) - tt.Equal(fmt.Sprintf("USD:%s", master.Address()), ef7.Asset) - tt.Equal("777.0000000", ef7.Amount) + tt.Equal(expectedNextAsset, ef7.Asset) + tt.Equal(expectedAmount[ef7.Asset], ef7.Amount) ef8 := (effs.Embedded.Records[7]).(effects.ClaimableBalanceClaimantCreated) tt.Equal("claimable_balance_claimant_created", ef8.Type) - tt.Equal(fmt.Sprintf("USD:%s", master.Address()), ef8.Asset) - tt.Equal("777.0000000", ef8.Amount) + tt.Equal(ef7.Asset, ef8.Asset) + tt.Equal(ef7.Amount, ef8.Amount) tt.Equal(shareKeys.Address(), ef8.Account) tt.Equal(xdr.ClaimPredicateTypeClaimPredicateUnconditional, ef8.Predicate.Type) diff --git a/xdr/ledger_close_meta.go b/xdr/ledger_close_meta.go index c8b85bae97..c2b352b613 100644 --- a/xdr/ledger_close_meta.go +++ b/xdr/ledger_close_meta.go @@ -1,6 +1,8 @@ package xdr -import "fmt" +import ( + "fmt" +) func (l LedgerCloseMeta) LedgerHeaderHistoryEntry() LedgerHeaderHistoryEntry { switch l.V {