From a79c36c5fa58f3036ea3ea101e1ae69ea80718cd Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 17 Oct 2023 11:09:49 -0700 Subject: [PATCH] add changelog, flag variable --- .../1697565877-progress-race-fixes.yaml | 32 ++++++++++ internal/pkg/agent/install/progress.go | 63 ++++++++++++++----- internal/pkg/agent/install/progress_test.go | 21 +++++++ 3 files changed, 99 insertions(+), 17 deletions(-) create mode 100644 changelog/fragments/1697565877-progress-race-fixes.yaml diff --git a/changelog/fragments/1697565877-progress-race-fixes.yaml b/changelog/fragments/1697565877-progress-race-fixes.yaml new file mode 100644 index 00000000000..56517dc8c89 --- /dev/null +++ b/changelog/fragments/1697565877-progress-race-fixes.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Add mutexes and flags to prevent flaky tests in progess tracker + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: progress tracker + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3607 diff --git a/internal/pkg/agent/install/progress.go b/internal/pkg/agent/install/progress.go index 1ec3cfb3faf..1ef36798a92 100644 --- a/internal/pkg/agent/install/progress.go +++ b/internal/pkg/agent/install/progress.go @@ -37,6 +37,11 @@ type progressTrackerStep struct { substeps bool mu sync.Mutex step *progressTrackerStep + // marks that either Succeeded() or Failed() has been called, + // and the tracker is completed. This is here so we can sync state despite tick() being called arbitrarily from a timer. + // Should only be called from inside a mutex lock, + // so the state can properly be synced between tick() and other methods + completed bool } func newProgressTrackerStep(tracker *ProgressTracker, prefix string, finalizeFunc func()) *progressTrackerStep { @@ -49,7 +54,14 @@ func newProgressTrackerStep(tracker *ProgressTracker, prefix string, finalizeFun // Succeeded step is done and successful. func (pts *progressTrackerStep) Succeeded() { + // calling finalizeFunc outside a mutex prevents + // this from being a truly atomic method, but + // it's too easy for a mutex to get passed to the callback func and create a deadlock. + pts.finalizeFunc() pts.mu.Lock() + defer pts.mu.Unlock() + pts.step = nil + prefix := " " if pts.substeps { prefix = pts.prefix + " " @@ -57,13 +69,17 @@ func (pts *progressTrackerStep) Succeeded() { if !pts.rootstep { pts.tracker.printf("%sDONE\n", prefix) } - pts.mu.Unlock() - pts.finalizeFunc() + // mark as done before unlocking + pts.completed = true } // Failed step has failed. func (pts *progressTrackerStep) Failed() { + pts.finalizeFunc() pts.mu.Lock() + defer pts.mu.Unlock() + pts.step = nil + prefix := " " if pts.substeps { prefix = pts.prefix + " " @@ -71,8 +87,8 @@ func (pts *progressTrackerStep) Failed() { if !pts.rootstep { pts.tracker.printf("%sFAILED\n", prefix) } - pts.mu.Unlock() - pts.finalizeFunc() + // mark as done before unlocking + pts.completed = true } // StepStart creates a new step. @@ -86,19 +102,19 @@ func (pts *progressTrackerStep) StepStart(msg string) ProgressTrackerStep { } } pts.tracker.printf("%s%s...", prefix, strings.TrimSpace(msg)) - s := newProgressTrackerStep(pts.tracker, prefix, func() { - pts.setStep(nil) - }) + s := newProgressTrackerStep(pts.tracker, prefix, func() {}) pts.setStep(s) return s } +// setStep sets the current sub-step for the tracker func (pts *progressTrackerStep) setStep(step *progressTrackerStep) { pts.mu.Lock() defer pts.mu.Unlock() pts.step = step } +// tick iterates the tracker with a ".", traveling down to the last sub-tracker to do so. func (pts *progressTrackerStep) tick() { pts.mu.Lock() defer pts.mu.Unlock() @@ -108,7 +124,10 @@ func (pts *progressTrackerStep) tick() { return } if !pts.rootstep { - pts.tracker.printf(".") + // check completed state while we have the mutex + if !pts.completed { + pts.tracker.printf(".") + } } } @@ -118,11 +137,13 @@ type ProgressTracker struct { tickInterval time.Duration randomizeTickInterval bool - step *progressTrackerStep - mu sync.Mutex - stop chan struct{} + step *progressTrackerStep + writerMutex sync.Mutex + stepMutex sync.Mutex + stop chan struct{} } +// NewProgressTracker returns a new root tracker with the given writer func NewProgressTracker(writer io.Writer) *ProgressTracker { return &ProgressTracker{ writer: writer, @@ -132,14 +153,17 @@ func NewProgressTracker(writer io.Writer) *ProgressTracker { } } +// SetTickInterval sets the tracker tick interval func (pt *ProgressTracker) SetTickInterval(d time.Duration) { pt.tickInterval = d } +// DisableRandomizedTickIntervals disables randomizing the tick interval func (pt *ProgressTracker) DisableRandomizedTickIntervals() { pt.randomizeTickInterval = false } +// Start the root tracker func (pt *ProgressTracker) Start() ProgressTrackerStep { timer := time.NewTimer(pt.calculateTickInterval()) go func() { @@ -159,6 +183,7 @@ func (pt *ProgressTracker) Start() ProgressTrackerStep { }() s := newProgressTrackerStep(pt, "", func() { + // callback here is what actually does the stopping pt.setStep(nil) pt.stop <- struct{}{} }) @@ -167,24 +192,28 @@ func (pt *ProgressTracker) Start() ProgressTrackerStep { return s } +// printf the given statement to the tracker writer func (pt *ProgressTracker) printf(format string, a ...any) { - pt.mu.Lock() - defer pt.mu.Unlock() + pt.writerMutex.Lock() + defer pt.writerMutex.Unlock() _, _ = fmt.Fprintf(pt.writer, format, a...) } +// getStep returns the substep func (pt *ProgressTracker) getStep() *progressTrackerStep { - pt.mu.Lock() - defer pt.mu.Unlock() + pt.stepMutex.Lock() + defer pt.stepMutex.Unlock() return pt.step } +// setStep sets the substep func (pt *ProgressTracker) setStep(step *progressTrackerStep) { - pt.mu.Lock() - defer pt.mu.Unlock() + pt.stepMutex.Lock() + defer pt.stepMutex.Unlock() pt.step = step } +// calculateTickInterval returns the tick interval func (pt *ProgressTracker) calculateTickInterval() time.Duration { if !pt.randomizeTickInterval { return pt.tickInterval diff --git a/internal/pkg/agent/install/progress_test.go b/internal/pkg/agent/install/progress_test.go index 2bb9bb6b1d3..bc202c7c8f3 100644 --- a/internal/pkg/agent/install/progress_test.go +++ b/internal/pkg/agent/install/progress_test.go @@ -135,4 +135,25 @@ func TestProgress(t *testing.T) { require.Regexp(t, regexp.MustCompile(`step starting\.{3,}\n substep 1 starting\.{3,}\.+ DONE\n substep 2 starting\.{3,}\.+ DONE\n DONE\n`), string(w.buf)) }) + + t.Run("nested_step_out_of_order", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step starting") + ss := s.StepStart("substep 1 starting") + time.Sleep(55 * time.Millisecond) // to simulate work being done + ss.Succeeded() + ss = s.StepStart("substep 2 starting") + time.Sleep(25 * time.Millisecond) // to simulate work being done + rs.Succeeded() + s.Succeeded() + ss.Succeeded() + + require.Regexp(t, regexp.MustCompile(`step starting\.{3,}\n substep 1 starting\.{3,}\.+ DONE\n substep 2 starting\.{3,}\.+ DONE\n DONE\n`), string(w.buf)) + }) }