Skip to content

Commit

Permalink
pkg/trace: make pkg/trace/agent control the lifecycle of pkg timing (#…
Browse files Browse the repository at this point in the history
…22183)

pkg/trace: make pkg/trace/agent control the lifecycle of pkg timing
  • Loading branch information
ahmed-mez authored Jan 25, 2024
1 parent 1f83d84 commit fe35c89
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 81 deletions.
2 changes: 0 additions & 2 deletions comp/trace/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
tracecfg "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/info"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"
"github.com/DataDog/datadog-agent/pkg/util"
Expand Down Expand Up @@ -142,7 +141,6 @@ func stopAgentSidekicks(cfg config.Component) {
log.Flush()
metrics.Flush()

timing.Stop()
tracecfg := cfg.Object()
if pcfg := profilingConfig(tracecfg); pcfg != nil {
profiling.Stop()
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig, telemetryCollector

// Run starts routers routines and individual pieces then stop them when the exit order is received.
func (a *Agent) Run() {
timing.Start(metrics.Client)
defer timing.Stop()
for _, starter := range []interface{ Start() }{
a.Receiver,
a.Concentrator,
Expand Down
110 changes: 67 additions & 43 deletions pkg/trace/metrics/timing/timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,99 @@ import (

"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
)

// AutoreportInterval specifies the interval at which the default set reports.
const AutoreportInterval = 10 * time.Second
// autoreportInterval specifies the interval at which the default set reports.
var autoreportInterval = 10 * time.Second

var (
defaultSet = NewSet()
stopReport = defaultSet.Autoreport(AutoreportInterval)
defaultSet *set
)

// Since records the duration for the given metric name as time passed since start.
// It uses the default set which is reported at 10 second intervals.
func Since(name string, start time.Time) { defaultSet.Since(name, start) }
func Since(name string, start time.Time) {
if defaultSet == nil {
log.Error("Timing hasn't been initialized, trace-agent metrics will be missing")
return
}
defaultSet.Since(name, start)
}

// Start initializes autoreporting of timing metrics.
func Start(statsd metrics.StatsClient) {
defaultSet = newSet(statsd)
defaultSet.autoreport(autoreportInterval)
}

// Stop permanently stops the default set from auto-reporting and flushes any remaining
// metrics. It can be useful to call when the program exits to ensure everything is
// submitted.
func Stop() { stopReport() }
func Stop() {
if defaultSet == nil {
return
}
defaultSet.stop()
}

// NewSet returns a new, ready to use Set.
func NewSet() *Set {
return &Set{c: make(map[string]*counter)}
// newSet returns a new, ready to use Set.
func newSet(statsd metrics.StatsClient) *set {
return &set{
c: make(map[string]*counter),
close: make(chan struct{}),
statsd: statsd,
}
}

// Set represents a set of metrics that can be used for timing. Use NewSet to initialize
// a new Set. Use Report (or Autoreport) to submit metrics. Set is safe for concurrent use.
type Set struct {
mu sync.RWMutex // guards c
c map[string]*counter // maps names to their aggregates
type set struct {
mu sync.RWMutex // guards c
c map[string]*counter // maps names to their aggregates
close chan struct{}
startOnce sync.Once
stopOnce sync.Once
statsd metrics.StatsClient
}

// Autoreport enables autoreporting of the Set at the given interval. It returns a
// autoreport enables autoreporting of the Set at the given interval. It returns a
// cancellation function.
func (s *Set) Autoreport(interval time.Duration) (cancelFunc func()) {
stop := make(chan struct{})
go func() {
defer close(stop)
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
s.Report()
case <-stop:
s.Report()
return
func (s *set) autoreport(interval time.Duration) {
s.startOnce.Do(func() {
go func() {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
s.report()
case <-s.close:
s.report()
return
}
}
}
}()
var once sync.Once // avoid panics
return func() {
once.Do(func() {
stop <- struct{}{}
<-stop
})
}
}()
})
}

func (s *set) stop() {
s.stopOnce.Do(func() {
close(s.close)
})
}

// Since records the duration for the given metric name as *time passed since start*.
// If name does not exist, as defined by NewSet, it creates it.
func (s *Set) Since(name string, start time.Time) {
func (s *set) Since(name string, start time.Time) {
ms := time.Since(start) / time.Millisecond
s.getCounter(name).add(float64(ms))
}

// getCounter returns the counter with the given name, initializing any uninitialized
// fields of s.
func (s *Set) getCounter(name string) *counter {
func (s *set) getCounter(name string) *counter {
s.mu.RLock()
c, ok := s.c[name]
s.mu.RUnlock()
Expand All @@ -102,11 +126,11 @@ func (s *Set) getCounter(name string) *counter {
}

// Report reports all of the Set's metrics to the statsd client.
func (s *Set) Report() {
func (s *set) report() {
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.c {
c.flush()
c.flush(s.statsd)
}
}

Expand Down Expand Up @@ -140,13 +164,13 @@ func (c *counter) add(v float64) {
c.mu.RUnlock()
}

func (c *counter) flush() {
func (c *counter) flush(statsd metrics.StatsClient) {
c.mu.Lock()
count := c.count.Swap(0)
sum := c.sum.Swap(0)
max := c.max.Swap(0)
c.mu.Unlock()
metrics.Count(c.name+".count", int64(count), nil, 1)
metrics.Gauge(c.name+".max", max, nil, 1)
metrics.Gauge(c.name+".avg", sum/count, nil, 1)
statsd.Count(c.name+".count", int64(count), nil, 1)
statsd.Gauge(c.name+".max", max, nil, 1)
statsd.Gauge(c.name+".avg", sum/count, nil, 1)
}
55 changes: 19 additions & 36 deletions pkg/trace/metrics/timing/timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,52 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/teststatsd"
)

func TestTiming(t *testing.T) {
assert := assert.New(t)
stats := &teststatsd.Client{}

Stop() // https://github.com/DataDog/datadog-agent/issues/13934
defer func(old metrics.StatsClient) { metrics.Client = old }(metrics.Client)
metrics.Client = stats
stopReport = defaultSet.Autoreport(AutoreportInterval)
stats := &teststatsd.Client{}

t.Run("report", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
set.Since("counter1", time.Now().Add(-2*time.Second))
set.Since("counter1", time.Now().Add(-3*time.Second))
set.Report()
set.report()

calls := stats.CountCalls
assert.Equal(1, len(calls))
assert.Equal(2., findCall(assert, calls, "counter1.count").Value)
counts := stats.GetCountSummaries()
assert.Equal(1, len(counts))
assert.Contains(counts, "counter1.count")

calls = stats.GaugeCalls
assert.Equal(2, len(calls))
assert.Equal(2500., findCall(assert, calls, "counter1.avg").Value, "avg")
assert.Equal(3000., findCall(assert, calls, "counter1.max").Value, "max")
gauges := stats.GetGaugeSummaries()
assert.Equal(2, len(gauges))
assert.Contains(gauges, "counter1.avg")
assert.Contains(gauges, "counter1.max")
})

t.Run("autoreport", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
set.Since("counter1", time.Now().Add(-1*time.Second))
stop := set.Autoreport(time.Millisecond)
set.autoreport(time.Millisecond)
if runtime.GOOS == "windows" {
time.Sleep(5 * time.Second)
}
time.Sleep(10 * time.Millisecond)
stop()
assert.True(len(stats.CountCalls) > 1)
Stop()
assert.Contains(stats.GetCountSummaries(), "counter1.count")
})

t.Run("panic", func(t *testing.T) {
set := NewSet()
stop := set.Autoreport(time.Millisecond)
stop()
stop()
Start(stats)
Stop()
Stop()
})

t.Run("race", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
var wg sync.WaitGroup
for i := 0; i < 150; i++ {
wg.Add(1)
Expand All @@ -81,19 +74,9 @@ func TestTiming(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
set.Report()
set.report()
}()
}
wg.Wait()
})
}

func findCall(assert *assert.Assertions, calls []teststatsd.MetricsArgs, name string) teststatsd.MetricsArgs {
for _, c := range calls {
if c.Name == name {
return c
}
}
assert.Failf("call not found", "key %q missing", name)
return teststatsd.MetricsArgs{}
}

0 comments on commit fe35c89

Please sign in to comment.