diff --git a/.github/workflows/ledgerexporter-release.yml b/.github/workflows/ledgerexporter-release.yml index dd4e61415b..2e66847286 100644 --- a/.github/workflows/ledgerexporter-release.yml +++ b/.github/workflows/ledgerexporter-release.yml @@ -12,7 +12,10 @@ jobs: env: LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core - LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing + # this pins to a version of quickstart:testing that has the same version as STELLAR_CORE_VERSION + # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" STELLAR_CORE_VERSION: 21.1.0-1921.b3aeb14cc.focal VERSION: ${GITHUB_REF_NAME#ledgerexporter-v} steps: diff --git a/.github/workflows/ledgerexporter.yml b/.github/workflows/ledgerexporter.yml index 369241faf1..c80a367771 100644 --- a/.github/workflows/ledgerexporter.yml +++ b/.github/workflows/ledgerexporter.yml @@ -13,7 +13,10 @@ jobs: CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core - LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing + # this pins to a version of quickstart:testing that has the same version as LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN + # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" steps: - name: Install captive core run: | diff --git a/exp/services/ledgerexporter/DEVELOPER_GUIDE.md b/exp/services/ledgerexporter/DEVELOPER_GUIDE.md index 4cef8419bb..28a16ec1b0 100644 --- a/exp/services/ledgerexporter/DEVELOPER_GUIDE.md +++ b/exp/services/ledgerexporter/DEVELOPER_GUIDE.md @@ -50,7 +50,9 @@ tests to run. Optional, tests will try to run `stellar-core` from o/s PATH for captive core, if not resolvable, then set `LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN=/path/to/stellar-core` -Optional, can override the version of quickstart used to run standalone stellar network, `LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE=docker.io/stellar/quickstart:`. Ideally don't need to change this, but is available. +Optional, can override the version of quickstart used to run standalone stellar network, `LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE=docker.io/stellar/quickstart:`. By default it will try to docker pull `stellar/quickstart:testing` image to local host's docker image store. Set `LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL=false` to skip the pull, if you know host has up to date image. + +Note, the version of stellar core in `LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE` and `LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN` needs to be on the same major rev or the captive core process may not be able to join or parse ledger meta from the `local` network created by `LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE` ``` $ LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED=true go test -v -race -run TestLedgerExporterTestSuite ./exp/services/ledgerexporter/... diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index 40cdf90f0e..00382f00e4 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -186,7 +186,7 @@ func newAdminServer(adminPort int, prometheusRegistry *prometheus.Registry) *htt } func (a *App) Run(runtimeSettings RuntimeSettings) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(runtimeSettings.Ctx) defer cancel() if err := a.init(ctx, runtimeSettings); err != nil { diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index 754fe5512b..196327e229 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -48,6 +48,7 @@ type RuntimeSettings struct { EndLedger uint32 ConfigFilePath string Mode Mode + Ctx context.Context } type StellarCoreConfig struct { @@ -56,6 +57,8 @@ type StellarCoreConfig struct { HistoryArchiveUrls []string `toml:"history_archive_urls"` StellarCoreBinaryPath string `toml:"stellar_core_binary_path"` CaptiveCoreTomlPath string `toml:"captive_core_toml_path"` + CheckpointFrequency uint32 `toml:"checkpoint_frequency"` + StoragePath string `toml:"storage_path"` } type Config struct { @@ -185,15 +188,20 @@ func (config *Config) GenerateCaptiveCoreConfig(coreBinFromPath string) (ledgerb return ledgerbackend.CaptiveCoreConfig{}, errors.Wrap(err, "Failed to create captive-core toml") } + checkpointFrequency := historyarchive.DefaultCheckpointFrequency + if config.StellarCoreConfig.CheckpointFrequency > 0 { + checkpointFrequency = config.StellarCoreConfig.CheckpointFrequency + } return ledgerbackend.CaptiveCoreConfig{ BinaryPath: config.StellarCoreConfig.StellarCoreBinaryPath, NetworkPassphrase: params.NetworkPassphrase, HistoryArchiveURLs: params.HistoryArchiveURLs, - CheckpointFrequency: historyarchive.DefaultCheckpointFrequency, + CheckpointFrequency: checkpointFrequency, Log: logger.WithField("subservice", "stellar-core"), Toml: captiveCoreToml, UserAgent: "ledger-exporter", UseDB: true, + StoragePath: config.StellarCoreConfig.StoragePath, }, nil } diff --git a/exp/services/ledgerexporter/internal/integration_test.go b/exp/services/ledgerexporter/internal/integration_test.go index 9fc88c4e6b..a9af8b9550 100644 --- a/exp/services/ledgerexporter/internal/integration_test.go +++ b/exp/services/ledgerexporter/internal/integration_test.go @@ -13,10 +13,12 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" "github.com/docker/go-connections/nat" "github.com/pkg/errors" "github.com/pelletier/go-toml" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/fsouza/fake-gcs-server/fakestorage" @@ -26,17 +28,21 @@ import ( ) const ( - maxWaitForCoreStartup = (30 * time.Second) + maxWaitForCoreStartup = (180 * time.Second) coreStartupPingInterval = time.Second + // set the max ledger we want the standalone network to emit + // tests then refer to ledger sequences only up to this, therefore + // don't have to do complex waiting within test for a sequence to exist. + waitForCoreLedgerSequence = 16 ) func TestLedgerExporterTestSuite(t *testing.T) { + os.Setenv("LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED", "true") + if os.Getenv("LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED") != "true" { t.Skip("skipping integration test: LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED not true") } - defineCommands() - ledgerExporterSuite := &LedgerExporterTestSuite{} suite.Run(t, ledgerExporterSuite) } @@ -46,25 +52,26 @@ type LedgerExporterTestSuite struct { tempConfigFile string ctx context.Context ctxStop context.CancelFunc - coreContainerId string - coreHttpPort int + coreContainerID string dockerCli *client.Client gcsServer *fakestorage.Server + finishedSetup bool } func (s *LedgerExporterTestSuite) TestScanAndFill() { require := s.Require() + rootCmd := defineCommands() rootCmd.SetArgs([]string{"scan-and-fill", "--start", "4", "--end", "5", "--config-file", s.tempConfigFile}) - var errWriter io.Writer = &bytes.Buffer{} - var outWriter io.Writer = &bytes.Buffer{} - rootCmd.SetErr(errWriter) - rootCmd.SetOut(outWriter) + var errWriter bytes.Buffer + var outWriter bytes.Buffer + rootCmd.SetErr(&errWriter) + rootCmd.SetOut(&outWriter) err := rootCmd.ExecuteContext(s.ctx) require.NoError(err) - output := outWriter.(*bytes.Buffer).String() - errOutput := errWriter.(*bytes.Buffer).String() + output := outWriter.String() + errOutput := errWriter.String() s.T().Log(output) s.T().Log(errOutput) @@ -79,29 +86,64 @@ func (s *LedgerExporterTestSuite) TestAppend() { require := s.Require() // first populate ledgers 4-5 - rootCmd.SetArgs([]string{"scan-and-fill", "--start", "4", "--end", "5", "--config-file", s.tempConfigFile}) + rootCmd := defineCommands() + rootCmd.SetArgs([]string{"scan-and-fill", "--start", "6", "--end", "7", "--config-file", s.tempConfigFile}) err := rootCmd.ExecuteContext(s.ctx) require.NoError(err) - // now run an append of overalapping range, it will resume past existing ledgers 4,5 - rootCmd.SetArgs([]string{"append", "--start", "4", "--end", "7", "--config-file", s.tempConfigFile}) - var errWriter io.Writer = &bytes.Buffer{} - var outWriter io.Writer = &bytes.Buffer{} - rootCmd.SetErr(errWriter) - rootCmd.SetOut(outWriter) + // now run an append of overalapping range, it will resume past existing ledgers + rootCmd.SetArgs([]string{"append", "--start", "6", "--end", "9", "--config-file", s.tempConfigFile}) + var errWriter bytes.Buffer + var outWriter bytes.Buffer + rootCmd.SetErr(&errWriter) + rootCmd.SetOut(&outWriter) err = rootCmd.ExecuteContext(s.ctx) require.NoError(err) - output := outWriter.(*bytes.Buffer).String() - errOutput := errWriter.(*bytes.Buffer).String() + output := outWriter.String() + errOutput := errWriter.String() s.T().Log(output) s.T().Log(errOutput) datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone") require.NoError(err) - _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF8--7.xdr.zstd") + _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd") + require.NoError(err) +} + +func (s *LedgerExporterTestSuite) TestAppendUnbounded() { + require := s.Require() + + rootCmd := defineCommands() + rootCmd.SetArgs([]string{"append", "--start", "10", "--config-file", s.tempConfigFile}) + var errWriter bytes.Buffer + var outWriter bytes.Buffer + rootCmd.SetErr(&errWriter) + rootCmd.SetOut(&outWriter) + + appendCtx, cancel := context.WithCancel(s.ctx) + syn := make(chan struct{}) + defer func() { <-syn }() + defer cancel() + go func() { + defer close(syn) + require.NoError(rootCmd.ExecuteContext(appendCtx)) + output := outWriter.String() + errOutput := errWriter.String() + s.T().Log(output) + s.T().Log(errOutput) + }() + + datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone") require.NoError(err) + + require.EventuallyWithT(func(c *assert.CollectT) { + // this checks every 50ms up to 180s total + assert := assert.New(c) + _, err = datastore.GetFile(s.ctx, "FFFFFFF5--10-19/FFFFFFF0--15.xdr.zstd") + assert.NoError(err) + }, 180*time.Second, 50*time.Millisecond, "append unbounded did not work") } func (s *LedgerExporterTestSuite) SetupSuite() { @@ -110,21 +152,31 @@ func (s *LedgerExporterTestSuite) SetupSuite() { s.ctx, s.ctxStop = signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer func() { + if !s.finishedSetup { + s.TearDownSuite() + } + }() + ledgerExporterConfigTemplate, err := toml.LoadFile("test/integration_config_template.toml") if err != nil { t.Fatalf("unable to load config template file %v", err) } - // if LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN specified, + testTempDir := t.TempDir() + + // if LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN not specified, // ledgerexporter will attempt resolve core bin using 'stellar-core' from OS path ledgerExporterConfigTemplate.Set("stellar_core_config.stellar_core_binary_path", os.Getenv("LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN")) + ledgerExporterConfigTemplate.Set("stellar_core_config.storage_path", filepath.Join(testTempDir, "captive-core")) + tomlBytes, err := toml.Marshal(ledgerExporterConfigTemplate) if err != nil { t.Fatalf("unable to load config file %v", err) } - testTempDir := t.TempDir() + tempSeedDataPath := filepath.Join(testTempDir, "data") if err = os.MkdirAll(filepath.Join(tempSeedDataPath, "integration-test"), 0777); err != nil { t.Fatalf("unable to create seed data in temp path, %v", err) @@ -160,16 +212,34 @@ func (s *LedgerExporterTestSuite) SetupSuite() { if quickstartImage == "" { quickstartImage = "stellar/quickstart:testing" } - s.mustStartCore(t, quickstartImage) + pullQuickStartImage := true + if os.Getenv("LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL") == "false" { + pullQuickStartImage = false + } + + s.mustStartCore(t, quickstartImage, pullQuickStartImage) s.mustWaitForCore(t, ledgerExporterConfigTemplate.GetArray("stellar_core_config.history_archive_urls").([]string), ledgerExporterConfigTemplate.Get("stellar_core_config.network_passphrase").(string)) + s.finishedSetup = true } func (s *LedgerExporterTestSuite) TearDownSuite() { - if s.coreContainerId != "" { - if err := s.dockerCli.ContainerStop(context.Background(), s.coreContainerId, container.StopOptions{}); err != nil { - s.T().Logf("unable to stop core container, %v, %v", s.coreContainerId, err) + if s.coreContainerID != "" { + s.T().Logf("Stopping the quickstart container %v", s.coreContainerID) + containerLogs, err := s.dockerCli.ContainerLogs(s.ctx, s.coreContainerID, container.LogsOptions{ShowStdout: true, ShowStderr: true}) + + if err == nil { + var errWriter bytes.Buffer + var outWriter bytes.Buffer + stdcopy.StdCopy(&outWriter, &errWriter, containerLogs) + s.T().Log(outWriter.String()) + s.T().Log(errWriter.String()) + } + if err := s.dockerCli.ContainerStop(context.Background(), s.coreContainerID, container.StopOptions{}); err != nil { + s.T().Logf("unable to stop core container, %v, %v", s.coreContainerID, err) } + } + if s.dockerCli != nil { s.dockerCli.Close() } if s.gcsServer != nil { @@ -178,25 +248,35 @@ func (s *LedgerExporterTestSuite) TearDownSuite() { s.ctxStop() } -func (s *LedgerExporterTestSuite) mustStartCore(t *testing.T, quickstartImage string) { +func (s *LedgerExporterTestSuite) mustStartCore(t *testing.T, quickstartImage string, pullImage bool) { var err error s.dockerCli, err = client.NewClientWithOpts(client.WithAPIVersionNegotiation()) if err != nil { t.Fatalf("could not create docker client, %v", err) } - img, err := s.dockerCli.ImagePull(s.ctx, quickstartImage, image.PullOptions{All: true}) - if err != nil { - t.Fatalf("could not pull docker image, %v, %v", quickstartImage, err) + if pullImage { + imgReader, err := s.dockerCli.ImagePull(s.ctx, quickstartImage, image.PullOptions{}) + if err != nil { + t.Fatalf("could not pull docker image, %v, %v", quickstartImage, err) + } + // ImagePull is asynchronous. + // The reader needs to be read completely for the pull operation to complete. + _, err = io.Copy(io.Discard, imgReader) + if err != nil { + t.Fatalf("could not pull docker image, %v, %v", quickstartImage, err) + } + + err = imgReader.Close() + if err != nil { + t.Fatalf("could not download all of docker image bytes after pull, %v, %v", quickstartImage, err) + } } - img.Close() resp, err := s.dockerCli.ContainerCreate(s.ctx, &container.Config{ - Image: quickstartImage, - Cmd: []string{"--enable", "core", "--local"}, - AttachStdout: true, - AttachStderr: true, + Image: quickstartImage, + Cmd: []string{"--enable", "core", "--local"}, ExposedPorts: nat.PortSet{ nat.Port("1570/tcp"): {}, nat.Port("11625/tcp"): {}, @@ -215,16 +295,16 @@ func (s *LedgerExporterTestSuite) mustStartCore(t *testing.T, quickstartImage st if err != nil { t.Fatalf("could not create quickstart docker container, %v, error %v", quickstartImage, err) } - s.coreContainerId = resp.ID + s.coreContainerID = resp.ID if err := s.dockerCli.ContainerStart(s.ctx, resp.ID, container.StartOptions{}); err != nil { t.Fatalf("could not run quickstart docker container, %v, error %v", quickstartImage, err) } + t.Logf("Started quickstart container %v", s.coreContainerID) } func (s *LedgerExporterTestSuite) mustWaitForCore(t *testing.T, archiveUrls []string, passphrase string) { t.Log("Waiting for core to be up...") - //coreClient := &stellarcore.Client{URL: "http://localhost:" + strconv.Itoa(s.coreHttpPort)} startTime := time.Now() infoTime := startTime archive, err := historyarchive.NewArchivePool(archiveUrls, historyarchive.ArchiveOptions{ @@ -252,7 +332,7 @@ func (s *LedgerExporterTestSuite) mustWaitForCore(t *testing.T, archiveUrls []st continue } latestCheckpoint := has.CurrentLedger - if latestCheckpoint > 1 { + if latestCheckpoint >= waitForCoreLedgerSequence { return } } diff --git a/exp/services/ledgerexporter/internal/main.go b/exp/services/ledgerexporter/internal/main.go index d1409eb89c..f0ff076ae3 100644 --- a/exp/services/ledgerexporter/internal/main.go +++ b/exp/services/ledgerexporter/internal/main.go @@ -1,6 +1,7 @@ package ledgerexporter import ( + "context" "fmt" "github.com/spf13/cobra" @@ -14,16 +15,15 @@ var ( app := NewApp() return app.Run(runtimeSettings) } - rootCmd, scanAndFillCmd, appendCmd *cobra.Command ) func Execute() error { - defineCommands() + rootCmd := defineCommands() return rootCmd.Execute() } -func defineCommands() { - rootCmd = &cobra.Command{ +func defineCommands() *cobra.Command { + var rootCmd = &cobra.Command{ Use: "ledgerexporter", Short: "Export Stellar network ledger data to a remote data store", Long: "Converts ledger meta data from Stellar network into static data and exports it remote data storage.", @@ -32,7 +32,7 @@ func defineCommands() { return fmt.Errorf("please specify one of the availble sub-commands to initiate export") }, } - scanAndFillCmd = &cobra.Command{ + var scanAndFillCmd = &cobra.Command{ Use: "scan-and-fill", Short: "scans the entire bounded requested range between 'start' and 'end' flags and exports only the ledgers which are missing from the data lake.", Long: "scans the entire bounded requested range between 'start' and 'end' flags and exports only the ledgers which are missing from the data lake.", @@ -42,10 +42,14 @@ func defineCommands() { cmd.PersistentFlags().Lookup("config-file"), ) settings.Mode = ScanFill + settings.Ctx = cmd.Context() + if settings.Ctx == nil { + settings.Ctx = context.Background() + } return ledgerExporterCmdRunner(settings) }, } - appendCmd = &cobra.Command{ + var appendCmd = &cobra.Command{ Use: "append", Short: "export ledgers beginning with the first missing ledger after the specified 'start' ledger and resumes exporting from there", Long: "export ledgers beginning with the first missing ledger after the specified 'start' ledger and resumes exporting from there", @@ -55,6 +59,10 @@ func defineCommands() { cmd.PersistentFlags().Lookup("config-file"), ) settings.Mode = Append + settings.Ctx = cmd.Context() + if settings.Ctx == nil { + settings.Ctx = context.Background() + } return ledgerExporterCmdRunner(settings) }, } @@ -73,6 +81,8 @@ func defineCommands() { "If 'end' is absent or '0' means unbounded mode, exporter will continue to run indefintely and export the latest closed ledgers from network as they are generated in real time.") appendCmd.PersistentFlags().String("config-file", "config.toml", "Path to the TOML config file. Defaults to 'config.toml' on runtime working directory path.") viper.BindPFlags(appendCmd.PersistentFlags()) + + return rootCmd } func bindCliParameters(startFlag *pflag.Flag, endFlag *pflag.Flag, configFileFlag *pflag.Flag) RuntimeSettings { diff --git a/exp/services/ledgerexporter/internal/main_test.go b/exp/services/ledgerexporter/internal/main_test.go index 4c9e5412f3..fdc5e04deb 100644 --- a/exp/services/ledgerexporter/internal/main_test.go +++ b/exp/services/ledgerexporter/internal/main_test.go @@ -97,7 +97,7 @@ func TestFlagsOutput(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { // mock the ledger exporter's cmd runner to be this test's mock routine instead of real app ledgerExporterCmdRunner = testCase.appRunner - defineCommands() + rootCmd := defineCommands() rootCmd.SetArgs(testCase.commandArgs) var errWriter io.Writer = &bytes.Buffer{} var outWriter io.Writer = &bytes.Buffer{} diff --git a/exp/services/ledgerexporter/internal/test/integration_config_template.toml b/exp/services/ledgerexporter/internal/test/integration_config_template.toml index 50042c9806..410aa0f7e5 100644 --- a/exp/services/ledgerexporter/internal/test/integration_config_template.toml +++ b/exp/services/ledgerexporter/internal/test/integration_config_template.toml @@ -11,4 +11,5 @@ files_per_partition = 10 [stellar_core_config] captive_core_toml_path = "test/integration_captive_core.cfg" history_archive_urls = ["http://localhost:1570"] -network_passphrase = "Standalone Network ; February 2017" \ No newline at end of file +network_passphrase = "Standalone Network ; February 2017" +checkpoint_frequency = 8 \ No newline at end of file