Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/trace: make pkg/trace/agent control the lifecycle of pkg timing #22183

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions comp/trace/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
tracecfg "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/info"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"
"github.com/DataDog/datadog-agent/pkg/util"
Expand Down Expand Up @@ -142,7 +141,6 @@ func stopAgentSidekicks(cfg config.Component) {
log.Flush()
metrics.Flush()

timing.Stop()
tracecfg := cfg.Object()
if pcfg := profilingConfig(tracecfg); pcfg != nil {
profiling.Stop()
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig, telemetryCollector

// Run starts routers routines and individual pieces then stop them when the exit order is received.
func (a *Agent) Run() {
timing.Start(metrics.Client)
defer timing.Stop()
for _, starter := range []interface{ Start() }{
a.Receiver,
a.Concentrator,
Expand Down
110 changes: 67 additions & 43 deletions pkg/trace/metrics/timing/timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,99 @@ import (

"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
)

// AutoreportInterval specifies the interval at which the default set reports.
const AutoreportInterval = 10 * time.Second
// autoreportInterval specifies the interval at which the default set reports.
var autoreportInterval = 10 * time.Second

var (
defaultSet = NewSet()
stopReport = defaultSet.Autoreport(AutoreportInterval)
defaultSet *set
)

// Since records the duration for the given metric name as time passed since start.
// It uses the default set which is reported at 10 second intervals.
func Since(name string, start time.Time) { defaultSet.Since(name, start) }
func Since(name string, start time.Time) {
if defaultSet == nil {
log.Error("Timing hasn't been initialized, trace-agent metrics will be missing")
return
}
defaultSet.Since(name, start)
}

// Start initializes autoreporting of timing metrics.
func Start(statsd metrics.StatsClient) {
defaultSet = newSet(statsd)
defaultSet.autoreport(autoreportInterval)
}

// Stop permanently stops the default set from auto-reporting and flushes any remaining
// metrics. It can be useful to call when the program exits to ensure everything is
// submitted.
func Stop() { stopReport() }
func Stop() {
if defaultSet == nil {
return
}
defaultSet.stop()
}

// NewSet returns a new, ready to use Set.
func NewSet() *Set {
return &Set{c: make(map[string]*counter)}
// newSet returns a new, ready to use Set.
func newSet(statsd metrics.StatsClient) *set {
return &set{
c: make(map[string]*counter),
close: make(chan struct{}),
statsd: statsd,
}
}

// Set represents a set of metrics that can be used for timing. Use NewSet to initialize
// a new Set. Use Report (or Autoreport) to submit metrics. Set is safe for concurrent use.
type Set struct {
mu sync.RWMutex // guards c
c map[string]*counter // maps names to their aggregates
type set struct {
mu sync.RWMutex // guards c
c map[string]*counter // maps names to their aggregates
close chan struct{}
startOnce sync.Once
stopOnce sync.Once
statsd metrics.StatsClient
}

// Autoreport enables autoreporting of the Set at the given interval. It returns a
// autoreport enables autoreporting of the Set at the given interval. It returns a
// cancellation function.
func (s *Set) Autoreport(interval time.Duration) (cancelFunc func()) {
stop := make(chan struct{})
go func() {
defer close(stop)
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
s.Report()
case <-stop:
s.Report()
return
func (s *set) autoreport(interval time.Duration) {
s.startOnce.Do(func() {
go func() {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
s.report()
case <-s.close:
s.report()
return
}
}
}
}()
var once sync.Once // avoid panics
return func() {
once.Do(func() {
stop <- struct{}{}
<-stop
})
}
}()
})
}

func (s *set) stop() {
s.stopOnce.Do(func() {
close(s.close)
})
}

// Since records the duration for the given metric name as *time passed since start*.
// If name does not exist, as defined by NewSet, it creates it.
func (s *Set) Since(name string, start time.Time) {
func (s *set) Since(name string, start time.Time) {
ms := time.Since(start) / time.Millisecond
s.getCounter(name).add(float64(ms))
}

// getCounter returns the counter with the given name, initializing any uninitialized
// fields of s.
func (s *Set) getCounter(name string) *counter {
func (s *set) getCounter(name string) *counter {
s.mu.RLock()
c, ok := s.c[name]
s.mu.RUnlock()
Expand All @@ -102,11 +126,11 @@ func (s *Set) getCounter(name string) *counter {
}

// Report reports all of the Set's metrics to the statsd client.
func (s *Set) Report() {
func (s *set) report() {
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.c {
c.flush()
c.flush(s.statsd)
}
}

Expand Down Expand Up @@ -140,13 +164,13 @@ func (c *counter) add(v float64) {
c.mu.RUnlock()
}

func (c *counter) flush() {
func (c *counter) flush(statsd metrics.StatsClient) {
c.mu.Lock()
count := c.count.Swap(0)
sum := c.sum.Swap(0)
max := c.max.Swap(0)
c.mu.Unlock()
metrics.Count(c.name+".count", int64(count), nil, 1)
metrics.Gauge(c.name+".max", max, nil, 1)
metrics.Gauge(c.name+".avg", sum/count, nil, 1)
statsd.Count(c.name+".count", int64(count), nil, 1)
statsd.Gauge(c.name+".max", max, nil, 1)
statsd.Gauge(c.name+".avg", sum/count, nil, 1)
}
55 changes: 19 additions & 36 deletions pkg/trace/metrics/timing/timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,52 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/teststatsd"
)

func TestTiming(t *testing.T) {
assert := assert.New(t)
stats := &teststatsd.Client{}

Stop() // https://github.com/DataDog/datadog-agent/issues/13934
defer func(old metrics.StatsClient) { metrics.Client = old }(metrics.Client)
metrics.Client = stats
stopReport = defaultSet.Autoreport(AutoreportInterval)
stats := &teststatsd.Client{}

t.Run("report", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
set.Since("counter1", time.Now().Add(-2*time.Second))
set.Since("counter1", time.Now().Add(-3*time.Second))
set.Report()
set.report()

calls := stats.CountCalls
assert.Equal(1, len(calls))
assert.Equal(2., findCall(assert, calls, "counter1.count").Value)
counts := stats.GetCountSummaries()
assert.Equal(1, len(counts))
assert.Contains(counts, "counter1.count")

calls = stats.GaugeCalls
assert.Equal(2, len(calls))
assert.Equal(2500., findCall(assert, calls, "counter1.avg").Value, "avg")
assert.Equal(3000., findCall(assert, calls, "counter1.max").Value, "max")
gauges := stats.GetGaugeSummaries()
assert.Equal(2, len(gauges))
assert.Contains(gauges, "counter1.avg")
assert.Contains(gauges, "counter1.max")
})

t.Run("autoreport", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
set.Since("counter1", time.Now().Add(-1*time.Second))
stop := set.Autoreport(time.Millisecond)
set.autoreport(time.Millisecond)
if runtime.GOOS == "windows" {
time.Sleep(5 * time.Second)
}
time.Sleep(10 * time.Millisecond)
stop()
assert.True(len(stats.CountCalls) > 1)
Stop()
assert.Contains(stats.GetCountSummaries(), "counter1.count")
})

t.Run("panic", func(t *testing.T) {
set := NewSet()
stop := set.Autoreport(time.Millisecond)
stop()
stop()
Start(stats)
Stop()
Stop()
})

t.Run("race", func(t *testing.T) {
stats.Reset()
set := NewSet()
set := newSet(stats)
var wg sync.WaitGroup
for i := 0; i < 150; i++ {
wg.Add(1)
Expand All @@ -81,19 +74,9 @@ func TestTiming(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
set.Report()
set.report()
}()
}
wg.Wait()
})
}

func findCall(assert *assert.Assertions, calls []teststatsd.MetricsArgs, name string) teststatsd.MetricsArgs {
for _, c := range calls {
if c.Name == name {
return c
}
}
assert.Failf("call not found", "key %q missing", name)
return teststatsd.MetricsArgs{}
}
Loading