diff --git a/.ci/bump-golang.yml b/.ci/bump-golang.yml index 086b72edf7f..470c6f4c8d5 100644 --- a/.ci/bump-golang.yml +++ b/.ci/bump-golang.yml @@ -103,15 +103,6 @@ targets: content: '{{ source "latestGoVersion" }}' file: .golangci.yml matchpattern: '\d+.\d+.\d+' - update-govulncheck: - name: "Update govulncheck.yml" - sourceid: latestGoVersion - scmid: githubConfig - kind: file - spec: - content: '{{ source "latestGoVersion" }}' - file: .github/workflows/govulncheck.yml - matchpattern: '\d+.\d+.\d+' update-version.asciidoc: name: "Update version.asciidoc" sourceid: latestGoVersion diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml deleted file mode 100644 index 4d8a0ba4ec1..00000000000 --- a/.github/workflows/govulncheck.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: govulncheck -on: - pull_request: - -jobs: - govulncheck: - strategy: - matrix: - os: [ ubuntu-latest, macos-latest, windows-latest ] - name: vulncheck - runs-on: ${{ matrix.os }} - steps: - - id: govulncheck - uses: golang/govulncheck-action@v1 - with: - go-version-input: 1.20.8 - go-package: ./... diff --git a/.go-version b/.go-version index 95393fc7d4d..20538953a5b 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.20.8 +1.20.9 diff --git a/.golangci.yml b/.golangci.yml index a6f9f5de236..89203ee7a09 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -116,7 +116,7 @@ linters-settings: gosimple: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" nakedret: # make an issue if func has more lines of code than this setting and it has naked returns; default is 30 @@ -136,17 +136,17 @@ linters-settings: staticcheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" checks: ["all"] stylecheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" checks: ["all"] unused: # Select the Go version to target. The default is '1.13'. - go: "1.20.8" + go: "1.20.9" gosec: excludes: diff --git a/Dockerfile b/Dockerfile index d0c2edc9f94..10882552a46 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG GO_VERSION=1.20.8 +ARG GO_VERSION=1.20.9 FROM circleci/golang:${GO_VERSION} diff --git a/Dockerfile.skaffold b/Dockerfile.skaffold index ca9300eb4de..24a4df32626 100644 --- a/Dockerfile.skaffold +++ b/Dockerfile.skaffold @@ -1,4 +1,4 @@ -ARG GO_VERSION=1.20.8 +ARG GO_VERSION=1.20.9 ARG crossbuild_image="docker.elastic.co/beats-dev/golang-crossbuild" ARG AGENT_VERSION=8.9.0-SNAPSHOT ARG AGENT_IMAGE="docker.elastic.co/beats/elastic-agent" diff --git a/NOTICE.txt b/NOTICE.txt index 5f88f27987f..38ce2d28f66 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -923,11 +923,11 @@ these terms. -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-autodiscover -Version: v0.6.2 +Version: v0.6.4 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.2/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.4/LICENSE: Apache License Version 2.0, January 2004 diff --git a/changelog/8.10.3.asciidoc b/changelog/8.10.3.asciidoc new file mode 100644 index 00000000000..ba6be27f7fc --- /dev/null +++ b/changelog/8.10.3.asciidoc @@ -0,0 +1,41 @@ +// begin 8.10.3 relnotes + +[[release-notes-8.10.3]] +== 8.10.3 + +Review important information about the 8.10.3 release. + + + + + + + + + +[discrete] +[[new-features-8.10.3]] +=== New features + +The 8.10.3 release adds the following new and notable features. + + +elastic-agent:: + +* Improve Agent Uninstall On Windows By Adding Delay Between Retries When File Removal Is Blocked By Busy Files. {elastic-agent-pull}https://github.com/elastic/elastic-agent/pull/3431[#https://github.com/elastic/elastic-agent/pull/3431] {elastic-agent-issue}https://github.com/elastic/elastic-agent/issues/3221[#https://github.com/elastic/elastic-agent/issues/3221] + + + + + + +[discrete] +[[bug-fixes-8.10.3]] +=== Bug fixes + + +elastic-agent:: + +* Resilient Handling Of Air Gapped Pgp Checks. {elastic-agent-pull}https://github.com/elastic/elastic-agent/pull/3427[#https://github.com/elastic/elastic-agent/pull/3427] {elastic-agent-issue}https://github.com/elastic/elastic-agent/issues/3368[#https://github.com/elastic/elastic-agent/issues/3368] + +// end 8.10.3 relnotes diff --git a/changelog/8.10.3.yaml b/changelog/8.10.3.yaml new file mode 100644 index 00000000000..6273a300a63 --- /dev/null +++ b/changelog/8.10.3.yaml @@ -0,0 +1,26 @@ +version: 8.10.3 +entries: + - kind: bug-fix + summary: Resilient handling of air gapped PGP checks + description: Elastic Agent should not fail when remote PGP is specified (or official Elastic fallback PGP used) and remote is not available + component: elastic-agent + pr: + - https://github.com/elastic/elastic-agent/pull/3427 + issue: + - https://github.com/elastic/elastic-agent/issues/3368 + timestamp: 1695035111 + file: + name: 1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml + checksum: 8741bfa9475a09d5901dc3fab0fed3a06b55d5bb + - kind: feature + summary: Improve Agent uninstall on Windows by adding delay between retries when file removal is blocked by busy files + description: "" + component: elastic-agent + pr: + - https://github.com/elastic/elastic-agent/pull/3431 + issue: + - https://github.com/elastic/elastic-agent/issues/3221 + timestamp: 1695050880 + file: + name: 1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml + checksum: 45eab228dfd89392a0f3685a628f73ccce05d081 diff --git a/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml b/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml deleted file mode 100644 index caaa8a2f53a..00000000000 --- a/changelog/fragments/1695035111-Resilient-handling-of-air-gapped-PGP-checks.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# Kind can be one of: -# - breaking-change: a change to previously-documented behavior -# - deprecation: functionality that is being removed in a later release -# - bug-fix: fixes a problem in a previous version -# - enhancement: extends functionality but does not break or fix existing behavior -# - feature: new functionality -# - known-issue: problems that we are aware of in a given version -# - security: impacts on the security of a product or a user’s deployment. -# - upgrade: important information for someone upgrading from a prior version -# - other: does not fit into any of the other categories -kind: bug-fix - -# Change summary; a 80ish characters long description of the change. -summary: Resilient handling of air gapped PGP checks - -# Long description; in case the summary is not enough to describe the change -# this field accommodate a description without length limits. -description: Elastic Agent should not fail when remote PGP is specified (or official Elastic fallback PGP used) and remote is not available - -# Affected component; a word indicating the component this changeset affects. -component: elastic-agent - -# PR number; optional; the PR number that added the changeset. -# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. -# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. -# Please provide it if you are adding a fragment for a different PR. -pr: 3427 - -# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). -# If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: 3368 diff --git a/changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml b/changelog/fragments/1696530758-bugfix-upgrade-progress-reporter.yaml similarity index 89% rename from changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml rename to changelog/fragments/1696530758-bugfix-upgrade-progress-reporter.yaml index 42c8945cb91..d39f4fa2f41 100644 --- a/changelog/fragments/1694700201-gpg-unreachable-url-fix.yaml +++ b/changelog/fragments/1696530758-bugfix-upgrade-progress-reporter.yaml @@ -11,7 +11,7 @@ kind: bug-fix # Change summary; a 80ish characters long description of the change. -summary: Fix gpg verification, if one is successful upgrade should continue. +summary: Periodically report progress of Elastic Agent artifact download during upgrade # Long description; in case the summary is not enough to describe the change # this field accommodate a description without length limits. @@ -25,8 +25,8 @@ component: elastic-agent # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. # NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. # Please provide it if you are adding a fragment for a different PR. -pr: https://github.com/elastic/elastic-agent/pull/3426 +pr: https://github.com/elastic/elastic-agent/pull/3548 # Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). # If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: https://github.com/elastic/elastic-agent/issues/3368 +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml b/changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml similarity index 92% rename from changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml rename to changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml index b3c1e7ac5e7..bf86933d97e 100644 --- a/changelog/fragments/1695050880-Improve-retry-strategy-when-uninstalling-agent.yaml +++ b/changelog/fragments/1696955150-Slow-down-agent-monitoring-metrics-interval-to-60s.yaml @@ -8,10 +8,10 @@ # - security: impacts on the security of a product or a user’s deployment. # - upgrade: important information for someone upgrading from a prior version # - other: does not fit into any of the other categories -kind: feature +kind: enhancement # Change summary; a 80ish characters long description of the change. -summary: Improve uninstall by adding some pause between retries when removal is blocked by busy files +summary: Increase agent monitoring metrics interval from 10s to 60s to reduce load # Long description; in case the summary is not enough to describe the change # this field accommodate a description without length limits. @@ -19,7 +19,7 @@ summary: Improve uninstall by adding some pause between retries when removal is #description: # Affected component; a word indicating the component this changeset affects. -component: elastic-agent +component: monitoring # PR URL; optional; the PR number that added the changeset. # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. diff --git a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml b/changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml similarity index 70% rename from changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml rename to changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml index 141c18715bf..0e2bba7ec53 100644 --- a/changelog/fragments/1696249276-Start-stop-monitoring-server-based-on-monitoring-config.yaml +++ b/changelog/fragments/1697111928-upgrade-to-go-1.20.9.yaml @@ -8,24 +8,25 @@ # - security: impacts on the security of a product or a user’s deployment. # - upgrade: important information for someone upgrading from a prior version # - other: does not fit into any of the other categories -kind: enhancement +kind: security # Change summary; a 80ish characters long description of the change. -summary: Start/stop monitoring server based on monitoring config +summary: Upgrade to Go 1.20.9 # Long description; in case the summary is not enough to describe the change # this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. #description: -# Affected component; a word indicating the component this changeset affects. +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. component: elastic-agent -# PR number; optional; the PR number that added the changeset. +# PR URL; optional; the PR number that added the changeset. # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. # NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. # Please provide it if you are adding a fragment for a different PR. -pr: 3492 +pr: https://github.com/elastic/elastic-agent/pull/3393 -# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). # If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: 2735 +#issue: https://github.com/owner/repo/1234 diff --git a/go.mod b/go.mod index eb32cabe4d4..f4d4d171fdc 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/docker/go-units v0.5.0 github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 github.com/elastic/e2e-testing v1.1.0 - github.com/elastic/elastic-agent-autodiscover v0.6.2 + github.com/elastic/elastic-agent-autodiscover v0.6.4 github.com/elastic/elastic-agent-client/v7 v7.4.0 github.com/elastic/elastic-agent-libs v0.5.0 github.com/elastic/elastic-agent-system-metrics v0.7.0 diff --git a/go.sum b/go.sum index 9994a9d174f..f27fce36f03 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/e2e-testing v1.1.0 h1:Y+K215EWkf3ojAWmBK2JrxH/rITjkKM1zR8mnwIpvLw= github.com/elastic/e2e-testing v1.1.0/go.mod h1:8q2d8dmwavJXISowwaoreHFBnbR/uK4qanfRGhC/W9A= -github.com/elastic/elastic-agent-autodiscover v0.6.2 h1:7P3cbMBWXjbzA80rxitQjc+PiWyZ4I4F4LqrCYgYlNc= -github.com/elastic/elastic-agent-autodiscover v0.6.2/go.mod h1:yXYKFAG+Py+TcE4CCR8EAbJiYb+6Dz9sCDoWgOveqtU= +github.com/elastic/elastic-agent-autodiscover v0.6.4 h1:K+xC7OGgcy4fLXVuGgOGLs+eXCqRnRg2SQQinxP+KsA= +github.com/elastic/elastic-agent-autodiscover v0.6.4/go.mod h1:5+7NIBAILc0GkgxYW3ckXncu5wRZfltZhTY4aZAYP4M= github.com/elastic/elastic-agent-client/v7 v7.4.0 h1:h75oTkkvIjgiKVm61NpvTZP4cy6QbQ3zrIpXKGigyjo= github.com/elastic/elastic-agent-client/v7 v7.4.0/go.mod h1:9/amG2K2y2oqx39zURcc+hnqcX+nyJ1cZrLgzsgo5c0= github.com/elastic/elastic-agent-libs v0.5.0 h1:8LbxSuMiGy8xhHX5NrE/dmTLsLMEuA+2AODUsiBfEcE= diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 856c624076f..e6a2dbc182f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -158,10 +158,6 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components var CoordinatorShutdownTimeout = time.Second * 5 -type configReloader interface { - Reload(*config.Config) error -} - // Coordinator manages the entire state of the Elastic Agent. // // All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator. @@ -177,8 +173,6 @@ type Coordinator struct { upgradeMgr UpgradeManager monitorMgr MonitorManager - monitoringServerReloader configReloader - runtimeMgr RuntimeManager configMgr ConfigManager varsMgr VarsManager @@ -371,10 +365,6 @@ func (c *Coordinator) State() State { return c.stateBroadcaster.Get() } -func (c *Coordinator) RegisterMonitoringServer(s configReloader) { - c.monitoringServerReloader = s -} - // StateSubscribe returns a channel that reports changes in Coordinator state. // // bufferLen specifies how many state changes should be queued in addition to @@ -1018,12 +1008,6 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { } } - if c.monitoringServerReloader != nil { - if err := c.monitoringServerReloader.Reload(cfg); err != nil { - return fmt.Errorf("failed to reload monitor manager configuration: %w", err) - } - } - c.ast = rawAst return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index dbcaa8b616e..805139f26e8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -24,12 +24,10 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" @@ -522,137 +520,6 @@ inputs: } } -func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { - // Send a test policy to the Coordinator as a Config Manager update, - // verify it generates the right component model and sends it to the - // runtime manager, then send an empty policy and verify it calls - // another runtime manager update with an empty component model. - - // Set a one-second timeout -- nothing here should block, but if it - // does let's report a failure instead of timing out the test runner. - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - logger := logp.NewLogger("testing") - - configChan := make(chan ConfigChange, 1) - - // Create a mocked runtime manager that will report the update call - runtimeManager := &fakeRuntimeManager{ - updateCallback: func(comp []component.Component) error { - return nil - }, - } - - monitoringServer := &fakeMonitoringServer{} - newServerFn := func() (reload.ServerController, error) { - return monitoringServer, nil - } - monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig()) - - coord := &Coordinator{ - logger: logger, - agentInfo: &info.AgentInfo{}, - stateBroadcaster: broadcaster.New(State{}, 0, 0), - managerChans: managerChans{ - configManagerUpdate: configChan, - }, - runtimeMgr: runtimeManager, - vars: emptyVars(t), - } - coord.RegisterMonitoringServer(monitoringReloader) - - // Create a policy with one input and one output - cfg := config.MustNewConfigFrom(` -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - cfgChange := &configChange{cfg: cfg} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is started by default - assert.True(t, monitoringServer.startTriggered) - assert.True(t, monitoringServer.isRunning) - - // disable monitoring - cfgDisableMonitoring := config.MustNewConfigFrom(` -agent.monitoring.enabled: false -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgDisableMonitoring} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is stopped: monitoring is disabled - assert.True(t, monitoringServer.stopTriggered) - assert.False(t, monitoringServer.isRunning) - - // enable monitoring - cfgEnabledMonitoring := config.MustNewConfigFrom(` -agent.monitoring.enabled: true -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgEnabledMonitoring} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is started again - assert.True(t, monitoringServer.startTriggered) - assert.True(t, monitoringServer.isRunning) - - // enable monitoring and disable metrics - cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(` -agent.monitoring.enabled: true -agent.monitoring.metrics: false -outputs: - default: - type: elasticsearch -inputs: - - id: test-input - type: filestream - use_output: default -`) - - // Send the policy change and make sure it was acknowledged. - monitoringServer.Reset() - cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics} - configChan <- cfgChange - coord.runLoopIteration(ctx) - assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") - - // server is stopped: monitoring.metrics is disabled - assert.True(t, monitoringServer.stopTriggered) - assert.False(t, monitoringServer.isRunning) -} - func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) { // Send a test policy to the Coordinator as a Config Manager update, // verify it generates the right component model and sends it to the @@ -994,25 +861,3 @@ func emptyAST(t *testing.T) *transpiler.AST { require.NoError(t, err, "AST creation must succeed") return ast } - -type fakeMonitoringServer struct { - startTriggered bool - stopTriggered bool - isRunning bool -} - -func (fs *fakeMonitoringServer) Start() { - fs.startTriggered = true - fs.isRunning = true -} - -func (fs *fakeMonitoringServer) Stop() error { - fs.stopTriggered = true - fs.isRunning = false - return nil -} - -func (fs *fakeMonitoringServer) Reset() { - fs.stopTriggered = false - fs.startTriggered = false -} diff --git a/internal/pkg/agent/application/monitoring/reload/reload.go b/internal/pkg/agent/application/monitoring/reload/reload.go deleted file mode 100644 index b39d46474f9..00000000000 --- a/internal/pkg/agent/application/monitoring/reload/reload.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reload - -import ( - "sync" - - "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" - aConfig "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -// ServerController controls the server runtime -type ServerController interface { - Start() - Stop() error -} -type serverConstructor func() (ServerController, error) - -type ServerReloader struct { - s ServerController - log *logger.Logger - newServerFn serverConstructor - - config *monitoringCfg.MonitoringConfig - isServerRunning bool - isServerRunningLock sync.Mutex -} - -func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader { - sr := &ServerReloader{ - log: log, - config: mcfg, - newServerFn: newServerFn, - } - - return sr -} - -func (sr *ServerReloader) Start() { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - sr.start() -} - -func (sr *ServerReloader) start() { - if sr.s != nil && sr.isServerRunning { - // server is already running - return - } - - sr.log.Info("Starting server") - var err error - sr.s, err = sr.newServerFn() - if err != nil { - sr.log.Errorf("Failed creating a server: %v", err) - return - } - - sr.s.Start() - sr.log.Debugf("Server started") - sr.isServerRunning = true -} - -func (sr *ServerReloader) Stop() error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - return sr.stop() -} - -func (sr *ServerReloader) stop() error { - if sr.s == nil { - // stopping not started server - sr.isServerRunning = false - return nil - } - sr.log.Info("Stopping server") - - sr.isServerRunning = false - if err := sr.s.Stop(); err != nil { - return err - } - - sr.log.Debugf("Server stopped") - sr.s = nil - return nil -} - -func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error { - sr.isServerRunningLock.Lock() - defer sr.isServerRunningLock.Unlock() - - newConfig := configuration.DefaultConfiguration() - if err := rawConfig.Unpack(&newConfig); err != nil { - return errors.New(err, "failed to unpack monitoring config during reload") - } - - sr.config = newConfig.Settings.MonitoringConfig - - shouldRunMetrics := sr.config.Enabled && sr.config.MonitorMetrics - if shouldRunMetrics && !sr.isServerRunning { - sr.start() - - sr.isServerRunning = true - return nil - } - - if !shouldRunMetrics && sr.isServerRunning { - sr.isServerRunning = false - return sr.stop() - } - - return nil -} diff --git a/internal/pkg/agent/application/monitoring/reload/reload_test.go b/internal/pkg/agent/application/monitoring/reload/reload_test.go deleted file mode 100644 index b686c5346a8..00000000000 --- a/internal/pkg/agent/application/monitoring/reload/reload_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reload - -import ( - "testing" - - "github.com/stretchr/testify/require" - - aConfig "github.com/elastic/elastic-agent/internal/pkg/config" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -func TestReload(t *testing.T) { - tcs := []struct { - name string - currEnabled bool - currMetrics bool - currRunning bool - - newConfig string - expectedRunning bool - expectedStart bool - expectedStop bool - }{ - { - "start with default config", - false, false, false, - ``, - true, true, false, - }, - { - "start when not running, monitoring enabled", - false, false, false, - ` -agent.monitoring.enabled: true -`, - true, true, false, - }, - { - "do not start when not running, only metrics enabled", - false, false, false, - ` -agent.monitoring.enabled: false -agent.monitoring.metrics: true -`, - false, false, false, - }, - - { - "stop when running, monitoring disabled", - true, true, true, - ` -agent.monitoring.enabled: false -`, - false, false, true, - }, - { - "stop when running, monitoring.metrics disabled", - true, true, true, - ` -agent.monitoring.metrics: false -`, - false, false, true, - }, - { - "stop stopped server", - false, false, false, - ` -agent.monitoring.metrics: false -`, - false, false, false, - }, - { - "start started server", - true, true, true, - ` -agent.monitoring.enabled: true -`, - true, false, false, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - fsc := &fakeServerController{} - log, _ := logger.NewTesting(tc.name) - cfg := &monitoringCfg.MonitoringConfig{ - Enabled: tc.currEnabled, - MonitorMetrics: tc.currMetrics, - } - r := NewServerReloader( - func() (ServerController, error) { - return fsc, nil - }, - log, - cfg, - ) - r.isServerRunning = tc.currRunning - if tc.currRunning { - r.s = fsc - } - - newCfg := aConfig.MustNewConfigFrom(tc.newConfig) - require.NoError(t, r.Reload(newCfg)) - - require.Equal(t, tc.expectedRunning, r.isServerRunning) - require.Equal(t, tc.expectedStart, fsc.startTriggered) - require.Equal(t, tc.expectedStop, fsc.stopTriggered) - }) - } -} - -type fakeServerController struct { - startTriggered bool - stopTriggered bool -} - -func (fsc *fakeServerController) Start() { fsc.startTriggered = true } -func (fsc *fakeServerController) Stop() error { - fsc.stopTriggered = true - return nil -} -func (fsc *fakeServerController) Reset() { - fsc.startTriggered = false - fsc.stopTriggered = false -} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 7f43bf1866b..390a472d5ed 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -20,9 +20,6 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" - monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -35,8 +32,7 @@ func NewServer( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, - mcfg *monitoringCfg.MonitoringConfig, -) (*reload.ServerReloader, error) { +) (*api.Server, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore log.Errorf("failed to create monitoring drop: %v", err) @@ -47,7 +43,7 @@ func NewServer( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem, mcfg) + return exposeMetricsEndpoint(log, cfg, ns, tracer, coord, enableProcessStats, operatingSystem) } func exposeMetricsEndpoint( @@ -58,8 +54,7 @@ func exposeMetricsEndpoint( coord *coordinator.Coordinator, enableProcessStats bool, operatingSystem string, - mcfg *monitoringCfg.MonitoringConfig, -) (*reload.ServerReloader, error) { +) (*api.Server, error) { r := mux.NewRouter() if tracer != nil { r.Use(apmgorilla.Middleware(apmgorilla.WithTracer(tracer))) @@ -77,15 +72,7 @@ func exposeMetricsEndpoint( mux := http.NewServeMux() mux.Handle("/", r) - newServerFn := func() (reload.ServerController, error) { - apiServer, err := api.New(log, mux, config) - if err != nil { - return nil, errors.New(err, "failed to create api server") - } - return apiServer, nil - } - - return reload.NewServerReloader(newServerFn, log, mcfg), nil + return api.New(log, mux, config) } func createAgentMonitoringDrop(drop string) error { diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 5c3f94e3e6a..79522096b8c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "strings" + "time" "unicode" "github.com/elastic/elastic-agent/pkg/component" @@ -51,6 +52,10 @@ const ( agentName = "elastic-agent" windowsOS = "windows" + + // metricset execution period used for the monitoring metrics inputs + // we set this to 60s to reduce the load/data volume on the monitoring cluster + metricsCollectionInterval = 60 * time.Second ) var ( @@ -517,6 +522,8 @@ func (b *BeatsMonitor) monitoringNamespace() string { } func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error { + + metricsCollectionIntervalString := metricsCollectionInterval.String() monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) @@ -532,7 +539,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)}, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -608,7 +615,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI }, "metricsets": []interface{}{"stats", "state"}, "hosts": endpoints, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -663,7 +670,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "hosts": endpoints, "path": "/stats", "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fixedAgentName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -725,7 +732,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/inputs/", "namespace": fbDataStreamName, "json.is_array": true, - "period": "10s", + "period": metricsCollectionIntervalString, "index": fmt.Sprintf("metrics-elastic_agent.%s-%s", fbDataStreamName, monitoringNamespace), "processors": []interface{}{ map[string]interface{}{ @@ -799,7 +806,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/shipper", "hosts": endpoints, "namespace": "application", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }, map[string]interface{}{ @@ -813,7 +820,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI "path": "/stats", "hosts": endpoints, "namespace": "agent", - "period": "10s", + "period": metricsCollectionIntervalString, "processors": createProcessorsForJSONInput(name, monitoringNamespace, b.agentInfo), }) } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go new file mode 100644 index 00000000000..9c7ea042d7f --- /dev/null +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -0,0 +1,96 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" +) + +func TestMonitoringConfigMetricsInterval(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + mCfg := &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + }, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + b := &BeatsMonitor{ + enabled: true, + config: mCfg, + operatingSystem: runtime.GOOS, + agentInfo: agentInfo, + } + got, err := b.MonitoringConfig(policy, nil, map[string]string{"foobeat": "filebeat"}) // put a componentID/binary mapping to have something in the beats monitoring input + assert.NoError(t, err) + + rawInputs, ok := got["inputs"] + require.True(t, ok, "monitoring config contains no input") + inputs, ok := rawInputs.([]any) + require.True(t, ok, "monitoring inputs are not a list") + marshaledInputs, err := yaml.Marshal(inputs) + if assert.NoError(t, err, "error marshaling monitoring inputs") { + t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs) + } + + // loop over the created inputs + for _, i := range inputs { + input, ok := i.(map[string]any) + if assert.Truef(t, ok, "input is not represented as a map: %v", i) { + inputID := input["id"] + t.Logf("input %q", inputID) + // check the streams created for the input, should be a list of objects + if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + // loop over streams and cast to map[string]any to access keys + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && + assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + periodString := stream["period"].(string) + duration, err := time.ParseDuration(periodString) + if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { + assert.Equalf(t, duration, 60*time.Second, "unexpected duration for stream %q of input %q", streamID, inputID) + } + } + } + } + } + } + + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go index 263f6ea8bf5..e42a35c76a4 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go @@ -158,7 +158,7 @@ func (v *Verifier) verifyAsc(fullPath string, skipDefaultPgp bool, pgpSources .. v.log.Infof("Verification with PGP[%d] successful", i) return nil } - v.log.Warnf("Verification with PGP[%d] succfailed: %v", i, err) + v.log.Warnf("Verification with PGP[%d] failed: %v", i, err) } v.log.Warnf("Verification failed") diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index 4336c4153d8..7be3ae1066f 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -17,14 +17,12 @@ import ( "strings" "time" - "github.com/docker/go-units" - - "github.com/elastic/elastic-agent-libs/atomic" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/core/logger" ) const ( @@ -35,6 +33,10 @@ const ( // the default timeout is 10 minutes and this will have it log every 30 seconds. downloadProgressIntervalPercentage = 0.05 + // downloadProgressDefaultInterval defines the default interval at which the current download progress will be reported. + // This value is used if the timeout is not specified (and therefore equal to 0). + downloadProgressMinInterval = 10 * time.Second + // warningProgressIntervalPercentage defines how often to log messages as a warning once the amount of time // passed is this percentage or more of the total allotted time to download. warningProgressIntervalPercentage = 0.75 @@ -42,13 +44,13 @@ const ( // Downloader is a downloader able to fetch artifacts from elastic.co web page. type Downloader struct { - log progressLogger + log *logger.Logger config *artifact.Config client http.Client } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) { client, err := config.HTTPTransportSettings.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second}, @@ -62,7 +64,7 @@ func NewDownloader(log progressLogger, config *artifact.Config) (*Downloader, er } // NewDownloaderWithClient creates Elastic Downloader with specific client used -func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client http.Client) *Downloader { +func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader { return &Downloader{ log: log, config: config, @@ -204,149 +206,16 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } } - reportCtx, reportCancel := context.WithCancel(ctx) - dp := newDownloadProgressReporter(e.log, sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize) - dp.Report(reportCtx) + loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout) + dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver) + dp.Report(ctx) _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { - reportCancel() dp.ReportFailed(err) // return path, file already exists and needs to be cleaned up return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } - reportCancel() dp.ReportComplete() return fullPath, nil } - -type downloadProgressReporter struct { - log progressLogger - sourceURI string - timeout time.Duration - interval time.Duration - warnTimeout time.Duration - length float64 - - downloaded atomic.Int - started time.Time -} - -func newDownloadProgressReporter(log progressLogger, sourceURI string, timeout time.Duration, length int) *downloadProgressReporter { - return &downloadProgressReporter{ - log: log, - sourceURI: sourceURI, - timeout: timeout, - interval: time.Duration(float64(timeout) * downloadProgressIntervalPercentage), - warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage), - length: float64(length), - } -} - -func (dp *downloadProgressReporter) Write(b []byte) (int, error) { - n := len(b) - dp.downloaded.Add(n) - return n, nil -} - -func (dp *downloadProgressReporter) Report(ctx context.Context) { - started := time.Now() - dp.started = started - sourceURI := dp.sourceURI - log := dp.log - length := dp.length - warnTimeout := dp.warnTimeout - interval := dp.interval - - go func() { - t := time.NewTimer(interval) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - now := time.Now() - timePast := now.Sub(started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - - var msg string - var args []interface{} - if length > 0 { - // length of the download is known, so more detail can be provided - percentComplete := downloaded / length * 100.0 - msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps" - args = []interface{}{ - sourceURI, units.HumanSize(downloaded), units.HumanSize(length), percentComplete, units.HumanSize(bytesPerSecond), - } - } else { - // length unknown so provide the amount downloaded and the speed - msg = "download progress from %s has fetched %s @ %sps" - args = []interface{}{ - sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), - } - } - - log.Infof(msg, args...) - if timePast >= warnTimeout { - // duplicate to warn when over the warnTimeout; this still has it logging to info that way if - // they are filtering the logs to info they still see the messages when over the warnTimeout, but - // when filtering only by warn they see these messages only - log.Warnf(msg, args...) - } - } - } - }() -} - -func (dp *downloadProgressReporter) ReportComplete() { - now := time.Now() - timePast := now.Sub(dp.started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - msg := "download from %s completed in %s @ %sps" - args := []interface{}{ - dp.sourceURI, units.HumanDuration(timePast), units.HumanSize(bytesPerSecond), - } - dp.log.Infof(msg, args...) - if timePast >= dp.warnTimeout { - // see reason in `Report` - dp.log.Warnf(msg, args...) - } -} - -func (dp *downloadProgressReporter) ReportFailed(err error) { - now := time.Now() - timePast := now.Sub(dp.started) - downloaded := float64(dp.downloaded.Load()) - bytesPerSecond := downloaded / float64(timePast/time.Second) - var msg string - var args []interface{} - if dp.length > 0 { - // length of the download is known, so more detail can be provided - percentComplete := downloaded / dp.length * 100.0 - msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s" - args = []interface{}{ - dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(dp.length), percentComplete, units.HumanSize(bytesPerSecond), err, - } - } else { - // length unknown so provide the amount downloaded and the speed - msg = "download from %s failed at %s @ %sps: %s" - args = []interface{}{ - dp.sourceURI, units.HumanSize(downloaded), units.HumanSize(bytesPerSecond), err, - } - } - dp.log.Infof(msg, args...) - if timePast >= dp.warnTimeout { - // see reason in `Report` - dp.log.Warnf(msg, args...) - } -} - -// progressLogger is a logger that only needs to implement Infof and Warnf, as those are the only functions -// that the downloadProgressReporter uses. -type progressLogger interface { - Infof(format string, args ...interface{}) - Warnf(format string, args ...interface{}) -} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index a49c9b6d154..119173e1344 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -6,17 +6,22 @@ package http import ( "context" + "fmt" "io/ioutil" "net" "net/http" "net/http/httptest" "os" + "regexp" "strconv" - "sync" "testing" "time" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/docker/go-units" "github.com/stretchr/testify/assert" @@ -57,7 +62,7 @@ func TestDownloadBodyError(t *testing.T) { Architecture: "64", } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) @@ -65,21 +70,23 @@ func TestDownloadBodyError(t *testing.T) { t.Fatal("expected Download to return an error") } - log.lock.RLock() - defer log.lock.RUnlock() + infoLogs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + warnLogs := obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() - require.GreaterOrEqual(t, len(log.info), 1, "download error not logged at info level") - assert.True(t, containsMessage(log.info, "download from %s failed at %s @ %sps: %s")) - require.GreaterOrEqual(t, len(log.warn), 1, "download error not logged at warn level") - assert.True(t, containsMessage(log.warn, "download from %s failed at %s @ %sps: %s")) + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedMsg := fmt.Sprintf("download from %s failed at 0B @ NaNBps: unexpected EOF", expectedURL) + require.GreaterOrEqual(t, len(infoLogs), 1, "download error not logged at info level") + assert.True(t, containsMessage(infoLogs, expectedMsg)) + require.GreaterOrEqual(t, len(warnLogs), 1, "download error not logged at warn level") + assert.True(t, containsMessage(warnLogs, expectedMsg)) } func TestDownloadLogProgressWithLength(t *testing.T) { - fileSize := 100 * units.MiB + fileSize := 100 * units.MB chunks := 100 chunk := make([]byte, fileSize/chunks) delayBetweenChunks := 10 * time.Millisecond - totalTime := time.Duration(chunks) * (delayBetweenChunks + 1*time.Millisecond) + totalTime := time.Duration(chunks) * delayBetweenChunks srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Length", strconv.Itoa(fileSize)) @@ -111,25 +118,50 @@ func TestDownloadLogProgressWithLength(t *testing.T) { }, } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") - log.lock.RLock() - defer log.lock.RUnlock() - - // 2 files are downloaded so 4 log messages are expected in the info level and only the complete is over the warn - // window as 2 log messages for warn. - require.Len(t, log.info, 4) - assert.Equal(t, log.info[0].record, "download progress from %s is %s/%s (%.2f%% complete) @ %sps") - assert.Equal(t, log.info[1].record, "download from %s completed in %s @ %sps") - assert.Equal(t, log.info[2].record, "download progress from %s is %s/%s (%.2f%% complete) @ %sps") - assert.Equal(t, log.info[3].record, "download from %s completed in %s @ %sps") - require.Len(t, log.warn, 2) - assert.Equal(t, log.warn[0].record, "download from %s completed in %s @ %sps") - assert.Equal(t, log.warn[1].record, "download from %s completed in %s @ %sps") + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedProgressRegexp := regexp.MustCompile( + `^download progress from ` + expectedURL + `(.sha512)? is \S+/\S+ \(\d+\.\d{2}% complete\) @ \S+$`, + ) + expectedCompletedRegexp := regexp.MustCompile( + `^download from ` + expectedURL + `(.sha512)? completed in \d+ \S+ @ \S+$`, + ) + + // Consider only progress logs + obs = obs.Filter(func(entry observer.LoggedEntry) bool { + return expectedProgressRegexp.MatchString(entry.Message) || + expectedCompletedRegexp.MatchString(entry.Message) + }) + + // Two files are downloaded. Each file is being downloaded in 100 chunks with a delay of 10ms between chunks. The + // expected time to download is, therefore, 100 * 10ms = 1000ms. In reality, the actual download time will be a bit + // more than 1000ms because some time is spent downloading the chunk, in between inter-chunk delays. + // Reporting happens every 0.05 * 1000ms = 50ms. We expect there to be as many log messages at that INFO level as + // the actual total download time / 50ms, for each file. That works out to at least 1000ms / 50ms = 20 INFO log + // messages, for each file, about its download progress. Additionally, we should expect 1 INFO log message, for + // each file, about the download completing. + logs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + failed := assertLogs(t, logs, 20, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } + + // By similar math as above, since the download of each file is expected to take 1000ms, and the progress logger + // starts issuing WARN messages once the download has taken more than 75% of the expected time, + // we should see warning messages for at least the last 250 seconds of the download. Given that + // reporting happens every 50 seconds, we should see at least 250s / 50s = 5 WARN log messages, for each file, + // about its download progress. Additionally, we should expect 1 WARN message, for each file, about the download + // completing. + logs = obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() + failed = assertLogs(t, logs, 5, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } } func TestDownloadLogProgressWithoutLength(t *testing.T) { @@ -137,7 +169,7 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { chunks := 100 chunk := make([]byte, fileSize/chunks) delayBetweenChunks := 10 * time.Millisecond - totalTime := time.Duration(chunks) * (delayBetweenChunks + 1*time.Millisecond) + totalTime := time.Duration(chunks) * delayBetweenChunks srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -168,62 +200,97 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { }, } - log := newRecordLogger() + log, obs := logger.NewTesting("downloader") testClient := NewDownloaderWithClient(log, config, *client) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") - log.lock.RLock() - defer log.lock.RUnlock() - - // 2 files are downloaded so 4 log messages are expected in the info level and only the complete is over the warn - // window as 2 log messages for warn. - require.Len(t, log.info, 4) - assert.Equal(t, log.info[0].record, "download progress from %s has fetched %s @ %sps") - assert.Equal(t, log.info[1].record, "download from %s completed in %s @ %sps") - assert.Equal(t, log.info[2].record, "download progress from %s has fetched %s @ %sps") - assert.Equal(t, log.info[3].record, "download from %s completed in %s @ %sps") - require.Len(t, log.warn, 2) - assert.Equal(t, log.warn[0].record, "download from %s completed in %s @ %sps") - assert.Equal(t, log.warn[1].record, "download from %s completed in %s @ %sps") -} + expectedURL := fmt.Sprintf("%s/%s-%s-%s", srv.URL, "beats/filebeat/filebeat", version, "linux-x86_64.tar.gz") + expectedProgressRegexp := regexp.MustCompile( + `^download progress from ` + expectedURL + `(.sha512)? has fetched \S+ @ \S+$`, + ) + expectedCompletedRegexp := regexp.MustCompile( + `^download from ` + expectedURL + `(.sha512)? completed in \d+ \S+ @ \S+$`, + ) + + // Consider only progress logs + obs = obs.Filter(func(entry observer.LoggedEntry) bool { + return expectedProgressRegexp.MatchString(entry.Message) || + expectedCompletedRegexp.MatchString(entry.Message) + }) + + // Two files are downloaded. Each file is being downloaded in 100 chunks with a delay of 10ms between chunks. The + // expected time to download is, therefore, 100 * 10ms = 1000ms. In reality, the actual download time will be a bit + // more than 1000ms because some time is spent downloading the chunk, in between inter-chunk delays. + // Reporting happens every 0.05 * 1000ms = 50ms. We expect there to be as many log messages at that INFO level as + // the actual total download time / 50ms, for each file. That works out to at least 1000ms / 50ms = 20 INFO log + // messages, for each file, about its download progress. Additionally, we should expect 1 INFO log message, for + // each file, about the download completing. + logs := obs.FilterLevelExact(zapcore.InfoLevel).TakeAll() + failed := assertLogs(t, logs, 20, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } -type logMessage struct { - record string - args []interface{} + // By similar math as above, since the download of each file is expected to take 1000ms, and the progress logger + // starts issuing WARN messages once the download has taken more than 75% of the expected time, + // we should see warning messages for at least the last 250 seconds of the download. Given that + // reporting happens every 50 seconds, we should see at least 250s / 50s = 5 WARN log messages, for each file, + // about its download progress. Additionally, we should expect 1 WARN message, for each file, about the download + // completing. + logs = obs.FilterLevelExact(zapcore.WarnLevel).TakeAll() + failed = assertLogs(t, logs, 5, expectedProgressRegexp, expectedCompletedRegexp) + if failed { + printLogs(t, logs) + } } -type recordLogger struct { - lock sync.RWMutex - info []logMessage - warn []logMessage +func containsMessage(logs []observer.LoggedEntry, msg string) bool { + for _, item := range logs { + if item.Message == msg { + return true + } + } + return false } -func newRecordLogger() *recordLogger { - return &recordLogger{ - info: make([]logMessage, 0, 10), - warn: make([]logMessage, 0, 10), +func assertLogs(t *testing.T, logs []observer.LoggedEntry, minExpectedProgressLogs int, expectedProgressRegexp, expectedCompletedRegexp *regexp.Regexp) bool { + t.Helper() + + // Verify that we've logged at least minExpectedProgressLogs (about download progress) + 1 log + // message (about download completion), for each of the two files being downloaded. + require.GreaterOrEqual(t, len(logs), (minExpectedProgressLogs+1)*2) + + // Verify that the first minExpectedProgressLogs messages are about the download progress (for the first file). + i := 0 + failed := false + for ; i < minExpectedProgressLogs; i++ { + failed = failed || assert.Regexp(t, expectedProgressRegexp, logs[i].Message) } -} -func (f *recordLogger) Infof(record string, args ...interface{}) { - f.lock.Lock() - defer f.lock.Unlock() - f.info = append(f.info, logMessage{record, args}) -} + // Find the next message that's about the download being completed (for the first file). + found := false + for ; i < len(logs) && !found; i++ { + found = expectedCompletedRegexp.MatchString(logs[i].Message) + } + failed = failed || assert.True(t, found) + + // Verify that the next minExpectedProgressLogs messages are about the download progress (for the second file). + for j := 0; j < minExpectedProgressLogs; j++ { + failed = failed || assert.Regexp(t, expectedProgressRegexp, logs[i+j].Message) + } + + // Verify that the last message is about the download being completed (for the second file). + failed = failed || assert.Regexp(t, expectedCompletedRegexp, logs[len(logs)-1].Message) -func (f *recordLogger) Warnf(record string, args ...interface{}) { - f.lock.Lock() - defer f.lock.Unlock() - f.warn = append(f.warn, logMessage{record, args}) + return failed } -func containsMessage(logs []logMessage, msg string) bool { - for _, item := range logs { - if item.record == msg { - return true - } +// printLogs is called in case one of the assertions fails; it's useful for debugging +func printLogs(t *testing.T, logs []observer.LoggedEntry) { + t.Helper() + for _, entry := range logs { + t.Logf("[%s] %s", entry.Level, entry.Message) } - return false } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go new file mode 100644 index 00000000000..4eef0682a50 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go @@ -0,0 +1,97 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "time" + + "github.com/docker/go-units" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type progressObserver interface { + // Report is called on a periodic basis with information about the download's progress so far. + Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64) + + // ReportCompleted is called when the download completes successfully. + ReportCompleted(sourceURI string, timePast time.Duration, downloadRate float64) + + // ReportFailed is called if the download does not complete successfully. + ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64, err error) +} + +type loggingProgressObserver struct { + log *logger.Logger + warnTimeout time.Duration +} + +func newLoggingProgressObserver(log *logger.Logger, downloadTimeout time.Duration) *loggingProgressObserver { + return &loggingProgressObserver{ + log: log, + warnTimeout: time.Duration(float64(downloadTimeout) * warningProgressIntervalPercentage), + } +} + +func (lpObs *loggingProgressObserver) Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64) { + var msg string + var args []interface{} + if totalBytes > 0 { + // length of the download is known, so more detail can be provided + msg = "download progress from %s is %s/%s (%.2f%% complete) @ %sps" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(totalBytes), percentComplete, units.HumanSize(downloadRate), + } + } else { + // length unknown so provide the amount downloaded and the speed + msg = "download progress from %s has fetched %s @ %sps" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(downloadRate), + } + } + + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // duplicate to warn when over the warnTimeout; this still has it logging to info that way if + // they are filtering the logs to info they still see the messages when over the warnTimeout, but + // when filtering only by warn they see these messages only + lpObs.log.Warnf(msg, args...) + } +} + +func (lpObs *loggingProgressObserver) ReportCompleted(sourceURI string, timePast time.Duration, downloadRate float64) { + msg := "download from %s completed in %s @ %sps" + args := []interface{}{ + sourceURI, units.HumanDuration(timePast), units.HumanSize(downloadRate), + } + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // see reason in `Report` + lpObs.log.Warnf(msg, args...) + } +} + +func (lpObs *loggingProgressObserver) ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRate float64, err error) { + var msg string + var args []interface{} + if totalBytes > 0 { + // length of the download is known, so more detail can be provided + msg = "download from %s failed at %s/%s (%.2f%% complete) @ %sps: %s" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(totalBytes), percentComplete, units.HumanSize(downloadRate), err, + } + } else { + // length unknown so provide the amount downloaded and the speed + msg = "download from %s failed at %s @ %sps: %s" + args = []interface{}{ + sourceURI, units.HumanSize(downloadedBytes), units.HumanSize(downloadRate), err, + } + } + lpObs.log.Infof(msg, args...) + if timePast >= lpObs.warnTimeout { + // see reason in `Report` + lpObs.log.Warnf(msg, args...) + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go new file mode 100644 index 00000000000..491646b3ab5 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_reporter.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "context" + "time" + + "github.com/elastic/elastic-agent-libs/atomic" +) + +type downloadProgressReporter struct { + sourceURI string + interval time.Duration + warnTimeout time.Duration + length float64 + + downloaded atomic.Int + started time.Time + + progressObservers []progressObserver + done chan struct{} +} + +func newDownloadProgressReporter(sourceURI string, timeout time.Duration, length int, progressObservers ...progressObserver) *downloadProgressReporter { + interval := time.Duration(float64(timeout) * downloadProgressIntervalPercentage) + if interval == 0 { + interval = downloadProgressMinInterval + } + + return &downloadProgressReporter{ + sourceURI: sourceURI, + interval: interval, + warnTimeout: time.Duration(float64(timeout) * warningProgressIntervalPercentage), + length: float64(length), + progressObservers: progressObservers, + done: make(chan struct{}), + } +} + +func (dp *downloadProgressReporter) Write(b []byte) (int, error) { + n := len(b) + dp.downloaded.Add(n) + return n, nil +} + +// Report periodically reports download progress to registered observers. Callers MUST either +// cancel the context provided to this method OR call either ReportComplete or ReportFailed when +// they no longer need the downloadProgressReporter to avoid resource leaks. +func (dp *downloadProgressReporter) Report(ctx context.Context) { + started := time.Now() + dp.started = started + sourceURI := dp.sourceURI + length := dp.length + interval := dp.interval + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-dp.done: + return + case <-t.C: + now := time.Now() + timePast := now.Sub(started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + var percentComplete float64 + if length > 0 { + percentComplete = downloaded / length * 100.0 + } + + for _, obs := range dp.progressObservers { + obs.Report(sourceURI, timePast, downloaded, length, percentComplete, bytesPerSecond) + } + } + } + }() +} + +// ReportComplete reports the completion of a download to registered observers. Callers MUST call +// either ReportComplete or ReportFailed when they no longer need the downloadProgressReporter +// to avoid resource leaks. +func (dp *downloadProgressReporter) ReportComplete() { + defer close(dp.done) + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + now := time.Now() + timePast := now.Sub(dp.started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + + for _, obs := range dp.progressObservers { + obs.ReportCompleted(dp.sourceURI, timePast, bytesPerSecond) + } +} + +// ReportFailed reports the failure of a download to registered observers. Callers MUST call +// either ReportFailed or ReportComplete when they no longer need the downloadProgressReporter +// to avoid resource leaks. +func (dp *downloadProgressReporter) ReportFailed(err error) { + defer close(dp.done) + + // If there are no observers to report progress to, there is nothing to do! + if len(dp.progressObservers) == 0 { + return + } + + now := time.Now() + timePast := now.Sub(dp.started) + downloaded := float64(dp.downloaded.Load()) + bytesPerSecond := downloaded / float64(timePast/time.Second) + var percentComplete float64 + if dp.length > 0 { + percentComplete = downloaded / dp.length * 100.0 + } + + for _, obs := range dp.progressObservers { + obs.ReportFailed(dp.sourceURI, timePast, downloaded, dp.length, percentComplete, bytesPerSecond, err) + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go index 84f3f6045f0..99fca9f65d4 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + "github.com/elastic/elastic-agent/pkg/core/logger" ) const ( @@ -33,7 +34,7 @@ type Verifier struct { client http.Client pgpBytes []byte allowEmptyPgp bool - log progressLogger + log *logger.Logger } func (v *Verifier) Name() string { @@ -42,7 +43,7 @@ func (v *Verifier) Name() string { // NewVerifier create a verifier checking downloaded package on preconfigured // location against a key stored on elastic.co website. -func NewVerifier(log progressLogger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { +func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { if len(pgp) == 0 && !allowEmptyPgp { return nil, errors.New("expecting PGP but retrieved none", errors.TypeSecurity) } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 353c9d1e7a7..91d470ac00b 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" @@ -249,15 +248,12 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration } defer composable.Close() - monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) + serverStopFn, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord) if err != nil { return err } - coord.RegisterMonitoringServer(monitoringServer) defer func() { - if monitoringServer != nil { - _ = monitoringServer.Stop() - } + _ = serverStopFn() }() diagHooks := diagnostics.GlobalHooks() @@ -551,7 +547,7 @@ func setupMetrics( cfg *monitoringCfg.MonitoringConfig, tracer *apm.Tracer, coord *coordinator.Coordinator, -) (*reload.ServerReloader, error) { +) (func() error, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err } @@ -562,12 +558,14 @@ func setupMetrics( Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), } - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem, cfg) + s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, isProcessStatsEnabled(cfg), operatingSystem) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } + s.Start() - return s, nil + // return server stopper + return s.Stop, nil } func isProcessStatsEnabled(cfg *monitoringCfg.MonitoringConfig) bool { diff --git a/magefile.go b/magefile.go index 183889b3e69..bf7b335cbe8 100644 --- a/magefile.go +++ b/magefile.go @@ -979,10 +979,10 @@ func packageAgent(platforms []string, packagingFn func()) { panic(err) } - packagesMissing := false packagesCopied := 0 if !requiredPackagesPresent(pwd, b, packageVersion, requiredPackages) { + fmt.Printf("--- Package %s\n", pwd) cmd := exec.Command("mage", "package") cmd.Dir = pwd cmd.Stdout = os.Stdout @@ -1008,6 +1008,15 @@ func packageAgent(platforms []string, packagingFn func()) { targetPath := filepath.Join(archivePath, rp) os.MkdirAll(targetPath, 0755) for _, f := range files { + // safety check; if the user has an older version of the beats repo, + // for example right after a release where you've `git pulled` from on repo and not the other, + // they might end up with a mishmash of packages from different versions. + // check to see if we have mismatched versions. + if !strings.Contains(f, packageVersion) { + // if this panic hits weird edge cases where we don't want actual failures, revert to a printf statement. + panic(fmt.Sprintf("the file %s doesn't match agent version %s, beats repo might be out of date", f, packageVersion)) + } + targetFile := filepath.Join(targetPath, filepath.Base(f)) packagesCopied += 1 if err := sh.Copy(targetFile, f); err != nil { @@ -1017,8 +1026,8 @@ func packageAgent(platforms []string, packagingFn func()) { } // a very basic footcannon protector; if packages are missing and we need to rebuild them, check to see if those files were copied // if we needed to repackage beats but still somehow copied nothing, could indicate an issue. Usually due to beats and agent being at different versions. - if packagesMissing && packagesCopied == 0 { - fmt.Printf(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") + if packagesCopied == 0 { + fmt.Println(">>> WARNING: no packages were copied, but we repackaged beats anyway. Check binary to see if intended beats are there.") } } } @@ -1153,6 +1162,7 @@ func copyComponentSpecs(componentName, versionedDropPath string) (string, error) targetPath := filepath.Join(versionedDropPath, specFileName) if _, err := os.Stat(targetPath); err != nil { + fmt.Printf(">> File %s does not exist, reverting to local specfile\n", targetPath) // spec not present copy from local sourceSpecFile := filepath.Join("specs", specFileName) if mg.Verbose() { diff --git a/pkg/component/fake/component/comp/apm.go b/pkg/component/fake/component/comp/apm.go index 56882088b12..12d7a264794 100644 --- a/pkg/component/fake/component/comp/apm.go +++ b/pkg/component/fake/component/comp/apm.go @@ -128,6 +128,11 @@ func (ats *apmTracesSender) createNewTracer(cfg *proto.APMConfig) (*apm.Tracer, defer os.Unsetenv(envCACert) } + if cfg.Elastic.GetGlobalLabels() != "" { + os.Setenv(envGlobalLabels, cfg.Elastic.GetGlobalLabels()) + defer os.Unsetenv(envGlobalLabels) + } + ts, err := apmtransport.NewHTTPTransport() if err != nil { return nil, err @@ -197,6 +202,10 @@ func (fai *fakeAPMInput) Update(u *client.Unit, triggers client.Trigger) error { func newFakeAPMInput(logger zerolog.Logger, logLevel client.UnitLogLevel, unit *client.Unit) (*fakeAPMInput, error) { logger = logger.Level(toZerologLevel(logLevel)) + + // close the default tracer to avoid unnecessary logs + apm.DefaultTracer.Close() + apmInput := &fakeAPMInput{ logger: logger, unit: unit, @@ -206,6 +215,7 @@ func newFakeAPMInput(logger zerolog.Logger, logLevel client.UnitLogLevel, unit * if err != nil { return apmInput, fmt.Errorf("error while setting starting state: %w", err) } + logger.Info().Msgf("Starting fake APM traces sender with config %v", unit.Expected().APMConfig) err = apmInput.sender.Start(context.Background(), unit.Expected().APMConfig) if err != nil { return apmInput, fmt.Errorf("error starting apm tracer sender: %w", err) @@ -231,6 +241,7 @@ func senderErrorLogger(ctx context.Context, logger zerolog.Logger, errCh <-chan if err != nil { logger.Err(err).Msg("sender error") _ = unit.UpdateState(client.UnitStateDegraded, fmt.Sprintf("sender error: %s", err), nil) + continue } _ = unit.UpdateState(client.UnitStateHealthy, fmt.Sprintf("sender error: %s", err), nil) } diff --git a/pkg/testing/ess/create_deployment_csp_configuration.yaml b/pkg/testing/ess/create_deployment_csp_configuration.yaml new file mode 100644 index 00000000000..199f664a65a --- /dev/null +++ b/pkg/testing/ess/create_deployment_csp_configuration.yaml @@ -0,0 +1,15 @@ +gcp: + integrations_server_conf_id: "gcp.integrationsserver.n2.68x32x45.2" + elasticsearch_conf_id: "gcp.es.datahot.n2.68x10x45" + elasticsearch_deployment_template_id: "gcp-storage-optimized-v5" + kibana_instance_configuration_id: "gcp.kibana.n2.68x32x45" +azure: + integrations_server_conf_id: "azure.integrationsserver.fsv2.2" + elasticsearch_conf_id: "azure.es.datahot.edsv4" + elasticsearch_deployment_template_id: "azure-storage-optimized-v2" + kibana_instance_configuration_id: "azure.kibana.fsv2" +aws: + integrations_server_conf_id: "aws.integrationsserver.c5d.2.1" + elasticsearch_conf_id: "aws.es.datahot.i3.1.1" + elasticsearch_deployment_template_id: "aws-storage-optimized-v5" + kibana_instance_configuration_id: "aws.kibana.c5d.1.1" \ No newline at end of file diff --git a/pkg/testing/ess/create_deployment_request.tmpl.json b/pkg/testing/ess/create_deployment_request.tmpl.json new file mode 100644 index 00000000000..3ef93868708 --- /dev/null +++ b/pkg/testing/ess/create_deployment_request.tmpl.json @@ -0,0 +1,102 @@ +{ + "resources": { + "integrations_server": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "{{ .request.Region }}", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "{{ .integrations_server_conf_id }}", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "integrations_server": { + "version": "{{ .request.Version }}" + } + }, + "ref_id": "main-integrations_server" + } + ], + "elasticsearch": [ + { + "region": "{{ .request.Region }}", + "settings": { + "dedicated_masters_threshold": 6 + }, + "plan": { + "cluster_topology": [ + { + "zone_count": 1, + "elasticsearch": { + "node_attributes": { + "data": "hot" + } + }, + "instance_configuration_id": "{{.elasticsearch_conf_id}}", + "node_roles": [ + "master", + "ingest", + "transform", + "data_hot", + "remote_cluster_client", + "data_content" + ], + "id": "hot_content", + "size": { + "resource": "memory", + "value": 8192 + } + } + ], + "elasticsearch": { + "version": "{{ .request.Version }}", + "enabled_built_in_plugins": [] + }, + "deployment_template": { + "id": "{{ .elasticsearch_deployment_template_id }}" + } + }, + "ref_id": "main-elasticsearch" + } + ], + "enterprise_search": [], + "kibana": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "{{ .request.Region }}", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "{{.kibana_instance_configuration_id}}", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "kibana": { + "version": "{{ .request.Version }}", + "user_settings_json": { + "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] + } + } + }, + "ref_id": "main-kibana" + } + ] + }, + "settings": { + "autoscaling_enabled": false + }, + "name": "{{ .request.Name }}", + "metadata": { + "system_owned": false, + "tags": {{ json .request.Tags }} + } +} \ No newline at end of file diff --git a/pkg/testing/ess/deployment.go b/pkg/testing/ess/deployment.go index 9d9469036b0..a79f8cb58cb 100644 --- a/pkg/testing/ess/deployment.go +++ b/pkg/testing/ess/deployment.go @@ -7,19 +7,28 @@ package ess import ( "bytes" "context" + _ "embed" "encoding/json" "fmt" - "html/template" "net/http" "net/url" "strings" + "text/template" "time" + + "gopkg.in/yaml.v2" ) +type Tag struct { + Key string `json:"key"` + Value string `json:"value"` +} + type CreateDeploymentRequest struct { Name string `json:"name"` Region string `json:"region"` Version string `json:"version"` + Tags []Tag `json:"tags"` } type CreateDeploymentResponse struct { @@ -85,21 +94,16 @@ type DeploymentStatusResponse struct { // CreateDeployment creates the deployment with the specified configuration. func (c *Client) CreateDeployment(ctx context.Context, req CreateDeploymentRequest) (*CreateDeploymentResponse, error) { - tpl, err := deploymentTemplateFactory(req) + reqBodyBytes, err := generateCreateDeploymentRequestBody(req) if err != nil { return nil, err } - var buf bytes.Buffer - if err := tpl.Execute(&buf, req); err != nil { - return nil, fmt.Errorf("unable to create deployment creation request body: %w", err) - } - createResp, err := c.doPost( ctx, "deployments", "application/json", - &buf, + bytes.NewReader(reqBodyBytes), ) if err != nil { return nil, fmt.Errorf("error calling deployment creation API: %w", err) @@ -308,233 +312,70 @@ func overallStatus(statuses ...DeploymentStatus) DeploymentStatus { return overallStatus } -func deploymentTemplateFactory(req CreateDeploymentRequest) (*template.Template, error) { +//go:embed create_deployment_request.tmpl.json +var createDeploymentRequestTemplate string + +//go:embed create_deployment_csp_configuration.yaml +var cloudProviderSpecificValues []byte + +func generateCreateDeploymentRequestBody(req CreateDeploymentRequest) ([]byte, error) { regionParts := strings.Split(req.Region, "-") if len(regionParts) < 2 { return nil, fmt.Errorf("unable to parse CSP out of region [%s]", req.Region) } csp := regionParts[0] - var tplStr string - switch csp { - case "gcp": - tplStr = createDeploymentRequestTemplateGCP - case "azure": - tplStr = createDeploymentRequestTemplateAzure - default: - return nil, fmt.Errorf("unsupported CSP [%s]", csp) + templateContext, err := createDeploymentTemplateContext(csp, req) + if err != nil { + return nil, fmt.Errorf("creating request template context: %w", err) } - tpl, err := template.New("create_deployment_request").Parse(tplStr) + tpl, err := template.New("create_deployment_request"). + Funcs(template.FuncMap{"json": jsonMarshal}). + Parse(createDeploymentRequestTemplate) if err != nil { return nil, fmt.Errorf("unable to parse deployment creation template: %w", err) } - return tpl, nil + var bBuf bytes.Buffer + err = tpl.Execute(&bBuf, templateContext) + if err != nil { + return nil, fmt.Errorf("rendering create deployment request template with context %v : %w", templateContext, err) + } + return bBuf.Bytes(), nil } -const createDeploymentRequestTemplateGCP = ` -{ - "resources": { - "integrations_server": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "gcp.integrationsserver.n2.68x32x45.2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "integrations_server": { - "version": "{{ .Version }}" - } - }, - "ref_id": "main-integrations_server" - } - ], - "elasticsearch": [ - { - "region": "{{ .Region }}", - "settings": { - "dedicated_masters_threshold": 6 - }, - "plan": { - "cluster_topology": [ - { - "zone_count": 1, - "elasticsearch": { - "node_attributes": { - "data": "hot" - } - }, - "instance_configuration_id": "gcp.es.datahot.n2.68x10x45", - "node_roles": [ - "master", - "ingest", - "transform", - "data_hot", - "remote_cluster_client", - "data_content" - ], - "id": "hot_content", - "size": { - "resource": "memory", - "value": 8192 - } - } - ], - "elasticsearch": { - "version": "{{ .Version }}", - "enabled_built_in_plugins": [] - }, - "deployment_template": { - "id": "gcp-storage-optimized-v5" - } - }, - "ref_id": "main-elasticsearch" - } - ], - "enterprise_search": [], - "kibana": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "gcp.kibana.n2.68x32x45", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "kibana": { - "version": "{{ .Version }}", - "user_settings_json": { - "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] - } - } - }, - "ref_id": "main-kibana" - } - ] - }, - "settings": { - "autoscaling_enabled": false - }, - "name": "{{ .Name }}", - "metadata": { - "system_owned": false - } -}` - -const createDeploymentRequestTemplateAzure = ` -{ - "resources": { - "integrations_server": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "azure.integrationsserver.fsv2.2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "integrations_server": { - "version": "{{ .Version }}" - } - }, - "ref_id": "main-integrations_server" - } - ], - "elasticsearch": [ - { - "region": "{{ .Region }}", - "settings": { - "dedicated_masters_threshold": 6 - }, - "plan": { - "cluster_topology": [ - { - "zone_count": 1, - "elasticsearch": { - "node_attributes": { - "data": "hot" - } - }, - "instance_configuration_id": "azure.es.datahot.edsv4", - "node_roles": [ - "master", - "ingest", - "transform", - "data_hot", - "remote_cluster_client", - "data_content" - ], - "id": "hot_content", - "size": { - "resource": "memory", - "value": 8192 - } - } - ], - "elasticsearch": { - "version": "{{ .Version }}", - "enabled_built_in_plugins": [] - }, - "deployment_template": { - "id": "azure-storage-optimized-v2" - } - }, - "ref_id": "main-elasticsearch" - } - ], - "enterprise_search": [], - "kibana": [ - { - "elasticsearch_cluster_ref_id": "main-elasticsearch", - "region": "{{ .Region }}", - "plan": { - "cluster_topology": [ - { - "instance_configuration_id": "azure.kibana.fsv2", - "zone_count": 1, - "size": { - "resource": "memory", - "value": 1024 - } - } - ], - "kibana": { - "version": "{{ .Version }}", - "user_settings_json": { - "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"] - } - } - }, - "ref_id": "main-kibana" - } - ] - }, - "settings": { - "autoscaling_enabled": false - }, - "name": "{{ .Name }}", - "metadata": { - "system_owned": false - } -}` +func jsonMarshal(in any) (string, error) { + jsonBytes, err := json.Marshal(in) + if err != nil { + return "", err + } + + return string(jsonBytes), nil +} + +func createDeploymentTemplateContext(csp string, req CreateDeploymentRequest) (map[string]any, error) { + cspSpecificContext, err := loadCspValues(csp) + if err != nil { + return nil, fmt.Errorf("loading csp-specific values for %q: %w", csp, err) + } + + cspSpecificContext["request"] = req + + return cspSpecificContext, nil +} + +func loadCspValues(csp string) (map[string]any, error) { + var cspValues map[string]map[string]any + + err := yaml.Unmarshal(cloudProviderSpecificValues, &cspValues) + if err != nil { + return nil, fmt.Errorf("unmarshalling error: %w", err) + } + values, supportedCSP := cspValues[csp] + if !supportedCSP { + return nil, fmt.Errorf("csp %s not supported", csp) + } + + return values, nil +} diff --git a/pkg/testing/ess/provisioner.go b/pkg/testing/ess/provisioner.go index 941cf5bcaf7..081b4100869 100644 --- a/pkg/testing/ess/provisioner.go +++ b/pkg/testing/ess/provisioner.go @@ -67,7 +67,14 @@ func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequ for _, r := range requests { // allow up to 2 minutes for each create request createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute) - resp, err := p.createDeployment(createCtx, r) + resp, err := p.createDeployment(createCtx, r, + map[string]string{ + "division": "engineering", + "org": "ingest", + "team": "elastic-agent", + "project": "elastic-agent", + "integration-tests": "true", + }) createCancel() if err != nil { return nil, err @@ -131,17 +138,30 @@ func (p *provisioner) Clean(ctx context.Context, stacks []runner.Stack) error { return nil } -func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest) (*CreateDeploymentResponse, error) { +func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest, tags map[string]string) (*CreateDeploymentResponse, error) { ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() p.logger.Logf("Creating stack %s (%s)", r.Version, r.ID) name := fmt.Sprintf("%s-%s", strings.Replace(p.cfg.Identifier, ".", "-", -1), r.ID) - resp, err := p.client.CreateDeployment(ctx, CreateDeploymentRequest{ + + // prepare tags + tagArray := make([]Tag, 0, len(tags)) + for k, v := range tags { + tagArray = append(tagArray, Tag{ + Key: k, + Value: v, + }) + } + + createDeploymentRequest := CreateDeploymentRequest{ Name: name, Region: p.cfg.Region, Version: r.Version, - }) + Tags: tagArray, + } + + resp, err := p.client.CreateDeployment(ctx, createDeploymentRequest) if err != nil { p.logger.Logf("Failed to create ESS cloud %s: %s", r.Version, err) return nil, fmt.Errorf("failed to create ESS cloud for version %s: %w", r.Version, err) diff --git a/version/docs/version.asciidoc b/version/docs/version.asciidoc index fd0e915eae0..153c58d4edf 100644 --- a/version/docs/version.asciidoc +++ b/version/docs/version.asciidoc @@ -3,7 +3,7 @@ // FIXME: once elastic.co docs have been switched over to use `main`, remove // the `doc-site-branch` line below as well as any references to it in the code. :doc-site-branch: master -:go-version: 1.20.8 +:go-version: 1.20.9 :release-state: unreleased :python: 3.7 :docker: 1.12