diff --git a/.ci/bump-golang.yml b/.ci/bump-golang.yml index 086b72edf7f..470c6f4c8d5 100644 --- a/.ci/bump-golang.yml +++ b/.ci/bump-golang.yml @@ -103,15 +103,6 @@ targets: content: '{{ source "latestGoVersion" }}' file: .golangci.yml matchpattern: '\d+.\d+.\d+' - update-govulncheck: - name: "Update govulncheck.yml" - sourceid: latestGoVersion - scmid: githubConfig - kind: file - spec: - content: '{{ source "latestGoVersion" }}' - file: .github/workflows/govulncheck.yml - matchpattern: '\d+.\d+.\d+' update-version.asciidoc: name: "Update version.asciidoc" sourceid: latestGoVersion diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml deleted file mode 100644 index 4d8a0ba4ec1..00000000000 --- a/.github/workflows/govulncheck.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: govulncheck -on: - pull_request: - -jobs: - govulncheck: - strategy: - matrix: - os: [ ubuntu-latest, macos-latest, windows-latest ] - name: vulncheck - runs-on: ${{ matrix.os }} - steps: - - id: govulncheck - uses: golang/govulncheck-action@v1 - with: - go-version-input: 1.20.8 - go-package: ./... diff --git a/.go-version b/.go-version index 95393fc7d4d..20538953a5b 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.20.8 +1.20.9 diff --git a/.golangci.yml b/.golangci.yml index a6f9f5de236..89203ee7a09 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -116,7 +116,7 @@ linters-settings: gosimple: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" nakedret: # make an issue if func has more lines of code than this setting and it has naked returns; default is 30 @@ -136,17 +136,17 @@ linters-settings: staticcheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" checks: ["all"] stylecheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" checks: ["all"] unused: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" gosec: excludes: diff --git a/Dockerfile b/Dockerfile index d0c2edc9f94..10882552a46 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG GO_VERSION=1.20.8 +ARG GO_VERSION=1.20.9 FROM circleci/golang:${GO_VERSION} diff --git a/Dockerfile.skaffold b/Dockerfile.skaffold index ca9300eb4de..24a4df32626 100644 --- a/Dockerfile.skaffold +++ b/Dockerfile.skaffold @@ -1,4 +1,4 @@ -ARG GO_VERSION=1.20.8 +ARG GO_VERSION=1.20.9 ARG crossbuild_image="docker.elastic.co/beats-dev/golang-crossbuild" ARG AGENT_VERSION=8.9.0-SNAPSHOT ARG AGENT_IMAGE="docker.elastic.co/beats/elastic-agent" diff --git a/changelog/8.10.3.asciidoc b/changelog/8.10.3.asciidoc new file mode 100644 index 00000000000..ba6be27f7fc --- /dev/null +++ b/changelog/8.10.3.asciidoc @@ -0,0 +1,41 @@ +// begin 8.10.3 relnotes + +[[release-notes-8.10.3]] +== 8.10.3 + +Review important information about the 8.10.3 release. + + + + + + + + + +[discrete] +[[new-features-8.10.3]] +=== New features + +The 8.10.3 release adds the following new and notable features. + + +elastic-agent:: + +* Improve Agent Uninstall On Windows By Adding Delay Between Retries When File Removal Is Blocked By Busy Files. {elastic-agent-pull}https://github.com/elastic/elastic-agent/pull/3431[#https://github.com/elastic/elastic-agent/pull/3431] {elastic-agent-issue}https://github.com/elastic/elastic-agent/issues/3221[#https://github.com/elastic/elastic-agent/issues/3221] + + + + + + +[discrete] +[[bug-fixes-8.10.3]] +=== Bug fixes + + +elastic-agent:: + +* Resilient Handling Of Air Gapped Pgp Checks. {elastic-agent-pull}https://github.com/elastic/elastic-agent/pull/3427[#https://github.com/elastic/elastic-agent/pull/3427] {elastic-agent-issue}https://github.com/elastic/elastic-agent/issues/3368[#https://github.com/elastic/elastic-agent/issues/3368] + +// end 8.10.3 relnotes diff --git a/changelog/8.10.3.yaml b/changelog/8.10.3.yaml new file mode 100644 index 00000000000..6273a300a63 --- /dev/null +++ b/changelog/8.10.3.yaml @@ -0,0 +1,26 @@ +version: 8.10.3 +entries: + - kind: bug-fix + summary: Resilient handling of air gapped PGP checks + description: Elastic Agent should not fail when remote PGP is specified (or official Elastic fallback PGP used) and remote is not available + component: elastic-agent + pr: + - https://github.com/elastic/elastic-agent/pull/3427 + issue: + - https://github.com/elastic/elastic-agent/issues/3368 + timestamp: 1695035111 + file: + name: 1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml + checksum: 8741bfa9475a09d5901dc3fab0fed3a06b55d5bb + - kind: feature + summary: Improve Agent uninstall on Windows by adding delay between retries when file removal is blocked by busy files + description: "" + component: elastic-agent + pr: + - https://github.com/elastic/elastic-agent/pull/3431 + issue: + - https://github.com/elastic/elastic-agent/issues/3221 + timestamp: 1695050880 + file: + name: 1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml + checksum: 45eab228dfd89392a0f3685a628f73ccce05d081 diff --git a/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml b/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml deleted file mode 100644 index caaa8a2f53a..00000000000 --- a/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# Kind can be one of: -# - breaking-change: a change to previously-documented behavior -# - deprecation: functionality that is being removed in a later release -# - bug-fix: fixes a problem in a previous version -# - enhancement: extends functionality but does not break or fix existing behavior -# - feature: new functionality -# - known-issue: problems that we are aware of in a given version -# - security: impacts on the security of a product or a user’s deployment. -# - upgrade: important information for someone upgrading from a prior version -# - other: does not fit into any of the other categories -kind: bug-fix - -# Change summary; a 80ish characters long description of the change. -summary: Resilient handling of air gapped PGP checks - -# Long description; in case the summary is not enough to describe the change -# this field accommodate a description without length limits. -description: Elastic Agent should not fail when remote PGP is specified (or official Elastic fallback PGP used) and remote is not available - -# Affected component; a word indicating the component this changeset affects. -component: elastic-agent - -# PR number; optional; the PR number that added the changeset. -# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. -# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. -# Please provide it if you are adding a fragment for a different PR. -pr: 3427 - -# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). -# If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: 3368 diff --git a/changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml b/changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml similarity index 92% rename from changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml rename to changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml index b3c1e7ac5e7..bf86933d97e 100644 --- a/changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml +++ b/changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml @@ -8,10 +8,10 @@ # - security: impacts on the security of a product or a user’s deployment. # - upgrade: important information for someone upgrading from a prior version # - other: does not fit into any of the other categories -kind: feature +kind: enhancement # Change summary; a 80ish characters long description of the change. -summary: Improve uninstall by adding some pause between retries when removal is blocked by busy files +summary: Increase agent monitoring metrics interval from 10s to 60s to reduce load # Long description; in case the summary is not enough to describe the change # this field accommodate a description without length limits. @@ -19,7 +19,7 @@ summary: Improve uninstall by adding some pause between retries when removal is #description: # Affected component; a word indicating the component this changeset affects. -component: elastic-agent +component: monitoring # PR URL; optional; the PR number that added the changeset. # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. diff --git a/changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml b/changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml similarity index 88% rename from changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml rename to changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml index 42c8945cb91..0e2bba7ec53 100644 --- a/changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml +++ b/changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml @@ -8,10 +8,10 @@ # - security: impacts on the security of a product or a user’s deployment. # - upgrade: important information for someone upgrading from a prior version # - other: does not fit into any of the other categories -kind: bug-fix +kind: security # Change summary; a 80ish characters long description of the change. -summary: Fix gpg verification, if one is successful upgrade should continue. +summary: Upgrade to Go 1.20.9 # Long description; in case the summary is not enough to describe the change # this field accommodate a description without length limits. @@ -25,8 +25,8 @@ component: elastic-agent # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. # NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. # Please provide it if you are adding a fragment for a different PR. -pr: https://github.com/elastic/elastic-agent/pull/3426 +pr: https://github.com/elastic/elastic-agent/pull/3393 # Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). # If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: https://github.com/elastic/elastic-agent/issues/3368 +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 5c3f94e3e6a..79522096b8c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "strings" + "time" "unicode" "github.com/elastic/elastic-agent/pkg/component" @@ -51,6 +52,10 @@ const ( agentName = "elastic-agent" windowsOS = "windows" + + // metricset execution period used for the monitoring metrics inputs + // we set this to 60s to reduce the load/data volume on the monitoring cluster + metricsCollectionInterval = 60 * time.Second ) var ( @@ -517,6 +522,8 @@ func (b *BeatsMonitor) monitoringNamespace() string { } func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error { + + metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) @@ -532,7 +539,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)}, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -608,7 +615,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, "metricsets": []interface{}{"stats", "state"}, "hosts": endpoints, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -663,7 +670,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "hosts": endpoints, "path": "/stats", "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -725,7 +732,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/inputs/", "namespace": fbDataStreamName, "json.is_array": true, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fbDataStreamName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -799,7 +806,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/shipper", "hosts": endpoints, "namespace": "application", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }, map[string]interface{}{ @@ -813,7 +820,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": endpoints, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }) } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go new file mode 100644 index 00000000000..9c7ea042d7f --- /dev/null +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -0,0 +1,96 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +func TestMonitoringConfigMetricsInterval(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + mCfg := &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + }, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + b := &BeatsMonitor{ + enabled: true, + config: mCfg, + operatingSystem: runtime.GOOS, + agentInfo: agentInfo, + } + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}) // put a componentID/binary mapping to have something in the beats monitoring input + assert.NoError(t, err) + + rawInputs, ok := got["inputs"] + require.True(t, ok, "monitoring config contains no input") + inputs, ok := rawInputs.([]any) + require.True(t, ok, "monitoring inputs are not a list") + marshaledInputs, err := yaml.Marshal(inputs) + if assert.NoError(t, err, "error marshaling monitoring inputs") { + t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs) + } + + // loop over the created inputs + for _, i := range inputs { + input, ok := i.(map[string]any) + if assert.Truef(t, ok, "input is not represented as a map: %v", i) { + inputID := input["id"] + t.Logf("input %q", inputID) + // check the streams created for the input, should be a list of objects + if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + // loop over streams and cast to map[string]any to access keys + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && + assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + periodString := stream["period"].(string) + duration, err := time.ParseDuration(periodString) + if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { + assert.Equalf(t, duration, 60*time.Second, "unexpected duration for stream %q of input %q", streamID, inputID) + } + } + } + } + } + } + + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go index 263f6ea8bf5..e42a35c76a4 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go @@ -158,7 +158,7 @@ func (v *Verifier) verifyAsc(fullPath string, skipDefaultPgp bool, pgpSources .. v.log.Infof("Verification with PGP[%d] successful", i) return nil } - v.log.Warnf("Verification with PGP[%d] succfailed: %v", i, err) + v.log.Warnf("Verification with PGP[%d] failed: %v", i, err) } v.log.Warnf("Verification failed") diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index 17945ddccb9..7be3ae1066f 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -17,14 +17,12 @@ import ( "strings" "time" - "github.com/docker/go-units" - - "github.com/elastic/elastic-agent-libs/atomic" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/core/logger" ) const ( @@ -46,13 +44,13 @@ const ( // Downloader is a downloader able to fetch artifacts from elastic.co web page. type Downloader struct { - log progressLogger + log *logger.Logger config *artifact.Config client http.Client } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) { client, err := config.HTTPTransportSettings.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second}, @@ -66,7 +64,7 @@ func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, er } // NewDownloaderWithClient creates Elastic Downloader with specific client used -func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client http.Client) *Downloader { +func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader { return &Downloader{ log: log, config: config, @@ -208,152 +206,16 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } } - reportCtx, reportCancel := context.WithCancel(ctx) - dp := newDownloadProgressReporter(e.log, sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize) - dp.Report(reportCtx) + loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout) + dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver) + dp.Report(ctx) _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { - reportCancel() dp.ReportFailed(err) // return path, file already exists and needs to be cleaned up return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } - reportCancel() dp.ReportComplete() return fullPath, nil } - -type downloadProgressReporter struct { - log progressLogger - sourceURI string - interval time.Duration - warnTimeout time.Duration - length float64 - - downloaded atomic.Int - started time.Time -} - -func newDownloadProgressReporter(log progressLogger, sourceURI string, timeout time.Duration, length int) *downloadProgressReporter { - interval := time.Duration(float64(timeout) * downloadProgressIntervalPercentage) - if interval == 0 { - interval = downloadProgressMinInterval - } - - return &downloadProgressReporter{ - log: log, - sourceURI: sourceURI, - interval: interval, - warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage), - length: float64(length), - } -} - -func (dp *downloadProgressReporter) Write(b []byte) (int, error) { - n := len(b) - dp.downloaded.Add(n) - return n, nil -} - -func (dp *downloadProgressReporter) Report(ctx context.Context) { - started := time.Now() - dp.started = started - sourceURI := dp.sourceURI - log := dp.log - length := dp.length - warnTimeout := dp.warnTimeout - interval := dp.interval - - go func() { - t := time.NewTicker(interval) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - now := time.Now() - timePast := now.Sub(started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - - var msg string - var args []interface{} - if length > 0 { - // length of the download is known, so more detail can be provided - percentComplete := downloaded / length * 100.0 - msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps" - args = []interface{}{ - sourceURI, units.HumanSize(downloaded), units.HumanSize(length), percentComplete, units.HumanSize(bytesPerSecond), - } - } else { - // length unknown so provide the amount downloaded and the speed - msg = "download progress from %s has fetched %s @ %sps" - args = []interface{}{ - sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), - } - } - - log.Infof(msg, args...) - if timePast >= warnTimeout { - // duplicate to warn when over the warnTimeout; this still has it logging to info that way if - // they are filtering the logs to info they still see the messages when over the warnTimeout, but - // when filtering only by warn they see these messages only - log.Warnf(msg, args...) - } - } - } - }() -} - -func (dp *downloadProgressReporter) ReportComplete() { - now := time.Now() - timePast := now.Sub(dp.started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - msg := "download from %s completed in %s @ %sps" - args := []interface{}{ - dp.sourceURI, units.HumanDuration(timePast), units.HumanSize(bytesPerSecond), - } - dp.log.Infof(msg, args...) - if timePast >= dp.warnTimeout { - // see reason in `Report` - dp.log.Warnf(msg, args...) - } -} - -func (dp *downloadProgressReporter) ReportFailed(err error) { - now := time.Now() - timePast := now.Sub(dp.started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - var msg string - var args []interface{} - if dp.length > 0 { - // length of the download is known, so more detail can be provided - percentComplete := downloaded / dp.length * 100.0 - msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s" - args = []interface{}{ - dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(dp.length), percentComplete, units.HumanSize(bytesPerSecond), err, - } - } else { - // length unknown so provide the amount downloaded and the speed - msg = "download from %s failed at %s @ %sps: %s" - args = []interface{}{ - dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), err, - } - } - dp.log.Infof(msg, args...) - if timePast >= dp.warnTimeout { - // see reason in `Report` - dp.log.Warnf(msg, args...) - } -} - -// progressLogger is a logger that only needs to implement Infof and Warnf, as those are the only functions -// that the downloadProgressReporter uses. -type progressLogger interface { - Infof(format string, args ...interface{}) - Warnf(format string, args ...interface{}) -} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index 4d6087f479b..119173e1344 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -6,17 +6,22 @@ package http import ( "context" + "fmt" "io/ioutil" "net" "net/http" "net/http/httptest" "os" + "regexp" "strconv" - "sync" "testing" "time" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/docker/go-units" "github.com/stretchr/testify/assert" @@ -57,7 +62,7 @@ func TestDownloadBodyError(t *testing.T) { Architecture: "64", } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) @@ -65,13 +70,15 @@ func TestDownloadBodyError(t *testing.T) { t.Fatal("expected Download to return an error") } - log.lock.RLock() - defer log.lock.RUnlock() + infoLogs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + warnLogs := obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() - require.GreaterOrEqual(t, len(log.info), 1, "download error not logged at info level") - assert.True(t, containsMessage(log.info, "download from %s failed at %s @ %sps: %s")) - require.GreaterOrEqual(t, len(log.warn), 1, "download error not logged at warn level") - assert.True(t, containsMessage(log.warn, "download from %s failed at %s @ %sps: %s")) + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedMsg := fmt.Sprintf("download from %s failed at 0B @ NaNBps: unexpected EOF", expectedURL) + require.GreaterOrEqual(t, len(infoLogs), 1, "download error not logged at info level") + assert.True(t, containsMessage(infoLogs, expectedMsg)) + require.GreaterOrEqual(t, len(warnLogs), 1, "download error not logged at warn level") + assert.True(t, containsMessage(warnLogs, expectedMsg)) } func TestDownloadLogProgressWithLength(t *testing.T) { @@ -111,16 +118,25 @@ func TestDownloadLogProgressWithLength(t *testing.T) { }, } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") - log.lock.RLock() - defer log.lock.RUnlock() + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedProgressRegexp := regexp.MustCompile( + `^download progress from ` + expectedURL + `(.sha512)? is \S+/\S+ \(\d+\.\d{2}% complete\) @ \S+$`, + ) + expectedCompletedRegexp := regexp.MustCompile( + `^download from ` + expectedURL + `(.sha512)? completed in \d+ \S+ @ \S+$`, + ) - expectedProgressMsg := "download progress from %s is %s/%s (%.2f%% complete) @ %sps" + // Consider only progress logs + obs = obs.Filter(func(entry observer.LoggedEntry) bool { + return expectedProgressRegexp.MatchString(entry.Message) || + expectedCompletedRegexp.MatchString(entry.Message) + }) // Two files are downloaded. Each file is being downloaded in 100 chunks with a delay of 10ms between chunks. The // expected time to download is, therefore, 100 * 10ms = 1000ms. In reality, the actual download time will be a bit @@ -129,7 +145,11 @@ func TestDownloadLogProgressWithLength(t *testing.T) { // the actual total download time / 50ms, for each file. That works out to at least 1000ms / 50ms = 20 INFO log // messages, for each file, about its download progress. Additionally, we should expect 1 INFO log message, for // each file, about the download completing. - assertLogs(t, log.info, 20, expectedProgressMsg) + logs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + failed := assertLogs(t, logs, 20, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } // By similar math as above, since the download of each file is expected to take 1000ms, and the progress logger // starts issuing WARN messages once the download has taken more than 75% of the expected time, @@ -137,7 +157,11 @@ func TestDownloadLogProgressWithLength(t *testing.T) { // reporting happens every 50 seconds, we should see at least 250s / 50s = 5 WARN log messages, for each file, // about its download progress. Additionally, we should expect 1 WARN message, for each file, about the download // completing. - assertLogs(t, log.warn, 5, expectedProgressMsg) + logs = obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() + failed = assertLogs(t, logs, 5, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } } func TestDownloadLogProgressWithoutLength(t *testing.T) { @@ -176,16 +200,25 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { }, } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") - log.lock.RLock() - defer log.lock.RUnlock() + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedProgressRegexp := regexp.MustCompile( + `^download progress from ` + expectedURL + `(.sha512)? has fetched \S+ @ \S+$`, + ) + expectedCompletedRegexp := regexp.MustCompile( + `^download from ` + expectedURL + `(.sha512)? completed in \d+ \S+ @ \S+$`, + ) - expectedProgressMsg := "download progress from %s has fetched %s @ %sps" + // Consider only progress logs + obs = obs.Filter(func(entry observer.LoggedEntry) bool { + return expectedProgressRegexp.MatchString(entry.Message) || + expectedCompletedRegexp.MatchString(entry.Message) + }) // Two files are downloaded. Each file is being downloaded in 100 chunks with a delay of 10ms between chunks. The // expected time to download is, therefore, 100 * 10ms = 1000ms. In reality, the actual download time will be a bit @@ -194,7 +227,11 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { // the actual total download time / 50ms, for each file. That works out to at least 1000ms / 50ms = 20 INFO log // messages, for each file, about its download progress. Additionally, we should expect 1 INFO log message, for // each file, about the download completing. - assertLogs(t, log.info, 20, expectedProgressMsg) + logs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + failed := assertLogs(t, logs, 20, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } // By similar math as above, since the download of each file is expected to take 1000ms, and the progress logger // starts issuing WARN messages once the download has taken more than 75% of the expected time, @@ -202,48 +239,23 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { // reporting happens every 50 seconds, we should see at least 250s / 50s = 5 WARN log messages, for each file, // about its download progress. Additionally, we should expect 1 WARN message, for each file, about the download // completing. - assertLogs(t, log.warn, 5, expectedProgressMsg) -} - -type logMessage struct { - record string - args []interface{} -} - -type recordLogger struct { - lock sync.RWMutex - info []logMessage - warn []logMessage -} - -func newRecordLogger() *recordLogger { - return &recordLogger{ - info: make([]logMessage, 0, 10), - warn: make([]logMessage, 0, 10), + logs = obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() + failed = assertLogs(t, logs, 5, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) } } -func (f *recordLogger) Infof(record string, args ...interface{}) { - f.lock.Lock() - defer f.lock.Unlock() - f.info = append(f.info, logMessage{record, args}) -} - -func (f *recordLogger) Warnf(record string, args ...interface{}) { - f.lock.Lock() - defer f.lock.Unlock() - f.warn = append(f.warn, logMessage{record, args}) -} - -func containsMessage(logs []logMessage, msg string) bool { +func containsMessage(logs []observer.LoggedEntry, msg string) bool { for _, item := range logs { - if item.record == msg { + if item.Message == msg { return true } } return false } -func assertLogs(t *testing.T, logs []logMessage, minExpectedProgressLogs int, expectedProgressMsg string) { + +func assertLogs(t *testing.T, logs []observer.LoggedEntry, minExpectedProgressLogs int, expectedProgressRegexp, expectedCompletedRegexp *regexp.Regexp) bool { t.Helper() // Verify that we've logged at least minExpectedProgressLogs (about download progress) + 1 log @@ -252,22 +264,33 @@ func assertLogs(t *testing.T, logs []logMessage, minExpectedProgressLogs int, ex // Verify that the first minExpectedProgressLogs messages are about the download progress (for the first file). i := 0 + failed := false for ; i < minExpectedProgressLogs; i++ { - assert.Equal(t, logs[i].record, expectedProgressMsg) + failed = failed || assert.Regexp(t, expectedProgressRegexp, logs[i].Message) } // Find the next message that's about the download being completed (for the first file). found := false for ; i < len(logs) && !found; i++ { - found = logs[i].record == "download from %s completed in %s @ %sps" + found = expectedCompletedRegexp.MatchString(logs[i].Message) } - assert.True(t, found) + failed = failed || assert.True(t, found) // Verify that the next minExpectedProgressLogs messages are about the download progress (for the second file). for j := 0; j < minExpectedProgressLogs; j++ { - assert.Equal(t, logs[i+j].record, expectedProgressMsg) + failed = failed || assert.Regexp(t, expectedProgressRegexp, logs[i+j].Message) } // Verify that the last message is about the download being completed (for the second file). - assert.Equal(t, logs[len(logs)-1].record, "download from %s completed in %s @ %sps") + failed = failed || assert.Regexp(t, expectedCompletedRegexp, logs[len(logs)-1].Message) + + return failed +} + +// printLogs is called in case one of the assertions fails; it's useful for debugging +func printLogs(t *testing.T, logs []observer.LoggedEntry) { + t.Helper() + for _, entry := range logs { + t.Logf("[%s] %s", entry.Level, entry.Message) + } } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go new file mode 100644 index 00000000000..4eef0682a50 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go @@ -0,0 +1,97 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "time" + + "github.com/docker/go-units" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type progressObserver interface { + // Report is called on a periodic basis with information about the download's progress so far. + Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64) + + // ReportCompleted is called when the download completes successfully. + ReportCompleted(sourceURI string, timePast time.Duration, downloadRate float64) + + // ReportFailed is called if the download does not complete successfully. + ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64, err error) +} + +type loggingProgressObserver struct { + log *logger.Logger + warnTimeout time.Duration +} + +func newLoggingProgressObserver(log *logger.Logger, downloadTimeout time.Duration) *loggingProgressObserver { + return &loggingProgressObserver{ + log: log, + warnTimeout: time.Duration(float64(downloadTimeout) * warningProgressIntervalPercentage), + } +} + +func (lpObs *loggingProgressObserver) Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64) { + var msg string + var args []interface{} + if totalBytes > 0 { + // length of the download is known, so more detail can be provided + msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(totalBytes), percentComplete, units.HumanSize(downloadRate), + } + } else { + // length unknown so provide the amount downloaded and the speed + msg = "download progress from %s has fetched %s @ %sps" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(downloadRate), + } + } + + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // duplicate to warn when over the warnTimeout; this still has it logging to info that way if + // they are filtering the logs to info they still see the messages when over the warnTimeout, but + // when filtering only by warn they see these messages only + lpObs.log.Warnf(msg, args...) + } +} + +func (lpObs *loggingProgressObserver) ReportCompleted(sourceURI string, timePast time.Duration, downloadRate float64) { + msg := "download from %s completed in %s @ %sps" + args := []interface{}{ + sourceURI, units.HumanDuration(timePast), units.HumanSize(downloadRate), + } + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // see reason in `Report` + lpObs.log.Warnf(msg, args...) + } +} + +func (lpObs *loggingProgressObserver) ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64, err error) { + var msg string + var args []interface{} + if totalBytes > 0 { + // length of the download is known, so more detail can be provided + msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(totalBytes), percentComplete, units.HumanSize(downloadRate), err, + } + } else { + // length unknown so provide the amount downloaded and the speed + msg = "download from %s failed at %s @ %sps: %s" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(downloadRate), err, + } + } + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // see reason in `Report` + lpObs.log.Warnf(msg, args...) + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go new file mode 100644 index 00000000000..491646b3ab5 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "context" + "time" + + "github.com/elastic/elastic-agent-libs/atomic" +) + +type downloadProgressReporter struct { + sourceURI string + interval time.Duration + warnTimeout time.Duration + length float64 + + downloaded atomic.Int + started time.Time + + progressObservers []progressObserver + done chan struct{} +} + +func newDownloadProgressReporter(sourceURI string, timeout time.Duration, length int, progressObservers ...progressObserver) *downloadProgressReporter { + interval := time.Duration(float64(timeout) * downloadProgressIntervalPercentage) + if interval == 0 { + interval = downloadProgressMinInterval + } + + return &downloadProgressReporter{ + sourceURI: sourceURI, + interval: interval, + warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage), + length: float64(length), + progressObservers: progressObservers, + done: make(chan struct{}), + } +} + +func (dp *downloadProgressReporter) Write(b []byte) (int, error) { + n := len(b) + dp.downloaded.Add(n) + return n, nil +} + +// Report periodically reports download progress to registered observers. Callers MUST either +// cancel the context provided to this method OR call either ReportComplete or ReportFailed when +// they no longer need the downloadProgressReporter to avoid resource leaks. +func (dp *downloadProgressReporter) Report(ctx context.Context) { + started := time.Now() + dp.started = started + sourceURI := dp.sourceURI + length := dp.length + interval := dp.interval + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-dp.done: + return + case <-t.C: + now := time.Now() + timePast := now.Sub(started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + var percentComplete float64 + if length > 0 { + percentComplete = downloaded / length * 100.0 + } + + for _, obs := range dp.progressObservers { + obs.Report(sourceURI, timePast, downloaded, length, percentComplete, bytesPerSecond) + } + } + } + }() +} + +// ReportComplete reports the completion of a download to registered observers. Callers MUST call +// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter +// to avoid resource leaks. +func (dp *downloadProgressReporter) ReportComplete() { + defer close(dp.done) + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + now := time.Now() + timePast := now.Sub(dp.started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + + for _, obs := range dp.progressObservers { + obs.ReportCompleted(dp.sourceURI, timePast, bytesPerSecond) + } +} + +// ReportFailed reports the failure of a download to registered observers. Callers MUST call +// either ReportFailed or ReportComplete when they no longer need the downloadProgressReporter +// to avoid resource leaks. +func (dp *downloadProgressReporter) ReportFailed(err error) { + defer close(dp.done) + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + now := time.Now() + timePast := now.Sub(dp.started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + var percentComplete float64 + if dp.length > 0 { + percentComplete = downloaded / dp.length * 100.0 + } + + for _, obs := range dp.progressObservers { + obs.ReportFailed(dp.sourceURI, timePast, downloaded, dp.length, percentComplete, bytesPerSecond, err) + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go index 84f3f6045f0..99fca9f65d4 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/core/logger" ) const ( @@ -33,7 +34,7 @@ type Verifier struct { client http.Client pgpBytes []byte allowEmptyPgp bool - log progressLogger + log *logger.Logger } func (v *Verifier) Name() string { @@ -42,7 +43,7 @@ func (v *Verifier) Name() string { // NewVerifier create a verifier checking downloaded package on preconfigured // location against a key stored on elastic.co website. -func NewVerifier(log progressLogger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { +func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { if len(pgp) == 0 && !allowEmptyPgp { return nil, errors.New("expecting PGP but retrieved none", errors.TypeSecurity) } diff --git a/magefile.go b/magefile.go index 183889b3e69..bf7b335cbe8 100644 --- a/magefile.go +++ b/magefile.go @@ -979,10 +979,10 @@ func packageAgent(platforms []string, packagingFn func()) { panic(err) } - packagesMissing := false packagesCopied := 0 if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { + fmt.Printf("--- Package %s\n", pwd) cmd := exec.Command("mage", "package") cmd.Dir = pwd cmd.Stdout = os.Stdout @@ -1008,6 +1008,15 @@ func packageAgent(platforms []string, packagingFn func()) { targetPath := filepath.Join(archivePath, rp) os.MkdirAll(targetPath, 0755) for _, f := range files { + // safety check; if the user has an older version of the beats repo, + // for example right after a release where you've `git pulled` from on repo and not the other, + // they might end up with a mishmash of packages from different versions. + // check to see if we have mismatched versions. + if !strings.Contains(f, packageVersion) { + // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. + panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) + } + targetFile := filepath.Join(targetPath, filepath.Base(f)) packagesCopied += 1 if err := sh.Copy(targetFile, f); err != nil { @@ -1017,8 +1026,8 @@ func packageAgent(platforms []string, packagingFn func()) { } // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. - if packagesMissing && packagesCopied == 0 { - fmt.Printf(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") + if packagesCopied == 0 { + fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") } } } @@ -1153,6 +1162,7 @@ func copyComponentSpecs(componentName, versionedDropPath string) (string, error) targetPath := filepath.Join(versionedDropPath, specFileName) if _, err := os.Stat(targetPath); err != nil { + fmt.Printf(">> File %s does not exist, reverting to local specfile\n", targetPath) // spec not present copy from local sourceSpecFile := filepath.Join("specs", specFileName) if mg.Verbose() { diff --git a/pkg/testing/ess/create_deployment_csp_configuration.yaml b/pkg/testing/ess/create_deployment_csp_configuration.yaml new file mode 100644 index 00000000000..199f664a65a --- /dev/null +++ b/pkg/testing/ess/create_deployment_csp_configuration.yaml @@ -0,0 +1,15 @@ +gcp: + integrations_server_conf_id: "gcp.integrationsserver.n2.68x32x45.2" + elasticsearch_conf_id: "gcp.es.datahot.n2.68x10x45" + elasticsearch_deployment_template_id: "gcp-storage-optimized-v5" + kibana_instance_configuration_id: "gcp.kibana.n2.68x32x45" +azure: + integrations_server_conf_id: "azure.integrationsserver.fsv2.2" + elasticsearch_conf_id: "azure.es.datahot.edsv4" + elasticsearch_deployment_template_id: "azure-storage-optimized-v2" + kibana_instance_configuration_id: "azure.kibana.fsv2" +aws: + integrations_server_conf_id: "aws.integrationsserver.c5d.2.1" + elasticsearch_conf_id: "aws.es.datahot.i3.1.1" + elasticsearch_deployment_template_id: "aws-storage-optimized-v5" + kibana_instance_configuration_id: "aws.kibana.c5d.1.1" \ No newline at end of file diff --git a/pkg/testing/ess/create_deployment_request.tmpl.json b/pkg/testing/ess/create_deployment_request.tmpl.json new file mode 100644 index 00000000000..3ef93868708 --- /dev/null +++ b/pkg/testing/ess/create_deployment_request.tmpl.json @@ -0,0 +1,102 @@ +{ + "resources": { + "integrations_server": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "{{ .request.Region }}", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "{{ .integrations_server_conf_id }}", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "integrations_server": { + "version": "{{ .request.Version }}" + } + }, + "ref_id": "main-integrations_server" + } + ], + "elasticsearch": [ + { + "region": "{{ .request.Region }}", + "settings": { + "dedicated_masters_threshold": 6 + }, + "plan": { + "cluster_topology": [ + { + "zone_count": 1, + "elasticsearch": { + "node_attributes": { + "data": "hot" + } + }, + "instance_configuration_id": "{{.elasticsearch_conf_id}}", + "node_roles": [ + "master", + "ingest", + "transform", + "data_hot", + "remote_cluster_client", + "data_content" + ], + "id": "hot_content", + "size": { + "resource": "memory", + "value": 8192 + } + } + ], + "elasticsearch": { + "version": "{{ .request.Version }}", + "enabled_built_in_plugins": [] + }, + "deployment_template": { + "id": "{{ .elasticsearch_deployment_template_id }}" + } + }, + "ref_id": "main-elasticsearch" + } + ], + "enterprise_search": [], + "kibana": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "{{ .request.Region }}", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "{{.kibana_instance_configuration_id}}", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "kibana": { + "version": "{{ .request.Version }}", + "user_settings_json": { + "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] + } + } + }, + "ref_id": "main-kibana" + } + ] + }, + "settings": { + "autoscaling_enabled": false + }, + "name": "{{ .request.Name }}", + "metadata": { + "system_owned": false, + "tags": {{ json .request.Tags }} + } +} \ No newline at end of file diff --git a/pkg/testing/ess/deployment.go b/pkg/testing/ess/deployment.go index 9d9469036b0..a79f8cb58cb 100644 --- a/pkg/testing/ess/deployment.go +++ b/pkg/testing/ess/deployment.go @@ -7,19 +7,28 @@ package ess import ( "bytes" "context" + _ "embed" "encoding/json" "fmt" - "html/template" "net/http" "net/url" "strings" + "text/template" "time" + + "gopkg.in/yaml.v2" ) +type Tag struct { + Key string `json:"key"` + Value string `json:"value"` +} + type CreateDeploymentRequest struct { Name string `json:"name"` Region string `json:"region"` Version string `json:"version"` + Tags []Tag `json:"tags"` } type CreateDeploymentResponse struct { @@ -85,21 +94,16 @@ type DeploymentStatusResponse struct { // CreateDeployment creates the deployment with the specified configuration. func (c *Client) CreateDeployment(ctx context.Context, req CreateDeploymentRequest) (*CreateDeploymentResponse, error) { - tpl, err := deploymentTemplateFactory(req) + reqBodyBytes, err := generateCreateDeploymentRequestBody(req) if err != nil { return nil, err } - var buf bytes.Buffer - if err := tpl.Execute(&buf, req); err != nil { - return nil, fmt.Errorf("unable to create deployment creation request body: %w", err) - } - createResp, err := c.doPost( ctx, "deployments", "application/json", - &buf, + bytes.NewReader(reqBodyBytes), ) if err != nil { return nil, fmt.Errorf("error calling deployment creation API: %w", err) @@ -308,233 +312,70 @@ func overallStatus(statuses ...DeploymentStatus) DeploymentStatus { return overallStatus } -func deploymentTemplateFactory(req CreateDeploymentRequest) (*template.Template, error) { +//go:embed create_deployment_request.tmpl.json +var createDeploymentRequestTemplate string + +//go:embed create_deployment_csp_configuration.yaml +var cloudProviderSpecificValues []byte + +func generateCreateDeploymentRequestBody(req CreateDeploymentRequest) ([]byte, error) { regionParts := strings.Split(req.Region, "-") if len(regionParts) < 2 { return nil, fmt.Errorf("unable to parse CSP out of region [%s]", req.Region) } csp := regionParts[0] - var tplStr string - switch csp { - case "gcp": - tplStr = createDeploymentRequestTemplateGCP - case "azure": - tplStr = createDeploymentRequestTemplateAzure - default: - return nil, fmt.Errorf("unsupported CSP [%s]", csp) + templateContext, err := createDeploymentTemplateContext(csp, req) + if err != nil { + return nil, fmt.Errorf("creating request template context: %w", err) } - tpl, err := template.New("create_deployment_request").Parse(tplStr) + tpl, err := template.New("create_deployment_request"). + Funcs(template.FuncMap{"json": jsonMarshal}). + Parse(createDeploymentRequestTemplate) if err != nil { return nil, fmt.Errorf("unable to parse deployment creation template: %w", err) } - return tpl, nil + var bBuf bytes.Buffer + err = tpl.Execute(&bBuf, templateContext) + if err != nil { + return nil, fmt.Errorf("rendering create deployment request template with context %v : %w", templateContext, err) + } + return bBuf.Bytes(), nil } -const createDeploymentRequestTemplateGCP = ` -{ - "resources": { - "integrations_server": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "gcp.integrationsserver.n2.68x32x45.2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "integrations_server": { - "version": "{{ .Version }}" - } - }, - "ref_id": "main-integrations_server" - } - ], - "elasticsearch": [ - { - "region": "{{ .Region }}", - "settings": { - "dedicated_masters_threshold": 6 - }, - "plan": { - "cluster_topology": [ - { - "zone_count": 1, - "elasticsearch": { - "node_attributes": { - "data": "hot" - } - }, - "instance_configuration_id": "gcp.es.datahot.n2.68x10x45", - "node_roles": [ - "master", - "ingest", - "transform", - "data_hot", - "remote_cluster_client", - "data_content" - ], - "id": "hot_content", - "size": { - "resource": "memory", - "value": 8192 - } - } - ], - "elasticsearch": { - "version": "{{ .Version }}", - "enabled_built_in_plugins": [] - }, - "deployment_template": { - "id": "gcp-storage-optimized-v5" - } - }, - "ref_id": "main-elasticsearch" - } - ], - "enterprise_search": [], - "kibana": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "gcp.kibana.n2.68x32x45", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "kibana": { - "version": "{{ .Version }}", - "user_settings_json": { - "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] - } - } - }, - "ref_id": "main-kibana" - } - ] - }, - "settings": { - "autoscaling_enabled": false - }, - "name": "{{ .Name }}", - "metadata": { - "system_owned": false - } -}` - -const createDeploymentRequestTemplateAzure = ` -{ - "resources": { - "integrations_server": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "azure.integrationsserver.fsv2.2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "integrations_server": { - "version": "{{ .Version }}" - } - }, - "ref_id": "main-integrations_server" - } - ], - "elasticsearch": [ - { - "region": "{{ .Region }}", - "settings": { - "dedicated_masters_threshold": 6 - }, - "plan": { - "cluster_topology": [ - { - "zone_count": 1, - "elasticsearch": { - "node_attributes": { - "data": "hot" - } - }, - "instance_configuration_id": "azure.es.datahot.edsv4", - "node_roles": [ - "master", - "ingest", - "transform", - "data_hot", - "remote_cluster_client", - "data_content" - ], - "id": "hot_content", - "size": { - "resource": "memory", - "value": 8192 - } - } - ], - "elasticsearch": { - "version": "{{ .Version }}", - "enabled_built_in_plugins": [] - }, - "deployment_template": { - "id": "azure-storage-optimized-v2" - } - }, - "ref_id": "main-elasticsearch" - } - ], - "enterprise_search": [], - "kibana": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "azure.kibana.fsv2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "kibana": { - "version": "{{ .Version }}", - "user_settings_json": { - "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] - } - } - }, - "ref_id": "main-kibana" - } - ] - }, - "settings": { - "autoscaling_enabled": false - }, - "name": "{{ .Name }}", - "metadata": { - "system_owned": false - } -}` +func jsonMarshal(in any) (string, error) { + jsonBytes, err := json.Marshal(in) + if err != nil { + return "", err + } + + return string(jsonBytes), nil +} + +func createDeploymentTemplateContext(csp string, req CreateDeploymentRequest) (map[string]any, error) { + cspSpecificContext, err := loadCspValues(csp) + if err != nil { + return nil, fmt.Errorf("loading csp-specific values for %q: %w", csp, err) + } + + cspSpecificContext["request"] = req + + return cspSpecificContext, nil +} + +func loadCspValues(csp string) (map[string]any, error) { + var cspValues map[string]map[string]any + + err := yaml.Unmarshal(cloudProviderSpecificValues, &cspValues) + if err != nil { + return nil, fmt.Errorf("unmarshalling error: %w", err) + } + values, supportedCSP := cspValues[csp] + if !supportedCSP { + return nil, fmt.Errorf("csp %s not supported", csp) + } + + return values, nil +} diff --git a/pkg/testing/ess/provisioner.go b/pkg/testing/ess/provisioner.go index 941cf5bcaf7..081b4100869 100644 --- a/pkg/testing/ess/provisioner.go +++ b/pkg/testing/ess/provisioner.go @@ -67,7 +67,14 @@ func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequ for _, r := range requests { // allow up to 2 minutes for each create request createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute) - resp, err := p.createDeployment(createCtx, r) + resp, err := p.createDeployment(createCtx, r, + map[string]string{ + "division": "engineering", + "org": "ingest", + "team": "elastic-agent", + "project": "elastic-agent", + "integration-tests": "true", + }) createCancel() if err != nil { return nil, err @@ -131,17 +138,30 @@ func (p *provisioner) Clean(ctx context.Context, stacks []runner.Stack) error { return nil } -func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest) (*CreateDeploymentResponse, error) { +func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest, tags map[string]string) (*CreateDeploymentResponse, error) { ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() p.logger.Logf("Creating stack %s (%s)", r.Version, r.ID) name := fmt.Sprintf("%s-%s", strings.Replace(p.cfg.Identifier, ".", "-", -1), r.ID) - resp, err := p.client.CreateDeployment(ctx, CreateDeploymentRequest{ + + // prepare tags + tagArray := make([]Tag, 0, len(tags)) + for k, v := range tags { + tagArray = append(tagArray, Tag{ + Key: k, + Value: v, + }) + } + + createDeploymentRequest := CreateDeploymentRequest{ Name: name, Region: p.cfg.Region, Version: r.Version, - }) + Tags: tagArray, + } + + resp, err := p.client.CreateDeployment(ctx, createDeploymentRequest) if err != nil { p.logger.Logf("Failed to create ESS cloud %s: %s", r.Version, err) return nil, fmt.Errorf("failed to create ESS cloud for version %s: %w", r.Version, err) diff --git a/version/docs/version.asciidoc b/version/docs/version.asciidoc index fd0e915eae0..153c58d4edf 100644 --- a/version/docs/version.asciidoc +++ b/version/docs/version.asciidoc @@ -3,7 +3,7 @@ // FIXME: once elastic.co docs have been switched over to use `main`, remove // the `doc-site-branch` line below as well as any references to it in the code. :doc-site-branch: master -:go-version: 1.20.8 +:go-version: 1.20.9 :release-state: unreleased :python: 3.7 :docker: 1.12