diff --git a/NOTICE.txt b/NOTICE.txt index 2ac1bfb9926..3290e34d2e5 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1799,11 +1799,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-transpo -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-elasticsearch/v8 -Version: v8.10.0 +Version: v8.10.1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/v8@v8.10.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/v8@v8.10.1/LICENSE: Apache License Version 2.0, January 2004 diff --git a/dev-tools/cmd/buildpgp/build_pgp.go b/dev-tools/cmd/buildpgp/build_pgp.go index 8559ea04c32..659fc1acbd1 100644 --- a/dev-tools/cmd/buildpgp/build_pgp.go +++ b/dev-tools/cmd/buildpgp/build_pgp.go @@ -49,9 +49,9 @@ func init() { pgpBytes = packer.MustUnpack("{{ .Pack }}")["GPG-KEY-elasticsearch"] } -// PGP return pgpbytes and a flag describing whether or not no pgp is valid. -func PGP() (bool, []byte) { - return allowEmptyPgp == "true", pgpBytes +// PGP return pgpbytes. +func PGP() []byte { + return pgpBytes } `)) diff --git a/docs/pgp-sign-verify-artifact.md b/docs/pgp-sign-verify-artifact.md new file mode 100644 index 00000000000..9f8f1295a89 --- /dev/null +++ b/docs/pgp-sign-verify-artifact.md @@ -0,0 +1,176 @@ +# Signing Elastic Agent artifacts + +This doc covers generating a key, exporting the public key, signing a file and verifying it using GPG as well as pure Go. + +Full GPG docs: https://www.gnupg.org/documentation/manuals/gnupg/OpenPGP-Key-Management.html + + +## Go + +```go +package main + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + + "golang.org/x/crypto/openpgp" + "golang.org/x/crypto/openpgp/armor" +) + +func main() { + dir, err := os.MkdirTemp(os.TempDir(), "pgp-") + NoError(err, "could not create directory to save the files to") + + key := filepath.Join(dir, "key") + keyPub := filepath.Join(dir, "key.pub") + asc := filepath.Join(dir, "plaindata.asc") + + fmt.Printf("Writing files to %q\n", dir) + + data := []byte("some data") + plaindata := filepath.Join(dir, "plaindata") + err = os.WriteFile(plaindata, data, 0o600) + NoError(err, "could not write plain data file") + + fmt.Printf("wrote %q\n", plaindata) + + // Create files + fKeyPub, err := os.OpenFile( + keyPub, + os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + NoError(err, "could not create %q file", keyPub) + defer func() { + if err := fKeyPub.Close(); err != nil { + fmt.Printf("failed closing %q\n", fKeyPub.Name()) + } + fmt.Printf("wrote %q\n", fKeyPub.Name()) + }() + + fKey, err := os.OpenFile( + key, + os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + NoError(err, "could not create %q file", key) + defer func() { + if err := fKey.Close(); err != nil { + fmt.Printf("failed closing %q\n", fKey.Name()) + } + fmt.Printf("wrote %q\n", fKey.Name()) + }() + + fasc, err := os.OpenFile( + asc, + os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + NoError(err, "could not create %q file", asc) + defer func() { + if err := fasc.Close(); err != nil { + fmt.Printf("failed closing %q\n", fasc.Name()) + } + fmt.Printf("wrote %q\n", fasc.Name()) + }() + + // Generate PGP key + entity, err := openpgp.NewEntity("someKeyName", "", "", nil) + + // Create an ASCII armored encoder to serialize the private key + wPubKey, err := armor.Encode(fKeyPub, openpgp.PublicKeyType, nil) + NoError(err, "could not create PGP ASCII Armor encoder for public key") + defer func() { + err := wPubKey.Close() + if err != nil { + fmt.Println("failed closing private key writer") + } + }() + + // Writes the public key to the io.Writer passed to armor.Encode. + // Use entity.SerializePrivate if you need the private key. + err = entity.Serialize(wPubKey) + NoError(err, "could not serialize the public key") + + // Create an ASCII armored encoder to serialize the private key + wPrivKey, err := armor.Encode(fKey, openpgp.PrivateKeyType, nil) + NoError(err, "could not create PGP ASCII Armor encoder for private key") + defer func() { + err := wPrivKey.Close() + if err != nil { + fmt.Println("failed closing private key writer") + } + }() + + // Writes the private key to the io.Writer passed to armor.Encode. + // Use entity.SerializePrivate if you need the private key. + err = entity.SerializePrivate(wPrivKey, nil) + NoError(err, "could not serialize the private key") + + // Sign data and write the detached signature to fasc + err = openpgp.ArmoredDetachSign(fasc, entity, bytes.NewReader(data), nil) + NoError(err, "failed signing date") +} + +func NoError(err error, msg string, args ...any) { + if err != nil { + panic(fmt.Sprintf(msg+": %v", append(args, err))) + } +} +``` + +## GPG +### Generate a key + +```shell +gpg --no-default-keyring --keyring ./some-file-to-be-the-key-ring --quick-generate-key atest rsa2048 default none +``` +Where: + - `--no-default-keyring`: do not use your keyring + - `--keyring ./some-file-to-be-the-key-ring`: keyring to use, as the file do not exist, it'll create it + - `--quick-generate-key`: quick generate the key + - `atest`: user-id, a.k.a the key identifier + - `rsa2048`: algorithm to use + - `default`: "usage" for the key. Just use default + - `none`: key expiration + + +### Export the public key +```shell +gpg --no-default-keyring --keyring ./some-file-to-be-the-key-ring --armor --output public-key.pgp --export atest +``` +Where: +- `--no-default-keyring`: do not use your keyring + - `--keyring ./some-file-to-be-the-key-ring`: the keyring to use, created in the previous step + - `--armor`: create ASCII armoured output. Otherwise, it's a binary format + - `--output public-key.pgp`: the output file + - `--export`: export the public key + - `atest`: the key identifier + +### Sing the file +```shell +gpg --no-default-keyring --keyring ./some-file-to-be-the-key-ring -a -o elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc --detach-sign elastic-agent-8.0.0-darwin-x86_64.tar.gz +``` + +Where: + - `-a -o`: --armored, --output + - `elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc`: the output file + - `--detach-sign`: generate a separated file for signature + - `elastic-agent-8.0.0-darwin-x86_64.tar.gz`: the file to sign + + + +### Verify the file + +#### Import the public key +```shell +gpg --no-default-keyring --keyring ./new-keyring --import public-key.pgp +``` +Where: + - `--import`: import a key + - `public-key.pgp`: the key to import + +#### Verify the signature using the imported key +```shell +gpg --no-default-keyring --keyring ./new-keyring --verify elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc +``` +Where: + - `--verify`: verify a signature + - `elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc`: the detached signature file. It'll assume the file to be verified is `elastic-agent-8.0.0-darwin-x86_64.tar.gz` diff --git a/go.mod b/go.mod index e6e2a1cdd67..48c55e7e5a7 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/elastic/elastic-agent-libs v0.6.0 github.com/elastic/elastic-agent-system-metrics v0.7.0 github.com/elastic/elastic-transport-go/v8 v8.3.0 - github.com/elastic/go-elasticsearch/v8 v8.10.0 + github.com/elastic/go-elasticsearch/v8 v8.10.1 github.com/elastic/go-licenser v0.4.1 github.com/elastic/go-sysinfo v1.11.1 github.com/elastic/go-ucfg v0.8.6 diff --git a/go.sum b/go.sum index 41b12f0955f..d176fb612e9 100644 --- a/go.sum +++ b/go.sum @@ -787,13 +787,12 @@ github.com/elastic/elastic-agent-system-metrics v0.7.0 h1:qDLY30UDforSd/TfHfqUDi github.com/elastic/elastic-agent-system-metrics v0.7.0/go.mod h1:9C1UEfj0P687HAzZepHszN6zXA+2tN2Lx3Osvq1zby8= github.com/elastic/elastic-integration-corpus-generator-tool v0.5.0/go.mod h1:uf9N86y+UACGybdEhZLpwZ93XHWVhsYZAA4c2T2v6YM= github.com/elastic/elastic-package v0.77.0/go.mod h1:Xeqx0OOVnKBfFoSHsHmKI74RxgRGiDhU6yXEu8BkJJM= -github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= github.com/elastic/go-elasticsearch/v7 v7.17.7/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v8 v8.0.0-20210317102009-a9d74cec0186/go.mod h1:xe9a/L2aeOgFKKgrO3ibQTnMdpAeL0GC+5/HpGScSa4= -github.com/elastic/go-elasticsearch/v8 v8.10.0 h1:ALg3DMxSrx07YmeMNcfPf7cFh1Ep2+Qa19EOXTbwr2k= -github.com/elastic/go-elasticsearch/v8 v8.10.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE= +github.com/elastic/go-elasticsearch/v8 v8.10.1 h1:JJ3i2DimYTsJcUoEGbg6tNB0eehTNdid9c5kTR1TGuI= +github.com/elastic/go-elasticsearch/v8 v8.10.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ= github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN48x4= diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go index 17de63af699..b917b00b900 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" @@ -35,7 +36,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error { return nil } -func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { select { case <-time.After(2 * time.Second): u.msgChan <- "completed " + version diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 856c624076f..778232b91d8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -17,8 +17,10 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/capabilities" @@ -59,7 +61,7 @@ type UpgradeManager interface { Reload(rawConfig *config.Config) error // Upgrade upgrades running agent. - Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) + Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) // Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action Ack(ctx context.Context, acker acker.Acker) error @@ -108,7 +110,7 @@ type RuntimeManager interface { // it performs diagnostics for all current units. PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic - //PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided, + // PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided, // then it performs the diagnostics for all current units. PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) } @@ -198,8 +200,12 @@ type Coordinator struct { // state should never be directly read or written outside the Coordinator // goroutine. Callers who need to access or modify the state should use the // public accessors like State(), SetLogLevel(), etc. - state State - stateBroadcaster *broadcaster.Broadcaster[State] + state State + stateBroadcaster *broadcaster.Broadcaster[State] + + // If you get a race detector error while accessing this field, it probably + // means you're calling private Coordinator methods from outside the + // Coordinator goroutine. stateNeedsRefresh bool // overrideState is used during the update process to report the overall @@ -210,6 +216,10 @@ type Coordinator struct { // SetOverrideState helper to the Coordinator goroutine. overrideStateChan chan *coordinatorOverrideState + // upgradeDetailsChan forwards upgrade details from the publicly accessible + // SetUpgradeDetails helper to the Coordinator goroutine. + upgradeDetailsChan chan *details.Details + // loglevelCh forwards log level changes from the public API (SetLogLevel) // to the run loop in Coordinator's main goroutine. logLevelCh chan logp.Level @@ -332,8 +342,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), - logLevelCh: make(chan logp.Level), - overrideStateChan: make(chan *coordinatorOverrideState), + logLevelCh: make(chan logp.Level), + overrideStateChan: make(chan *coordinatorOverrideState), + upgradeDetailsChan: make(chan *details.Details), } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during @@ -425,7 +436,7 @@ func (c *Coordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides .. // Upgrade runs the upgrade process. // Called from external goroutines. func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error { - // early check outside of upgrader before overridding the state + // early check outside of upgrader before overriding the state if !c.upgradeMgr.Upgradeable() { return ErrNotUpgradable } @@ -455,17 +466,33 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str // override the overall state to upgrading until the re-execution is complete c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version)) - cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + + // initialize upgrade details + actionID := "" + if action != nil { + actionID = action.ActionID + } + det := details.NewDetails(version, details.StateRequested, actionID) + det.RegisterObserver(c.SetUpgradeDetails) + det.RegisterObserver(c.logUpgradeDetails) + + cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { c.ClearOverrideState() + det.Fail(err) return err } if cb != nil { + det.SetState(details.StateRestarting) c.ReExec(cb) } return nil } +func (c *Coordinator) logUpgradeDetails(details *details.Details) { + c.logger.Infow("updated upgrade details", "upgrade_details", details) +} + // AckUpgrade is the method used on startup to ack a previously successful upgrade action. // Called from external goroutines. func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error { @@ -888,6 +915,9 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { case overrideState := <-c.overrideStateChan: c.setOverrideState(overrideState) + case upgradeDetails := <-c.upgradeDetailsChan: + c.setUpgradeDetails(upgradeDetails) + case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index f896f024733..6e645c3a06b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -7,11 +7,13 @@ package coordinator import ( "fmt" - agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/component/runtime" + agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" ) // State provides the current state of the coordinator along with all the current states of components and units. @@ -30,6 +32,8 @@ type State struct { Components []runtime.ComponentComponentState `yaml:"components"` LogLevel logp.Level `yaml:"log_level"` + + UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"` } type coordinatorOverrideState struct { @@ -54,6 +58,11 @@ func (c *Coordinator) ClearOverrideState() { c.overrideStateChan <- nil } +// SetUpgradeDetails sets upgrade details. This is used during upgrades. +func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) { + c.upgradeDetailsChan <- upgradeDetails +} + // setRuntimeManagerError updates the error state for the runtime manager. // Called on the main Coordinator goroutine. func (c *Coordinator) setRuntimeManagerError(err error) { @@ -114,6 +123,13 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState) c.stateNeedsRefresh = true } +// setUpgradeDetails is the internal helper to set upgrade details and set stateNeedsRefresh. +// Must be called on the main Coordinator goroutine. +func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) { + c.state.UpgradeDetails = upgradeDetails + c.stateNeedsRefresh = true +} + // Forward the current state to the broadcaster and clear the stateNeedsRefresh // flag. Must be called on the main Coordinator goroutine. func (c *Coordinator) refreshState() { @@ -163,6 +179,7 @@ func (c *Coordinator) generateReportableState() (s State) { s.FleetState = c.state.FleetState s.FleetMessage = c.state.FleetMessage s.LogLevel = c.state.LogLevel + s.UpgradeDetails = c.state.UpgradeDetails s.Components = make([]runtime.ComponentComponentState, len(c.state.Components)) copy(s.Components, c.state.Components) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index e84b43f182c..131b91b447a 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/stretchr/testify/assert" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -471,8 +473,50 @@ func TestCoordinator_Upgrade(t *testing.T) { require.NoError(t, err) } +func TestCoordinator_UpgradeDetails(t *testing.T) { + coordCh := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + expectedErr := errors.New("some upgrade error") + upgradeManager := &fakeUpgradeManager{ + upgradeable: true, + upgradeErr: expectedErr, + } + coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithUpgradeManager(upgradeManager)) + require.Nil(t, coord.state.UpgradeDetails) + go func() { + err := coord.Run(ctx) + if errors.Is(err, context.Canceled) { + // allowed error + err = nil + } + coordCh <- err + }() + + // no vars used by the config + varsMgr.Vars(ctx, []*transpiler.Vars{{}}) + + // no need for anything to really run + cfg, err := config.NewConfigFrom(nil) + require.NoError(t, err) + cfgMgr.Config(ctx, cfg) + + err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false) + require.ErrorIs(t, expectedErr, err) + cancel() + + err = <-coordCh + require.NoError(t, err) + + require.Equal(t, details.StateFailed, coord.state.UpgradeDetails.State) + require.Equal(t, details.StateRequested, coord.state.UpgradeDetails.Metadata.FailedState) + require.Equal(t, expectedErr.Error(), coord.state.UpgradeDetails.Metadata.ErrorMsg) +} + type createCoordinatorOpts struct { - managed bool + managed bool + upgradeManager UpgradeManager } type CoordinatorOpt func(o *createCoordinatorOpts) @@ -483,6 +527,12 @@ func ManagedCoordinator(managed bool) CoordinatorOpt { } } +func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt { + return func(o *createCoordinatorOpts) { + o.upgradeManager = upgradeManager + } +} + // createCoordinator creates a coordinator that using a fake config manager and a fake vars manager. // // The runtime specifications is set up to use both the fake component and fake shipper. @@ -527,7 +577,12 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt cfgMgr := newFakeConfigManager() varsMgr := newFakeVarsManager() - coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, &fakeUpgradeManager{}, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) + upgradeManager := o.upgradeManager + if upgradeManager == nil { + upgradeManager = &fakeUpgradeManager{} + } + + coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed) return coord, cfgMgr, varsMgr } @@ -574,7 +629,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error { return nil } -func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { f.upgradeCalled = true if f.upgradeErr != nil { return nil, f.upgradeErr diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 7e6d2b224a5..fd4034f24c4 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/config" monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" @@ -943,6 +944,9 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { // since a successful upgrade sets the override state twice. overrideStateChan := make(chan *coordinatorOverrideState, 2) + // similarly, upgradeDetailsChan is a buffered channel as well. + upgradeDetailsChan := make(chan *details.Details, 2) + // Create a manager that will allow upgrade attempts but return a failure // from Upgrade itself (success requires testing ReExec and we aren't // quite ready to do that yet). @@ -952,9 +956,11 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { } coord := &Coordinator{ - stateBroadcaster: broadcaster.New(State{}, 0, 0), - overrideStateChan: overrideStateChan, - upgradeMgr: upgradeMgr, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + overrideStateChan: overrideStateChan, + upgradeDetailsChan: upgradeDetailsChan, + upgradeMgr: upgradeMgr, + logger: logp.NewLogger("testing"), } // Call upgrade and make sure the upgrade manager receives an Upgrade call diff --git a/internal/pkg/agent/application/dispatcher/dispatcher.go b/internal/pkg/agent/application/dispatcher/dispatcher.go index a4ec47a96fe..92ac050f9ab 100644 --- a/internal/pkg/agent/application/dispatcher/dispatcher.go +++ b/internal/pkg/agent/application/dispatcher/dispatcher.go @@ -248,7 +248,7 @@ func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.R attempt := action.RetryAttempt() d, err := ad.rt.GetWait(attempt) if err != nil { - ad.log.Errorf("No more reties for action id %s: %v", action.ID(), err) + ad.log.Errorf("No more retries for action id %s: %v", action.ID(), err) action.SetRetryAttempt(-1) if err := acker.Ack(ctx, action); err != nil { ad.log.Errorf("Unable to ack action failure (id %s) to fleet-server: %v", action.ID(), err) diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 31c81955a10..000ec534bf2 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -217,7 +217,7 @@ func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee if f.checkinFailCounter > 0 { // Log at same level as error logs above so subsequent successes are visible when log level is set to 'error'. - f.log.Errorf("Checkin request to fleet-server succeeded after %d failures", f.checkinFailCounter) + f.log.Warnf("Checkin request to fleet-server succeeded after %d failures", f.checkinFailCounter) } f.checkinFailCounter = 0 diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 7f43bf1866b..bd117bda241 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -39,7 +39,7 @@ func NewServer( ) (*reload.ServerReloader, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore - log.Errorf("failed to create monitoring drop: %v", err) + log.Warnf("failed to create monitoring drop: %v", err) } cfg, err := config.NewConfigFrom(endpointConfig) diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go index 42cc058c16b..6de72f0143e 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/downloader.go @@ -52,32 +52,34 @@ func (e *Downloader) Download(ctx context.Context, a artifact.Artifact, version }() // download from source to dest - path, err := e.download(e.config.OS(), a, version) + path, err := e.download(e.config.OS(), a, version, "") downloadedFiles = append(downloadedFiles, path) if err != nil { return "", err } - hashPath, err := e.downloadHash(e.config.OS(), a, version) + hashPath, err := e.download(e.config.OS(), a, version, ".sha512") downloadedFiles = append(downloadedFiles, hashPath) return path, err } -func (e *Downloader) download(operatingSystem string, a artifact.Artifact, version string) (string, error) { - filename, err := artifact.GetArtifactName(a, version, operatingSystem, e.config.Arch()) - if err != nil { - return "", errors.New(err, "generating package name failed") - } - - fullPath, err := artifact.GetArtifactPath(a, version, operatingSystem, e.config.Arch(), e.config.TargetDirectory) +// DownloadAsc downloads the package .asc file from configured source. +// It returns absolute path to the downloaded file and a no-nil error if any occurs. +func (e *Downloader) DownloadAsc(_ context.Context, a artifact.Artifact, version string) (string, error) { + path, err := e.download(e.config.OS(), a, version, ".asc") if err != nil { - return "", errors.New(err, "generating package path failed") + os.Remove(path) + return "", err } - return e.downloadFile(filename, fullPath) + return path, nil } -func (e *Downloader) downloadHash(operatingSystem string, a artifact.Artifact, version string) (string, error) { +func (e *Downloader) download( + operatingSystem string, + a artifact.Artifact, + version, + extension string) (string, error) { filename, err := artifact.GetArtifactName(a, version, operatingSystem, e.config.Arch()) if err != nil { return "", errors.New(err, "generating package name failed") @@ -88,8 +90,10 @@ func (e *Downloader) downloadHash(operatingSystem string, a artifact.Artifact, v return "", errors.New(err, "generating package path failed") } - filename = filename + ".sha512" - fullPath = fullPath + ".sha512" + if extension != "" { + filename += extension + fullPath += extension + } return e.downloadFile(filename, fullPath) } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz similarity index 100% rename from internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz rename to internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc new file mode 100644 index 00000000000..dc0dc3745c9 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.asc @@ -0,0 +1,14 @@ +-----BEGIN PGP SIGNATURE----- + +iQGzBAABCgAdFiEE81a455Doc5DWexOcF4e6ez4rqzAFAmUn7TsACgkQF4e6ez4r +qzDcIgwArpuXDex9aisWFWkXjCfjhJdrTTXr3wv8W68NeFsAaazLlvsWPxdol1db +FeKFL+P/P/PhlTvdkZw9xMyXoVRWQXJ2p2jVjV0Wq2SCtbbjdrGjQ4OrchgE9FW7 +onWxqV8RjzPyaMwpDWWtHKgxhQeLP5yXhWm6RXHvBLZ5mqbTCuIq2Q4sijEd6IFD +9JoAA276tqyKGOsPZ1QzaPUFF69B9QLcWasEuNFf5ytMVFfTcMl6/HYDPO7ErhJx +E1hnKGIc5rrMghL0LzaVLGYZUtnQwru02ZA0omXzEv1uYgqmZl75g9qHk2Cu2V5W +0qbg9OtUKOkJ1sODvsVv8O40rVazdZTgL2ifNLi2wFwR3syMdHCih2aKMcPDPzt3 +Q4q0zvsxuR9PGsv5+8zze74iC3oZSvF8h36XGjJuyjEFORUpcWNGDmhsC6l7ql5W +rEbIPZ19j3r1M4yHG/ptBmrwRnQz9RKFnwTO9ME/5eBVumPLUD5kAcYXjvAFYQI5 +qEc7okL5 +=+nvi +-----END PGP SIGNATURE----- diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.sha512 similarity index 74% rename from internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 rename to internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.sha512 index 5d0fc9e405d..599ae848893 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/elastic-agent-8.0.0-darwin-x86_64.tar.gz.sha512 @@ -1 +1 @@ -9af9aa016f3349aa248034629e4336ca2f4d31317bfb8c9a23a9d924c18969cf43ad93727e784da010a272690b2b5ce4c4ded3a5d2039e4408e93e1e18d113db beat-8.0.0-darwin-x86_64.tar.gz +9af9aa016f3349aa248034629e4336ca2f4d31317bfb8c9a23a9d924c18969cf43ad93727e784da010a272690b2b5ce4c4ded3a5d2039e4408e93e1e18d113db elastic-agent-8.0.0-darwin-x86_64.tar.gz diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/public-key.pgp b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/public-key.pgp new file mode 100644 index 00000000000..7d452cb033c --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/testdata/drop/public-key.pgp @@ -0,0 +1,40 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQGNBGUn7JEBDADH0iBdohpZIQY7QyBz9Hl68b7fq0zoFcB4HTDDMQD1ouDQfPwg +Frpr/ViNNHsye1QfrUWXN8FQfKztqHtUHeM8ggdSvhYYGaDtVSuVakoNNz3Z3+kD +YhwH0byZrE2MiCKExtgQYWBIDd1TeCMSOgYcQPAXPqQBwX0G1xRAY3s+eazCjaSU +aNJtlNuAx36jEBa+X73sTh+Y/OtCSN9s75SVYu5xJ+kXkpcHNvsMJmDCZ0zsKrxT +TMvXSU9qcczj8+wAee/1E77eR01ttrf67IjVReuVZ0OhxucVxJHOp7x9jfeGsjjn +6uhFT0KV+VOaaRlI9wZ4AOMmAX5nroNYP/GC+SKiOvKV79+r3jyxbChqd5nWdSBN +mO9okB72nUpGmL1NosW926MMTauR9/nP1uWB66d/pHYRop7sAbAZ7u8COoRS1wd+ +V6dtb3QUwR9LsfKd1xQfrTFVKZ4i703MN1qkq/6TqLhpwlt0+K4WN7LtkkeFivyx +N0RLiVDzZP289ssAEQEAAbQFYXRlc3SJAc4EEwEKADgWIQTzVrjnkOhzkNZ7E5wX +h7p7PiurMAUCZSfskQIbAwULCQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRAXh7p7 +PiurMFkbDAC0QLqwq4dZGjOqcNjj02DOM1eQcC8XUSy0w8X6gX/69wFHGM34zl4+ +IO7H6ujvkBxMHmeEU3nNsLH+WsN6Hc8JBRQZSqjySgL2am+K6XYMcP7h7VGnFR0r +5IKbGn9zCR7xkVrkvW0T48U0fJ00X3v+GWcxcBQIu58sMmKrmzliPCDhmQ94yum8 +n8Yc1tB3DazAQEDGxtfP8/yc93sWKZ4qKPBMZUsjSSzC8a7zei/J9vJccRy/JJEl +/mNIQx7FxObrCSSa3wXc4AEbWdq4HNZkahLvnOs4EhNR9ihWg7TtMVyBesV/rdgj +5cgHU3erir1nSOHmrHqLydeWH4vHW4R6BYuJd6NXhsISMHO8Oerlceqmt7aex3wJ +09ULyareJ3QMc+HWcjxxYbSLU6j5ZgCqcPz17V88W7SkXnzbPaoVAxMCf+M3a0Ib +r+Yw6CrvWRj2+bmW8Ars6fND90nX4ZS82VnMc27kFqNYdkAE9kdlZ+L8OU70nWmT +Clh2FhjhHKe5AY0EZSfskQEMANT+4NWxDtyExZEIvwUdegcetF3hbdHlXMmMnuPU +vJwPhXhXJtzyX5VKRp3WCUO2TOGMKaHaNPi4XCS4QMzBEEft8C7X896QPGqanGdV +oZ9Oc/mXNZfuOk62hP6Ifn38VIyxAcpQ11ypKJ5wFSwSvkPIdaXm1125oGIFQg+W +51GSNz8PBuP5GavLs3L1Wp2VupJ9pOrolxGRP+t41u6rNewaktSO6eLY0o0j/FMY +Anujnj68sS92e7TnQcaAEUsplYLrZlZI1Ly0W2QakvOUIkDq5DSsNYKypTM1rZ7s +VYENPjHdhATsHoW1LxirBKHuoi8aANSjsofdggnxtu+sp8mk/+oZpyR78yA/+hIA +/t/wEVgVXETTB0Y8o6n8+/U/uBHEjYGa8JJEcMbNJesQAusBXYt90N8URKHRWEcR +L9IH3V4rmssDqgE7voHYvNKFru/socsI3WPmDnPKFWGRd7rqzlkBoqbrPiD/tRIC +cwDqz5hm3vKqOkHqvsGqzNVp4wARAQABiQG2BBgBCgAgFiEE81a455Doc5DWexOc +F4e6ez4rqzAFAmUn7JECGwwACgkQF4e6ez4rqzA23gv/UZTQw13urB8Hf6s5FJyz +z5dCWT1RMW1ig7MuCe/MzRCk29zDc16y5fOo0aLzYMWsQXBrBTAXj6hx2/MYHXg0 +mUXzxrnUqM5H/b1hbx52NdwD1eR1prQIX39ifPzw+FTirD98qx04479En/561PQW +lbWXtm1/JoaSpGIYP2gWNgb3HfHShEGPxFH39vxmP6XVz99BL+3zaHehcCUP8fbC +Kabo/qbtNC/nZEBUVVMxEj2O9eEq9otk8K8fBzoCOQ4K0Idn+BnQ0O67x4jemunD +JX6BGBo0WYxJNarK2sJw5+CVRK472va8U6Y+6yGyv5qu68eOZZXvkrCbDpysSIf7 +YjwhmaZuerd4oBvRKJHbbHoqgde8sviSjm6cdU+ZSHILvwEaBLwW3pTgBJAupQcV +4Ws7fo7/6R2YWws8c4sseGqLC+XxCXk+SvrvyA02ZBY+0L6IFD6Cb8BT0uMMrLIP +YcZ1xK3gfrp4PCg2OFj46WER5ufHP1r0zvufY7chA9tP +=Jwiw +-----END PGP PUBLIC KEY BLOCK----- diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go index e42a35c76a4..8c7861e1c75 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier.go @@ -6,7 +6,6 @@ package fs import ( "fmt" - "io/ioutil" "net/http" "os" "path/filepath" @@ -26,11 +25,10 @@ const ( // The signature is validated against Elastic's public GPG key that is // embedded into Elastic Agent. type Verifier struct { - config *artifact.Config - client http.Client - pgpBytes []byte - allowEmptyPgp bool - log *logger.Logger + config *artifact.Config + client http.Client + defaultKey []byte + log *logger.Logger } func (v *Verifier) Name() string { @@ -39,9 +37,9 @@ func (v *Verifier) Name() string { // NewVerifier creates a verifier checking downloaded package on preconfigured // location against a key stored on elastic.co website. -func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { - if len(pgp) == 0 && !allowEmptyPgp { - return nil, errors.New("expecting PGP but retrieved none", errors.TypeSecurity) +func NewVerifier(log *logger.Logger, config *artifact.Config, pgp []byte) (*Verifier, error) { + if len(pgp) == 0 { + return nil, errors.New("expecting PGP key but received none", errors.TypeSecurity) } client, err := config.HTTPTransportSettings.Client( @@ -55,11 +53,10 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool } v := &Verifier{ - config: config, - client: *client, - allowEmptyPgp: allowEmptyPgp, - pgpBytes: pgp, - log: log, + config: config, + client: *client, + defaultKey: pgp, + log: log, } return v, nil @@ -70,24 +67,22 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool func (v *Verifier) Verify(a artifact.Artifact, version string, skipDefaultPgp bool, pgpBytes ...string) error { filename, err := artifact.GetArtifactName(a, version, v.config.OS(), v.config.Arch()) if err != nil { - return errors.New(err, "retrieving package name") + return fmt.Errorf("could not get artifact name: %w", err) } - fullPath := filepath.Join(v.config.TargetDirectory, filename) + artifactPath := filepath.Join(v.config.TargetDirectory, filename) - if err = download.VerifySHA512Hash(fullPath); err != nil { - var checksumMismatchErr *download.ChecksumMismatchError - if errors.As(err, &checksumMismatchErr) { - os.Remove(fullPath) - os.Remove(fullPath + ".sha512") - } - return err + if err = download.VerifySHA512HashWithCleanup(v.log, artifactPath); err != nil { + return fmt.Errorf("failed to verify SHA512 hash: %w", err) } - if err = v.verifyAsc(fullPath, skipDefaultPgp, pgpBytes...); err != nil { + if err = v.verifyAsc(artifactPath, skipDefaultPgp, pgpBytes...); err != nil { var invalidSignatureErr *download.InvalidSignatureError if errors.As(err, &invalidSignatureErr) { - os.Remove(fullPath + ".asc") + if err := os.Remove(artifactPath + ".asc"); err != nil { + v.log.Warnf("failed clean up after signature verification: failed to remove %q: %v", + artifactPath+".asc", err) + } } return err } @@ -113,63 +108,25 @@ func (v *Verifier) Reload(c *artifact.Config) error { return nil } -func (v *Verifier) verifyAsc(fullPath string, skipDefaultPgp bool, pgpSources ...string) error { +func (v *Verifier) verifyAsc(fullPath string, skipDefaultKey bool, pgpSources ...string) error { var pgpBytes [][]byte - if len(v.pgpBytes) > 0 && !skipDefaultPgp { - v.log.Infof("Default PGP being appended") - pgpBytes = append(pgpBytes, v.pgpBytes) - } - - for _, check := range pgpSources { - if len(check) == 0 { - continue - } - raw, err := download.PgpBytesFromSource(v.log, check, &v.client) - if err != nil { - return err - } - - if len(raw) == 0 { - continue - } - - pgpBytes = append(pgpBytes, raw) - } - - if len(pgpBytes) == 0 { - // no pgp available skip verification process - v.log.Infof("No checks defined") - return nil + pgpBytes, err := download.FetchPGPKeys( + v.log, v.client, v.defaultKey, skipDefaultKey, pgpSources) + if err != nil { + return fmt.Errorf("could not fetch pgp keys: %w", err) } - v.log.Infof("Using %d PGP keys", len(pgpBytes)) ascBytes, err := v.getPublicAsc(fullPath) - if err != nil && v.allowEmptyPgp { - // asc not available but we allow empty for dev use-case - return nil - } else if err != nil { - return err - } - - for i, check := range pgpBytes { - err = download.VerifyGPGSignature(fullPath, ascBytes, check) - if err == nil { - // verify successful - v.log.Infof("Verification with PGP[%d] successful", i) - return nil - } - v.log.Warnf("Verification with PGP[%d] failed: %v", i, err) + if err != nil { + return fmt.Errorf("could not get .asc file: %w", err) } - v.log.Warnf("Verification failed") - - // return last error - return err + return download.VerifyPGPSignatureWithKeys(v.log, fullPath, ascBytes, pgpBytes) } func (v *Verifier) getPublicAsc(fullPath string) ([]byte, error) { fullPath = fmt.Sprintf("%s%s", fullPath, ascSuffix) - b, err := ioutil.ReadFile(fullPath) + b, err := os.ReadFile(fullPath) if err != nil { return nil, errors.New(err, fmt.Sprintf("fetching asc file from '%s'", fullPath), errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)) } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier_test.go b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier_test.go index 5012e8244dd..4bd605142f3 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/fs/verifier_test.go @@ -5,10 +5,10 @@ package fs import ( + "bytes" "context" "crypto/sha512" "fmt" - "io/ioutil" "os" "path/filepath" "testing" @@ -18,11 +18,11 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/transport/httpcommon" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/testing/pgptest" ) const ( @@ -30,21 +30,30 @@ const ( ) var ( - beatSpec = artifact.Artifact{Name: "Filebeat", Cmd: "filebeat", Artifact: "beat/filebeat"} + beatSpec = artifact.Artifact{ + Name: "Elastic Agent", + Cmd: "elastic-agent", + Artifact: "beat/elastic-agent"} ) func TestFetchVerify(t *testing.T) { + // See docs/pgp-sign-verify-artifact.md for how to generate a key, export + // the public key, sign a file and verify it. + log, _ := logger.New("", false) timeout := 15 * time.Second dropPath := filepath.Join("testdata", "drop") installPath := filepath.Join("testdata", "install") targetPath := filepath.Join("testdata", "download") ctx := context.Background() - s := artifact.Artifact{Name: "Beat", Cmd: "beat", Artifact: "beats/filebeat"} + a := artifact.Artifact{ + Name: "elastic-agent", Cmd: "elastic-agent", Artifact: "beats/elastic-agent"} version := "8.0.0" - targetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz") - hashTargetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512") + filename := "elastic-agent-8.0.0-darwin-x86_64.tar.gz" + targetFilePath := filepath.Join(targetPath, filename) + hashTargetFilePath := filepath.Join(targetPath, filename+".sha512") + ascTargetFilePath := filepath.Join(targetPath, filename+".asc") // cleanup defer os.RemoveAll(targetPath) @@ -60,48 +69,52 @@ func TestFetchVerify(t *testing.T) { }, } - err := prepareFetchVerifyTests(dropPath, targetPath, targetFilePath, hashTargetFilePath) - assert.NoError(t, err) + err := prepareFetchVerifyTests(dropPath, targetPath, filename, targetFilePath, hashTargetFilePath) + require.NoError(t, err) - downloader := NewDownloader(config) - verifier, err := NewVerifier(log, config, true, nil) - assert.NoError(t, err) + pgp, err := os.ReadFile(filepath.Join(dropPath, "public-key.pgp")) + require.NoError(t, err, "could not read public PGP key") + verifier, err := NewVerifier(log, config, pgp) + require.NoError(t, err, "could not create the verifier") // first download verify should fail: // download skipped, as invalid package is prepared upfront // verify fails and cleans download - err = verifier.Verify(s, version, false) + err = verifier.Verify(a, version, false) var checksumErr *download.ChecksumMismatchError - assert.ErrorAs(t, err, &checksumErr) + require.ErrorAs(t, err, &checksumErr) _, err = os.Stat(targetFilePath) - assert.True(t, os.IsNotExist(err)) + require.True(t, os.IsNotExist(err)) _, err = os.Stat(hashTargetFilePath) - assert.True(t, os.IsNotExist(err)) + require.True(t, os.IsNotExist(err)) // second one should pass // download not skipped: package missing // verify passes because hash is not correct - _, err = downloader.Download(ctx, s, version) - assert.NoError(t, err) + _, err = NewDownloader(config).Download(ctx, a, version) + require.NoError(t, err) + asc, err := os.ReadFile(filepath.Join(dropPath, filename+".asc")) + require.NoErrorf(t, err, "could not open .asc for copy") + err = os.WriteFile(ascTargetFilePath, asc, 0o600) + require.NoErrorf(t, err, "could not save .asc (%q) to target path (%q)", + filepath.Join(dropPath, filename+".asc"), ascTargetFilePath) // file downloaded ok _, err = os.Stat(targetFilePath) - assert.NoError(t, err) - + require.NoError(t, err) _, err = os.Stat(hashTargetFilePath) - assert.NoError(t, err) + require.NoError(t, err) + _, err = os.Stat(ascTargetFilePath) + require.NoError(t, err) - err = verifier.Verify(s, version, false) - assert.NoError(t, err) - - // Enable GPG signature validation. - verifier.allowEmptyPgp = false + err = verifier.Verify(a, version, false) + require.NoError(t, err) // Bad GPG public key. { - verifier.pgpBytes = []byte("garbage") + verifier.defaultKey = []byte("garbage") // Don't delete anything. assertFileExists(t, targetFilePath) @@ -109,11 +122,11 @@ func TestFetchVerify(t *testing.T) { } // Setup proper GPG public key. - _, verifier.pgpBytes = release.PGP() + verifier.defaultKey = release.PGP() // Missing .asc file. { - err = verifier.Verify(s, version, false) + err = verifier.Verify(a, version, false) require.Error(t, err) // Don't delete these files when GPG validation failure. @@ -123,10 +136,10 @@ func TestFetchVerify(t *testing.T) { // Invalid signature. { - err = ioutil.WriteFile(targetFilePath+".asc", []byte("bad sig"), 0o600) + err = os.WriteFile(targetFilePath+".asc", []byte("bad sig"), 0o600) require.NoError(t, err) - err = verifier.Verify(s, version, false) + err = verifier.Verify(a, version, false) var invalidSigErr *download.InvalidSignatureError assert.ErrorAs(t, err, &invalidSigErr) @@ -139,9 +152,14 @@ func TestFetchVerify(t *testing.T) { } } -func prepareFetchVerifyTests(dropPath, targetDir, targetFilePath, hashTargetFilePath string) error { - sourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz") - hashSourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512") +func prepareFetchVerifyTests( + dropPath, + targetDir, + filename, + targetFilePath, + hashTargetFilePath string) error { + sourceFilePath := filepath.Join(dropPath, filename) + hashSourceFilePath := filepath.Join(dropPath, filename+".sha512") // clean targets os.Remove(targetFilePath) @@ -163,13 +181,13 @@ func prepareFetchVerifyTests(dropPath, targetDir, targetFilePath, hashTargetFile } defer targretFile.Close() - hashContent, err := ioutil.ReadFile(hashSourceFilePath) + hashContent, err := os.ReadFile(hashSourceFilePath) if err != nil { return err } corruptedHash := append([]byte{1, 2, 3, 4, 5, 6}, hashContent[6:]...) - return ioutil.WriteFile(hashTargetFilePath, corruptedHash, 0666) + return os.WriteFile(hashTargetFilePath, corruptedHash, 0666) } func TestVerify(t *testing.T) { @@ -185,8 +203,7 @@ func TestVerify(t *testing.T) { for _, tc := range tt { t.Run(tc.Name, func(t *testing.T) { log, obs := logger.NewTesting("TestVerify") - targetDir, err := ioutil.TempDir(os.TempDir(), "") - require.NoError(t, err) + targetDir := t.TempDir() timeout := 30 * time.Second @@ -200,23 +217,18 @@ func TestVerify(t *testing.T) { }, } - err = prepareTestCase(beatSpec, version, config) - require.NoError(t, err) + pgpKey := prepareTestCase(t, beatSpec, version, config) testClient := NewDownloader(config) - artifact, err := testClient.Download(context.Background(), beatSpec, version) - require.NoError(t, err) + artifactPath, err := testClient.Download(context.Background(), beatSpec, version) + require.NoError(t, err, "fs.Downloader could not download artifacts") + _, err = testClient.DownloadAsc(context.Background(), beatSpec, version) + require.NoError(t, err, "fs.Downloader could not download artifacts .asc file") - t.Cleanup(func() { - os.Remove(artifact) - os.Remove(artifact + ".sha512") - os.RemoveAll(config.DropPath) - }) - - _, err = os.Stat(artifact) + _, err = os.Stat(artifactPath) require.NoError(t, err) - testVerifier, err := NewVerifier(log, config, true, nil) + testVerifier, err := NewVerifier(log, config, pgpKey) require.NoError(t, err) err = testVerifier.Verify(beatSpec, version, false, tc.RemotePGPUris...) @@ -229,25 +241,40 @@ func TestVerify(t *testing.T) { } } -func prepareTestCase(a artifact.Artifact, version string, cfg *artifact.Config) error { +// prepareTestCase creates an artifact file, defined by 'a' and 'version', +// its corresponding checksum (.sha512) and signature (.asc) files. +// It creates the necessary key to sing the artifact and returns the public key +// to verify the signature. +func prepareTestCase( + t *testing.T, + a artifact.Artifact, + version string, + cfg *artifact.Config) []byte { + filename, err := artifact.GetArtifactName(a, version, cfg.OperatingSystem, cfg.Architecture) - if err != nil { - return err - } + require.NoErrorf(t, err, "could not get artifact name") - if err := os.MkdirAll(cfg.DropPath, 0777); err != nil { - return err - } + err = os.MkdirAll(cfg.DropPath, 0777) + require.NoErrorf(t, err, "failed creating directory %q", cfg.DropPath) + + filePath := filepath.Join(cfg.DropPath, filename) + filePathSHA := filePath + ".sha512" + filePathASC := filePath + ".asc" content := []byte("sample content") + err = os.WriteFile(filePath, content, 0644) + require.NoErrorf(t, err, "could not write %q file", filePath) + hash := sha512.Sum512(content) hashContent := fmt.Sprintf("%x %s", hash, filename) + err = os.WriteFile(filePathSHA, []byte(hashContent), 0644) + require.NoErrorf(t, err, "could not write %q file", filePathSHA) - if err := ioutil.WriteFile(filepath.Join(cfg.DropPath, filename), content, 0644); err != nil { - return err - } + pub, sig := pgptest.Sing(t, bytes.NewReader(content)) + err = os.WriteFile(filePathASC, sig, 0644) + require.NoErrorf(t, err, "could not write %q file", filePathASC) - return ioutil.WriteFile(filepath.Join(cfg.DropPath, filename+".sha512"), []byte(hashContent), 0644) + return pub } func assertFileExists(t testing.TB, path string) { diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/common_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/common_test.go new file mode 100644 index 00000000000..cfc899420c2 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/common_test.go @@ -0,0 +1,121 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "bytes" + "context" + "crypto/sha512" + "fmt" + "net" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/testing/pgptest" +) + +const ( + version = "7.5.1" + sourcePattern = "/downloads/beats/filebeat/" + source = "http://artifacts.elastic.co/downloads/" +) + +var ( + beatSpec = artifact.Artifact{ + Name: "filebeat", + Cmd: "filebeat", + Artifact: "beats/filebeat", + } +) + +type testCase struct { + system string + arch string +} + +func getTestCases() []testCase { + // always test random package to save time + return []testCase{ + {"linux", "32"}, + {"linux", "64"}, + {"linux", "arm64"}, + {"darwin", "32"}, + {"darwin", "64"}, + {"windows", "32"}, + {"windows", "64"}, + } +} + +func getElasticCoServer(t *testing.T) (*httptest.Server, []byte) { + correctValues := map[string]struct{}{ + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "i386.deb"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "amd64.deb"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "i686.rpm"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "x86_64.rpm"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-x86.tar.gz"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-arm64.tar.gz"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-x86_64.tar.gz"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "windows-x86.zip"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "windows-x86_64.zip"): {}, + fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "darwin-x86_64.tar.gz"): {}, + } + var resp []byte + content := []byte("anything will do") + hash := sha512.Sum512(content) + pub, sig := pgptest.Sing(t, bytes.NewReader(content)) + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + packageName := r.URL.Path[len(sourcePattern):] + + ext := filepath.Ext(packageName) + if ext == ".gz" { + ext = ".tar.gz" + } + packageName = strings.TrimSuffix(packageName, ext) + + switch ext { + case ".sha512": + resp = []byte(fmt.Sprintf("%x %s", hash, packageName)) + case ".asc": + resp = sig + case ".tar.gz", ".zip", ".deb", ".rpm": + packageName += ext + resp = content + default: + w.WriteHeader(http.StatusNotFound) + t.Errorf("mock elastic.co server: unknown file extension: %q", ext) + return + } + + if _, ok := correctValues[packageName]; !ok { + t.Errorf("mock elastic.co server: invalid package name: %q", packageName) + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte{}) + return + } + + _, err := w.Write(resp) + assert.NoErrorf(t, err, "mock elastic.co server: failes writing response") + }) + + return httptest.NewServer(handler), pub +} + +func getElasticCoClient(server *httptest.Server) http.Client { + return http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, network, s string) (net.Conn, error) { + _ = s + return net.Dial(network, server.Listener.Addr().String()) + }, + }, + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go index 7be3ae1066f..50fc6849f21 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader.go @@ -18,9 +18,9 @@ import ( "time" "github.com/elastic/elastic-agent-libs/transport/httpcommon" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -44,13 +44,14 @@ const ( // Downloader is a downloader able to fetch artifacts from elastic.co web page. type Downloader struct { - log *logger.Logger - config *artifact.Config - client http.Client + log *logger.Logger + config *artifact.Config + client http.Client + upgradeDetails *details.Details } // NewDownloader creates and configures Elastic Downloader -func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (*Downloader, error) { client, err := config.HTTPTransportSettings.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second}, @@ -60,15 +61,16 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, er } client.Transport = download.WithHeaders(client.Transport, download.Headers) - return NewDownloaderWithClient(log, config, *client), nil + return NewDownloaderWithClient(log, config, *client, upgradeDetails), nil } // NewDownloaderWithClient creates Elastic Downloader with specific client used -func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader { +func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader { return &Downloader{ - log: log, - config: config, - client: client, + log: log, + config: config, + client: client, + upgradeDetails: upgradeDetails, } } @@ -207,7 +209,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f } loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout) - dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver) + detailsObserver := newDetailsProgressObserver(e.upgradeDetails) + dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver, detailsObserver) dp.Report(ctx) _, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp)) if err != nil { diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go index 119173e1344..94e3ce856e2 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/downloader_test.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/docker/go-units" @@ -30,6 +31,49 @@ import ( "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) +func TestDownload(t *testing.T) { + targetDir, err := ioutil.TempDir(os.TempDir(), "") + if err != nil { + t.Fatal(err) + } + + log, _ := logger.New("", false) + timeout := 30 * time.Second + testCases := getTestCases() + server, _ := getElasticCoServer(t) + elasticClient := getElasticCoClient(server) + + config := &artifact.Config{ + SourceURI: source, + TargetDirectory: targetDir, + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, + } + + for _, testCase := range testCases { + testName := fmt.Sprintf("%s-binary-%s", testCase.system, testCase.arch) + t.Run(testName, func(t *testing.T) { + config.OperatingSystem = testCase.system + config.Architecture = testCase.arch + + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) + artifactPath, err := testClient.Download(context.Background(), beatSpec, version) + if err != nil { + t.Fatal(err) + } + + _, err = os.Stat(artifactPath) + if err != nil { + t.Fatal(err) + } + + os.Remove(artifactPath) + }) + } +} + func TestDownloadBodyError(t *testing.T) { // This tests the scenario where the download encounters a network error // part way through the download, while copying the response body. @@ -63,7 +107,8 @@ func TestDownloadBodyError(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) if err == nil { @@ -119,7 +164,8 @@ func TestDownloadLogProgressWithLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") @@ -201,7 +247,8 @@ func TestDownloadLogProgressWithoutLength(t *testing.T) { } log, obs := logger.NewTesting("downloader") - testClient := NewDownloaderWithClient(log, config, *client) + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, *client, upgradeDetails) artifactPath, err := testClient.Download(context.Background(), beatSpec, version) os.Remove(artifactPath) require.NoError(t, err, "Download should not have errored") diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/elastic_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/elastic_test.go deleted file mode 100644 index bd1564cab2b..00000000000 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/elastic_test.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package http - -import ( - "context" - "crypto/sha512" - "fmt" - "io/ioutil" - "math/rand" - "net" - "net/http" - "net/http/httptest" - "os" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-libs/transport/httpcommon" - - "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -const ( - version = "7.5.1" - sourcePattern = "/downloads/beats/filebeat/" - source = "http://artifacts.elastic.co/downloads/" -) - -var ( - beatSpec = artifact.Artifact{ - Name: "filebeat", - Cmd: "filebeat", - Artifact: "beats/filebeat", - } -) - -type testCase struct { - system string - arch string -} - -func TestDownload(t *testing.T) { - targetDir, err := ioutil.TempDir(os.TempDir(), "") - if err != nil { - t.Fatal(err) - } - - log, _ := logger.New("", false) - timeout := 30 * time.Second - testCases := getTestCases() - elasticClient := getElasticCoClient() - - config := &artifact.Config{ - SourceURI: source, - TargetDirectory: targetDir, - HTTPTransportSettings: httpcommon.HTTPTransportSettings{ - Timeout: timeout, - }, - } - - for _, testCase := range testCases { - testName := fmt.Sprintf("%s-binary-%s", testCase.system, testCase.arch) - t.Run(testName, func(t *testing.T) { - config.OperatingSystem = testCase.system - config.Architecture = testCase.arch - - testClient := NewDownloaderWithClient(log, config, elasticClient) - artifactPath, err := testClient.Download(context.Background(), beatSpec, version) - if err != nil { - t.Fatal(err) - } - - _, err = os.Stat(artifactPath) - if err != nil { - t.Fatal(err) - } - - os.Remove(artifactPath) - }) - } -} - -func TestVerify(t *testing.T) { - targetDir, err := ioutil.TempDir(os.TempDir(), "") - if err != nil { - t.Fatal(err) - } - - log, _ := logger.New("", false) - timeout := 30 * time.Second - testCases := getRandomTestCases() - elasticClient := getElasticCoClient() - - config := &artifact.Config{ - SourceURI: source, - TargetDirectory: targetDir, - HTTPTransportSettings: httpcommon.HTTPTransportSettings{ - Timeout: timeout, - }, - } - - for _, testCase := range testCases { - testName := fmt.Sprintf("%s-binary-%s", testCase.system, testCase.arch) - t.Run(testName, func(t *testing.T) { - config.OperatingSystem = testCase.system - config.Architecture = testCase.arch - - testClient := NewDownloaderWithClient(log, config, elasticClient) - artifact, err := testClient.Download(context.Background(), beatSpec, version) - if err != nil { - t.Fatal(err) - } - - _, err = os.Stat(artifact) - if err != nil { - t.Fatal(err) - } - - testVerifier, err := NewVerifier(log, config, true, nil) - if err != nil { - t.Fatal(err) - } - - err = testVerifier.Verify(beatSpec, version, false) - require.NoError(t, err) - - os.Remove(artifact) - os.Remove(artifact + ".sha512") - }) - } -} - -func getTestCases() []testCase { - // always test random package to save time - return []testCase{ - {"linux", "32"}, - {"linux", "64"}, - {"linux", "arm64"}, - {"darwin", "32"}, - {"darwin", "64"}, - {"windows", "32"}, - {"windows", "64"}, - } -} - -func getRandomTestCases() []testCase { - tt := getTestCases() - - rand.Seed(time.Now().UnixNano()) - first := rand.Intn(len(tt)) - second := rand.Intn(len(tt)) - - return []testCase{ - tt[first], - tt[second], - } -} - -func getElasticCoClient() http.Client { - correctValues := map[string]struct{}{ - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "i386.deb"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "amd64.deb"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "i686.rpm"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "x86_64.rpm"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-x86.tar.gz"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-arm64.tar.gz"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "linux-x86_64.tar.gz"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "windows-x86.zip"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "windows-x86_64.zip"): {}, - fmt.Sprintf("%s-%s-%s", beatSpec.Cmd, version, "darwin-x86_64.tar.gz"): {}, - } - - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - packageName := r.URL.Path[len(sourcePattern):] - isShaReq := strings.HasSuffix(packageName, ".sha512") - packageName = strings.TrimSuffix(packageName, ".sha512") - - if _, ok := correctValues[packageName]; !ok { - w.WriteHeader(http.StatusInternalServerError) - } - - content := []byte(packageName) - if isShaReq { - hash := sha512.Sum512(content) - _, err := w.Write([]byte(fmt.Sprintf("%x %s", hash, packageName))) - if err != nil { - panic(err) - } - } else { - _, err := w.Write(content) - if err != nil { - panic(err) - } - } - }) - server := httptest.NewServer(handler) - - return http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, network, _ string) (net.Conn, error) { - return net.Dial(network, server.Listener.Addr().String()) - }, - }, - } -} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go index 4eef0682a50..ca024c53c88 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer.go @@ -5,10 +5,12 @@ package http import ( + "sync" "time" "github.com/docker/go-units" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -95,3 +97,36 @@ func (lpObs *loggingProgressObserver) ReportFailed(sourceURI string, timePast ti lpObs.log.Warnf(msg, args...) } } + +type detailsProgressObserver struct { + upgradeDetails *details.Details + mu sync.RWMutex +} + +func newDetailsProgressObserver(upgradeDetails *details.Details) *detailsProgressObserver { + upgradeDetails.SetState(details.StateDownloading) + return &detailsProgressObserver{ + upgradeDetails: upgradeDetails, + } +} + +func (dpObs *detailsProgressObserver) Report(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(percentComplete, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportCompleted(sourceURI string, timePast time.Duration, downloadRateBytesPerSecond float64) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.SetDownloadProgress(1, downloadRateBytesPerSecond) +} + +func (dpObs *detailsProgressObserver) ReportFailed(sourceURI string, timePast time.Duration, downloadedBytes, totalBytes, percentComplete, downloadRateBytesPerSecond float64, err error) { + dpObs.mu.Lock() + defer dpObs.mu.Unlock() + + dpObs.upgradeDetails.Fail(err) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go new file mode 100644 index 00000000000..bb1d7ac1c87 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/progress_observer_test.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/docker/go-units" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/internal/pkg/agent/errors" +) + +func TestDetailsProgressObserver(t *testing.T) { + upgradeDetails := details.NewDetails("8.11.0", details.StateRequested, "") + detailsObs := newDetailsProgressObserver(upgradeDetails) + + detailsObs.Report("http://some/uri", 20*time.Second, 400*units.MiB, 500*units.MiB, 0.8, 4455) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 0.8, upgradeDetails.Metadata.DownloadPercent) + + detailsObs.ReportCompleted("http://some/uri", 30*time.Second, 3333) + require.Equal(t, details.StateDownloading, upgradeDetails.State) + require.Equal(t, 1.0, upgradeDetails.Metadata.DownloadPercent) + + err := errors.New("some download error") + detailsObs.ReportFailed("http://some/uri", 30*time.Second, 450*units.MiB, 500*units.MiB, 0.9, 1122, err) + require.Equal(t, details.StateFailed, upgradeDetails.State) + require.Equal(t, details.StateDownloading, upgradeDetails.Metadata.FailedState) + require.Equal(t, err.Error(), upgradeDetails.Metadata.ErrorMsg) +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go index 99fca9f65d4..50aa64fab1e 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier.go @@ -30,11 +30,10 @@ const ( // Verifier verifies a downloaded package by comparing with public ASC // file from elastic.co website. type Verifier struct { - config *artifact.Config - client http.Client - pgpBytes []byte - allowEmptyPgp bool - log *logger.Logger + config *artifact.Config + client http.Client + defaultKey []byte + log *logger.Logger } func (v *Verifier) Name() string { @@ -43,9 +42,9 @@ func (v *Verifier) Name() string { // NewVerifier create a verifier checking downloaded package on preconfigured // location against a key stored on elastic.co website. -func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (*Verifier, error) { - if len(pgp) == 0 && !allowEmptyPgp { - return nil, errors.New("expecting PGP but retrieved none", errors.TypeSecurity) +func NewVerifier(log *logger.Logger, config *artifact.Config, pgp []byte) (*Verifier, error) { + if len(pgp) == 0 { + return nil, errors.New("expecting PGP key received none", errors.TypeSecurity) } client, err := config.HTTPTransportSettings.Client( @@ -59,11 +58,10 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool } v := &Verifier{ - config: config, - client: *client, - allowEmptyPgp: allowEmptyPgp, - pgpBytes: pgp, - log: log, + config: config, + client: *client, + defaultKey: pgp, + log: log, } return v, nil @@ -90,24 +88,26 @@ func (v *Verifier) Reload(c *artifact.Config) error { // Verify checks downloaded package on preconfigured // location against a key stored on elastic.co website. func (v *Verifier) Verify(a artifact.Artifact, version string, skipDefaultPgp bool, pgpBytes ...string) error { - fullPath, err := artifact.GetArtifactPath(a, version, v.config.OS(), v.config.Arch(), v.config.TargetDirectory) + artifactPath, err := artifact.GetArtifactPath(a, version, v.config.OS(), v.config.Arch(), v.config.TargetDirectory) if err != nil { return errors.New(err, "retrieving package path") } - if err = download.VerifySHA512Hash(fullPath); err != nil { - var checksumMismatchErr *download.ChecksumMismatchError - if errors.As(err, &checksumMismatchErr) { - os.Remove(fullPath) - os.Remove(fullPath + ".sha512") - } - return err + if err = download.VerifySHA512HashWithCleanup(v.log, artifactPath); err != nil { + return fmt.Errorf("failed to verify SHA512 hash: %w", err) } if err = v.verifyAsc(a, version, skipDefaultPgp, pgpBytes...); err != nil { var invalidSignatureErr *download.InvalidSignatureError if errors.As(err, &invalidSignatureErr) { - os.Remove(fullPath + ".asc") + if err := os.Remove(artifactPath); err != nil { + v.log.Warnf("failed clean up after signature verification: failed to remove %q: %v", + artifactPath, err) + } + if err := os.Remove(artifactPath + ascSuffix); err != nil { + v.log.Warnf("failed clean up after sha512 check: failed to remove %q: %v", + artifactPath+ascSuffix, err) + } } return err } @@ -115,36 +115,7 @@ func (v *Verifier) Verify(a artifact.Artifact, version string, skipDefaultPgp bo return nil } -func (v *Verifier) verifyAsc(a artifact.Artifact, version string, skipDefaultPgp bool, pgpSources ...string) error { - var pgpBytes [][]byte - if len(v.pgpBytes) > 0 && !skipDefaultPgp { - v.log.Infof("Default PGP being appended") - pgpBytes = append(pgpBytes, v.pgpBytes) - } - - for _, check := range pgpSources { - if len(check) == 0 { - continue - } - raw, err := download.PgpBytesFromSource(v.log, check, &v.client) - if err != nil { - return err - } - - if len(raw) == 0 { - continue - } - - pgpBytes = append(pgpBytes, raw) - } - - if len(pgpBytes) == 0 { - // no pgp available skip verification process - v.log.Infof("No checks defined") - return nil - } - v.log.Infof("Using %d PGP keys", len(pgpBytes)) - +func (v *Verifier) verifyAsc(a artifact.Artifact, version string, skipDefaultKey bool, pgpSources ...string) error { filename, err := artifact.GetArtifactName(a, version, v.config.OS(), v.config.Arch()) if err != nil { return errors.New(err, "retrieving package name") @@ -161,27 +132,17 @@ func (v *Verifier) verifyAsc(a artifact.Artifact, version string, skipDefaultPgp } ascBytes, err := v.getPublicAsc(ascURI) - if err != nil && v.allowEmptyPgp { - // asc not available but we allow empty for dev use-case - return nil - } else if err != nil { + if err != nil { return errors.New(err, fmt.Sprintf("fetching asc file from %s", ascURI), errors.TypeNetwork, errors.M(errors.MetaKeyURI, ascURI)) } - for i, check := range pgpBytes { - err = download.VerifyGPGSignature(fullPath, ascBytes, check) - if err == nil { - // verify successful - v.log.Infof("Verification with PGP[%d] successful", i) - return nil - } - v.log.Warnf("Verification with PGP[%d] failed: %v", i, err) + pgpBytes, err := download.FetchPGPKeys( + v.log, v.client, v.defaultKey, skipDefaultKey, pgpSources) + if err != nil { + return fmt.Errorf("could not fetch pgp keys: %w", err) } - v.log.Warnf("Verification failed") - - // return last error - return err + return download.VerifyPGPSignatureWithKeys(v.log, fullPath, ascBytes, pgpBytes) } func (v *Verifier) composeURI(filename, artifactName string) (string, error) { @@ -204,11 +165,12 @@ func (v *Verifier) composeURI(filename, artifactName string) (string, error) { func (v *Verifier) getPublicAsc(sourceURI string) ([]byte, error) { ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFn() - // Change NewRequest to NewRequestWithContext and pass context it req, err := http.NewRequestWithContext(ctx, http.MethodGet, sourceURI, nil) if err != nil { return nil, errors.New(err, "failed create request for loading public key", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } + + // TODO: receive a http.Client resp, err := http.DefaultClient.Do(req) if err != nil { return nil, errors.New(err, "failed loading public key", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) diff --git a/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go new file mode 100644 index 00000000000..66c8bd715e0 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/artifact/download/http/verifier_test.go @@ -0,0 +1,83 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func TestVerify(t *testing.T) { + targetDir := t.TempDir() + + log, _ := logger.New("", false) + timeout := 30 * time.Second + testCases := getRandomTestCases() + server, pub := getElasticCoServer(t) + elasticClient := getElasticCoClient(server) + // artifact/download/http.Verifier uses http.DefaultClient, thus we need to + // change it. + http.DefaultClient = &elasticClient + + config := &artifact.Config{ + SourceURI: source, + TargetDirectory: targetDir, + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: timeout, + }, + } + + for _, testCase := range testCases { + testName := fmt.Sprintf("%s-binary-%s", testCase.system, testCase.arch) + t.Run(testName, func(t *testing.T) { + config.OperatingSystem = testCase.system + config.Architecture = testCase.arch + + upgradeDetails := details.NewDetails("8.12.0", details.StateRequested, "") + testClient := NewDownloaderWithClient(log, config, elasticClient, upgradeDetails) + artifact, err := testClient.Download(context.Background(), beatSpec, version) + if err != nil { + t.Fatal(err) + } + + _, err = os.Stat(artifact) + if err != nil { + t.Fatal(err) + } + + testVerifier, err := NewVerifier(log, config, pub) + if err != nil { + t.Fatal(err) + } + + err = testVerifier.Verify(beatSpec, version, false) + require.NoError(t, err) + }) + } +} + +func getRandomTestCases() []testCase { + tt := getTestCases() + + first := rand.Intn(len(tt)) + second := rand.Intn(len(tt)) + + return []testCase{ + tt[first], + tt[second], + } +} diff --git a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go index 78cef03e578..023c15a5272 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/localremote/downloader.go @@ -11,13 +11,14 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/fs" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/snapshot" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" ) // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { downloaders := make([]download.Downloader, 0, 3) downloaders = append(downloaders, fs.NewDownloader(config)) @@ -26,7 +27,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo // a snapshot version of fleet, for example. // try snapshot repo before official if release.Snapshot() { - snapDownloader, err := snapshot.NewDownloader(log, config, nil) + snapDownloader, err := snapshot.NewDownloader(log, config, nil, upgradeDetails) if err != nil { log.Error(err) } else { @@ -34,7 +35,7 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (download.Downlo } } - httpDownloader, err := http.NewDownloader(log, config) + httpDownloader, err := http.NewDownloader(log, config, upgradeDetails) if err != nil { return nil, err } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/localremote/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/localremote/verifier.go index fc2c3a806be..c92b01b207c 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/localremote/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/localremote/verifier.go @@ -17,10 +17,10 @@ import ( // NewVerifier creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) { +func NewVerifier(log *logger.Logger, config *artifact.Config, pgp []byte) (download.Verifier, error) { verifiers := make([]download.Verifier, 0, 3) - fsVer, err := fs.NewVerifier(log, config, allowEmptyPgp, pgp) + fsVer, err := fs.NewVerifier(log, config, pgp) if err != nil { return nil, err } @@ -30,7 +30,7 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool // useful for testing with a snapshot version of fleet for example // try snapshot repo before official if release.Snapshot() { - snapshotVerifier, err := snapshot.NewVerifier(log, config, allowEmptyPgp, pgp, nil) + snapshotVerifier, err := snapshot.NewVerifier(log, config, pgp, nil) if err != nil { log.Error(err) } else { @@ -38,7 +38,7 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool } } - remoteVer, err := http.NewVerifier(log, config, allowEmptyPgp, pgp) + remoteVer, err := http.NewVerifier(log, config, pgp) if err != nil { return nil, err } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go index 51b16ee4372..ecf2497851c 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/downloader.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -32,13 +33,13 @@ type Downloader struct { // We need to pass the versionOverride separately from the config as // artifact.Config struct is part of agent configuration and a version // override makes no sense there -func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer) (download.Downloader, error) { +func NewDownloader(log *logger.Logger, config *artifact.Config, versionOverride *agtversion.ParsedSemVer, upgradeDetails *details.Details) (download.Downloader, error) { cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, fmt.Errorf("error creating snapshot config: %w", err) } - httpDownloader, err := http.NewDownloader(log, cfg) + httpDownloader, err := http.NewDownloader(log, cfg, upgradeDetails) if err != nil { return nil, fmt.Errorf("failed to create snapshot downloader: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/verifier.go index 302aa93e766..060c5e9fa10 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/snapshot/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/snapshot/verifier.go @@ -24,12 +24,12 @@ func (v *Verifier) Name() string { // NewVerifier creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride *agtversion.ParsedSemVer) (download.Verifier, error) { +func NewVerifier(log *logger.Logger, config *artifact.Config, pgp []byte, versionOverride *agtversion.ParsedSemVer) (download.Verifier, error) { cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, err } - v, err := http.NewVerifier(log, cfg, allowEmptyPgp, pgp) + v, err := http.NewVerifier(log, cfg, pgp) if err != nil { return nil, errors.New(err, "failed to create snapshot verifier") } diff --git a/internal/pkg/agent/application/upgrade/artifact/download/verifier.go b/internal/pkg/agent/application/upgrade/artifact/download/verifier.go index 662367f4909..79fc2348711 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/verifier.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/verifier.go @@ -21,7 +21,6 @@ import ( "time" "github.com/hashicorp/go-multierror" - "golang.org/x/crypto/openpgp" //nolint:staticcheck // crypto/openpgp is only receiving security updates. "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" @@ -39,12 +38,18 @@ var ( ErrUnknownPGPSource = errors.New("unknown pgp source") ) -// warnLogger is a logger that only needs to implement Warnf, as that is the only functions -// that the downloadProgressReporter uses. +// warnLogger is a logger that only needs to implement Warnf. type warnLogger interface { Warnf(format string, args ...interface{}) } +// loggerInfofWarnf is a logger that only needs to implement Infof and Warnf. +type infoWarnLogger interface { + warnLogger + + Infof(format string, args ...interface{}) +} + // ChecksumMismatchError indicates the expected checksum for a file does not // match the computed checksum. type ChecksumMismatchError struct { @@ -70,18 +75,40 @@ func (e *InvalidSignatureError) Error() string { // Unwrap returns the cause. func (e *InvalidSignatureError) Unwrap() error { return e.Err } -// Verifier is an interface verifying the SHA512 checksum and GPG signature and +// Verifier is an interface verifying the SHA512 checksum and PGP signature and // of a downloaded artifact. type Verifier interface { Name() string - // Verify should verify the artifact and return if succeed status (true|false) and an error if any checks fail. - // If the checksum does no match Verify returns a - // *download.ChecksumMismatchError. And if the GPG signature is invalid then - // Verify returns a *download.InvalidSignatureError. Use errors.As() to - // check error types. + // Verify should verify the artifact, returning an error if any checks fail. + // If the checksum does no match Verify returns a *download.ChecksumMismatchError. + // If the PGP signature check fails then Verify returns a + // *download.InvalidSignatureError. Verify(a artifact.Artifact, version string, skipDefaultPgp bool, pgpBytes ...string) error } +// VerifySHA512HashWithCleanup calls VerifySHA512Hash and, in case of a +// *ChecksumMismatchError, performs a cleanup by deleting both the filename and +// filename.sha512 files. If the cleanup fails, it logs a warning. +func VerifySHA512HashWithCleanup(log infoWarnLogger, filename string) error { + if err := VerifySHA512Hash(filename); err != nil { + var checksumMismatchErr *ChecksumMismatchError + if errors.As(err, &checksumMismatchErr) { + if err := os.Remove(filename); err != nil { + log.Warnf("failed clean up after sha512 verification: failed to remove %q: %v", + filename, err) + } + if err := os.Remove(filename + ".sha512"); err != nil { + log.Warnf("failed clean up after sha512 check: failed to remove %q: %v", + filename+".sha512", err) + } + } + + return err + } + + return nil +} + // VerifySHA512Hash checks that a sidecar file containing a sha512 checksum // exists and that the checksum in the sidecar file matches the checksum of // the file. It returns an error if validation fails. @@ -89,7 +116,7 @@ func VerifySHA512Hash(filename string) error { // Read expected checksum. expectedHash, err := readChecksumFile(filename+".sha512", filepath.Base(filename)) if err != nil { - return err + return fmt.Errorf("could not read checksum file: %w", err) } // Compute sha512 checksum. @@ -101,10 +128,10 @@ func VerifySHA512Hash(filename string) error { hash := sha512.New() if _, err := io.Copy(hash, f); err != nil { - return err + return fmt.Errorf("faled to read file to calculate hash") } - computedHash := hex.EncodeToString(hash.Sum(nil)) + computedHash := hex.EncodeToString(hash.Sum(nil)) if computedHash != expectedHash { return &ChecksumMismatchError{ Expected: expectedHash, @@ -157,11 +184,27 @@ func readChecksumFile(checksumFile, filename string) (string, error) { return checksum, nil } -// VerifyGPGSignature verifies the GPG signature of a file. It accepts the path +func VerifyPGPSignatureWithKeys( + log infoWarnLogger, file string, asciiArmorSignature []byte, publicKeys [][]byte) error { + var err error + for i, key := range publicKeys { + err = VerifyPGPSignature(file, asciiArmorSignature, key) + if err == nil { + log.Infof("Verification with PGP[%d] successful", i) + return nil + } + log.Warnf("Verification with PGP[%d] failed: %v", i, err) + } + + log.Warnf("Verification failed: %v", err) + return fmt.Errorf("could not verify PGP signature of %q: %w", file, err) +} + +// VerifyPGPSignature verifies the GPG signature of a file. It accepts the path // to the file to verify, the ASCII armored signature, and the public key to // check against. If there is a problem with the signature then a // *download.InvalidSignatureError is returned. -func VerifyGPGSignature(file string, asciiArmorSignature, publicKey []byte) error { +func VerifyPGPSignature(file string, asciiArmorSignature, publicKey []byte) error { keyring, err := openpgp.ReadArmoredKeyRing(bytes.NewReader(publicKey)) if err != nil { return errors.New(err, "read armored key ring", errors.TypeSecurity) @@ -181,6 +224,39 @@ func VerifyGPGSignature(file string, asciiArmorSignature, publicKey []byte) erro return nil } +func FetchPGPKeys(log infoWarnLogger, client http.Client, defaultPGPKey []byte, skipDefaultPGP bool, pgpSources []string) ([][]byte, error) { + var pgpKeys [][]byte + if len(defaultPGPKey) > 0 && !skipDefaultPGP { + pgpKeys = append(pgpKeys, defaultPGPKey) + log.Infof("Default PGP appended") + } + + for _, check := range pgpSources { + if len(check) == 0 { + continue + } + + raw, err := PgpBytesFromSource(log, check, &client) + if err != nil { + return nil, err + } + + if len(raw) == 0 { + continue + } + + pgpKeys = append(pgpKeys, raw) + } + + if len(pgpKeys) == 0 { + log.Infof("No PGP key available, skipping verification process") + return nil, nil + } + + log.Infof("Using %d PGP keys", len(pgpKeys)) + return pgpKeys, nil +} + func PgpBytesFromSource(log warnLogger, source string, client HTTPClient) ([]byte, error) { if strings.HasPrefix(source, PgpSourceRawPrefix) { return []byte(strings.TrimPrefix(source, PgpSourceRawPrefix)), nil @@ -191,7 +267,8 @@ func PgpBytesFromSource(log warnLogger, source string, client HTTPClient) ([]byt if errors.Is(err, ErrRemotePGPDownloadFailed) || errors.Is(err, ErrInvalidLocation) { log.Warnf("Skipped remote PGP located at %q because it's unavailable: %v", strings.TrimPrefix(source, PgpSourceURIPrefix), err) } else if err != nil { - log.Warnf("Failed to fetch remote PGP") + log.Warnf("Failed to fetch remote PGP key from %q: %v", + strings.TrimPrefix(source, PgpSourceURIPrefix), err) } return pgpBytes, nil diff --git a/internal/pkg/agent/application/upgrade/artifact/download/verifier_test.go b/internal/pkg/agent/application/upgrade/artifact/download/verifier_test.go index 05ad9a96b91..6e12a4b3d98 100644 --- a/internal/pkg/agent/application/upgrade/artifact/download/verifier_test.go +++ b/internal/pkg/agent/application/upgrade/artifact/download/verifier_test.go @@ -6,10 +6,16 @@ package download import ( "bytes" + "crypto/sha512" + "encoding/hex" + "fmt" "io" "net/http" + "os" + "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" @@ -118,6 +124,85 @@ func TestPgpBytesFromSource(t *testing.T) { } } +func TestVerifySHA512HashWithCleanup_success(t *testing.T) { + data := "I’m the Doctor. I’m a Time Lord. I’m from the planet " + + "Gallifrey in the constellation of Kasterborous. I’m 903 years old and " + + "I’m the man who’s going to save your lives and all 6 billion people on " + + "the planet below. You got a problem with that?" + dir := t.TempDir() + filename := "file" + path := filepath.Join(dir, filename) + + f, err := os.Create(path) + require.NoError(t, err, "could not create file") + fsha512, err := os.Create(path + ".sha512") + require.NoError(t, err, "could not create .sha512 file") + + _, err = fmt.Fprint(f, data) + require.NoError(t, err, "could not write to file") + hash := sha512.Sum512([]byte(data)) + _, err = fmt.Fprintf(fsha512, "%s %s", hex.EncodeToString(hash[:]), filename) + require.NoError(t, err, "could not write to file") + + err = f.Close() + require.NoError(t, err, "could not close file") + err = fsha512.Close() + require.NoError(t, err, "could not close .sha512 file") + + err = VerifySHA512HashWithCleanup(testlogger{t: t}, path) + assert.NoErrorf(t, err, "failed verifying sha512") +} + +func TestVerifySHA512HashWithCleanup_failure(t *testing.T) { + data := "I’m the Doctor. I’m a Time Lord. I’m from the planet " + + "Gallifrey in the constellation of Kasterborous. I’m 903 years old and " + + "I’m the man who’s going to save your lives and all 6 billion people on " + + "the planet below. You got a problem with that?" + dir := t.TempDir() + filename := "file" + path := filepath.Join(dir, filename) + + f, err := os.Create(path) + require.NoError(t, err, "could not create file") + fsha512, err := os.Create(path + ".sha512") + require.NoError(t, err, "could not create .sha512 file") + + _, err = fmt.Fprint(f, data) + require.NoError(t, err, "could not write to file") + _, err = fmt.Fprintf(fsha512, "%s %s", "wrong-sha512", filename) + require.NoError(t, err, "could not write to file") + + err = f.Close() + require.NoError(t, err, "could not close file") + err = fsha512.Close() + require.NoError(t, err, "could not close .sha512 file") + + err = VerifySHA512HashWithCleanup(testlogger{t: t}, path) + assert.Errorf(t, err, "checksum verification should have failed") + + dirEntries, err := os.ReadDir(dir) + require.NoError(t, err, "could not read %q to check it's empty", dir) + if len(dirEntries) != 0 { + var files []string + for _, e := range dirEntries { + files = append(files, e.Name()) + } + + t.Errorf("there should be no files on %q. Found %v", dir, files) + } +} + +type testlogger struct { + t *testing.T +} + +func (l testlogger) Infof(format string, args ...interface{}) { + l.t.Logf("[INFO] "+format, args) +} +func (l testlogger) Warnf(format string, args ...interface{}) { + l.t.Logf("[WARN] "+format, args) +} + type MockClient struct { DoFunc func(req *http.Request) (*http.Response, error) } diff --git a/internal/pkg/agent/application/upgrade/details/details.go b/internal/pkg/agent/application/upgrade/details/details.go new file mode 100644 index 00000000000..028990aafcd --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details.go @@ -0,0 +1,166 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + + "github.com/docker/go-units" +) + +// downloadRate is a float64 that can be safely marshalled to JSON +// when the value is Infinity. The rate is always in bytes/second units. +type downloadRate float64 + +// Observer is a function that will be called with upgrade details +type Observer func(details *Details) + +// Details consists of details regarding an ongoing upgrade. +type Details struct { + TargetVersion string `json:"target_version"` + State State `json:"state"` + ActionID string `json:"action_id,omitempty"` + Metadata Metadata `json:"metadata"` + + observers []Observer + mu sync.Mutex +} + +// Metadata consists of metadata relating to a specific upgrade state +type Metadata struct { + ScheduledAt time.Time `json:"scheduled_at,omitempty"` + DownloadPercent float64 `json:"download_percent,omitempty"` + DownloadRate downloadRate `json:"download_rate,omitempty"` + + // FailedState is the state an upgrade was in if/when it failed. Use the + // Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + FailedState State `json:"failed_state,omitempty"` + + // ErrorMsg is any error message encountered if/when an upgrade fails. Use + // the Fail() method of UpgradeDetails to correctly record details when + // an upgrade fails. + ErrorMsg string `json:"error_msg,omitempty"` +} + +func NewDetails(targetVersion string, initialState State, actionID string) *Details { + return &Details{ + TargetVersion: targetVersion, + State: initialState, + ActionID: actionID, + Metadata: Metadata{}, + observers: []Observer{}, + } +} + +// SetState is a convenience method to set the state of the upgrade and +// notify all observers. +func (d *Details) SetState(s State) { + d.mu.Lock() + defer d.mu.Unlock() + + d.State = s + d.notifyObservers() +} + +// SetDownloadProgress is a convenience method to set the download percent +// when the upgrade is in UPG_DOWNLOADING state. +func (d *Details) SetDownloadProgress(percent, rateBytesPerSecond float64) { + d.mu.Lock() + defer d.mu.Unlock() + + d.Metadata.DownloadPercent = percent + d.Metadata.DownloadRate = downloadRate(rateBytesPerSecond) + d.notifyObservers() +} + +// Fail is a convenience method to set the state of the upgrade +// to StateFailed, set metadata associated with the failure, and +// notify all observers. +func (d *Details) Fail(err error) { + d.mu.Lock() + defer d.mu.Unlock() + + // Record the state the upgrade process was in right before it + // failed, but only do this if we haven't already transitioned the + // state to the StateFailed state; otherwise we'll just end up recording + // the state we failed from as StateFailed which is not useful. + if d.State != StateFailed { + d.Metadata.FailedState = d.State + } + + d.Metadata.ErrorMsg = err.Error() + d.State = StateFailed + d.notifyObservers() +} + +// RegisterObserver allows an interested consumer of Details to register +// themselves as an Observer. The registered observer is immediately notified +// of the current upgrade details. +func (d *Details) RegisterObserver(observer Observer) { + d.mu.Lock() + defer d.mu.Unlock() + + d.observers = append(d.observers, observer) + d.notifyObserver(observer) +} + +func (d *Details) notifyObservers() { + for _, observer := range d.observers { + d.notifyObserver(observer) + } +} + +func (d *Details) notifyObserver(observer Observer) { + if d.State == StateCompleted { + observer(nil) + } else { + dCopy := Details{ + TargetVersion: d.TargetVersion, + State: d.State, + ActionID: d.ActionID, + Metadata: d.Metadata, + } + observer(&dCopy) + } +} + +func (dr *downloadRate) MarshalJSON() ([]byte, error) { + downloadRateBytesPerSecond := float64(*dr) + if math.IsInf(downloadRateBytesPerSecond, 0) { + return json.Marshal("+Inf bps") + } + + return json.Marshal( + fmt.Sprintf("%sps", units.HumanSizeWithPrecision(downloadRateBytesPerSecond, 2)), + ) +} + +func (dr *downloadRate) UnmarshalJSON(data []byte) error { + var downloadRateStr string + err := json.Unmarshal(data, &downloadRateStr) + if err != nil { + return err + } + + if downloadRateStr == "+Inf bps" { + *dr = downloadRate(math.Inf(1)) + return nil + } + + downloadRateStr = strings.TrimSuffix(downloadRateStr, "ps") + downloadRateBytesPerSecond, err := units.FromHumanSize(downloadRateStr) + if err != nil { + return err + } + + *dr = downloadRate(downloadRateBytesPerSecond) + return nil +} diff --git a/internal/pkg/agent/application/upgrade/details/details_test.go b/internal/pkg/agent/application/upgrade/details/details_test.go new file mode 100644 index 00000000000..88d239dfabf --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/details_test.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +import ( + "encoding/json" + "errors" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDetailsNew(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + require.Equal(t, "99.999.9999", det.TargetVersion) + require.Equal(t, "test_action_id", det.ActionID) + require.Equal(t, Metadata{}, det.Metadata) +} + +func TestDetailsSetState(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) +} + +func TestDetailsFail(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + err := errors.New("test error") + det.Fail(err) + require.Equal(t, StateFailed, det.State) + require.Equal(t, StateRequested, det.Metadata.FailedState) + require.Equal(t, err.Error(), det.Metadata.ErrorMsg) +} + +func TestDetailsObserver(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + require.Equal(t, StateRequested, det.State) + + var observedDetails *Details + obs := func(updatedDetails *Details) { observedDetails = updatedDetails } + + det.RegisterObserver(obs) + require.Len(t, det.observers, 1) + require.NotNil(t, observedDetails) + require.Equal(t, StateRequested, observedDetails.State) + + det.SetState(StateDownloading) + require.Equal(t, StateDownloading, det.State) + require.Equal(t, StateDownloading, observedDetails.State) + + det.SetState(StateCompleted) + require.Equal(t, StateCompleted, det.State) + require.Nil(t, nil, observedDetails) +} + +func TestDetailsDownloadRateJSON(t *testing.T) { + det := NewDetails("99.999.9999", StateRequested, "test_action_id") + + // Normal (non-infinity) download rate + t.Run("non_infinity", func(t *testing.T) { + det.SetDownloadProgress(.8, 1794.7) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, float64(1800), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, .8, unmarshalledDetails.Metadata.DownloadPercent) + }) + + // Infinity download rate + t.Run("infinity", func(t *testing.T) { + det.SetDownloadProgress(0.99, math.Inf(1)) + + data, err := json.Marshal(det) + require.NoError(t, err) + + var unmarshalledDetails Details + err = json.Unmarshal(data, &unmarshalledDetails) + require.NoError(t, err) + require.Equal(t, math.Inf(1), float64(unmarshalledDetails.Metadata.DownloadRate)) + require.Equal(t, 0.99, unmarshalledDetails.Metadata.DownloadPercent) + }) + +} diff --git a/internal/pkg/agent/application/upgrade/details/state.go b/internal/pkg/agent/application/upgrade/details/state.go new file mode 100644 index 00000000000..19aaaae8a25 --- /dev/null +++ b/internal/pkg/agent/application/upgrade/details/state.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package details + +type State string + +// The values of these State* constants should match those enumerated for +// upgrade_details.state in https://github.com/elastic/fleet-server/blob/main/model/openapi.yml +const ( + StateRequested State = "UPG_REQUESTED" + StateScheduled State = "UPG_SCHEDULED" + StateDownloading State = "UPG_DOWNLOADING" + StateExtracting State = "UPG_EXTRACTING" + StateReplacing State = "UPG_REPLACING" + StateRestarting State = "UPG_RESTARTING" + StateWatching State = "UPG_WATCHING" + StateRollback State = "UPG_ROLLBACK" + StateCompleted State = "UPG_COMPLETED" + StateFailed State = "UPG_FAILED" +) diff --git a/internal/pkg/agent/application/upgrade/step_download.go b/internal/pkg/agent/application/upgrade/step_download.go index a273460f337..d86a43a5a3b 100644 --- a/internal/pkg/agent/application/upgrade/step_download.go +++ b/internal/pkg/agent/application/upgrade/step_download.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" + "github.com/cenkalti/backoff/v4" "go.elastic.co/apm" @@ -35,7 +37,7 @@ const ( fleetUpgradeFallbackPGPFormat = "/api/agents/upgrades/%d.%d.%d/pgp-public-key" ) -func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { +func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { span, ctx := apm.StartSpan(ctx, "downloadArtifact", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() @@ -69,7 +71,7 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri return "", errors.New(err, fmt.Sprintf("failed to create download directory at %s", paths.Downloads())) } - path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings) + path, err := u.downloadWithRetries(ctx, newDownloader, parsedVersion, &settings, upgradeDetails) if err != nil { return "", errors.New(err, "failed download of agent binary") } @@ -121,20 +123,20 @@ func (u *Upgrader) appendFallbackPGP(targetVersion string, pgpBytes []string) [] return pgpBytes } -func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { +func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { if !version.IsSnapshot() { - return localremote.NewDownloader(log, settings) + return localremote.NewDownloader(log, settings, upgradeDetails) } // TODO since we know if it's a snapshot or not, shouldn't we add EITHER the snapshot downloader OR the release one ? // try snapshot repo before official - snapDownloader, err := snapshot.NewDownloader(log, settings, version) + snapDownloader, err := snapshot.NewDownloader(log, settings, version, upgradeDetails) if err != nil { return nil, err } - httpDownloader, err := http.NewDownloader(log, settings) + httpDownloader, err := http.NewDownloader(log, settings, upgradeDetails) if err != nil { return nil, err } @@ -143,23 +145,23 @@ func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, setting } func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) { - allowEmptyPgp, pgp := release.PGP() + pgp := release.PGP() if !version.IsSnapshot() { - return localremote.NewVerifier(log, settings, allowEmptyPgp, pgp) + return localremote.NewVerifier(log, settings, pgp) } - fsVerifier, err := fs.NewVerifier(log, settings, allowEmptyPgp, pgp) + fsVerifier, err := fs.NewVerifier(log, settings, pgp) if err != nil { return nil, err } - snapshotVerifier, err := snapshot.NewVerifier(log, settings, allowEmptyPgp, pgp, version) + snapshotVerifier, err := snapshot.NewVerifier(log, settings, pgp, version) if err != nil { return nil, err } - remoteVerifier, err := http.NewVerifier(log, settings, allowEmptyPgp, pgp) + remoteVerifier, err := http.NewVerifier(log, settings, pgp) if err != nil { return nil, err } @@ -169,9 +171,10 @@ func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings func (u *Upgrader) downloadWithRetries( ctx context.Context, - downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config) (download.Downloader, error), + downloaderCtor func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config, *details.Details) (download.Downloader, error), version *agtversion.ParsedSemVer, settings *artifact.Config, + upgradeDetails *details.Details, ) (string, error) { cancelCtx, cancel := context.WithTimeout(ctx, settings.Timeout) defer cancel() @@ -187,7 +190,7 @@ func (u *Upgrader) downloadWithRetries( attempt++ u.log.Infof("download attempt %d", attempt) - downloader, err := downloaderCtor(version, u.log, settings) + downloader, err := downloaderCtor(version, u.log, settings, upgradeDetails) if err != nil { return fmt.Errorf("unable to create fetcher: %w", err) } diff --git a/internal/pkg/agent/application/upgrade/step_download_test.go b/internal/pkg/agent/application/upgrade/step_download_test.go index 330a60f5288..dcdc4da7de8 100644 --- a/internal/pkg/agent/application/upgrade/step_download_test.go +++ b/internal/pkg/agent/application/upgrade/step_download_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" @@ -84,14 +85,15 @@ func TestDownloadWithRetries(t *testing.T) { // Successful immediately (no retries) t.Run("successful_immediately", func(t *testing.T) { - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{expectedDownloadPath, nil}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -103,7 +105,7 @@ func TestDownloadWithRetries(t *testing.T) { // Downloader constructor failing on first attempt, but succeeding on second attempt (= first retry) t.Run("constructor_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -125,7 +127,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -139,7 +142,7 @@ func TestDownloadWithRetries(t *testing.T) { // Download failing on first attempt, but succeeding on second attempt (= first retry) t.Run("download_failure_once", func(t *testing.T) { attemptIdx := 0 - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { defer func() { attemptIdx++ }() @@ -161,7 +164,8 @@ func TestDownloadWithRetries(t *testing.T) { u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &settings, upgradeDetails) require.NoError(t, err) require.Equal(t, expectedDownloadPath, path) @@ -178,14 +182,15 @@ func TestDownloadWithRetries(t *testing.T) { testCaseSettings.Timeout = 200 * time.Millisecond testCaseSettings.RetrySleepInitDuration = 100 * time.Millisecond - mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + mockDownloaderCtor := func(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { return &mockDownloader{"", errors.New("download failed")}, nil } u := NewUpgrader(testLogger, &settings, &info.AgentInfo{}) parsedVersion, err := agtversion.ParseVersion("8.9.0") require.NoError(t, err) - path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings) + upgradeDetails := details.NewDetails(parsedVersion.String(), details.StateRequested, "") + path, err := u.downloadWithRetries(context.Background(), mockDownloaderCtor, parsedVersion, &testCaseSettings, upgradeDetails) require.Equal(t, "context deadline exceeded", err.Error()) require.Equal(t, "", path) diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index eaf51ef7684..36276f239b6 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/install" "github.com/elastic/elastic-agent/internal/pkg/config" @@ -28,7 +29,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/release" - "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -127,7 +127,7 @@ func (u *Upgrader) Upgradeable() bool { } // Upgrade upgrades running agent, function returns shutdown callback that must be called by reexec. -func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { +func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) { u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI) span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal") defer span.End() @@ -137,17 +137,22 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads()) } + det.SetState(details.StateDownloading) + sourceURI = u.sourceURI(sourceURI) - archivePath, err := u.downloadArtifact(ctx, version, sourceURI, skipVerifyOverride, skipDefaultPgp, pgpBytes...) + archivePath, err := u.downloadArtifact(ctx, version, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { // Run the same pre-upgrade cleanup task to get rid of any newly downloaded files // This may have an issue if users are upgrading to the same version number. if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil { u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr) } + return nil, err } + det.SetState(details.StateExtracting) + newHash, err := u.unpack(version, archivePath) if err != nil { return nil, err @@ -170,6 +175,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, errors.New(err, "failed to copy run directory") } + det.SetState(details.StateReplacing) + if err := ChangeSymlink(ctx, u.log, newHash); err != nil { u.log.Errorw("Rolling back: changing symlink failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) @@ -182,6 +189,8 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, err } + det.SetState(details.StateWatching) + if err := InvokeWatcher(u.log); err != nil { u.log.Errorw("Rolling back: starting watcher failed", "error.message", err) rollbackInstall(ctx, u.log, newHash) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 353c9d1e7a7..4a12bdc8540 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -218,10 +218,6 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration l.Error(errors.New(err, "failed to invoke rollback watcher")) } - if allowEmptyPgp, _ := release.PGP(); allowEmptyPgp { - l.Info("Elastic Agent has been built with security disabled. Elastic Agent will not verify signatures of upgrade artifact.") - } - execPath, err := reexecPath() if err != nil { return err diff --git a/internal/pkg/agent/storage/store/action_store.go b/internal/pkg/agent/storage/store/action_store.go index ea0b2eb3c8b..4fc9df8b485 100644 --- a/internal/pkg/agent/storage/store/action_store.go +++ b/internal/pkg/agent/storage/store/action_store.go @@ -33,7 +33,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { // and return an empty store. reader, err := store.Load() if err != nil { - log.Errorf("failed to load action store, returning empty contents: %v", err.Error()) + log.Warnf("failed to load action store, returning empty contents: %v", err.Error()) return &actionStore{log: log, store: store}, nil } defer reader.Close() diff --git a/internal/pkg/agent/storage/store/state_store.go b/internal/pkg/agent/storage/store/state_store.go index 6f64f1184bf..3e794c3547b 100644 --- a/internal/pkg/agent/storage/store/state_store.go +++ b/internal/pkg/agent/storage/store/state_store.go @@ -95,7 +95,7 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { // and return an empty store. reader, err := store.Load() if err != nil { - log.Errorf("failed to load state store, returning empty contents: %v", err.Error()) + log.Warnf("failed to load state store, returning empty contents: %v", err.Error()) return &StateStore{log: log, store: store}, nil } defer reader.Close() diff --git a/internal/pkg/capabilities/upgrade.go b/internal/pkg/capabilities/upgrade.go index 07866ec111e..0f7b19babf8 100644 --- a/internal/pkg/capabilities/upgrade.go +++ b/internal/pkg/capabilities/upgrade.go @@ -65,7 +65,7 @@ func allowUpgrade( for _, cap := range upgradeCaps { result, err := cap.condition.Eval(varStore, true) if err != nil { - log.Errorf("failed evaluating eql formula %q, skipping: %v", cap.conditionStr, err) + log.Warnf("failed evaluating eql formula %q, skipping: %v", cap.conditionStr, err) continue } if result { diff --git a/internal/pkg/fleetapi/acker/lazy/lazy_acker.go b/internal/pkg/fleetapi/acker/lazy/lazy_acker.go index 298b2b5bf7f..65f7bdc1cac 100644 --- a/internal/pkg/fleetapi/acker/lazy/lazy_acker.go +++ b/internal/pkg/fleetapi/acker/lazy/lazy_acker.go @@ -87,7 +87,7 @@ func (f *Acker) Commit(ctx context.Context) (err error) { // If request failed enqueue all actions with retrier if it is set if err != nil { if f.retrier != nil { - f.log.Errorf("lazy acker: failed ack batch, enqueue for retry: %s", actions) + f.log.Warnf("lazy acker: failed ack batch, enqueue for retry: %s", actions) f.retrier.Enqueue(actions) return nil } diff --git a/internal/pkg/release/pgp.go b/internal/pkg/release/pgp.go index 49d4cfc32a9..9644f6778f3 100644 --- a/internal/pkg/release/pgp.go +++ b/internal/pkg/release/pgp.go @@ -19,7 +19,7 @@ func init() { pgpBytes = packer.MustUnpack("eJyMlsuOtLoBhPd5jH9/JBua0U+ksxguhobGjI0v4B1gNDQ2NNPNTNNEeffoRMoqipR1Va1KVfr+8Sv5SP7I4+aPwbaP7do/hvbej7/+/utSge1SwZBynbFrQCh8/+RIhCy20TnekKyCkMV+VL3+8oEttMr2C1475/R2jvW3FkF6TpvXZXr/Lhj5zGNdisovWunBITR5OENKuRdSY44qxT/E4ICiMdZJVlazd2pssJMJOTT2AHHx3iYclLVKZI1bNtmMwfWQdlz6SI9Vst6wwTkxJCfVdqVAfWjX3pqZuE1NDixX2lod6AN9FA6eZY0vRMJkqLagn3BRxRi3sDk6uB59vAYE0kwB/NKOd29l8VOSNRJyX7nkRzHRXRv/KlhG+UIJjtWjSNe6cdT1AouTEPZNwLGuuILVgrA23GMSVZKhq4Yi1Mv600vksFi34Xw7OGh2DoOPHNIQC/Sqku3F+Rj2DmxysJqGKYORfejo80dHKtIGugqiskuzx2DsRyk0z6Et8bKy3MV7lZC8EPZycZDCNbp1YC/b9N2jL/88JOPEoYpasO8lwkwnt13a284P+6V5Rjo4ykKsNZuzEzVeqoF/uwBPUWc39P32ah0YqgkrBfCu+P5WJejWA4T6VAUdGzmZY5czWxcTispUBcSIkHCUFigTYsFKxqdnI3Q52LWmi1XFrJfQbOh/aYJlj27Oti4V7cCzQtrx3sc2HpKtlWidmwmHg0Xpf+dNLlm2aHk+qtkjnbOFDCAqYkQrjjIFRSTkOqnaLpXMAg1tQSBaqhi5FdcJlzoiEP2wOAslx4EGIqoSjchzTXCMAuLoO2fBdyHhlc32g9dZNcxroAB6CCRoIeHSHfi7OlRI4XhtJ9oQvjparDE9gnPF91XU5N6j4HuYehjO3qGmoBTOuPOJehSoR1cHsHHpQhzIabidB2uBmOzcR3YXL/+qoYKVwaWOt6Y98KHSVRGeqRzaRxFjO1x/5xgUL+qKlAD/0cynZ5eKiFnikWXMu/kTdLzw2tCfKmCOgYkvbW+udpCjHQ6Kmtqe9ztGzZ1KP+HO6WA2uHRk5UOCYxGrl5g064QiQ4zDBqI3xd6fDVhvfZpdhmSD0kDCkdr7OGM5WGeKEBisoDT0Cw7U3MbjVylFnMPRhtNosIvdHI7OEJ9fDT8B5Xi8Q9lba557Z2nVR8GtuPoZA/reCztXvPd6qzcO9rJLxxuZMpA7xZMv+EvFGuPwd65E9uBIe12srsLACdfBRtMg02l2FxB/DyabWwc2LMIOO7KmgNrRDmR0trSZt5AiVHHgnTDYAi51gPkquvf1rQGwxpU/6dq2fBrHDuGxAdveInzVs8/6Gl0HM14aKKic4WMwHq6A5oRnCT2Qx5dPr79ublP5s14Qwcbnoc3euKGOsMVP49pJu8VJik+XJhR2TJ/KVGQlE9eytrao/GZg4ruDAerYONFwu/XJ/t0jfudxRqW5QRnfXlX1yAv59NhsT0NEXA2DVCYi6uJ9bxz6XSTIU+Hm5DBz1VT8SCTyflFTA/qXFKtoRQxaq9vW8U89iJ/YHU8YZF/5cy3++iRqthS7+MyNcpQIEsGClh8qp8unR8G6txaHjRkrPm9jVz2AkrAuXFErqGa5jFA7WBDntDe1LRXMQGi2lS6N11r0UkfGK6GRXlY1RBYMtebEHaccqIQJHXRJFg2zuXeM1jTcqBC6VobSInr3ROXXjAX3/ggu2u1zWmvWvnxBU8E7d0w7s4lippSILOAcha1ASJksVCD7a8c5sSj/T+9tijHh6Kcyoqwm23b1qtp5uxTv61QAmBFuUSnHgE/nZ1ejXIB96t2x0GlW9YeCzDS7WLInuT5Ad/0NsUACO/6JMLQKk31gYxPN6K5cnBQsy0NLRz17OeawaLjgXaIXicx9MNSqyR6DoEQmPm1c+hxmPDKr4k4Ko53fO3dsUUQYMulfu9h3hBtgfvVVkdxyvGQbBnbpDawFpw1FgdPBIG0NfBRLvLeC2oGPdyZWNAiRlwhfeged24h+icm6fWQvHS9O5PXwiKuxlluBn6vDmHDLBC/arD+k1kcf4aSbuEfqsdbJWBZwzKXE30wqppMt00IvfNKiWM73hp33AvWOnj0xIA46cHryBOJwxrBfsOKptWIKShn6Fy6FUNzDGmhOzdnhfGQXB33zGaVcCreV6723VnWQljpRjy4dM81sIyVciREVr37nvQjiLtZVF/uLsNSU9SiUGZPWXUPMsgkfAfogN79k2Y98v/2bbS5clDT8Pxjo888/f/3zb/8KAAD///dAGpU=")["GPG-KEY-elasticsearch"] } -// PGP return pgpbytes and a flag describing whether or not no pgp is valid. -func PGP() (bool, []byte) { - return allowEmptyPgp == "true", pgpBytes +// PGP return pgpbytes. +func PGP() []byte { + return pgpBytes } diff --git a/internal/pkg/release/version.go b/internal/pkg/release/version.go index 5ff0be1dc29..93f78e5f4be 100644 --- a/internal/pkg/release/version.go +++ b/internal/pkg/release/version.go @@ -23,10 +23,6 @@ var snapshot = "" // complete is an environment variable marking the image as complete. var complete = "ELASTIC_AGENT_COMPLETE" -// allowEmptyPgp is used as a debug flag and allows working -// without valid pgp -var allowEmptyPgp string - // allowUpgrade is used as a debug flag and allows working // with upgrade without requiring Agent to be installed correctly var allowUpgrade string diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 8cd6e126597..ca6dad2dba4 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -201,6 +201,13 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor "log.level": "error", }, }, + { + "term": map[string]interface{}{ + "data_stream.namespace": map[string]interface{}{ + "value": namespace, + }, + }, + }, }, "must_not": excludeStatements, }, diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go index 97836c7ff3f..c52b2150d47 100644 --- a/testing/integration/monitoring_logs_test.go +++ b/testing/integration/monitoring_logs_test.go @@ -38,7 +38,6 @@ func TestMonitoringLogsShipped(t *testing.T) { ctx := context.Background() t.Logf("got namespace: %s", info.Namespace) - t.Skip("Test is flaky; see https://github.com/elastic/elastic-agent/issues/3081") agentFixture, err := define.NewFixture(t, define.Version()) require.NoError(t, err) @@ -90,7 +89,7 @@ func TestMonitoringLogsShipped(t *testing.T) { require.NotZero(t, len(docs.Hits.Hits)) t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - // Stage 4: make sure all components are health + // Stage 4: make sure all components are healthy t.Log("Making sure all components are healthy") status, err := agentFixture.ExecStatus(ctx) require.NoError(t, err, @@ -101,7 +100,26 @@ func TestMonitoringLogsShipped(t *testing.T) { c.Name, client.Healthy, client.State(c.State)) } - // Stage 5: Make sure we have message confirming central management is running + // Stage 5: Make sure there are no errors in logs + t.Log("Making sure there are no error logs") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ + // acceptable error messages (include reason) + "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated + "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues + "Failed to download artifact", + "Failed to initialize artifact", + "Failed to apply initial policy from on disk configuration", + "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart + }) + }) + t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + require.Empty(t, docs.Hits.Hits) + + // Stage 6: Make sure we have message confirming central management is running t.Log("Making sure we have message confirming central management is running") docs = findESDocs(t, func() (estools.Documents, error) { return estools.FindMatchingLogLines(info.ESClient, info.Namespace, @@ -109,7 +127,7 @@ func TestMonitoringLogsShipped(t *testing.T) { }) require.NotZero(t, len(docs.Hits.Hits)) - // Stage 6: verify logs from the monitoring components are not sent to the output + // Stage 7: verify logs from the monitoring components are not sent to the output t.Log("Check monitoring logs") hostname, err := os.Hostname() if err != nil { diff --git a/testing/integration/upgrade_gpg_test.go b/testing/integration/upgrade_gpg_test.go index 56d4378d147..e2001dc1eca 100644 --- a/testing/integration/upgrade_gpg_test.go +++ b/testing/integration/upgrade_gpg_test.go @@ -11,7 +11,6 @@ import ( "strings" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent/internal/pkg/release" @@ -55,7 +54,7 @@ func TestStandaloneUpgradeWithGPGFallback(t *testing.T) { t.Logf("Testing Elastic Agent upgrade from %s to %s...", define.Version(), upgradeToVersion) - _, defaultPGP := release.PGP() + defaultPGP := release.PGP() firstSeven := string(defaultPGP[:7]) newPgp := strings.Replace( string(defaultPGP), @@ -73,7 +72,7 @@ func TestStandaloneUpgradeWithGPGFallback(t *testing.T) { upgradetest.WithSourceURI(""), upgradetest.WithCustomPGP(customPGP), upgradetest.WithSkipVerify(false)) - assert.NoError(t, err) + require.NoError(t, err, "perform upgrade failed") } func TestStandaloneUpgradeWithGPGFallbackOneRemoteFailing(t *testing.T) { @@ -110,7 +109,7 @@ func TestStandaloneUpgradeWithGPGFallbackOneRemoteFailing(t *testing.T) { t.Logf("Testing Elastic Agent upgrade from %s to %s...", define.Version(), upgradeToVersion) - _, defaultPGP := release.PGP() + defaultPGP := release.PGP() firstSeven := string(defaultPGP[:7]) newPgp := strings.Replace( string(defaultPGP), @@ -129,4 +128,5 @@ func TestStandaloneUpgradeWithGPGFallbackOneRemoteFailing(t *testing.T) { upgradetest.WithSourceURI(""), upgradetest.WithCustomPGP(customPGP), upgradetest.WithSkipVerify(false)) + require.NoError(t, err, "perform upgrade failed") } diff --git a/testing/pgptest/pgp.go b/testing/pgptest/pgp.go new file mode 100644 index 00000000000..c6c441536bf --- /dev/null +++ b/testing/pgptest/pgp.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pgptest + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/crypto/openpgp" //nolint:staticcheck // It still receives security fixes and it's just test code + "golang.org/x/crypto/openpgp/armor" //nolint:staticcheck // It still receives security fixes and it's just test code +) + +// Sing signs data using RSA. It creates the key, sings data and returns the +// ASCII armored public key and detached signature. +func Sing(t *testing.T, data io.Reader) ([]byte, []byte) { + pub := &bytes.Buffer{} + asc := &bytes.Buffer{} + + // Create a new key. The openpgp.Entity hold the private and public keys. + entity, err := openpgp.NewEntity("somekey", "", "", nil) + require.NoError(t, err, "could not create PGP key") + + // Create an encoder to serialize the public key. + wPubKey, err := armor.Encode(pub, openpgp.PublicKeyType, nil) + require.NoError(t, err, "could not create PGP ASCII Armor encoder") + + // Writes the public key to the io.Writer padded to armor.Encode. + // Use entity.SerializePrivate if you need the private key. + err = entity.Serialize(wPubKey) + require.NoError(t, err, "could not serialize the public key") + // cannot use defer as it needs to be closed before pub.Bytes() is invoked. + wPubKey.Close() + + err = openpgp.ArmoredDetachSign(asc, entity, data, nil) + require.NoError(t, err, "failed signing the data") + + return pub.Bytes(), asc.Bytes() +}