diff --git a/services/horizon/internal/integration/change_test.go b/services/horizon/internal/integration/change_test.go index 2d87fea8ff..cf90a5bba3 100644 --- a/services/horizon/internal/integration/change_test.go +++ b/services/horizon/internal/integration/change_test.go @@ -3,18 +3,19 @@ package integration import ( "context" "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "io" "testing" "time" ) -func TestCoreDump(t *testing.T) { +func TestProtocolUpgradeChanges(t *testing.T) { tt := assert.New(t) - itest := integration.NewTest(t, integration.Config{SkipHorizonStart: true}) + itest := integration.NewTest(t, integration.Config{SkipHorizonStart: true, SkipProtocolUpgrade: true}) archive, err := historyarchive.Connect( integration.HistoryArchiveUrl, historyarchive.ArchiveOptions{ @@ -23,8 +24,11 @@ func TestCoreDump(t *testing.T) { }) tt.NoError(err) + // Manually invoke command to upgrade protocol + itest.UpgradeProtocol(itest.Config().ProtocolVersion) + upgradedLedgerSeq, _ := itest.GetUpgradeLedgerSeq() + var latestCheckpoint uint32 - startTime := time.Now() publishedNextCheckpoint := func() bool { has, requestErr := archive.GetRootHAS() if requestErr != nil { @@ -32,48 +36,62 @@ func TestCoreDump(t *testing.T) { return false } latestCheckpoint = has.CurrentLedger - t.Logf("Latest ledger so far: %d", latestCheckpoint) - return latestCheckpoint >= uint32(7) // ALLOW for atleast 3 checkpoints + return latestCheckpoint >= upgradedLedgerSeq } - //time.Sleep(15 * time.Second) - // Ensure that a checkpoint has been created with the ledgerNumber you want in it - tt.Eventually(publishedNextCheckpoint, 45*time.Second, time.Second) - endTime := time.Now() + tt.Eventually(publishedNextCheckpoint, 15*time.Second, time.Second) - t.Logf("waited %v seconds to start captive core...", endTime.Sub(startTime).Seconds()) - t.Log("---------- STARTING CAPTIVE CORE ---------") + prevLedgerToUpgrade := upgradedLedgerSeq - 1 + ledgerSeqToLedgers := getLedgersFromArchive(itest, prevLedgerToUpgrade, upgradedLedgerSeq) + prevLedgerChangeMap := changeMap(getChangesFromLedger(itest, ledgerSeqToLedgers[prevLedgerToUpgrade])) + upgradedLedgerChangeMap := changeMap(getChangesFromLedger(itest, ledgerSeqToLedgers[upgradedLedgerSeq])) + + tt.Zero(prevLedgerChangeMap[ingest.LedgerEntryChangeReasonUpgrade]) + tt.NotZero(upgradedLedgerChangeMap[ingest.LedgerEntryChangeReasonUpgrade]) +} - ledgerSeqToLedgers := getLedgersFromArchive(itest, 2, 7) - t.Logf("----- length of hashmap is %v", len(ledgerSeqToLedgers)) - time.Sleep(45 * time.Second) +func getChangesFromLedger(itest *integration.Test, ledger xdr.LedgerCloseMeta) []ingest.Change { + t := itest.CurrentTest() + changeReader, err := ingest.NewLedgerChangeReaderFromLedgerCloseMeta(itest.GetPassPhrase(), ledger) + changes := make([]ingest.Change, 0) + defer changeReader.Close() + if err != nil { + t.Fatalf("unable to create ledger change reader: %v", err) + } + for { + change, err := changeReader.Read() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("unable to read ledger change: %v", err) + } + changes = append(changes, change) + } + return changes } func getLedgersFromArchive(itest *integration.Test, startingLedger uint32, endLedger uint32) map[uint32]xdr.LedgerCloseMeta { t := itest.CurrentTest() - ccConfig, cleanpupFn, err := itest.CreateCaptiveCoreConfig() + ccConfig, cleanupFn, err := itest.CreateCaptiveCoreConfig() if err != nil { - panic(err) + t.Fatalf("unable to create captive core config: %v", err) } + defer cleanupFn() - defer cleanpupFn() captiveCore, err := ledgerbackend.NewCaptive(*ccConfig) if err != nil { - panic(err) + t.Fatalf("unable to create captive core: %v", err) } defer captiveCore.Close() ctx := context.Background() - require.NoError(t, err) - err = captiveCore.PrepareRange(ctx, ledgerbackend.BoundedRange(startingLedger, endLedger)) if err != nil { t.Fatalf("failed to prepare range: %v", err) } - t.Logf("Ledger Range ----- [%v, %v]", startingLedger, endLedger) - var seqToLedgersMap = make(map[uint32]xdr.LedgerCloseMeta) for ledgerSeq := startingLedger; ledgerSeq <= endLedger; ledgerSeq++ { ledger, err := captiveCore.GetLedger(ctx, ledgerSeq) @@ -81,8 +99,15 @@ func getLedgersFromArchive(itest *integration.Test, startingLedger uint32, endLe t.Fatalf("failed to get ledgerNum: %v, error: %v", ledgerSeq, err) } seqToLedgersMap[ledgerSeq] = ledger - itest.CurrentTest().Logf("processed ledgerNum: %v, hash: %v", ledgerSeq, ledger.LedgerHash().HexString()) } return seqToLedgersMap } + +func changeMap(changes []ingest.Change) map[ingest.LedgerEntryChangeReason]int { + changeMap := make(map[ingest.LedgerEntryChangeReason]int) + for _, change := range changes { + changeMap[change.Reason]++ + } + return changeMap +} diff --git a/services/horizon/internal/integration/core_dump_test.go b/services/horizon/internal/integration/core_dump_test.go deleted file mode 100644 index 76ab1b7282..0000000000 --- a/services/horizon/internal/integration/core_dump_test.go +++ /dev/null @@ -1 +0,0 @@ -package integration diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index c7e0d0c75b..7ded1f9a88 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -41,24 +41,6 @@ var networkParamArgs = map[string]string{ horizon.NetworkPassphraseFlagName: "", } -const ( - SimpleCaptiveCoreToml = ` - PEER_PORT=11725 - ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true - - UNSAFE_QUORUM=true - FAILURE_SAFETY=0 - - [[VALIDATORS]] - NAME="local_core" - HOME_DOMAIN="core.local" - PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" - ADDRESS="localhost" - QUALITY="MEDIUM"` - - StellarCoreURL = "http://localhost:11626" -) - var ( CaptiveCoreConfigErrMsg = "error generating captive core configuration: invalid config: " ) @@ -66,9 +48,9 @@ var ( // Ensures that BUCKET_DIR_PATH is not an allowed value for Captive Core. func TestBucketDirDisallowed(t *testing.T) { config := `BUCKET_DIR_PATH="/tmp" - ` + SimpleCaptiveCoreToml + ` + integration.SimpleCaptiveCoreToml - confName, _, cleanup := createCaptiveCoreConfig(config) + confName, _, cleanup := integration.CreateCaptiveCoreConfig(config) defer cleanup() testConfig := integration.GetTestConfig() testConfig.HorizonIngestParameters = map[string]string{ @@ -103,7 +85,7 @@ func TestEnvironmentPreserved(t *testing.T) { testConfig := integration.GetTestConfig() testConfig.HorizonEnvironment = map[string]string{ - "STELLAR_CORE_URL": StellarCoreURL, + "STELLAR_CORE_URL": integration.StellarCoreURL, } test := integration.NewTest(t, *testConfig) @@ -112,7 +94,7 @@ func TestEnvironmentPreserved(t *testing.T) { test.WaitForHorizonIngest() envValue := os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, StellarCoreURL, envValue) + assert.Equal(t, integration.StellarCoreURL, envValue) test.Shutdown() @@ -252,7 +234,7 @@ func TestNetworkEnvironmentVariable(t *testing.T) { // Ensures that the filesystem ends up in the correct state with Captive Core. func TestCaptiveCoreConfigFilesystemState(t *testing.T) { - confName, storagePath, cleanup := createCaptiveCoreConfig(SimpleCaptiveCoreToml) + confName, storagePath, cleanup := integration.CreateCaptiveCoreConfig(integration.SimpleCaptiveCoreToml) defer cleanup() localParams := integration.MergeMaps(defaultCaptiveCoreParameters, map[string]string{ @@ -671,30 +653,3 @@ func validateCaptiveCoreDiskState(itest *integration.Test, rootDir string) { tt.DirExists(storageDir) tt.FileExists(coreConf) } - -// createCaptiveCoreConfig will create a temporary TOML config with the -// specified contents as well as a temporary storage directory. You should -// `defer` the returned function to clean these up when you're done. -func createCaptiveCoreConfig(contents string) (string, string, func()) { - tomlFile, err := ioutil.TempFile("", "captive-core-test-*.toml") - defer tomlFile.Close() - if err != nil { - panic(err) - } - - _, err = tomlFile.WriteString(contents) - if err != nil { - panic(err) - } - - storagePath, err := os.MkdirTemp("", "captive-core-test-*-storage") - if err != nil { - panic(err) - } - - filename := tomlFile.Name() - return filename, storagePath, func() { - os.Remove(filename) - os.RemoveAll(storagePath) - } -} diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 4ea4be6bef..cf283d737c 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -48,22 +48,31 @@ const ( HistoryArchivePort = 1570 SorobanRPCPort = 8080 HistoryArchiveUrl = "http://localhost:1570" + CheckpointFrequency = 8 ) const ( SimpleCaptiveCoreToml = ` PEER_PORT=11725 ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true - + NETWORK_PASSPHRASE = "Standalone Network ; February 2017" UNSAFE_QUORUM=true FAILURE_SAFETY=0 + RUN_STANDALONE=false + + # Lower the TTL of persistent ledger entries + # so that ledger entry extension/restoring becomes testeable + # These 2 settings need to be present in both places - stellar-core-integration-tests.cfg and here + TESTING_MINIMUM_PERSISTENT_ENTRY_LIFETIME=10 + TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true [[VALIDATORS]] NAME="local_core" HOME_DOMAIN="core.local" PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" ADDRESS="localhost" - QUALITY="MEDIUM"` + QUALITY="MEDIUM" +` StellarCoreURL = "http://localhost:11626" ) @@ -73,8 +82,10 @@ type Config struct { ProtocolVersion uint32 EnableSorobanRPC bool SkipCoreContainerCreation bool + SkipCoreContainerDeletion bool // This flag is helpful to debug CoreDockerImage string SorobanRPCDockerImage string + SkipProtocolUpgrade bool // Weird naming here because bools default to false, but we want to start // Horizon by default. @@ -117,13 +128,18 @@ type Test struct { horizonAdminClient *sdk.AdminClient coreClient *stellarcore.Client - webNode *horizon.App - ingestNode *horizon.App - appStopped *sync.WaitGroup - shutdownOnce sync.Once - shutdownCalls []func() - masterKey *keypair.Full - passPhrase string + webNode *horizon.App + ingestNode *horizon.App + appStopped *sync.WaitGroup + shutdownOnce sync.Once + shutdownCalls []func() + masterKey *keypair.Full + passPhrase string + coreUpgradeState *CoreUpgradeState +} + +type CoreUpgradeState struct { + upgradeLedgerSeq uint32 } // GetTestConfig returns the default test Config required to run NewTest. @@ -294,8 +310,13 @@ func (i *Test) prepareShutdownHandlers() { i.ingestNode.Close() } if !i.config.SkipCoreContainerCreation { - i.runComposeCommand("rm", "-fvs", "core") - i.runComposeCommand("rm", "-fvs", "core-postgres") + if !i.config.SkipCoreContainerDeletion { + i.t.Log("Removing core docker containers...") + i.runComposeCommand("rm", "-fvs", "core") + i.runComposeCommand("rm", "-fvs", "core-postgres") + } else { + i.t.Log("Skip core docker container removal for debugging...") + } if i.config.EnableSorobanRPC { i.runComposeCommand("logs", "soroban-rpc") i.runComposeCommand("rm", "-fvs", "soroban-rpc") @@ -358,6 +379,7 @@ func (i *Test) Shutdown() { // StartHorizon initializes and starts the Horizon client-facing API server. // When startIngestProcess=true, start a second process for ingest server func (i *Test) StartHorizon(startIngestProcess bool) error { + i.t.Logf("Starting horizon.....") i.testDB = dbtest.Postgres(i.t) i.shutdownCalls = append(i.shutdownCalls, func() { if i.appStopped == nil { @@ -460,7 +482,7 @@ func (i *Test) getDefaultArgs() map[string]string { "apply-migrations": "true", "port": HorizonDefaultPort, // due to ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING - "checkpoint-frequency": "8", + "checkpoint-frequency": strconv.Itoa(CheckpointFrequency), "per-hour-rate-limit": "0", // disable rate limiting "max-db-connections": "50", // the postgres container supports 100 connections, be conservative } @@ -564,80 +586,115 @@ func (i *Test) setupHorizonClient(webArgs map[string]string) { } } -func createDefaultCaptiveCoreConfig() (*ledgerbackend.CaptiveCoreConfig, error) { +// CreateCaptiveCoreConfig will create a temporary TOML config with the +// specified contents as well as a temporary storage directory. You should +// `defer` the returned function to clean these up when you're done. +func CreateCaptiveCoreConfig(contents string) (string, string, func()) { + tomlFile, err := ioutil.TempFile("", "captive-core-test-*.toml") + defer tomlFile.Close() + if err != nil { + panic(err) + } + + _, err = tomlFile.WriteString(contents) + if err != nil { + panic(err) + } + + storagePath, err := os.MkdirTemp("", "captive-core-test-*-storage") + if err != nil { + panic(err) + } + + filename := tomlFile.Name() + return filename, storagePath, func() { + os.Remove(filename) + os.RemoveAll(storagePath) + } +} + +func (i *Test) CreateCaptiveCoreConfig() (*ledgerbackend.CaptiveCoreConfig, func(), error) { + + confName, storagePath, cleanupFn := CreateCaptiveCoreConfig(SimpleCaptiveCoreToml) + i.t.Logf("Creating Captive Core config files, ConfName: %v, storagePath: %v", confName, storagePath) + captiveCoreConfig := ledgerbackend.CaptiveCoreConfig{ BinaryPath: os.Getenv("HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_BIN"), HistoryArchiveURLs: []string{HistoryArchiveUrl}, NetworkPassphrase: StandaloneNetworkPassphrase, - CheckpointFrequency: 8, // This is required for accelerated archive creation for integration test + CheckpointFrequency: CheckpointFrequency, // This is required for accelerated archive creation for integration test + UseDB: true, + StoragePath: storagePath, } tomlParams := ledgerbackend.CaptiveCoreTomlParams{ NetworkPassphrase: StandaloneNetworkPassphrase, HistoryArchiveURLs: []string{HistoryArchiveUrl}, + UseDB: true, } + toml, err := ledgerbackend.NewCaptiveCoreTomlFromData([]byte(SimpleCaptiveCoreToml), tomlParams) if err != nil { - return nil, err + return nil, func() {}, err } captiveCoreConfig.Toml = toml - return &captiveCoreConfig, nil -} - -func (i *Test) GetDefaultCaptiveCoreInstance() (*ledgerbackend.CaptiveStellarCore, error) { - ccConfig, err := createDefaultCaptiveCoreConfig() - if err != nil { - return nil, err - } - - return ledgerbackend.NewCaptive(*ccConfig) + return &captiveCoreConfig, cleanupFn, nil } const maxWaitForCoreStartup = 30 * time.Second const maxWaitForCoreUpgrade = 5 * time.Second const coreStartupPingInterval = time.Second -// Wait for core to be up and manually close the first ledger -func (i *Test) waitForCore() { - i.t.Log("Waiting for core to be up...") +// Wait for protocol upgrade +func (i *Test) waitCoreForProtocolUpgrade(protocolVersion uint32) { + i.UpgradeProtocol(protocolVersion) + startTime := time.Now() - for time.Since(startTime) < maxWaitForCoreStartup { + for time.Since(startTime) < maxWaitForCoreUpgrade { ctx, cancel := context.WithTimeout(context.Background(), time.Second) infoTime := time.Now() - _, err := i.coreClient.Info(ctx) + info, err := i.coreClient.Info(ctx) cancel() - if err != nil { - i.t.Logf("could not obtain info response: %v", err) + if err != nil || !info.IsSynced() { + i.t.Logf("Core is still not synced: %v %v", err, info) // sleep up to a second between consecutive calls. if durationSince := time.Since(infoTime); durationSince < coreStartupPingInterval { time.Sleep(coreStartupPingInterval - durationSince) } continue } - break + i.t.Log("Core is up.") + return } + i.t.Fatalf("Core could not sync after %v + %v", maxWaitForCoreStartup, maxWaitForCoreUpgrade) +} - i.UpgradeProtocol(i.config.ProtocolVersion) - - startTime = time.Now() - for time.Since(startTime) < maxWaitForCoreUpgrade { +// Wait for core to be up and manually close the first ledger +func (i *Test) waitForCore() { + i.t.Log("Waiting for core to be up...") + startTime := time.Now() + for time.Since(startTime) < maxWaitForCoreStartup { ctx, cancel := context.WithTimeout(context.Background(), time.Second) infoTime := time.Now() - info, err := i.coreClient.Info(ctx) + _, err := i.coreClient.Info(ctx) cancel() - if err != nil || !info.IsSynced() { - i.t.Logf("Core is still not synced: %v %v", err, info) + if err != nil { + i.t.Logf("could not obtain info response: %v", err) // sleep up to a second between consecutive calls. if durationSince := time.Since(infoTime); durationSince < coreStartupPingInterval { time.Sleep(coreStartupPingInterval - durationSince) } continue } - i.t.Log("Core is up.") - return + break + } + + if !i.config.SkipProtocolUpgrade { + i.waitCoreForProtocolUpgrade(i.config.ProtocolVersion) + } else { + i.t.Log("Core is up. Protocol Upgrade skipped. Please manually upgrade protocol version, if needed...") } - i.t.Fatalf("Core could not sync after %v + %v", maxWaitForCoreStartup, maxWaitForCoreUpgrade) } const sorobanRPCInitTime = 20 * time.Second @@ -867,7 +924,8 @@ func (i *Test) RestoreFootprint( } // UpgradeProtocol arms Core with upgrade and blocks until protocol is upgraded. -func (i *Test) UpgradeProtocol(version uint32) int { +func (i *Test) UpgradeProtocol(version uint32) { + i.t.Logf("Attempting Core Protocol upgade to version: %v", version) ctx, cancel := context.WithTimeout(context.Background(), time.Second) err := i.coreClient.Upgrade(ctx, int(version)) cancel() @@ -889,13 +947,22 @@ func (i *Test) UpgradeProtocol(version uint32) int { if info.Info.Ledger.Version == int(version) { i.t.Logf("Protocol upgraded to: %d, in ledger sequence number: %v, hash: %v", info.Info.Ledger.Version, ledgerSeq, info.Info.Ledger.Hash) - return ledgerSeq + i.coreUpgradeState = &CoreUpgradeState{ + upgradeLedgerSeq: uint32(ledgerSeq), + } + return } time.Sleep(time.Second) } i.t.Fatalf("could not upgrade protocol in 10s") - return -1 +} + +func (i *Test) GetUpgradeLedgerSeq() (uint32, error) { + if i.coreUpgradeState == nil { + return 0, errors.Errorf("Core has not been upgraded yet") + } + return i.coreUpgradeState.upgradeLedgerSeq, nil } func (i *Test) WaitForHorizonWeb() {