diff --git a/ingest/change.go b/ingest/change.go index 0a2c063f1c..758a8a11f4 100644 --- a/ingest/change.go +++ b/ingest/change.go @@ -13,15 +13,90 @@ import ( // It also provides some helper functions to quickly check if a given // change has occurred in an entry. // -// If an entry is created: Pre is nil and Post is not nil. -// If an entry is updated: Pre is not nil and Post is not nil. -// If an entry is removed: Pre is not nil and Post is nil. +// Change represents a modification to a ledger entry, capturing both the before and after states +// of the entry along with the context that explains what caused the change. It is primarily used to +// track changes during transactions and/or operations within a transaction +// and can be helpful in identifying the specific cause of changes to the LedgerEntry state. (https://github.com/stellar/go/issues/5535 +// +// Behavior: +// +// - **Created entries**: Pre is nil, and Post is not nil. +// +// - **Updated entries**: Both Pre and Post are non-nil. +// +// - **Removed entries**: Pre is not nil, and Post is nil. +// +// A `Change` can be caused primarily by either a transaction or by an operation within a transaction: +// +// - **Operations**: +// Each successful operation can cause multiple ledger entry changes. +// For example, a path payment operation may affect the source and destination account entries, +// as well as potentially modify offers and/or liquidity pools. +// +// - **Transactions**: +// Some ledger changes, such as those involving fees or account balances, may be caused by +// the transaction itself and may not be tied to a specific operation within a transaction. +// For instance, fees for all operations in a transaction are debited from the source account, +// triggering ledger changes without operation-specific details. type Change struct { + // The type of the ledger entry being changed. Type xdr.LedgerEntryType - Pre *xdr.LedgerEntry + + // The state of the LedgerEntry before the change. This will be nil if the entry was created. + Pre *xdr.LedgerEntry + + // The state of the LedgerEntry after the change. This will be nil if the entry was removed. Post *xdr.LedgerEntry + + // Specifies why the change occurred, represented as a LedgerEntryChangeReason + Reason LedgerEntryChangeReason + + // The index of the operation within the transaction that caused the change. + // This field is relevant only when the Reason is LedgerEntryChangeReasonOperation + // This field cannot be relied upon when the compactingChangeReader is used. + OperationIndex uint32 + + // The LedgerTransaction responsible for the change. + // It contains details such as transaction hash, envelope, result pair, and fees. + // This field is populated only when the Reason is one of: + // LedgerEntryChangeReasonTransaction, LedgerEntryChangeReasonOperation or LedgerEntryChangeReasonFee + Transaction *LedgerTransaction + + // The LedgerCloseMeta that precipitated the change. + // This is useful only when the Change is caused by an upgrade or by an eviction, i.e. outside a transaction + // This field is populated only when the Reason is one of: + // LedgerEntryChangeReasonUpgrade or LedgerEntryChangeReasonEviction + // For changes caused by transaction or operations, look at the Transaction field + Ledger *xdr.LedgerCloseMeta + + // Information about the upgrade, if the change occurred as part of an upgrade + // This field is relevant only when the Reason is LedgerEntryChangeReasonUpgrade + LedgerUpgrade *xdr.LedgerUpgrade } +// LedgerEntryChangeReason represents the reason for a ledger entry change. +type LedgerEntryChangeReason uint16 + +const ( + // LedgerEntryChangeReasonUnknown indicates an unknown or unsupported change reason + LedgerEntryChangeReasonUnknown LedgerEntryChangeReason = iota + + // LedgerEntryChangeReasonOperation indicates a change caused by an operation in a transaction + LedgerEntryChangeReasonOperation + + // LedgerEntryChangeReasonTransaction indicates a change caused by the transaction itself + LedgerEntryChangeReasonTransaction + + // LedgerEntryChangeReasonFee indicates a change related to transaction fees. + LedgerEntryChangeReasonFee + + // LedgerEntryChangeReasonUpgrade indicates a change caused by a ledger upgrade. + LedgerEntryChangeReasonUpgrade + + // LedgerEntryChangeReasonEviction indicates a change caused by entry eviction. + LedgerEntryChangeReasonEviction +) + // String returns a best effort string representation of the change. // If the Pre or Post xdr is invalid, the field will be omitted from the string. func (c Change) String() string { diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index 496dc98b40..ff6a5c2792 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -175,6 +175,7 @@ func (r *LedgerChangeReader) Read() (Change, error) { r.pending = append(r.pending, metaChanges...) } return r.Read() + case evictionChangesState: entries, err := r.lcm.EvictedPersistentLedgerEntries() if err != nil { @@ -185,21 +186,30 @@ func (r *LedgerChangeReader) Read() (Change, error) { entry := entries[i] // when a ledger entry is evicted it is removed from the ledger changes[i] = Change{ - Type: entry.Data.Type, - Pre: &entry, - Post: nil, + Type: entry.Data.Type, + Pre: &entry, + Post: nil, + Reason: LedgerEntryChangeReasonEviction, + Ledger: &r.lcm, } } sortChanges(changes) r.pending = append(r.pending, changes...) r.state++ return r.Read() + case upgradeChangesState: // Get upgrade changes if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) { changes := GetChangesFromLedgerEntryChanges( r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes, ) + ledgerUpgrades := r.LedgerTransactionReader.lcm.UpgradesProcessing() + for i := range changes { + changes[i].Reason = LedgerEntryChangeReasonUpgrade + changes[i].Ledger = &r.lcm + changes[i].LedgerUpgrade = &ledgerUpgrades[r.upgradeIndex].Upgrade + } r.pending = append(r.pending, changes...) r.upgradeIndex++ return r.Read() diff --git a/ingest/ledger_transaction.go b/ingest/ledger_transaction.go index 77ca777206..046cff3782 100644 --- a/ingest/ledger_transaction.go +++ b/ingest/ledger_transaction.go @@ -17,6 +17,8 @@ type LedgerTransaction struct { FeeChanges xdr.LedgerEntryChanges UnsafeMeta xdr.TransactionMeta LedgerVersion uint32 + Ledger xdr.LedgerCloseMeta // This is read-only and not to be modified by downstream functions + Hash xdr.Hash } func (t *LedgerTransaction) txInternalError() bool { @@ -26,7 +28,21 @@ func (t *LedgerTransaction) txInternalError() bool { // GetFeeChanges returns a developer friendly representation of LedgerEntryChanges // connected to fees. func (t *LedgerTransaction) GetFeeChanges() []Change { - return GetChangesFromLedgerEntryChanges(t.FeeChanges) + changes := GetChangesFromLedgerEntryChanges(t.FeeChanges) + for i := range changes { + changes[i].Reason = LedgerEntryChangeReasonFee + changes[i].Transaction = t + } + return changes +} + +func (t *LedgerTransaction) getTransactionChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { + changes := GetChangesFromLedgerEntryChanges(ledgerEntryChanges) + for i := range changes { + changes[i].Reason = LedgerEntryChangeReasonTransaction + changes[i].Transaction = t + } + return changes } // GetChanges returns a developer friendly representation of LedgerEntryChanges. @@ -42,7 +58,8 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, errors.New("TransactionMeta.V=0 not supported") case 1: v1Meta := t.UnsafeMeta.MustV1() - txChanges := GetChangesFromLedgerEntryChanges(v1Meta.TxChanges) + // The var `txChanges` reflect the ledgerEntryChanges that are changed because of the transaction as a whole + txChanges := t.getTransactionChanges(v1Meta.TxChanges) changes = append(changes, txChanges...) // Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111 @@ -50,34 +67,39 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, nil } - for _, operationMeta := range v1Meta.Operations { - opChanges := GetChangesFromLedgerEntryChanges( - operationMeta.Changes, - ) + // These changes reflect the ledgerEntry changes that were caused by the operations in the transaction + // Populate the operationInfo for these changes in the `Change` struct + + operationMeta := v1Meta.Operations + // operationMeta is a list of lists. + // Each element in operationMeta is a list of ledgerEntryChanges + // caused by the operation at that index of the element + for opIdx := range operationMeta { + opChanges := t.operationChanges(v1Meta.Operations, uint32(opIdx)) changes = append(changes, opChanges...) } case 2, 3: var ( - beforeChanges, afterChanges xdr.LedgerEntryChanges - operationMeta []xdr.OperationMeta + txBeforeChanges, txAfterChanges xdr.LedgerEntryChanges + operationMeta []xdr.OperationMeta ) switch t.UnsafeMeta.V { case 2: v2Meta := t.UnsafeMeta.MustV2() - beforeChanges = v2Meta.TxChangesBefore - afterChanges = v2Meta.TxChangesAfter + txBeforeChanges = v2Meta.TxChangesBefore + txAfterChanges = v2Meta.TxChangesAfter operationMeta = v2Meta.Operations case 3: v3Meta := t.UnsafeMeta.MustV3() - beforeChanges = v3Meta.TxChangesBefore - afterChanges = v3Meta.TxChangesAfter + txBeforeChanges = v3Meta.TxChangesBefore + txAfterChanges = v3Meta.TxChangesAfter operationMeta = v3Meta.Operations default: panic("Invalid meta version, expected 2 or 3") } - txChangesBefore := GetChangesFromLedgerEntryChanges(beforeChanges) + txChangesBefore := t.getTransactionChanges(txBeforeChanges) changes = append(changes, txChangesBefore...) // Ignore operations meta and txChangesAfter if txInternalError @@ -86,14 +108,15 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, nil } - for _, operationMeta := range operationMeta { - opChanges := GetChangesFromLedgerEntryChanges( - operationMeta.Changes, - ) + // operationMeta is a list of lists. + // Each element in operationMeta is a list of ledgerEntryChanges + // caused by the operation at that index of the element + for opIdx := range operationMeta { + opChanges := t.operationChanges(operationMeta, uint32(opIdx)) changes = append(changes, opChanges...) } - txChangesAfter := GetChangesFromLedgerEntryChanges(afterChanges) + txChangesAfter := t.getTransactionChanges(txAfterChanges) changes = append(changes, txChangesAfter...) default: return changes, errors.New("Unsupported TransactionMeta version") @@ -114,15 +137,13 @@ func (t *LedgerTransaction) GetOperation(index uint32) (xdr.Operation, bool) { // GetOperationChanges returns a developer friendly representation of LedgerEntryChanges. // It contains only operation changes. func (t *LedgerTransaction) GetOperationChanges(operationIndex uint32) ([]Change, error) { - changes := []Change{} - if t.UnsafeMeta.V == 0 { - return changes, errors.New("TransactionMeta.V=0 not supported") + return []Change{}, errors.New("TransactionMeta.V=0 not supported") } // Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111 if t.txInternalError() && t.LedgerVersion <= 12 { - return changes, nil + return []Change{}, nil } var operationMeta []xdr.OperationMeta @@ -134,21 +155,26 @@ func (t *LedgerTransaction) GetOperationChanges(operationIndex uint32) ([]Change case 3: operationMeta = t.UnsafeMeta.MustV3().Operations default: - return changes, errors.New("Unsupported TransactionMeta version") + return []Change{}, errors.New("Unsupported TransactionMeta version") } - return operationChanges(operationMeta, operationIndex), nil + return t.operationChanges(operationMeta, operationIndex), nil } -func operationChanges(ops []xdr.OperationMeta, index uint32) []Change { +func (t *LedgerTransaction) operationChanges(ops []xdr.OperationMeta, index uint32) []Change { if int(index) >= len(ops) { return []Change{} } operationMeta := ops[index] - return GetChangesFromLedgerEntryChanges( - operationMeta.Changes, - ) + changes := GetChangesFromLedgerEntryChanges(operationMeta.Changes) + + for i := range changes { + changes[i].Reason = LedgerEntryChangeReasonOperation + changes[i].Transaction = t + changes[i].OperationIndex = index + } + return changes } // GetDiagnosticEvents returns all contract events emitted by a given operation. diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 5d2ad1d237..bbe11a15b8 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -94,6 +94,8 @@ func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { UnsafeMeta: reader.lcm.TxApplyProcessing(i), FeeChanges: reader.lcm.FeeProcessing(i), LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), + Ledger: reader.lcm, + Hash: hash, }, nil } diff --git a/services/horizon/internal/integration/change_test.go b/services/horizon/internal/integration/change_test.go new file mode 100644 index 0000000000..bf2df266eb --- /dev/null +++ b/services/horizon/internal/integration/change_test.go @@ -0,0 +1,231 @@ +package integration + +import ( + "context" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/keypair" + "github.com/stellar/go/services/horizon/internal/test/integration" + "github.com/stellar/go/txnbuild" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "io" + "sort" + "testing" + "time" +) + +func TestProtocolUpgradeChanges(t *testing.T) { + tt := assert.New(t) + itest := integration.NewTest(t, integration.Config{SkipHorizonStart: true}) + + upgradedLedgerAppx, _ := itest.GetUpgradedLedgerSeqAppx() + waitForLedgerInArchive(t, 15*time.Second, upgradedLedgerAppx) + + ledgerSeqToLedgers := getLedgers(itest, 2, upgradedLedgerAppx) + + // It is important to find the "exact" ledger which is representative of protocol upgrade + // and the one before it, to check for upgrade related changes + upgradedLedgerSeq := getExactUpgradedLedgerSeq(ledgerSeqToLedgers, itest.Config().ProtocolVersion) + prevLedgerToUpgrade := upgradedLedgerSeq - 1 + + prevLedgerChanges := getChangesFromLedger(itest, ledgerSeqToLedgers[prevLedgerToUpgrade]) + prevLedgerChangeMap := changeReasonCountMap(prevLedgerChanges) + upgradedLedgerChanges := getChangesFromLedger(itest, ledgerSeqToLedgers[upgradedLedgerSeq]) + upgradedLedgerChangeMap := changeReasonCountMap(upgradedLedgerChanges) + + tt.Zero(prevLedgerChangeMap[ingest.LedgerEntryChangeReasonUpgrade]) + tt.NotZero(upgradedLedgerChangeMap[ingest.LedgerEntryChangeReasonUpgrade]) + for _, change := range upgradedLedgerChanges { + tt.Equal(change.Ledger.LedgerSequence(), upgradedLedgerSeq) + tt.Empty(change.Transaction) + tt.NotEmpty(change.LedgerUpgrade) + } +} + +func TestOneTxOneOperationChanges(t *testing.T) { + tt := assert.New(t) + itest := integration.NewTest(t, integration.Config{}) + + master := itest.Master() + keys, _ := itest.CreateAccounts(2, "1000") + srcAcc, destAcc := keys[0], keys[1] + + operation := txnbuild.Payment{ + SourceAccount: srcAcc.Address(), + Destination: destAcc.Address(), + Asset: txnbuild.NativeAsset{}, + Amount: "900", + } + txResp, err := itest.SubmitMultiSigOperations(itest.MasterAccount(), []*keypair.Full{master, srcAcc}, &operation) + tt.NoError(err) + + ledgerSeq := uint32(txResp.Ledger) + waitForLedgerInArchive(t, 15*time.Second, ledgerSeq) + + ledger := getLedgers(itest, ledgerSeq, ledgerSeq)[ledgerSeq] + changes := getChangesFromLedger(itest, ledger) + + reasonCntMap := changeReasonCountMap(changes) + tt.Equal(2, reasonCntMap[ingest.LedgerEntryChangeReasonOperation]) + tt.Equal(1, reasonCntMap[ingest.LedgerEntryChangeReasonTransaction]) + tt.Equal(1, reasonCntMap[ingest.LedgerEntryChangeReasonFee]) + + reasonToChangeMap := changeReasonToChangeMap(changes) + // Assert Transaction Hash and Ledger Sequence within Transaction are accurate in all changes + for _, change := range changes { + tt.Equal(change.Transaction.Hash.HexString(), txResp.Hash) + tt.Equal(change.Transaction.Ledger.LedgerSequence(), ledgerSeq) + tt.Empty(change.Ledger) + tt.Empty(change.LedgerUpgrade) + } + + feeRelatedChange := reasonToChangeMap[ingest.LedgerEntryChangeReasonFee][0] + txRelatedChange := reasonToChangeMap[ingest.LedgerEntryChangeReasonTransaction][0] + operationChanges := reasonToChangeMap[ingest.LedgerEntryChangeReasonOperation] + + accountFromEntry := func(e *xdr.LedgerEntry) xdr.AccountEntry { + return e.Data.MustAccount() + } + + changeForAccount := func(changes []ingest.Change, target string) ingest.Change { + for _, change := range changes { + acc := change.Pre.Data.MustAccount() + if acc.AccountId.Address() == target { + return change + } + } + return ingest.Change{} + } + + containsAccount := func(changes []ingest.Change, target string) bool { + for _, change := range changes { + addr := change.Pre.Data.MustAccount().AccountId.Address() + if addr == target { + return true + } + } + return false + } + + tt.Equal(accountFromEntry(feeRelatedChange.Pre).AccountId.Address(), master.Address()) + tt.Equal(accountFromEntry(txRelatedChange.Pre).AccountId.Address(), master.Address()) + tt.True(containsAccount(operationChanges, srcAcc.Address())) + tt.True(containsAccount(operationChanges, destAcc.Address())) + // MasterAccount shouldn't show up in operation level changes + tt.False(containsAccount(operationChanges, master.Address())) + tt.True(accountFromEntry(feeRelatedChange.Pre).Balance > accountFromEntry(feeRelatedChange.Post).Balance) + tt.True(accountFromEntry(txRelatedChange.Post).SeqNum == accountFromEntry(txRelatedChange.Pre).SeqNum+1) + + srcAccChange := changeForAccount(operationChanges, srcAcc.Address()) + destAccChange := changeForAccount(operationChanges, destAcc.Address()) + tt.True(accountFromEntry(srcAccChange.Pre).Balance > accountFromEntry(srcAccChange.Post).Balance) + tt.True(accountFromEntry(destAccChange.Pre).Balance < accountFromEntry(destAccChange.Post).Balance) +} + +func getChangesFromLedger(itest *integration.Test, ledger xdr.LedgerCloseMeta) []ingest.Change { + t := itest.CurrentTest() + changeReader, err := ingest.NewLedgerChangeReaderFromLedgerCloseMeta(itest.GetPassPhrase(), ledger) + changes := make([]ingest.Change, 0) + defer changeReader.Close() + if err != nil { + t.Fatalf("unable to create ledger change reader: %v", err) + } + for { + change, err := changeReader.Read() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("unable to read ledger change: %v", err) + } + changes = append(changes, change) + } + return changes +} + +func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32) map[uint32]xdr.LedgerCloseMeta { + t := itest.CurrentTest() + + ccConfig, err := itest.CreateCaptiveCoreConfig() + if err != nil { + t.Fatalf("unable to create captive core config: %v", err) + } + + captiveCore, err := ledgerbackend.NewCaptive(*ccConfig) + if err != nil { + t.Fatalf("unable to create captive core: %v", err) + } + defer captiveCore.Close() + + ctx := context.Background() + err = captiveCore.PrepareRange(ctx, ledgerbackend.BoundedRange(startingLedger, endLedger)) + if err != nil { + t.Fatalf("failed to prepare range: %v", err) + } + + var seqToLedgersMap = make(map[uint32]xdr.LedgerCloseMeta) + for ledgerSeq := startingLedger; ledgerSeq <= endLedger; ledgerSeq++ { + ledger, err := captiveCore.GetLedger(ctx, ledgerSeq) + if err != nil { + t.Fatalf("failed to get ledgerNum: %v, error: %v", ledgerSeq, err) + } + seqToLedgersMap[ledgerSeq] = ledger + } + + return seqToLedgersMap +} + +func changeReasonCountMap(changes []ingest.Change) map[ingest.LedgerEntryChangeReason]int { + changeMap := make(map[ingest.LedgerEntryChangeReason]int) + for _, change := range changes { + changeMap[change.Reason]++ + } + return changeMap +} + +func changeReasonToChangeMap(changes []ingest.Change) map[ingest.LedgerEntryChangeReason][]ingest.Change { + changeMap := make(map[ingest.LedgerEntryChangeReason][]ingest.Change) + for _, change := range changes { + changeMap[change.Reason] = append(changeMap[change.Reason], change) + } + return changeMap +} + +func waitForLedgerInArchive(t *testing.T, waitTime time.Duration, ledgerSeq uint32) { + archive, err := integration.GetHistoryArchive() + if err != nil { + t.Fatalf("could not get history archive: %v", err) + } + + var latestCheckpoint uint32 + + assert.Eventually(t, + func() bool { + has, requestErr := archive.GetRootHAS() + if requestErr != nil { + t.Logf("Request to fetch checkpoint failed: %v", requestErr) + return false + } + latestCheckpoint = has.CurrentLedger + return latestCheckpoint >= ledgerSeq + + }, + waitTime, + 1*time.Second) +} + +func getExactUpgradedLedgerSeq(ledgerMap map[uint32]xdr.LedgerCloseMeta, version uint32) uint32 { + keys := make([]int, 0, len(ledgerMap)) + for key := range ledgerMap { + keys = append(keys, int(key)) + } + sort.Ints(keys) + + for _, key := range keys { + if ledgerMap[uint32(key)].ProtocolVersion() == version { + return uint32(key) + } + } + return 0 +} diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 5a2b03e48b..7d85289852 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -15,7 +15,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stellar/go/clients/horizonclient" - "github.com/stellar/go/historyarchive" "github.com/stellar/go/keypair" hProtocol "github.com/stellar/go/protocols/horizon" horizoncmd "github.com/stellar/go/services/horizon/cmd" @@ -507,12 +506,7 @@ func TestReingestDB(t *testing.T) { // cannot ingest past the most recent checkpoint ledger when using captive // core toLedger := uint32(reachedLedger) - archive, err := historyarchive.Connect( - horizonConfig.HistoryArchiveURLs[0], - historyarchive.ArchiveOptions{ - NetworkPassphrase: horizonConfig.NetworkPassphrase, - CheckpointFrequency: horizonConfig.CheckpointFrequency, - }) + archive, err := integration.GetHistoryArchive() tt.NoError(err) // make sure a full checkpoint has elapsed otherwise there will be nothing to reingest @@ -635,12 +629,7 @@ func TestReingestDBWithFilterRules(t *testing.T) { itest, _ := initializeDBIntegrationTest(t) tt := assert.New(t) - archive, err := historyarchive.Connect( - itest.GetHorizonIngestConfig().HistoryArchiveURLs[0], - historyarchive.ArchiveOptions{ - NetworkPassphrase: itest.GetHorizonIngestConfig().NetworkPassphrase, - CheckpointFrequency: itest.GetHorizonIngestConfig().CheckpointFrequency, - }) + archive, err := integration.GetHistoryArchive() tt.NoError(err) // make sure one full checkpoint has elapsed before making ledger entries @@ -879,12 +868,7 @@ func TestFillGaps(t *testing.T) { // cap reachedLedger to the nearest checkpoint ledger because reingest range cannot ingest past the most // recent checkpoint ledger when using captive core toLedger := uint32(reachedLedger) - archive, err := historyarchive.Connect( - horizonConfig.HistoryArchiveURLs[0], - historyarchive.ArchiveOptions{ - NetworkPassphrase: horizonConfig.NetworkPassphrase, - CheckpointFrequency: horizonConfig.CheckpointFrequency, - }) + archive, err := integration.GetHistoryArchive() tt.NoError(err) t.Run("validate parallel range", func(t *testing.T) { diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index c7e0d0c75b..7ded1f9a88 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -41,24 +41,6 @@ var networkParamArgs = map[string]string{ horizon.NetworkPassphraseFlagName: "", } -const ( - SimpleCaptiveCoreToml = ` - PEER_PORT=11725 - ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true - - UNSAFE_QUORUM=true - FAILURE_SAFETY=0 - - [[VALIDATORS]] - NAME="local_core" - HOME_DOMAIN="core.local" - PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" - ADDRESS="localhost" - QUALITY="MEDIUM"` - - StellarCoreURL = "http://localhost:11626" -) - var ( CaptiveCoreConfigErrMsg = "error generating captive core configuration: invalid config: " ) @@ -66,9 +48,9 @@ var ( // Ensures that BUCKET_DIR_PATH is not an allowed value for Captive Core. func TestBucketDirDisallowed(t *testing.T) { config := `BUCKET_DIR_PATH="/tmp" - ` + SimpleCaptiveCoreToml + ` + integration.SimpleCaptiveCoreToml - confName, _, cleanup := createCaptiveCoreConfig(config) + confName, _, cleanup := integration.CreateCaptiveCoreConfig(config) defer cleanup() testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{ @@ -103,7 +85,7 @@ func TestEnvironmentPreserved(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonEnvironment = map[string]string{ - "STELLAR_CORE_URL": StellarCoreURL, + "STELLAR_CORE_URL": integration.StellarCoreURL, } test := integration.NewTest(t, *testConfig) @@ -112,7 +94,7 @@ func TestEnvironmentPreserved(t *testing.T) { test.WaitForHorizonIngest() envValue := os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, StellarCoreURL, envValue) + assert.Equal(t, integration.StellarCoreURL, envValue) test.Shutdown() @@ -252,7 +234,7 @@ func TestNetworkEnvironmentVariable(t *testing.T) { // Ensures that the filesystem ends up in the correct state with Captive Core. func TestCaptiveCoreConfigFilesystemState(t *testing.T) { - confName, storagePath, cleanup := createCaptiveCoreConfig(SimpleCaptiveCoreToml) + confName, storagePath, cleanup := integration.CreateCaptiveCoreConfig(integration.SimpleCaptiveCoreToml) defer cleanup() localParams := integration.MergeMaps(defaultCaptiveCoreParameters, map[string]string{ @@ -671,30 +653,3 @@ func validateCaptiveCoreDiskState(itest *integration.Test, rootDir string) { tt.DirExists(storageDir) tt.FileExists(coreConf) } - -// createCaptiveCoreConfig will create a temporary TOML config with the -// specified contents as well as a temporary storage directory. You should -// `defer` the returned function to clean these up when you're done. -func createCaptiveCoreConfig(contents string) (string, string, func()) { - tomlFile, err := ioutil.TempFile("", "captive-core-test-*.toml") - defer tomlFile.Close() - if err != nil { - panic(err) - } - - _, err = tomlFile.WriteString(contents) - if err != nil { - panic(err) - } - - storagePath, err := os.MkdirTemp("", "captive-core-test-*-storage") - if err != nil { - panic(err) - } - - filename := tomlFile.Name() - return filename, storagePath, func() { - os.Remove(filename) - os.RemoveAll(storagePath) - } -} diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 107cb33759..f90eece01f 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -4,6 +4,8 @@ package integration import ( "context" "fmt" + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest/ledgerbackend" "io/ioutil" "os" "os/exec" @@ -41,13 +43,38 @@ import ( const ( StandaloneNetworkPassphrase = "Standalone Network ; February 2017" - stellarCorePostgresPassword = "mysecretpassword" - horizonDefaultPort = "8000" - adminPort = 6060 - stellarCorePort = 11626 - stellarCorePostgresPort = 5641 - historyArchivePort = 1570 - sorobanRPCPort = 8080 + HorizonDefaultPort = "8000" + AdminPort = 6060 + StellarCorePort = 11626 + HistoryArchivePort = 1570 + SorobanRPCPort = 8080 + HistoryArchiveUrl = "http://localhost:1570" + CheckpointFrequency = 8 +) + +const ( + SimpleCaptiveCoreToml = ` + PEER_PORT=11725 + ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true + NETWORK_PASSPHRASE = "Standalone Network ; February 2017" + UNSAFE_QUORUM=true + FAILURE_SAFETY=0 + RUN_STANDALONE=false + + # Lower the TTL of persistent ledger entries + # so that ledger entry extension/restoring becomes testeable + # These 2 settings need to be present in both places - stellar-core-integration-tests.cfg and here + TESTING_MINIMUM_PERSISTENT_ENTRY_LIFETIME=10 + TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true + + [[VALIDATORS]] + NAME="local_core" + HOME_DOMAIN="core.local" + PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" + ADDRESS="localhost" + QUALITY="MEDIUM" +` + StellarCoreURL = "http://localhost:11626" ) const HorizonInitErrStr = "cannot initialize Horizon" @@ -56,8 +83,10 @@ type Config struct { ProtocolVersion uint32 EnableSorobanRPC bool SkipCoreContainerCreation bool + SkipCoreContainerDeletion bool // This flag is helpful to debug CoreDockerImage string SorobanRPCDockerImage string + SkipProtocolUpgrade bool // Weird naming here because bools default to false, but we want to start // Horizon by default. @@ -100,13 +129,18 @@ type Test struct { horizonAdminClient *sdk.AdminClient coreClient *stellarcore.Client - webNode *horizon.App - ingestNode *horizon.App - appStopped *sync.WaitGroup - shutdownOnce sync.Once - shutdownCalls []func() - masterKey *keypair.Full - passPhrase string + webNode *horizon.App + ingestNode *horizon.App + appStopped *sync.WaitGroup + shutdownOnce sync.Once + shutdownCalls []func() + masterKey *keypair.Full + passPhrase string + coreUpgradeState *CoreUpgradeState +} + +type CoreUpgradeState struct { + maxUpgradeLedger uint32 } // GetTestConfig returns the default test Config required to run NewTest. @@ -163,7 +197,7 @@ func NewTest(t *testing.T, config Config) *Test { } i.prepareShutdownHandlers() - i.coreClient = &stellarcore.Client{URL: "http://localhost:" + strconv.Itoa(stellarCorePort)} + i.coreClient = &stellarcore.Client{URL: "http://localhost:" + strconv.Itoa(StellarCorePort)} if !config.SkipCoreContainerCreation { i.waitForCore() if i.config.EnableSorobanRPC { @@ -277,8 +311,13 @@ func (i *Test) prepareShutdownHandlers() { i.ingestNode.Close() } if !i.config.SkipCoreContainerCreation { - i.runComposeCommand("rm", "-fvs", "core") - i.runComposeCommand("rm", "-fvs", "core-postgres") + if !i.config.SkipCoreContainerDeletion { + i.t.Log("Removing core docker containers...") + i.runComposeCommand("rm", "-fvs", "core") + i.runComposeCommand("rm", "-fvs", "core-postgres") + } else { + i.t.Log("Skip core docker container removal for debugging...") + } if i.config.EnableSorobanRPC { i.runComposeCommand("logs", "soroban-rpc") i.runComposeCommand("rm", "-fvs", "soroban-rpc") @@ -341,6 +380,7 @@ func (i *Test) Shutdown() { // StartHorizon initializes and starts the Horizon client-facing API server. // When startIngestProcess=true, start a second process for ingest server func (i *Test) StartHorizon(startIngestProcess bool) error { + i.t.Logf("Starting horizon.....") i.testDB = dbtest.Postgres(i.t) i.shutdownCalls = append(i.shutdownCalls, func() { if i.appStopped == nil { @@ -436,14 +476,14 @@ func (i *Test) getDefaultArgs() map[string]string { // Compose YAML file itself rather than hardcoding it. return map[string]string{ "ingest": "false", - "history-archive-urls": fmt.Sprintf("http://%s:%d", "localhost", historyArchivePort), + "history-archive-urls": HistoryArchiveUrl, "db-url": i.testDB.RO_DSN, "stellar-core-url": i.coreClient.URL, "network-passphrase": i.passPhrase, "apply-migrations": "true", - "port": horizonDefaultPort, + "port": HorizonDefaultPort, // due to ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING - "checkpoint-frequency": "8", + "checkpoint-frequency": strconv.Itoa(CheckpointFrequency), "per-hour-rate-limit": "0", // disable rate limiting "max-db-connections": "50", // the postgres container supports 100 connections, be conservative } @@ -537,7 +577,7 @@ func (i *Test) setupHorizonAdminClient(ingestArgs map[string]string) error { func (i *Test) setupHorizonClient(webArgs map[string]string) { hostname := "localhost" - horizonPort := horizonDefaultPort + horizonPort := HorizonDefaultPort if port, ok := webArgs["port"]; ok { horizonPort = port } @@ -547,6 +587,62 @@ func (i *Test) setupHorizonClient(webArgs map[string]string) { } } +// CreateCaptiveCoreConfig will create a temporary TOML config with the +// specified contents as well as a temporary storage directory. You should +// `defer` the returned function to clean these up when you're done. +func CreateCaptiveCoreConfig(contents string) (string, string, func()) { + tomlFile, err := ioutil.TempFile("", "captive-core-test-*.toml") + if err != nil { + panic(err) + } + defer tomlFile.Close() + + _, err = tomlFile.WriteString(contents) + if err != nil { + panic(err) + } + + storagePath, err := os.MkdirTemp("", "captive-core-test-*-storage") + if err != nil { + panic(err) + } + + filename := tomlFile.Name() + return filename, storagePath, func() { + os.Remove(filename) + os.RemoveAll(storagePath) + } +} + +func (i *Test) CreateCaptiveCoreConfig() (*ledgerbackend.CaptiveCoreConfig, error) { + confName, storagePath, cleanupFn := CreateCaptiveCoreConfig(SimpleCaptiveCoreToml) + i.t.Cleanup(cleanupFn) + i.t.Logf("Creating Captive Core config files, ConfName: %v, storagePath: %v", confName, storagePath) + + captiveCoreConfig := ledgerbackend.CaptiveCoreConfig{ + BinaryPath: i.coreConfig.binaryPath, + HistoryArchiveURLs: []string{HistoryArchiveUrl}, + NetworkPassphrase: StandaloneNetworkPassphrase, + CheckpointFrequency: CheckpointFrequency, // This is required for accelerated archive creation for integration test + UseDB: true, + StoragePath: storagePath, + } + + tomlParams := ledgerbackend.CaptiveCoreTomlParams{ + NetworkPassphrase: StandaloneNetworkPassphrase, + HistoryArchiveURLs: []string{HistoryArchiveUrl}, + UseDB: true, + } + + toml, err := ledgerbackend.NewCaptiveCoreTomlFromData([]byte(SimpleCaptiveCoreToml), tomlParams) + if err != nil { + return nil, err + } + + captiveCoreConfig.Toml = toml + return &captiveCoreConfig, nil +} + const maxWaitForCoreStartup = 30 * time.Second const maxWaitForCoreUpgrade = 5 * time.Second const coreStartupPingInterval = time.Second @@ -604,7 +700,7 @@ func (i *Test) waitForSorobanRPC() { for time.Since(start) < sorobanRPCInitTime { ctx, cancel := context.WithTimeout(context.Background(), sorobanRPCHealthCheckInterval) // TODO: soroban-tools should be exporting a proper Go client - ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(sorobanRPCPort), nil) + ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(SorobanRPCPort), nil) sorobanRPCClient := jrpc2.NewClient(ch, nil) callTime := time.Now() _, err := sorobanRPCClient.Call(ctx, "getHealth", nil) @@ -675,7 +771,7 @@ func (i *Test) simulateTransaction( i.syncWithSorobanRPC(uint32(root.HorizonSequence)) // TODO: soroban-tools should be exporting a proper Go client - ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(sorobanRPCPort), nil) + ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(SorobanRPCPort), nil) sorobanRPCClient := jrpc2.NewClient(ch, nil) txParams := GetBaseTransactionParamsWithFee(sourceAccount, txnbuild.MinBaseFee, op) txParams.IncrementSequenceNum = false @@ -702,7 +798,7 @@ func (i *Test) syncWithSorobanRPC(ledgerToWaitFor uint32) { result := struct { Sequence uint32 `json:"sequence"` }{} - ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(sorobanRPCPort), nil) + ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(SorobanRPCPort), nil) sorobanRPCClient := jrpc2.NewClient(ch, nil) err := sorobanRPCClient.CallResult(context.Background(), "getLatestLedger", nil, &result) assert.NoError(i.t, err) @@ -715,7 +811,7 @@ func (i *Test) syncWithSorobanRPC(ledgerToWaitFor uint32) { } func (i *Test) WaitUntilLedgerEntryTTL(ledgerKey xdr.LedgerKey) { - ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(sorobanRPCPort), nil) + ch := jhttp.NewChannel("http://localhost:"+strconv.Itoa(SorobanRPCPort), nil) client := jrpc2.NewClient(ch, nil) keyB64, err := xdr.MarshalBase64(ledgerKey) @@ -821,6 +917,7 @@ func (i *Test) RestoreFootprint( // UpgradeProtocol arms Core with upgrade and blocks until protocol is upgraded. func (i *Test) UpgradeProtocol(version uint32) { + i.t.Logf("Attempting Core Protocol upgade to version: %v", version) ctx, cancel := context.WithTimeout(context.Background(), time.Second) err := i.coreClient.Upgrade(ctx, int(version)) cancel() @@ -838,8 +935,14 @@ func (i *Test) UpgradeProtocol(version uint32) { continue } + ledgerSeq := info.Info.Ledger.Num if info.Info.Ledger.Version == int(version) { - i.t.Logf("Protocol upgraded to: %d", info.Info.Ledger.Version) + i.t.Logf("Protocol upgraded to: %d, in ledger sequence number: %v, hash: %v", + info.Info.Ledger.Version, ledgerSeq, info.Info.Ledger.Hash) + // Mark the fact that the core has been upgraded as of this ledger sequence + // It could have been earlier than this, but certainly no later. + // The core upgrade could have happened in any ledger since the coreClient.Upgrade was issued + i.coreUpgradeState = &CoreUpgradeState{maxUpgradeLedger: uint32(ledgerSeq)} return } time.Sleep(time.Second) @@ -933,7 +1036,7 @@ func (i *Test) StopHorizon() { // AdminPort returns Horizon admin port. func (i *Test) AdminPort() int { - return adminPort + return AdminPort } // Metrics URL returns Horizon metrics URL. @@ -1375,3 +1478,20 @@ func GetCoreMaxSupportedProtocol() uint32 { func (i *Test) GetEffectiveProtocolVersion() uint32 { return i.config.ProtocolVersion } + +func GetHistoryArchive() (*historyarchive.Archive, error) { + return historyarchive.Connect( + HistoryArchiveUrl, + historyarchive.ArchiveOptions{ + NetworkPassphrase: StandaloneNetworkPassphrase, + CheckpointFrequency: CheckpointFrequency, + }) +} + +// This is approximate becuase the upgrade could have happened at a leger before this as well. +func (i *Test) GetUpgradedLedgerSeqAppx() (uint32, error) { + if i.coreUpgradeState == nil { + return 0, errors.Errorf("Core has not been upgraded yet") + } + return i.coreUpgradeState.maxUpgradeLedger, nil +} diff --git a/support/http/httptest/client_expectation.go b/support/http/httptest/client_expectation.go index f056ae7ebf..e19bdc334f 100644 --- a/support/http/httptest/client_expectation.go +++ b/support/http/httptest/client_expectation.go @@ -1,6 +1,7 @@ package httptest import ( + "fmt" "net/http" "net/url" "strconv" @@ -85,6 +86,37 @@ func (ce *ClientExpectation) ReturnStringWithHeader( return ce.Return(httpmock.ResponderFromResponse(&cResp)) } +// ReturnMultipleResults registers multiple sequential responses for a given client expectation. +// Useful for testing retries +func (ce *ClientExpectation) ReturnMultipleResults(responseSets []ResponseData) *ClientExpectation { + var allResponses []httpmock.Responder + for _, response := range responseSets { + resp := http.Response{ + Status: strconv.Itoa(response.Status), + StatusCode: response.Status, + Body: httpmock.NewRespBodyFromString(response.Body), + Header: response.Header, + } + allResponses = append(allResponses, httpmock.ResponderFromResponse(&resp)) + } + responseIndex := 0 + ce.Client.MockTransport.RegisterResponder( + ce.Method, + ce.URL, + func(req *http.Request) (*http.Response, error) { + if responseIndex >= len(allResponses) { + panic(fmt.Errorf("no responses available")) + } + + resp := allResponses[responseIndex] + responseIndex++ + return resp(req) + }, + ) + + return ce +} + // ReturnJSONWithHeader causes this expectation to resolve to a json-based body with the provided // status code and response header. Panics when the provided body cannot be encoded to JSON. func (ce *ClientExpectation) ReturnJSONWithHeader( diff --git a/support/http/httptest/main.go b/support/http/httptest/main.go index 47a00b1991..18b986ba1b 100644 --- a/support/http/httptest/main.go +++ b/support/http/httptest/main.go @@ -67,3 +67,9 @@ func NewServer(t *testing.T, handler http.Handler) *Server { Expect: httpexpect.New(t, server.URL), } } + +type ResponseData struct { + Status int + Body string + Header http.Header +} diff --git a/utils/apiclient/client.go b/utils/apiclient/client.go new file mode 100644 index 0000000000..82501df3fc --- /dev/null +++ b/utils/apiclient/client.go @@ -0,0 +1,97 @@ +package apiclient + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/stellar/go/support/log" +) + +const ( + defaultMaxRetries = 5 + defaultInitialBackoffTime = 1 * time.Second +) + +func isRetryableStatusCode(statusCode int) bool { + return statusCode == http.StatusTooManyRequests || statusCode == http.StatusServiceUnavailable +} + +func (c *APIClient) GetURL(endpoint string, queryParams url.Values) string { + return fmt.Sprintf("%s/%s?%s", c.BaseURL, endpoint, queryParams.Encode()) +} + +func (c *APIClient) CallAPI(reqParams RequestParams) (interface{}, error) { + if reqParams.QueryParams == nil { + reqParams.QueryParams = url.Values{} + } + + if reqParams.Headers == nil { + reqParams.Headers = map[string]interface{}{} + } + + if c.MaxRetries == 0 { + c.MaxRetries = defaultMaxRetries + } + + if c.InitialBackoffTime == 0 { + c.InitialBackoffTime = defaultInitialBackoffTime + } + + if reqParams.Endpoint == "" { + return nil, fmt.Errorf("Please set endpoint to query") + } + + url := c.GetURL(reqParams.Endpoint, reqParams.QueryParams) + reqBody, err := CreateRequestBody(reqParams.RequestType, url) + if err != nil { + return nil, fmt.Errorf("http request creation failed") + } + + SetAuthHeaders(reqBody, c.AuthType, c.AuthHeaders) + SetHeaders(reqBody, reqParams.Headers) + client := c.HTTP + if client == nil { + client = &http.Client{} + } + + var result interface{} + retries := 0 + + for retries <= c.MaxRetries { + resp, err := client.Do(reqBody) + if err != nil { + return nil, fmt.Errorf("http request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + return result, nil + } else if isRetryableStatusCode(resp.StatusCode) { + retries++ + backoffDuration := c.InitialBackoffTime * time.Duration(1<