From 324f813641dbde47266c14c6c9008f396722ae59 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 21 Sep 2023 12:12:41 -0400 Subject: [PATCH] Uninstall finds and kills any running `elastic-agent watch` process (#3384) * kill watcher on uninstall * Empty commit. * Fix killWatcher. * Empty commit. * Another fix for killWatcher. * Empty commit. * Catch ErrProcessDone. * Empty commit. * Empty commit * Add changelog fragment. * Make it work on Windows. * Change killWatcher to be in a loop. * Add loop to killWatcher. * Revert "Skip TestStandaloneUpgradeFailsStatus to fix failing integration tests again (#3391)" This reverts commit bf467e32de6c87c73d3f411c35fac40c9f3b8c75. * Revert "Fix integration tests by waiting for the watcher to finish during upgrade tests (#3370)" This reverts commit 94764be90ce4a207f2e380390cfb67558095eb9b. * Fix test. * Revert "Revert "Skip TestStandaloneUpgradeFailsStatus to fix failing integration tests again (#3391)"" This reverts commit 3b0f040460cba561d9503d716b33c241893c01db. * Add progress tracking for uninstall like install. * Log when no watchers our found. * Improve uninstall. * Fix data race. (cherry picked from commit 7e86d24cd336145b9641f621a3de10ed3af0c9fe) # Conflicts: # internal/pkg/agent/cmd/install.go # internal/pkg/agent/install/install.go # internal/pkg/agent/install/progress.go # internal/pkg/agent/install/progress_test.go --- ...and-kills-any-running-watcher-process.yaml | 32 +++ internal/pkg/agent/cmd/install.go | 62 +++++- internal/pkg/agent/cmd/uninstall.go | 8 +- internal/pkg/agent/install/install.go | 45 ++++ internal/pkg/agent/install/progress.go | 204 ++++++++++++++++++ internal/pkg/agent/install/progress_test.go | 138 ++++++++++++ internal/pkg/agent/install/uninstall.go | 119 ++++++++-- pkg/utils/watcher.go | 45 ++++ testing/integration/upgrade_test.go | 85 +++----- 9 files changed, 660 insertions(+), 78 deletions(-) create mode 100644 changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml create mode 100644 internal/pkg/agent/install/progress.go create mode 100644 internal/pkg/agent/install/progress_test.go create mode 100644 pkg/utils/watcher.go diff --git a/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml b/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml new file mode 100644 index 00000000000..057cc06d33e --- /dev/null +++ b/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Uninstall finds and kills any running watcher process + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3384 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3371 diff --git a/internal/pkg/agent/cmd/install.go b/internal/pkg/agent/cmd/install.go index 90fd9eba10d..87169ecedf1 100644 --- a/internal/pkg/agent/cmd/install.go +++ b/internal/pkg/agent/cmd/install.go @@ -179,20 +179,43 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { } } +<<<<<<< HEAD cfgFile := paths.ConfigFile() if status != install.PackageInstall { err = install.Install(cfgFile, topPath) +======= + pt := install.NewProgressTracker(streams.Out) + s := pt.Start() + defer func() { + if err != nil { + s.Failed() + } else { + s.Succeeded() + } + }() + + cfgFile := paths.ConfigFile() + if status != install.PackageInstall { + err = install.Install(cfgFile, topPath, s) +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) if err != nil { return err } defer func() { if err != nil { - _ = install.Uninstall(cfgFile, topPath, "") + uninstallStep := s.StepStart("Uninstalling") + innerErr := install.Uninstall(cfgFile, topPath, "", uninstallStep) + if innerErr != nil { + uninstallStep.Failed() + } else { + uninstallStep.Succeeded() + } } }() if !delayEnroll { +<<<<<<< HEAD err = install.StartService(topPath) if err != nil { fmt.Fprintf(streams.Out, "Installation failed to start Elastic Agent service.\n") @@ -202,6 +225,26 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { defer func() { if err != nil { _ = install.StopService(topPath) +======= + startServiceStep := s.StepStart("Starting service") + err = install.StartService(topPath) + if err != nil { + startServiceStep.Failed() + fmt.Fprintf(streams.Out, "Installation failed to start Elastic Agent service.\n") + return err + } + startServiceStep.Succeeded() + + defer func() { + if err != nil { + stoppingServiceStep := s.StepStart("Stopping service") + innerErr := install.StopService(topPath) + if innerErr != nil { + stoppingServiceStep.Failed() + } else { + stoppingServiceStep.Succeeded() + } +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) } }() } @@ -214,12 +257,21 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { enrollCmd.Stdin = os.Stdin enrollCmd.Stdout = os.Stdout enrollCmd.Stderr = os.Stderr +<<<<<<< HEAD err = enrollCmd.Start() if err != nil { +======= + + enrollStep := s.StepStart("Enrolling Elastic Agent with Fleet") + err = enrollCmd.Start() + if err != nil { + enrollStep.Failed() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return fmt.Errorf("failed to execute enroll command: %w", err) } err = enrollCmd.Wait() if err != nil { +<<<<<<< HEAD if status != install.PackageInstall { var exitErr *exec.ExitError _ = install.Uninstall(cfgFile, topPath, "") @@ -229,6 +281,14 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { } return fmt.Errorf("enroll command failed for unknown reason: %w", err) } +======= + enrollStep.Failed() + // uninstall doesn't need to be performed here the defer above will + // catch the error and perform the uninstall + return fmt.Errorf("enroll command failed for unknown reason: %w", err) + } + enrollStep.Succeeded() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) } if err := info.CreateInstallMarker(topPath); err != nil { diff --git a/internal/pkg/agent/cmd/uninstall.go b/internal/pkg/agent/cmd/uninstall.go index 9a96e412302..e9446757c04 100644 --- a/internal/pkg/agent/cmd/uninstall.go +++ b/internal/pkg/agent/cmd/uninstall.go @@ -80,9 +80,15 @@ func uninstallCmd(streams *cli.IOStreams, cmd *cobra.Command) error { } } - err = install.Uninstall(paths.ConfigFile(), paths.Top(), uninstallToken) + pt := install.NewProgressTracker(streams.Out) + s := pt.Start() + + err = install.Uninstall(paths.ConfigFile(), paths.Top(), uninstallToken, s) if err != nil { + s.Failed() return err + } else { + s.Succeeded() } fmt.Fprintf(streams.Out, "Elastic Agent has been uninstalled.\n") diff --git a/internal/pkg/agent/install/install.go b/internal/pkg/agent/install/install.go index b91c931afd6..f9eaa6cbcdf 100644 --- a/internal/pkg/agent/install/install.go +++ b/internal/pkg/agent/install/install.go @@ -22,7 +22,11 @@ const ( ) // Install installs Elastic Agent persistently on the system including creating and starting its service. +<<<<<<< HEAD func Install(cfgFile, topPath string) error { +======= +func Install(cfgFile, topPath string, pt ProgressTrackerStep) error { +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) dir, err := findDirectory() if err != nil { return errors.New(err, "failed to discover the source directory for installation", errors.TypeFilesystem) @@ -33,13 +37,24 @@ func Install(cfgFile, topPath string) error { // There is no uninstall token for "install" command. // Uninstall will fail on protected agent. // The protected Agent will need to be uninstalled first before it can be installed. +<<<<<<< HEAD err = Uninstall(cfgFile, topPath, "") if err != nil { +======= + s := pt.StepStart("Uninstalling current Elastic Agent") + err = Uninstall(cfgFile, topPath, "", s) + if err != nil { + s.Failed() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return errors.New( err, fmt.Sprintf("failed to uninstall Agent at (%s)", filepath.Dir(topPath)), errors.M("directory", filepath.Dir(topPath))) } +<<<<<<< HEAD +======= + s.Succeeded() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) // ensure parent directory exists, copy source into install path err = os.MkdirAll(filepath.Dir(topPath), 0755) @@ -49,6 +64,12 @@ func Install(cfgFile, topPath string) error { fmt.Sprintf("failed to create installation parent directory (%s)", filepath.Dir(topPath)), errors.M("directory", filepath.Dir(topPath))) } +<<<<<<< HEAD +======= + + // copy source into install path + s = pt.StepStart("Copying files") +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) err = copy.Copy(dir, topPath, copy.Options{ OnSymlink: func(_ string) copy.SymlinkAction { return copy.Shallow @@ -56,11 +77,19 @@ func Install(cfgFile, topPath string) error { Sync: true, }) if err != nil { +<<<<<<< HEAD +======= + s.Failed() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return errors.New( err, fmt.Sprintf("failed to copy source directory (%s) to destination (%s)", dir, topPath), errors.M("source", dir), errors.M("destination", topPath)) } +<<<<<<< HEAD +======= + s.Succeeded() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) // place shell wrapper, if present on platform if paths.ShellWrapperPath != "" { @@ -124,17 +153,33 @@ func Install(cfgFile, topPath string) error { } // install service +<<<<<<< HEAD svc, err := newService(topPath) if err != nil { +======= + s = pt.StepStart("Installing service") + svc, err := newService(topPath) + if err != nil { + s.Failed() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return err } err = svc.Install() if err != nil { +<<<<<<< HEAD +======= + s.Failed() +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return errors.New( err, fmt.Sprintf("failed to install service (%s)", paths.ServiceName), errors.M("service", paths.ServiceName)) } +<<<<<<< HEAD +======= + s.Succeeded() + +>>>>>>> 7e86d24cd3 (Uninstall finds and kills any running `elastic-agent watch` process (#3384)) return nil } diff --git a/internal/pkg/agent/install/progress.go b/internal/pkg/agent/install/progress.go new file mode 100644 index 00000000000..bc797a2c58d --- /dev/null +++ b/internal/pkg/agent/install/progress.go @@ -0,0 +1,204 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package install + +import ( + "fmt" + "io" + "math" + "strings" + "sync" + "time" + + "golang.org/x/exp/rand" +) + +// ProgressTrackerStep is a currently running step. +// +// A step can produce a sub-step that is a step that is part of another step. +type ProgressTrackerStep interface { + // Succeeded step is done and successful. + Succeeded() + // Failed step has failed. + Failed() + // StepStart creates a new step. + StepStart(msg string) ProgressTrackerStep +} + +type progressTrackerStep struct { + tracker *ProgressTracker + prefix string + + finalizeFunc func() + + rootstep bool + substeps bool + mu sync.Mutex + step *progressTrackerStep +} + +func newProgressTrackerStep(tracker *ProgressTracker, prefix string, finalizeFunc func()) *progressTrackerStep { + return &progressTrackerStep{ + tracker: tracker, + prefix: prefix, + finalizeFunc: finalizeFunc, + } +} + +// Succeeded step is done and successful. +func (pts *progressTrackerStep) Succeeded() { + prefix := " " + if pts.substeps { + prefix = pts.prefix + " " + } + if !pts.rootstep { + pts.tracker.printf("%sDONE\n", prefix) + } + pts.finalizeFunc() +} + +// Failed step has failed. +func (pts *progressTrackerStep) Failed() { + prefix := " " + if pts.substeps { + prefix = pts.prefix + " " + } + if !pts.rootstep { + pts.tracker.printf("%sFAILED\n", prefix) + } + pts.finalizeFunc() +} + +// StepStart creates a new step. +func (pts *progressTrackerStep) StepStart(msg string) ProgressTrackerStep { + prefix := pts.prefix + if !pts.rootstep { + prefix += " " + if !pts.substeps { + prefix = "\n" + prefix + pts.substeps = true + } + } + pts.tracker.printf("%s%s...", prefix, strings.TrimSpace(msg)) + s := newProgressTrackerStep(pts.tracker, prefix, func() { + pts.setStep(nil) + }) + pts.setStep(s) + return s +} + +func (pts *progressTrackerStep) getStep() *progressTrackerStep { + pts.mu.Lock() + defer pts.mu.Unlock() + return pts.step +} + +func (pts *progressTrackerStep) setStep(step *progressTrackerStep) { + pts.mu.Lock() + defer pts.mu.Unlock() + pts.step = step +} + +func (pts *progressTrackerStep) tick() { + step := pts.getStep() + if step != nil { + step.tick() + return + } + if !pts.rootstep { + pts.tracker.printf(".") + } +} + +type ProgressTracker struct { + writer io.Writer + + tickInterval time.Duration + randomizeTickInterval bool + + step *progressTrackerStep + mu sync.Mutex + stop chan struct{} +} + +func NewProgressTracker(writer io.Writer) *ProgressTracker { + return &ProgressTracker{ + writer: writer, + tickInterval: 200 * time.Millisecond, + randomizeTickInterval: true, + stop: make(chan struct{}), + } +} + +func (pt *ProgressTracker) SetTickInterval(d time.Duration) { + pt.tickInterval = d +} + +func (pt *ProgressTracker) DisableRandomizedTickIntervals() { + pt.randomizeTickInterval = false +} + +func (pt *ProgressTracker) Start() ProgressTrackerStep { + timer := time.NewTimer(pt.calculateTickInterval()) + go func() { + defer timer.Stop() + for { + select { + case <-pt.stop: + return + case <-timer.C: + step := pt.getStep() + if step != nil { + step.tick() + } + timer = time.NewTimer(pt.calculateTickInterval()) + } + } + }() + + s := newProgressTrackerStep(pt, "", func() { + pt.setStep(nil) + pt.stop <- struct{}{} + }) + s.rootstep = true // is the root step + pt.setStep(s) + return s +} + +func (pt *ProgressTracker) printf(format string, a ...any) { + pt.mu.Lock() + defer pt.mu.Unlock() + _, _ = fmt.Fprintf(pt.writer, format, a...) +} + +func (pt *ProgressTracker) getStep() *progressTrackerStep { + pt.mu.Lock() + defer pt.mu.Unlock() + return pt.step +} + +func (pt *ProgressTracker) setStep(step *progressTrackerStep) { + pt.mu.Lock() + defer pt.mu.Unlock() + pt.step = step +} + +func (pt *ProgressTracker) calculateTickInterval() time.Duration { + if !pt.randomizeTickInterval { + return pt.tickInterval + } + + // Randomize interval between 65% and 250% of configured interval + // to make it look like the progress is non-linear. :) + floor := int64(math.Floor(float64(pt.tickInterval.Milliseconds()) * 0.65)) + ceiling := int64(math.Floor(float64(pt.tickInterval.Milliseconds()) * 2.5)) + + randomDuration := rand.Int63() % ceiling + if randomDuration < floor { + randomDuration = floor + } + + return time.Duration(randomDuration) * time.Millisecond +} diff --git a/internal/pkg/agent/install/progress_test.go b/internal/pkg/agent/install/progress_test.go new file mode 100644 index 00000000000..f9bbc303228 --- /dev/null +++ b/internal/pkg/agent/install/progress_test.go @@ -0,0 +1,138 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package install + +import ( + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testWriter struct { + buf []byte +} + +func newTestWriter() *testWriter { + return &testWriter{ + buf: []byte{}, + } +} + +func (tw *testWriter) Write(p []byte) (int, error) { + tw.buf = append(tw.buf, p...) + return len(p), nil +} + +func TestProgress(t *testing.T) { + t.Run("single_step_immediate_failure", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + + rs := pt.Start() + + s := rs.StepStart("step 1 starting") + s.Failed() + + rs.Failed() + + require.Equal(t, "step 1 starting... FAILED\n", string(w.buf)) + }) + + t.Run("single_step_delayed_failure", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step 1 starting") + time.Sleep(15 * time.Millisecond) // to simulate work being done + s.Failed() + + rs.Failed() + + require.Regexp(t, regexp.MustCompile(`step 1 starting\.{3}\.+ FAILED\n`), string(w.buf)) + }) + + t.Run("multi_step_immediate_success", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step 1 starting") + s.Succeeded() + s = rs.StepStart("step 2 starting") + s.Succeeded() + + rs.Succeeded() + + require.Equal(t, "step 1 starting... DONE\nstep 2 starting... DONE\n", string(w.buf)) + }) + + t.Run("multi_step_delayed_success", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step 1 starting") + time.Sleep(55 * time.Millisecond) // to simulate work being done + s.Succeeded() + s = rs.StepStart("step 2 starting") + time.Sleep(25 * time.Millisecond) // to simulate work being done + s.Succeeded() + + rs.Succeeded() + + require.Regexp(t, regexp.MustCompile(`step 1 starting\.{3}\.+ DONE\nstep 2 starting\.{3}\.+ DONE`), string(w.buf)) + }) + + t.Run("single_step_delay_after_failed", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step 1 starting") + s.Failed() + time.Sleep(15 * time.Millisecond) + + rs.Failed() + + require.Equal(t, "step 1 starting... FAILED\n", string(w.buf)) + + }) + + t.Run("nested_step_delayed_success", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step starting") + ss := s.StepStart("substep 1 starting") + time.Sleep(55 * time.Millisecond) // to simulate work being done + ss.Succeeded() + ss = s.StepStart("substep 2 starting") + time.Sleep(25 * time.Millisecond) // to simulate work being done + ss.Succeeded() + s.Succeeded() + + rs.Succeeded() + + require.Regexp(t, regexp.MustCompile(`step starting\.{3}\n substep 1 starting\.{3}\.+ DONE\n substep 2 starting\.{3}\.+ DONE\n DONE\n`), string(w.buf)) + }) +} diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index cc1f18380f3..0d34f990bf5 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -6,6 +6,7 @@ package install import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -17,7 +18,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + aerrors "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/agent/vars" "github.com/elastic/elastic-agent/internal/pkg/capabilities" @@ -27,10 +28,11 @@ import ( comprt "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/features" + "github.com/elastic/elastic-agent/pkg/utils" ) // Uninstall uninstalls persistently Elastic Agent on the system. -func Uninstall(cfgFile, topPath, uninstallToken string) error { +func Uninstall(cfgFile, topPath, uninstallToken string, pt ProgressTrackerStep) error { // uninstall the current service svc, err := newService(topPath) if err != nil { @@ -38,53 +40,70 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { } status, _ := svc.Status() + s := pt.StepStart("Stopping service") if status == service.StatusRunning { err := svc.Stop() if err != nil { - return errors.New( + s.Failed() + return aerrors.New( err, fmt.Sprintf("failed to stop service (%s)", paths.ServiceName), - errors.M("service", paths.ServiceName)) + aerrors.M("service", paths.ServiceName)) } } + s.Succeeded() + + // kill any running watcher + if err := killWatcher(pt); err != nil { + return fmt.Errorf("failed trying to kill any running watcher: %w", err) + } // Uninstall components first - if err := uninstallComponents(context.Background(), cfgFile, uninstallToken); err != nil { + if err := uninstallComponents(context.Background(), cfgFile, uninstallToken, pt); err != nil { // If service status was running it was stopped to uninstall the components. // If the components uninstall failed start the service again if status == service.StatusRunning { if startErr := svc.Start(); startErr != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to restart service (%s), after failed components uninstall: %v", paths.ServiceName, startErr), - errors.M("service", paths.ServiceName)) + aerrors.M("service", paths.ServiceName)) } } return err } // Uninstall service only after components were uninstalled successfully - _ = svc.Uninstall() + s = pt.StepStart("Removing service") + err = svc.Uninstall() + if err != nil { + s.Failed() + } else { + s.Succeeded() + } // remove, if present on platform if paths.ShellWrapperPath != "" { err = os.Remove(paths.ShellWrapperPath) if !os.IsNotExist(err) && err != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to remove shell wrapper (%s)", paths.ShellWrapperPath), - errors.M("destination", paths.ShellWrapperPath)) + aerrors.M("destination", paths.ShellWrapperPath)) } } // remove existing directory + s = pt.StepStart("Removing install directory") err = RemovePath(topPath) if err != nil { - return errors.New( + s.Failed() + return aerrors.New( err, fmt.Sprintf("failed to remove installation directory (%s)", paths.Top()), - errors.M("directory", paths.Top())) + aerrors.M("directory", paths.Top())) } + s.Succeeded() return nil } @@ -158,7 +177,7 @@ func containsString(str string, a []string, caseSensitive bool) bool { return false } -func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken string) error { +func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken string, pt ProgressTrackerStep) error { log, err := logger.NewWithLogpLevel("", logp.ErrorLevel, false) if err != nil { return err @@ -212,7 +231,7 @@ func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken str // This component is not active continue } - if err := uninstallServiceComponent(ctx, log, comp, uninstallToken); err != nil { + if err := uninstallServiceComponent(ctx, log, comp, uninstallToken, pt); err != nil { os.Stderr.WriteString(fmt.Sprintf("failed to uninstall component %q: %s\n", comp.ID, err)) // The decision was made to change the behaviour and leave the Agent installed if Endpoint uninstall fails // https://github.com/elastic/elastic-agent/pull/2708#issuecomment-1574251911 @@ -224,17 +243,24 @@ func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken str return nil } -func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp component.Component, uninstallToken string) error { +func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp component.Component, uninstallToken string, pt ProgressTrackerStep) error { // Do not use infinite retries when uninstalling from the command line. If the uninstall needs to be // retried the entire uninstall command can be retried. Retries may complete asynchronously with the // execution of the uninstall command, leading to bugs like https://github.com/elastic/elastic-agent/issues/3060. - return comprt.UninstallService(ctx, log, comp, uninstallToken) + s := pt.StepStart(fmt.Sprintf("Uninstalling service component %s", comp.InputType)) + err := comprt.UninstallService(ctx, log, comp, uninstallToken) + if err != nil { + s.Failed() + return err + } + s.Succeeded() + return nil } func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) { mm, err := cfg.ToMapStr() if err != nil { - return nil, errors.New("failed to create a map from config", err) + return nil, aerrors.New("failed to create a map from config", err) } allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil) if err != nil { @@ -275,7 +301,7 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) } err = transpiler.Insert(ast, renderedInputs, "inputs") if err != nil { - return nil, errors.New("inserting rendered inputs failed", err) + return nil, aerrors.New("inserting rendered inputs failed", err) } } @@ -286,3 +312,60 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) return config.NewConfigFrom(finalConfig) } + +// killWatcher finds and kills any running Elastic Agent watcher. +func killWatcher(pt ProgressTrackerStep) error { + var s ProgressTrackerStep + for { + // finding and killing watchers is performed in a loop until no + // more watchers are existing, this ensures that during uninstall + // that no matter what the watchers are dead before going any further + pids, err := utils.GetWatcherPIDs() + if err != nil { + if s != nil { + s.Failed() + } + return err + } + if len(pids) == 0 { + if s != nil { + s.Succeeded() + } else { + // step was never started so no watcher was found on first loop + s = pt.StepStart("Stopping upgrade watcher; none found") + s.Succeeded() + } + return nil + } + + if s == nil { + var pidsStr []string + for _, pid := range pids { + pidsStr = append(pidsStr, fmt.Sprintf("%d", pid)) + } + s = pt.StepStart(fmt.Sprintf("Stopping upgrade watcher (%s)", strings.Join(pidsStr, ", "))) + } + + var errs error + for _, pid := range pids { + proc, err := os.FindProcess(pid) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to load watcher process with pid %d: %w", pid, err)) + continue + } + err = proc.Kill() + if err != nil && !errors.Is(err, os.ErrProcessDone) { + errs = errors.Join(errs, fmt.Errorf("failed to kill watcher process with pid %d: %w", pid, err)) + continue + } + } + if errs != nil { + if s != nil { + s.Failed() + } + return errs + } + // wait 1 second before performing the loop again + <-time.After(1 * time.Second) + } +} diff --git a/pkg/utils/watcher.go b/pkg/utils/watcher.go new file mode 100644 index 00000000000..fa018b704b3 --- /dev/null +++ b/pkg/utils/watcher.go @@ -0,0 +1,45 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package utils + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" +) + +// GetWatcherPIDs returns the PID's of any running `elastic-agent watch` process. +func GetWatcherPIDs() ([]int, error) { + procStats := process.Stats{ + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected, filtering is done in the for loop below + Procs: []string{".*"}, + } + err := procStats.Init() + if err != nil { + return nil, fmt.Errorf("failed to initialize process.Stats: %w", err) + } + pidMap, _, err := procStats.FetchPids() + if err != nil { + return nil, fmt.Errorf("failed to fetch pids: %w", err) + } + var pids []int + for pid, state := range pidMap { + if len(state.Args) < 2 { + // must have at least 2 args "elastic-agent[.exe] watch" + continue + } + // instead of matching on Windows using the specific '.exe' suffix, this ensures + // that even if the watcher is spawned without the '.exe' suffix (which Windows will allow and supports) + // it always results in the watch process being killed + if strings.TrimSuffix(filepath.Base(state.Args[0]), ".exe") == "elastic-agent" && state.Args[1] == "watch" { + // it is a watch subprocess + pids = append(pids, pid) + } + } + return pids, nil +} diff --git a/testing/integration/upgrade_test.go b/testing/integration/upgrade_test.go index 4f0c6300912..fc526fcf99c 100644 --- a/testing/integration/upgrade_test.go +++ b/testing/integration/upgrade_test.go @@ -9,6 +9,7 @@ package integration import ( "context" "encoding/json" + "errors" "fmt" "io/fs" "net/http" @@ -44,19 +45,12 @@ import ( agtversion "github.com/elastic/elastic-agent/version" ) -// The watcher will need the default 10 minutes to complete for a Fleet managed agent, see https://github.com/elastic/elastic-agent/issues/2977. -const defaultWatcherDuration = 10 * time.Minute - -// Configure standalone agents to complete faster to speed up tests. -const standaloneWatcherDuration = time.Minute - -// Note: this configuration can't apply to Fleet managed upgrades until https://github.com/elastic/elastic-agent/issues/2977 is resolved -var fastWatcherCfg = fmt.Sprintf(` +const fastWatcherCfg = ` agent.upgrade.watcher: - grace_period: %s + grace_period: 1m error_check.interval: 15s crash_check.interval: 15s -`, standaloneWatcherDuration) +` // notable versions used in tests @@ -100,11 +94,8 @@ func TestFleetManagedUpgrade(t *testing.T) { err = agentFixture.Prepare(ctx) require.NoError(t, err, "error preparing agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedVersion, defaultWatcherDuration) - }) - + err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) + require.NoError(t, err, "error configuring agent fixture") testUpgradeFleetManagedElasticAgent(t, ctx, info, agentFixture, parsedVersion, define.Version()) }) } @@ -214,11 +205,6 @@ func TestStandaloneUpgrade(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedVersion, standaloneWatcherDuration) - }) - parsedUpgradeVersion, err := version.ParseVersion(define.Version()) require.NoErrorf(t, err, "define.Version() %q cannot be parsed as agent version", define.Version()) skipVerify := version_8_7_0.Less(*parsedVersion) @@ -261,11 +247,6 @@ func TestStandaloneUpgradeWithGPGFallback(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, fromVersion, standaloneWatcherDuration) - }) - _, defaultPGP := release.PGP() firstSeven := string(defaultPGP[:7]) newPgp := strings.Replace( @@ -316,11 +297,6 @@ func TestStandaloneUpgradeWithGPGFallbackOneRemoteFailing(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, fromVersion, standaloneWatcherDuration) - }) - _, defaultPGP := release.PGP() firstSeven := string(defaultPGP[:7]) newPgp := strings.Replace( @@ -410,12 +386,6 @@ func TestStandaloneDowngradeToPreviousSnapshotBuild(t *testing.T) { t.Logf("Targeting upgrade to version %+v", upgradeInputVersion) parsedFromVersion, err := version.ParseVersion(define.Version()) - - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedFromVersion, standaloneWatcherDuration) - }) - require.NoErrorf(t, err, "define.Version() %q cannot be parsed as agent version", define.Version()) testStandaloneUpgrade(ctx, t, agentFixture, parsedFromVersion, upgradeInputVersion, expectedAgentHashAfterUpgrade, false, true, false, CustomPGP{}) } @@ -564,6 +534,8 @@ func testStandaloneUpgrade( return checkAgentHealthAndVersion(t, ctx, f, parsedUpgradeVersion.CoreVersion(), parsedUpgradeVersion.IsSnapshot(), expectedAgentHashAfterUpgrade) }, 5*time.Minute, 1*time.Second, "agent never upgraded to expected version") + checkUpgradeWatcherRan(t, f, parsedFromVersion) + if expectedAgentHashAfterUpgrade != "" { aVersion, err := c.Version(ctx) assert.NoError(t, err, "error checking version after upgrade") @@ -641,17 +613,28 @@ func checkLegacyAgentHealthAndVersion(t *testing.T, ctx context.Context, f *ates } -// waitForUpgradeWatcherToComplete asserts that the Upgrade Watcher finished running. -func waitForUpgradeWatcherToComplete(t *testing.T, f *atesting.Fixture, fromVersion *version.ParsedSemVer, timeout time.Duration) { +// checkUpgradeWatcherRan asserts that the Upgrade Watcher finished running. We use the +// presence of the update marker file as evidence that the Upgrade Watcher is still running +// and the absence of that file as evidence that the Upgrade Watcher is no longer running. +func checkUpgradeWatcherRan(t *testing.T, agentFixture *atesting.Fixture, fromVersion *version.ParsedSemVer) { t.Helper() if fromVersion.Less(*version_8_9_0_SNAPSHOT) { - t.Logf("Version %q is too old for a quick update marker check", fromVersion) - timeout = defaultWatcherDuration + t.Logf("Version %q is too old for a quick update marker check, skipping...", fromVersion) + return } - t.Logf("Waiting %s for upgrade watcher to finish running", timeout) - time.Sleep(timeout) + t.Log("Waiting for upgrade watcher to finish running...") + + updateMarkerFile := filepath.Join(agentFixture.WorkDir(), "data", ".update-marker") + require.FileExists(t, updateMarkerFile) + + now := time.Now() + require.Eventuallyf(t, func() bool { + _, err := os.Stat(updateMarkerFile) + return errors.Is(err, fs.ErrNotExist) + }, 2*time.Minute, 15*time.Second, "agent never removed update marker") + t.Logf("Upgrade Watcher completed in %s", time.Now().Sub(now)) } func extractCommitHashFromArtifact(t *testing.T, ctx context.Context, artifactVersion *version.ParsedSemVer, agentProject tools.Project) string { @@ -732,11 +715,6 @@ func TestStandaloneUpgradeRetryDownload(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, upgradeFromVersion, standaloneWatcherDuration) - }) - t.Log("Install the built Agent") output, err := tools.InstallStandaloneAgent(agentFixture) t.Log(string(output)) @@ -823,6 +801,8 @@ func TestStandaloneUpgradeRetryDownload(t *testing.T) { t.Log("Waiting for upgrade to finish") wg.Wait() + checkUpgradeWatcherRan(t, agentFixture, upgradeFromVersion) + t.Log("Check Agent version to ensure upgrade is successful") currentVersion, err = getVersion(t, ctx, agentFixture) require.NoError(t, err) @@ -884,9 +864,6 @@ func TestUpgradeBrokenPackageVersion(t *testing.T) { f, err := define.NewFixture(t, define.Version()) require.NoError(t, err) - fromVersion, err := version.ParseVersion(define.Version()) - require.NoError(t, err) - // Prepare the Elastic Agent so the binary is extracted and ready to use. err = f.Prepare(context.Background()) require.NoError(t, err) @@ -897,11 +874,6 @@ func TestUpgradeBrokenPackageVersion(t *testing.T) { err = f.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, f, fromVersion, standaloneWatcherDuration) - }) - output, err := tools.InstallStandaloneAgent(f) t.Logf("Agent installation output: %q", string(output)) require.NoError(t, err) @@ -1079,9 +1051,6 @@ inputs: return checkAgentHealthAndVersion(t, ctx, agentFixture, upgradeToVersion.CoreVersion(), upgradeToVersion.IsSnapshot(), "") }, 2*time.Minute, 250*time.Millisecond, "Upgraded Agent never became healthy") - // Wait for upgrade watcher to finish running - waitForUpgradeWatcherToComplete(t, agentFixture, upgradeFromVersion, standaloneWatcherDuration) - t.Log("Ensure the we have rolled back and the correct version is running") require.Eventually(t, func() bool { return checkAgentHealthAndVersion(t, ctx, agentFixture, upgradeFromVersion.CoreVersion(), upgradeFromVersion.IsSnapshot(), "")