Skip to content

Commit

Permalink
Rework test fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikiyer56 committed Dec 3, 2024
1 parent 157c12e commit b82a9dd
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 79 deletions.
120 changes: 79 additions & 41 deletions services/horizon/internal/integration/change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package integration

import (
"context"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/keypair"
Expand All @@ -11,26 +10,24 @@ import (
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"io"
"sort"
"testing"
"time"
)

func TestProtocolUpgradeChanges(t *testing.T) {
tt := assert.New(t)
itest := integration.NewTest(t, integration.Config{SkipHorizonStart: true, SkipProtocolUpgrade: true})
archive, err := integration.GetHistoryArchive()
tt.NoError(err)
itest := integration.NewTest(t, integration.Config{SkipHorizonStart: true})

// Manually invoke command to upgrade protocol
itest.UpgradeProtocol(itest.Config().ProtocolVersion)
upgradedLedgerSeq, _ := itest.GetUpgradeLedgerSeq()
upgradedLedgerAppx, _ := itest.GetUpgradedLedgerSeqAppx()
waitForLedgerInArchive(t, 15*time.Second, upgradedLedgerAppx)

publishedNextCheckpoint := publishedNextCheckpoint(archive, upgradedLedgerSeq, t)
// Ensure that a checkpoint has been created with the ledgerNumber you want in it
tt.Eventually(publishedNextCheckpoint, 15*time.Second, time.Second)
ledgerSeqToLedgers := getLedgers(itest, 2, upgradedLedgerAppx)

// It is important to find the "exact" ledger which is representative of protocol upgrade
// and the one before it, to check for upgrade related changes
upgradedLedgerSeq := getExactUpgradedLedgerSeq(ledgerSeqToLedgers, itest.Config().ProtocolVersion)
prevLedgerToUpgrade := upgradedLedgerSeq - 1
ledgerSeqToLedgers := getLedgersFromArchive(itest, prevLedgerToUpgrade, upgradedLedgerSeq)

prevLedgerChanges := getChangesFromLedger(itest, ledgerSeqToLedgers[prevLedgerToUpgrade])
prevLedgerChangeMap := changeReasonCountMap(prevLedgerChanges)
Expand All @@ -52,26 +49,21 @@ func TestOneTxOneOperationChanges(t *testing.T) {

master := itest.Master()
keys, _ := itest.CreateAccounts(2, "1000")
keyA, keyB := keys[0], keys[1]
srcAcc, destAcc := keys[0], keys[1]

operation := txnbuild.Payment{
SourceAccount: keyA.Address(),
Destination: keyB.Address(),
SourceAccount: srcAcc.Address(),
Destination: destAcc.Address(),
Asset: txnbuild.NativeAsset{},
Amount: "900",
}
txResp, err := itest.SubmitMultiSigOperations(itest.MasterAccount(), []*keypair.Full{master, keyA}, &operation)
tt.NoError(err)
ledgerSeq := uint32(txResp.Ledger)

archive, err := integration.GetHistoryArchive()
txResp, err := itest.SubmitMultiSigOperations(itest.MasterAccount(), []*keypair.Full{master, srcAcc}, &operation)
tt.NoError(err)

publishedNextCheckpoint := publishedNextCheckpoint(archive, ledgerSeq, t)
// Ensure that a checkpoint has been created with the ledgerNumber you want in it
tt.Eventually(publishedNextCheckpoint, 15*time.Second, time.Second)
ledgerSeq := uint32(txResp.Ledger)
waitForLedgerInArchive(t, 15*time.Second, ledgerSeq)

ledger := getLedgersFromArchive(itest, ledgerSeq, ledgerSeq)[ledgerSeq]
ledger := getLedgers(itest, ledgerSeq, ledgerSeq)[ledgerSeq]
changes := getChangesFromLedger(itest, ledger)

reasonCntMap := changeReasonCountMap(changes)
Expand All @@ -86,16 +78,40 @@ func TestOneTxOneOperationChanges(t *testing.T) {
tt.Equal(change.Transaction.Ledger.LedgerSequence(), ledgerSeq)
}

tt.Equal(
ledgerKey(reasonToChangeMap[ingest.LedgerEntryChangeReasonFee][0]).MustAccount().AccountId.Address(),
master.Address())
tt.Equal(
ledgerKey(reasonToChangeMap[ingest.LedgerEntryChangeReasonTransaction][0]).MustAccount().AccountId.Address(),
master.Address())
tt.True(containsAccount(reasonToChangeMap[ingest.LedgerEntryChangeReasonOperation], keyA.Address()))
tt.True(containsAccount(reasonToChangeMap[ingest.LedgerEntryChangeReasonOperation], keyB.Address()))
accountFromEntry := func(e *xdr.LedgerEntry) xdr.AccountEntry {
return e.Data.MustAccount()
}

changeForAccount := func(changes []ingest.Change, target string) ingest.Change {
for _, change := range changes {
acc := change.Pre.Data.MustAccount()
if acc.AccountId.Address() == target {
return change
}
}
return ingest.Change{}
}

feeRelatedChange := reasonToChangeMap[ingest.LedgerEntryChangeReasonFee][0]
txRelatedChange := reasonToChangeMap[ingest.LedgerEntryChangeReasonTransaction][0]
operationChanges := reasonToChangeMap[ingest.LedgerEntryChangeReasonOperation]

tt.Equal(accountFromEntry(feeRelatedChange.Pre).AccountId.Address(), master.Address())
tt.Equal(accountFromEntry(txRelatedChange.Pre).AccountId.Address(), master.Address())
tt.True(containsAccount(operationChanges, srcAcc.Address()))
tt.True(containsAccount(operationChanges, destAcc.Address()))
// MasterAccount shouldnt show up in operation level changes
tt.False(containsAccount(reasonToChangeMap[ingest.LedgerEntryChangeReasonOperation], master.Address()))
tt.False(containsAccount(operationChanges, master.Address()))

tt.True(accountFromEntry(feeRelatedChange.Pre).Balance > accountFromEntry(feeRelatedChange.Post).Balance)
tt.True(accountFromEntry(txRelatedChange.Pre).SeqNum < accountFromEntry(txRelatedChange.Post).SeqNum)

srcAccChange := changeForAccount(operationChanges, srcAcc.Address())
destAccChange := changeForAccount(operationChanges, destAcc.Address())

tt.True(accountFromEntry(srcAccChange.Pre).Balance < accountFromEntry(srcAccChange.Post).Balance)
tt.True(accountFromEntry(destAccChange.Pre).Balance > accountFromEntry(destAccChange.Post).Balance)

}

// Helper function to check if a specific XX exists in the list
Expand Down Expand Up @@ -139,7 +155,7 @@ func getChangesFromLedger(itest *integration.Test, ledger xdr.LedgerCloseMeta) [
return changes
}

func getLedgersFromArchive(itest *integration.Test, startingLedger uint32, endLedger uint32) map[uint32]xdr.LedgerCloseMeta {
func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32) map[uint32]xdr.LedgerCloseMeta {
t := itest.CurrentTest()

ccConfig, cleanupFn, err := itest.CreateCaptiveCoreConfig()
Expand Down Expand Up @@ -180,9 +196,22 @@ func changeReasonCountMap(changes []ingest.Change) map[ingest.LedgerEntryChangeR
return changeMap
}

func publishedNextCheckpoint(archive *historyarchive.Archive, ledgerSeq uint32, t *testing.T) func() bool {
return func() bool {
var latestCheckpoint uint32
func changeReasonToChangeMap(changes []ingest.Change) map[ingest.LedgerEntryChangeReason][]ingest.Change {
changeMap := make(map[ingest.LedgerEntryChangeReason][]ingest.Change)
for _, change := range changes {
changeMap[change.Reason] = append(changeMap[change.Reason], change)
}
return changeMap
}

func waitForLedgerInArchive(t *testing.T, waitTime time.Duration, ledgerSeq uint32) {
archive, err := integration.GetHistoryArchive()
if err != nil {
t.Fatalf("could not get history archive: %v", err)
}

var latestCheckpoint uint32
var f = func() bool {
has, requestErr := archive.GetRootHAS()
if requestErr != nil {
t.Logf("Request to fetch checkpoint failed: %v", requestErr)
Expand All @@ -191,12 +220,21 @@ func publishedNextCheckpoint(archive *historyarchive.Archive, ledgerSeq uint32,
latestCheckpoint = has.CurrentLedger
return latestCheckpoint >= ledgerSeq
}

assert.Eventually(t, f, waitTime, 1*time.Second)
}

func changeReasonToChangeMap(changes []ingest.Change) map[ingest.LedgerEntryChangeReason][]ingest.Change {
changeMap := make(map[ingest.LedgerEntryChangeReason][]ingest.Change)
for _, change := range changes {
changeMap[change.Reason] = append(changeMap[change.Reason], change)
func getExactUpgradedLedgerSeq(ledgerMap map[uint32]xdr.LedgerCloseMeta, version uint32) uint32 {
keys := make([]int, 0, len(ledgerMap))
for key, _ := range ledgerMap {
keys = append(keys, int(key))
}
return changeMap
sort.Ints(keys)

for _, key := range keys {
if ledgerMap[uint32(key)].ProtocolVersion() == version {
return uint32(key)
}
}
return 0
}
69 changes: 31 additions & 38 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type Test struct {
}

type CoreUpgradeState struct {
upgradeLedgerSeq uint32
maxUpgradeLedger uint32
}

// GetTestConfig returns the default test Config required to run NewTest.
Expand Down Expand Up @@ -647,55 +647,46 @@ const maxWaitForCoreStartup = 30 * time.Second
const maxWaitForCoreUpgrade = 5 * time.Second
const coreStartupPingInterval = time.Second

// Wait for protocol upgrade
func (i *Test) waitCoreForProtocolUpgrade(protocolVersion uint32) {
i.UpgradeProtocol(protocolVersion)

// Wait for core to be up and manually close the first ledger
func (i *Test) waitForCore() {
i.t.Log("Waiting for core to be up...")
startTime := time.Now()
for time.Since(startTime) < maxWaitForCoreUpgrade {
for time.Since(startTime) < maxWaitForCoreStartup {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
infoTime := time.Now()
info, err := i.coreClient.Info(ctx)
_, err := i.coreClient.Info(ctx)
cancel()
if err != nil || !info.IsSynced() {
i.t.Logf("Core is still not synced: %v %v", err, info)
if err != nil {
i.t.Logf("could not obtain info response: %v", err)
// sleep up to a second between consecutive calls.
if durationSince := time.Since(infoTime); durationSince < coreStartupPingInterval {
time.Sleep(coreStartupPingInterval - durationSince)
}
continue
}
i.t.Log("Core is up.")
return
break
}
i.t.Fatalf("Core could not sync after %v + %v", maxWaitForCoreStartup, maxWaitForCoreUpgrade)
}

// Wait for core to be up and manually close the first ledger
func (i *Test) waitForCore() {
i.t.Log("Waiting for core to be up...")
startTime := time.Now()
for time.Since(startTime) < maxWaitForCoreStartup {
i.UpgradeProtocol(i.config.ProtocolVersion)

startTime = time.Now()
for time.Since(startTime) < maxWaitForCoreUpgrade {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
infoTime := time.Now()
_, err := i.coreClient.Info(ctx)
info, err := i.coreClient.Info(ctx)
cancel()
if err != nil {
i.t.Logf("could not obtain info response: %v", err)
if err != nil || !info.IsSynced() {
i.t.Logf("Core is still not synced: %v %v", err, info)
// sleep up to a second between consecutive calls.
if durationSince := time.Since(infoTime); durationSince < coreStartupPingInterval {
time.Sleep(coreStartupPingInterval - durationSince)
}
continue
}
break
}

if !i.config.SkipProtocolUpgrade {
i.waitCoreForProtocolUpgrade(i.config.ProtocolVersion)
} else {
i.t.Log("Core is up. Protocol Upgrade skipped. Please manually upgrade protocol version, if needed...")
i.t.Log("Core is up.")
return
}
i.t.Fatalf("Core could not sync after %v + %v", maxWaitForCoreStartup, maxWaitForCoreUpgrade)
}

const sorobanRPCInitTime = 20 * time.Second
Expand Down Expand Up @@ -948,9 +939,10 @@ func (i *Test) UpgradeProtocol(version uint32) {
if info.Info.Ledger.Version == int(version) {
i.t.Logf("Protocol upgraded to: %d, in ledger sequence number: %v, hash: %v",
info.Info.Ledger.Version, ledgerSeq, info.Info.Ledger.Hash)
i.coreUpgradeState = &CoreUpgradeState{
upgradeLedgerSeq: uint32(ledgerSeq),
}
// Mark the fact that the core has been upgraded as of this ledger sequence
// It could have been earlier than this, but certainly no later.
// The core upgrade could have happened in any ledger since the coreClient.Upgrade was issued
i.coreUpgradeState = &CoreUpgradeState{maxUpgradeLedger: uint32(ledgerSeq)}
return
}
time.Sleep(time.Second)
Expand All @@ -959,13 +951,6 @@ func (i *Test) UpgradeProtocol(version uint32) {
i.t.Fatalf("could not upgrade protocol in 10s")
}

func (i *Test) GetUpgradeLedgerSeq() (uint32, error) {
if i.coreUpgradeState == nil {
return 0, errors.Errorf("Core has not been upgraded yet")
}
return i.coreUpgradeState.upgradeLedgerSeq, nil
}

func (i *Test) WaitForHorizonWeb() {
// wait until the web server is up before continuing to test requests
require.Eventually(i.t, func() bool {
Expand Down Expand Up @@ -1502,3 +1487,11 @@ func GetHistoryArchive() (*historyarchive.Archive, error) {
CheckpointFrequency: CheckpointFrequency,
})
}

// This is approximate becuase the upgrade could have happened at a leger before this as well.
func (i *Test) GetUpgradedLedgerSeqAppx() (uint32, error) {
if i.coreUpgradeState == nil {
return 0, errors.Errorf("Core has not been upgraded yet")
}
return i.coreUpgradeState.maxUpgradeLedger, nil
}

0 comments on commit b82a9dd

Please sign in to comment.