Skip to content

Commit

Permalink
ingest: Extract ledger entry changes from Tx Meta in a deterministic …
Browse files Browse the repository at this point in the history
…order (#5070)
  • Loading branch information
tamirms authored Oct 6, 2023
1 parent 9477bf5 commit 4ecb643
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 26 deletions.
56 changes: 56 additions & 0 deletions ingest/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingest

import (
"bytes"
"sort"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand All @@ -20,6 +21,13 @@ type Change struct {
Post *xdr.LedgerEntry
}

func (c *Change) ledgerKey() (xdr.LedgerKey, error) {
if c.Pre != nil {
return c.Pre.LedgerKey()
}
return c.Post.LedgerKey()
}

// GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change.
// Each `update` and `removed` is preceded with `state` and `create` changes
// are alone, without `state`. The transformation we're doing is to move each
Expand Down Expand Up @@ -64,9 +72,57 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
}
}

sortChanges(changes)
return changes
}

type sortableChanges struct {
changes []Change
ledgerKeys [][]byte
}

func newSortableChanges(changes []Change) sortableChanges {
ledgerKeys := make([][]byte, len(changes))
for i, c := range changes {
lk, err := c.ledgerKey()
if err != nil {
panic(err)
}
lkBytes, err := lk.MarshalBinary()
if err != nil {
panic(err)
}
ledgerKeys[i] = lkBytes
}
return sortableChanges{
changes: changes,
ledgerKeys: ledgerKeys,
}
}

func (s sortableChanges) Len() int {
return len(s.changes)
}

func (s sortableChanges) Less(i, j int) bool {
return bytes.Compare(s.ledgerKeys[i], s.ledgerKeys[j]) < 0
}

func (s sortableChanges) Swap(i, j int) {
s.changes[i], s.changes[j] = s.changes[j], s.changes[i]
s.ledgerKeys[i], s.ledgerKeys[j] = s.ledgerKeys[j], s.ledgerKeys[i]
}

// sortChanges is applied on a list of changes to ensure that LedgerEntryChanges
// from Tx Meta are ingested in a deterministic order.
// The changes are sorted by ledger key. It is unexpected for there to be
// multiple changes with the same ledger key in a LedgerEntryChanges group,
// but if that is the case, we fall back to the original ordering of the changes
// by using a stable sorting algorithm.
func sortChanges(changes []Change) {
sort.Stable(newSortableChanges(changes))
}

// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
switch {
Expand Down
215 changes: 215 additions & 0 deletions ingest/change_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package ingest

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/xdr"
)

func assertChangesAreEqual(t *testing.T, a, b Change) {
assert.Equal(t, a.Type, b.Type)
if a.Pre == nil {
assert.Nil(t, b.Pre)
} else {
aBytes, err := a.Pre.MarshalBinary()
assert.NoError(t, err)
bBytes, err := b.Pre.MarshalBinary()
assert.NoError(t, err)
assert.Equal(t, aBytes, bBytes)
}
if a.Post == nil {
assert.Nil(t, b.Post)
} else {
aBytes, err := a.Post.MarshalBinary()
assert.NoError(t, err)
bBytes, err := b.Post.MarshalBinary()
assert.NoError(t, err)
assert.Equal(t, aBytes, bBytes)
}
}

func TestSortChanges(t *testing.T) {
for _, testCase := range []struct {
input []Change
expected []Change
}{
{[]Change{}, []Change{}},
{
[]Change{
{
Type: xdr.LedgerEntryTypeAccount,
Pre: nil,
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
},
},
},
[]Change{
{
Type: xdr.LedgerEntryTypeAccount,
Pre: nil,
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
},
},
},
},
{
[]Change{
{
Type: xdr.LedgerEntryTypeAccount,
Pre: nil,
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Balance: 25,
},
},
},
},
{
Type: xdr.LedgerEntryTypeAccount,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF"),
Balance: 20,
},
},
},
Post: nil,
},
{
Type: xdr.LedgerEntryTypeExpiration,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeExpiration,
Expiration: &xdr.ExpirationEntry{
KeyHash: xdr.Hash{1},
ExpirationLedgerSeq: 50,
},
},
},
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeExpiration,
Expiration: &xdr.ExpirationEntry{
KeyHash: xdr.Hash{1},
ExpirationLedgerSeq: 100,
},
},
},
},
{
Type: xdr.LedgerEntryTypeAccount,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Balance: 25,
},
},
},
Post: nil,
},
},
[]Change{
{
Type: xdr.LedgerEntryTypeAccount,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF"),
Balance: 20,
},
},
},
Post: nil,
},
{
Type: xdr.LedgerEntryTypeAccount,
Pre: nil,
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Balance: 25,
},
},
},
},
{
Type: xdr.LedgerEntryTypeAccount,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Balance: 25,
},
},
},
Post: nil,
},

