Skip to content

Commit

Permalink
usm: Move USM telemetry repoter to the USM monitor
Browse files Browse the repository at this point in the history
The telemetry go-routine, was incorrectly placed in the network-tracer. We move the logic into
the USM monitor, before further changing it
  • Loading branch information
guyarb committed Feb 10, 2025
1 parent 5649b10 commit 950cb7a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
27 changes: 1 addition & 26 deletions cmd/system-probe/modules/network_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/tracer"
usmconsts "github.com/DataDog/datadog-agent/pkg/network/usm/consts"
usm "github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/statsd"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -67,19 +66,13 @@ func createNetworkTracerModule(_ *sysconfigtypes.Config, deps module.FactoryDepe

t, err := tracer.NewTracer(ncfg, deps.Telemetry)

done := make(chan struct{})
if err == nil {
startTelemetryReporter(done)
}

return &networkTracer{tracer: t, done: done}, err
return &networkTracer{tracer: t}, err
}

var _ module.Module = &networkTracer{}

type networkTracer struct {
tracer *tracer.Tracer
done chan struct{}
restartTimer *time.Timer
}

Expand Down Expand Up @@ -322,7 +315,6 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {

// Close will stop all system probe activities
func (nt *networkTracer) Close() {
close(nt.done)
nt.tracer.Stop()
}

Expand Down Expand Up @@ -363,23 +355,6 @@ func writeConnections(w http.ResponseWriter, marshaler marshal.Marshaler, cs *ne
log.Tracef("/connections: %d connections", len(cs.Conns))
}

func startTelemetryReporter(done <-chan struct{}) {
telemetry.SetStatsdClient(statsd.Client)
ticker := time.NewTicker(30 * time.Second)
go func() {
defer ticker.Stop()

for {
select {
case <-ticker.C:
telemetry.ReportStatsd()
case <-done:
return
}
}
}()
}

func writeDisabledProtocolMessage(protocolName string, w http.ResponseWriter) {
log.Warnf("%s monitoring is disabled", protocolName)
w.WriteHeader(404)
Expand Down
38 changes: 33 additions & 5 deletions pkg/network/usm/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
usmstate "github.com/DataDog/datadog-agent/pkg/network/usm/state"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/process/statsd"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -51,6 +52,8 @@ type Monitor struct {
closeFilterFn func()

lastUpdateTime *atomic.Int64

telemetryStopChannel chan struct{}
}

// NewMonitor returns a new Monitor instance
Expand Down Expand Up @@ -95,10 +98,11 @@ func NewMonitor(c *config.Config, connectionProtocolMap *ebpf.Map) (m *Monitor,
usmstate.Set(usmstate.Running)

usmMonitor := &Monitor{
cfg: c,
ebpfProgram: mgr,
closeFilterFn: closeFilterFn,
processMonitor: processMonitor,
cfg: c,
ebpfProgram: mgr,
closeFilterFn: closeFilterFn,
processMonitor: processMonitor,
telemetryStopChannel: make(chan struct{}),
}

usmMonitor.lastUpdateTime = atomic.NewInt64(time.Now().Unix())
Expand Down Expand Up @@ -138,7 +142,11 @@ func (m *Monitor) Start() error {
err = m.processMonitor.Initialize(m.cfg.EnableUSMEventStream)
}

return err
if err != nil {
return err
}
m.startTelemetryReporter()
return nil
}

// Pause bypasses the eBPF programs in the monitor
Expand Down Expand Up @@ -202,6 +210,9 @@ func (m *Monitor) Stop() {
return
}

if m.telemetryStopChannel != nil {
close(m.telemetryStopChannel)
}
m.processMonitor.Stop()

ddebpf.RemoveNameMappings(m.ebpfProgram.Manager.Manager)
Expand All @@ -215,3 +226,20 @@ func (m *Monitor) Stop() {
func (m *Monitor) DumpMaps(w io.Writer, maps ...string) error {
return m.ebpfProgram.DumpMaps(w, maps...)
}

func (m *Monitor) startTelemetryReporter() {
telemetry.SetStatsdClient(statsd.Client)
ticker := time.NewTicker(30 * time.Second)
go func() {
defer ticker.Stop()

for {
select {
case <-ticker.C:
telemetry.ReportStatsd()
case <-m.telemetryStopChannel:
return
}
}
}()
}

0 comments on commit 950cb7a

Please sign in to comment.