From fe35c89e20c1737197dda17c436c91d6e0e1f2b7 Mon Sep 17 00:00:00 2001 From: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com> Date: Thu, 25 Jan 2024 18:26:46 +0100 Subject: [PATCH] pkg/trace: make pkg/trace/agent control the lifecycle of pkg timing (#22183) pkg/trace: make pkg/trace/agent control the lifecycle of pkg timing --- comp/trace/agent/run.go | 2 - pkg/trace/agent/agent.go | 2 + pkg/trace/metrics/timing/timing.go | 110 +++++++++++++++--------- pkg/trace/metrics/timing/timing_test.go | 55 ++++-------- 4 files changed, 88 insertions(+), 81 deletions(-) diff --git a/comp/trace/agent/run.go b/comp/trace/agent/run.go index 74a00663d89be..9cfafac5ce13d 100644 --- a/comp/trace/agent/run.go +++ b/comp/trace/agent/run.go @@ -26,7 +26,6 @@ import ( tracecfg "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-agent/pkg/trace/info" "github.com/DataDog/datadog-agent/pkg/trace/metrics" - "github.com/DataDog/datadog-agent/pkg/trace/metrics/timing" "github.com/DataDog/datadog-agent/pkg/trace/telemetry" "github.com/DataDog/datadog-agent/pkg/trace/watchdog" "github.com/DataDog/datadog-agent/pkg/util" @@ -142,7 +141,6 @@ func stopAgentSidekicks(cfg config.Component) { log.Flush() metrics.Flush() - timing.Stop() tracecfg := cfg.Object() if pcfg := profilingConfig(tracecfg); pcfg != nil { profiling.Stop() diff --git a/pkg/trace/agent/agent.go b/pkg/trace/agent/agent.go index f3d715e3a593a..4a8510e97d3c7 100644 --- a/pkg/trace/agent/agent.go +++ b/pkg/trace/agent/agent.go @@ -131,6 +131,8 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig, telemetryCollector // Run starts routers routines and individual pieces then stop them when the exit order is received. func (a *Agent) Run() { + timing.Start(metrics.Client) + defer timing.Stop() for _, starter := range []interface{ Start() }{ a.Receiver, a.Concentrator, diff --git a/pkg/trace/metrics/timing/timing.go b/pkg/trace/metrics/timing/timing.go index 44c557694f170..c4dc418e772a9 100644 --- a/pkg/trace/metrics/timing/timing.go +++ b/pkg/trace/metrics/timing/timing.go @@ -15,75 +15,99 @@ import ( "go.uber.org/atomic" + "github.com/DataDog/datadog-agent/pkg/trace/log" "github.com/DataDog/datadog-agent/pkg/trace/metrics" ) -// AutoreportInterval specifies the interval at which the default set reports. -const AutoreportInterval = 10 * time.Second +// autoreportInterval specifies the interval at which the default set reports. +var autoreportInterval = 10 * time.Second var ( - defaultSet = NewSet() - stopReport = defaultSet.Autoreport(AutoreportInterval) + defaultSet *set ) // Since records the duration for the given metric name as time passed since start. // It uses the default set which is reported at 10 second intervals. -func Since(name string, start time.Time) { defaultSet.Since(name, start) } +func Since(name string, start time.Time) { + if defaultSet == nil { + log.Error("Timing hasn't been initialized, trace-agent metrics will be missing") + return + } + defaultSet.Since(name, start) +} + +// Start initializes autoreporting of timing metrics. +func Start(statsd metrics.StatsClient) { + defaultSet = newSet(statsd) + defaultSet.autoreport(autoreportInterval) +} // Stop permanently stops the default set from auto-reporting and flushes any remaining // metrics. It can be useful to call when the program exits to ensure everything is // submitted. -func Stop() { stopReport() } +func Stop() { + if defaultSet == nil { + return + } + defaultSet.stop() +} -// NewSet returns a new, ready to use Set. -func NewSet() *Set { - return &Set{c: make(map[string]*counter)} +// newSet returns a new, ready to use Set. +func newSet(statsd metrics.StatsClient) *set { + return &set{ + c: make(map[string]*counter), + close: make(chan struct{}), + statsd: statsd, + } } // Set represents a set of metrics that can be used for timing. Use NewSet to initialize // a new Set. Use Report (or Autoreport) to submit metrics. Set is safe for concurrent use. -type Set struct { - mu sync.RWMutex // guards c - c map[string]*counter // maps names to their aggregates +type set struct { + mu sync.RWMutex // guards c + c map[string]*counter // maps names to their aggregates + close chan struct{} + startOnce sync.Once + stopOnce sync.Once + statsd metrics.StatsClient } -// Autoreport enables autoreporting of the Set at the given interval. It returns a +// autoreport enables autoreporting of the Set at the given interval. It returns a // cancellation function. -func (s *Set) Autoreport(interval time.Duration) (cancelFunc func()) { - stop := make(chan struct{}) - go func() { - defer close(stop) - tick := time.NewTicker(interval) - defer tick.Stop() - for { - select { - case <-tick.C: - s.Report() - case <-stop: - s.Report() - return +func (s *set) autoreport(interval time.Duration) { + s.startOnce.Do(func() { + go func() { + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-tick.C: + s.report() + case <-s.close: + s.report() + return + } } - } - }() - var once sync.Once // avoid panics - return func() { - once.Do(func() { - stop <- struct{}{} - <-stop - }) - } + }() + }) +} + +func (s *set) stop() { + s.stopOnce.Do(func() { + close(s.close) + }) } // Since records the duration for the given metric name as *time passed since start*. // If name does not exist, as defined by NewSet, it creates it. -func (s *Set) Since(name string, start time.Time) { +func (s *set) Since(name string, start time.Time) { ms := time.Since(start) / time.Millisecond s.getCounter(name).add(float64(ms)) } // getCounter returns the counter with the given name, initializing any uninitialized // fields of s. -func (s *Set) getCounter(name string) *counter { +func (s *set) getCounter(name string) *counter { s.mu.RLock() c, ok := s.c[name] s.mu.RUnlock() @@ -102,11 +126,11 @@ func (s *Set) getCounter(name string) *counter { } // Report reports all of the Set's metrics to the statsd client. -func (s *Set) Report() { +func (s *set) report() { s.mu.RLock() defer s.mu.RUnlock() for _, c := range s.c { - c.flush() + c.flush(s.statsd) } } @@ -140,13 +164,13 @@ func (c *counter) add(v float64) { c.mu.RUnlock() } -func (c *counter) flush() { +func (c *counter) flush(statsd metrics.StatsClient) { c.mu.Lock() count := c.count.Swap(0) sum := c.sum.Swap(0) max := c.max.Swap(0) c.mu.Unlock() - metrics.Count(c.name+".count", int64(count), nil, 1) - metrics.Gauge(c.name+".max", max, nil, 1) - metrics.Gauge(c.name+".avg", sum/count, nil, 1) + statsd.Count(c.name+".count", int64(count), nil, 1) + statsd.Gauge(c.name+".max", max, nil, 1) + statsd.Gauge(c.name+".avg", sum/count, nil, 1) } diff --git a/pkg/trace/metrics/timing/timing_test.go b/pkg/trace/metrics/timing/timing_test.go index 38a4d9f1aa243..60005e45b676f 100644 --- a/pkg/trace/metrics/timing/timing_test.go +++ b/pkg/trace/metrics/timing/timing_test.go @@ -15,59 +15,52 @@ import ( "github.com/stretchr/testify/assert" - "github.com/DataDog/datadog-agent/pkg/trace/metrics" "github.com/DataDog/datadog-agent/pkg/trace/teststatsd" ) func TestTiming(t *testing.T) { assert := assert.New(t) - stats := &teststatsd.Client{} - Stop() // https://github.com/DataDog/datadog-agent/issues/13934 - defer func(old metrics.StatsClient) { metrics.Client = old }(metrics.Client) - metrics.Client = stats - stopReport = defaultSet.Autoreport(AutoreportInterval) + stats := &teststatsd.Client{} t.Run("report", func(t *testing.T) { - stats.Reset() - set := NewSet() + set := newSet(stats) set.Since("counter1", time.Now().Add(-2*time.Second)) set.Since("counter1", time.Now().Add(-3*time.Second)) - set.Report() + set.report() - calls := stats.CountCalls - assert.Equal(1, len(calls)) - assert.Equal(2., findCall(assert, calls, "counter1.count").Value) + counts := stats.GetCountSummaries() + assert.Equal(1, len(counts)) + assert.Contains(counts, "counter1.count") - calls = stats.GaugeCalls - assert.Equal(2, len(calls)) - assert.Equal(2500., findCall(assert, calls, "counter1.avg").Value, "avg") - assert.Equal(3000., findCall(assert, calls, "counter1.max").Value, "max") + gauges := stats.GetGaugeSummaries() + assert.Equal(2, len(gauges)) + assert.Contains(gauges, "counter1.avg") + assert.Contains(gauges, "counter1.max") }) t.Run("autoreport", func(t *testing.T) { stats.Reset() - set := NewSet() + set := newSet(stats) set.Since("counter1", time.Now().Add(-1*time.Second)) - stop := set.Autoreport(time.Millisecond) + set.autoreport(time.Millisecond) if runtime.GOOS == "windows" { time.Sleep(5 * time.Second) } time.Sleep(10 * time.Millisecond) - stop() - assert.True(len(stats.CountCalls) > 1) + Stop() + assert.Contains(stats.GetCountSummaries(), "counter1.count") }) t.Run("panic", func(t *testing.T) { - set := NewSet() - stop := set.Autoreport(time.Millisecond) - stop() - stop() + Start(stats) + Stop() + Stop() }) t.Run("race", func(t *testing.T) { stats.Reset() - set := NewSet() + set := newSet(stats) var wg sync.WaitGroup for i := 0; i < 150; i++ { wg.Add(1) @@ -81,19 +74,9 @@ func TestTiming(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - set.Report() + set.report() }() } wg.Wait() }) } - -func findCall(assert *assert.Assertions, calls []teststatsd.MetricsArgs, name string) teststatsd.MetricsArgs { - for _, c := range calls { - if c.Name == name { - return c - } - } - assert.Failf("call not found", "key %q missing", name) - return teststatsd.MetricsArgs{} -}