{
Type: xdr.LedgerEntryTypeExpiration,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeExpiration,
Expiration: &xdr.ExpirationEntry{
KeyHash: xdr.Hash{1},
ExpirationLedgerSeq: 50,
},
},
},
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeExpiration,
Expiration: &xdr.ExpirationEntry{
KeyHash: xdr.Hash{1},
ExpirationLedgerSeq: 100,
},
},
},
},
},
},
} {
sortChanges(testCase.input)
assert.Equal(t, len(testCase.input), len(testCase.expected))
for i := range testCase.input {
assertChangesAreEqual(t, testCase.input[i], testCase.expected[i])
}
}
}
1 change: 1 addition & 0 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (r *LedgerChangeReader) Read() (Change, error) {
Post: nil,
}
}
sortChanges(changes)
r.pending = append(r.pending, changes...)
r.state++
return r.Read()
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## 2.27.0

### Fixed
- Ordering of effects are now deterministic. Previously the order of some Horizon effects could vary upon reingestion but this issue has now been fixed ([5070](https://github.com/stellar/go/pull/5070)).

## 2.27.0-rc2
### Fixed
Expand Down
20 changes: 4 additions & 16 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"

"github.com/guregu/null"

"github.com/stellar/go/amount"
"github.com/stellar/go/ingest"
"github.com/stellar/go/keypair"
Expand Down Expand Up @@ -1241,12 +1242,6 @@ func setTrustLineFlagDetails(flagDetails map[string]interface{}, flags xdr.Trust
}
}

type sortableClaimableBalanceEntries []*xdr.ClaimableBalanceEntry

func (s sortableClaimableBalanceEntries) Len() int { return len(s) }
func (s sortableClaimableBalanceEntries) Less(i, j int) bool { return s[i].Asset.LessThan(s[j].Asset) }
func (s sortableClaimableBalanceEntries) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error {
source := e.operation.SourceAccount()
lp, delta, err := e.operation.getLiquidityPoolAndProductDelta(nil)
Expand All @@ -1262,7 +1257,6 @@ func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error {
return err
}
assetToCBID := map[string]string{}
var cbs sortableClaimableBalanceEntries
for _, change := range changes {
if change.Type == xdr.LedgerEntryTypeClaimableBalance && change.Pre == nil && change.Post != nil {
cb := change.Post.Data.ClaimableBalance
Expand All @@ -1271,21 +1265,15 @@ func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error {
return err
}
assetToCBID[cb.Asset.StringCanonical()] = id
cbs = append(cbs, cb)
if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil {
return err
}
}
}
if len(assetToCBID) == 0 {
// no claimable balances were created, and thus, no revocation happened
return nil
}
// Core's claimable balance metadata isn't ordered, so we order it ourselves
// so that effects are ordered consistently
sort.Sort(cbs)
for _, cb := range cbs {
if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil {
return err
}
}

reservesRevoked := make([]map[string]string, 0, 2)
for _, aa := range []base.AssetAmount{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ package processors
import (
"testing"

"github.com/stellar/go/protocols/horizon/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/protocols/horizon/base"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
. "github.com/stellar/go/services/horizon/internal/test/transactions"
Expand Down Expand Up @@ -1521,6 +1522,13 @@ func getSponsoredSandwichWrappers() []*transactionOperationWrapper {
{
Type: xdr.LedgerEntryChangeTypeLedgerEntryCreated,
Created: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAUJETIZVEP2NRYLUESJ3LS66NVCEGMON4UDCBCSBEVPIID773P2W6AY"),
Balance: 100,
},
},
LastModifiedLedgerSeq: xdr.Uint32(ledgerSeq),
Ext: xdr.LedgerEntryExt{
V: 1,
Expand Down
Loading

0 comments on commit 4ecb643

Please sign in to comment.