Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #1492

Draft
wants to merge 1 commit into
base: codeboten/opamp
Choose a base branch
from
Draft

wip #1492

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 85 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -39,11 +40,12 @@ type Agent struct {
clientPrivateKeyPEM []byte
lastHealth *protobufs.ComponentHealth

logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
health health.Reporter
logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
usageStore *usageStore
health health.Reporter
}

func NewAgent(refineryLogger Logger, agentVersion string, currentConfig config.Config, metrics metrics.Metrics, health health.Reporter) *Agent {
Expand All @@ -57,6 +59,7 @@ func NewAgent(refineryLogger Logger, agentVersion string, currentConfig config.C
effectiveConfig: currentConfig,
metrics: metrics,
health: health,
usageStore: newUsageStore(),
}
agent.createAgentIdentity()
agent.logger.Debugf(context.Background(), "starting opamp client, id=%v", agent.instanceId)
Expand Down Expand Up @@ -165,6 +168,7 @@ func (agent *Agent) connect() error {
agent.logger.Debugf(context.Background(), "started opamp client")

go agent.healthCheck()
go agent.usageReport()
return nil
}

Expand Down Expand Up @@ -196,12 +200,87 @@ func (agent *Agent) healthCheck() {
report := agent.calculateHealth()
if report != nil {
agent.lastHealth = report
agent.opampClient.SetHealth(report)
if err := agent.opampClient.SetHealth(report); err != nil {
agent.logger.Errorf(context.Background(), "Could not report health to OpAMP server: %v", err)
}
}

traceUsage, ok := agent.metrics.Get("bytes_received_trace")
if !ok {
panic("leaky wallet from trace")
}
logUsage, ok := agent.metrics.Get("bytes_received_log")
if !ok {
panic("leaky wallet from log")
}

agent.usageStore.Add(traceUsage, logUsage)
}
}
}

func (agent *Agent) usageReport() {
timer := time.NewTicker(15 * time.Second)
defer timer.Stop()

for {
select {
case <-agent.ctx.Done():
// TODO: drain the existing reports
return
case <-timer.C:
usageReport, err := agent.usageStore.NewReport()
if err != nil {
if errors.Is(err, errNoData) {
continue
}
agent.logger.Errorf(context.Background(), "Could not generate usage report: %v", err)
continue
}

if err := agent.sendUsageReport(usageReport); err != nil {
agent.logger.Errorf(context.Background(), "MONEY STEALING Could not send usage report: %v", err)
}
}
}
}

func (agent *Agent) sendUsageReport(usageReport []byte) error {
isSent, err := agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})

if err != nil {
if errors.Is(err, types.ErrCustomMessagePending) {
agent.logger.Debugf(context.Background(), "Usage report is pending")
select {
case <-agent.ctx.Done():
// TODO: we probably need to drain the existing reports
return agent.ctx.Err()
case <-isSent:
// Retry sending the message once
isSent, err = agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})
if err != nil {
return err
}
}
} else {
return err
}
}

select {
case <-agent.ctx.Done():
return agent.ctx.Err()
case <-isSent:
return nil
}
}

func (agent *Agent) calculateHealth() *protobufs.ComponentHealth {
lastHealth := agent.lastHealth
report := healthMessage(agent.health.IsAlive())
Expand Down
55 changes: 55 additions & 0 deletions agent/otel_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package agent

import (
"fmt"
"math"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type otlpMetrics struct {
metrics pmetric.Metrics
ms pmetric.Sum
}

func newOTLPMetrics() *otlpMetrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
resourceAttrs := rm.Resource().Attributes()
resourceAttrs.PutStr("service.name", "-service")
resourceAttrs.PutStr("service.version", "1.0.0")
resourceAttrs.PutStr("host.name", "example-host")
sm := rm.ScopeMetrics().AppendEmpty()
ms := sm.Metrics().AppendEmpty()
ms.SetName("bytes_received")
sum := ms.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
return &otlpMetrics{
metrics: metrics,
ms: sum,
}
}

func (om *otlpMetrics) addOTLPSum(timestamp time.Time, value float64, signal usageSignal) error {
intVal, err := convertFloat64ToInt64(value)
if err != nil {
return err
}
d := om.ms.DataPoints().AppendEmpty()
d.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
d.SetIntValue(intVal)
d.Attributes().PutStr("signal", string(signal))
return nil
}

func convertFloat64ToInt64(value float64) (int64, error) {
if value > math.MaxInt64 {
return 0, fmt.Errorf("value %f is too large to convert to int64", value)
}
if value < math.MinInt64 {
return 0, fmt.Errorf("value %f is too small to convert to int64", value)
Copy link
Contributor

@kentquirk kentquirk Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"too small" is probably the wrong phrase here -- MinInt64 is a very large negative number.

I'm also wondering why you bothered to introduce this function at all. It's not actually wrong, but if you had just used int64(value) it would only error if a single report is more than about 9000 terabytes. Then addOTLPSum() wouldn't have to return errors either.

}
return int64(value), nil
}
79 changes: 79 additions & 0 deletions agent/usage_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package agent

import (
"errors"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
)

var errNoData = errors.New("no data to report")

type usageStore struct {
lastUsageData totalUsage

mut sync.Mutex
datapoints []usage
}

func newUsageStore() *usageStore {
return &usageStore{
lastUsageData: make(totalUsage),
datapoints: make([]usage, 0),
}
}

func (ur *usageStore) Add(traceUsage, logUsage float64) {
ur.mut.Lock()
defer ur.mut.Unlock()

if traceUsage != 0 {
deltaTraceUsage := traceUsage - ur.lastUsageData[signal_trace].val
ur.datapoints = append(ur.datapoints, usage{signal: signal_trace, val: deltaTraceUsage, timestamp: time.Now()})
ur.lastUsageData[signal_trace] = usage{signal: signal_trace, val: traceUsage}
}
if logUsage != 0 {
deltaLogUsage := logUsage - ur.lastUsageData[signal_log].val
ur.datapoints = append(ur.datapoints, usage{signal: signal_log, val: deltaLogUsage, timestamp: time.Now()})
ur.lastUsageData[signal_log] = usage{signal: signal_log, val: logUsage}
}

}

func (ur *usageStore) NewReport() ([]byte, error) {
ur.mut.Lock()
defer ur.mut.Unlock()

if len(ur.datapoints) == 0 {
return nil, errNoData
}

otlpMetrics := newOTLPMetrics()
for _, usage := range ur.datapoints {
otlpMetrics.addOTLPSum(usage.timestamp, usage.val, usage.signal)
}

jsonMarshaler := &pmetric.JSONMarshaler{}
data, err := jsonMarshaler.MarshalMetrics(otlpMetrics.metrics)
if err != nil {
return nil, err
}
ur.datapoints = ur.datapoints[:0]
return data, nil
}

type usageSignal string

var (
signal_trace usageSignal = "trace"
signal_log usageSignal = "log"
)

type totalUsage map[usageSignal]usage

type usage struct {
signal usageSignal
val float64
timestamp time.Time
}
6 changes: 6 additions & 0 deletions metrics/multi_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (m *MultiMetrics) Increment(name string) { // for counters
for _, ch := range m.children {
ch.Increment(name)
}
m.lock.Lock()
defer m.lock.Unlock()
m.values[name]++
}

func (m *MultiMetrics) Gauge(name string, val interface{}) { // for gauges
Expand All @@ -91,6 +94,9 @@ func (m *MultiMetrics) Count(name string, n interface{}) { // for counters
for _, ch := range m.children {
ch.Count(name, n)
}
m.lock.Lock()
defer m.lock.Unlock()
m.values[name] += ConvertNumeric(n)
}

func (m *MultiMetrics) Histogram(name string, obs interface{}) { // for histogram
Expand Down
16 changes: 15 additions & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ var routerMetrics = []metrics.Metadata{
{Name: "_router_peer", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of spans proxied to a peer"},
{Name: "_router_batch", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of events received"},
{Name: "_router_otlp", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of otlp requests received"},
{Name: "bytes_received_trace", Type: metrics.Counter, Unit: metrics.Bytes, Description: "the number of bytes received in trace events"},
{Name: "bytes_received_log", Type: metrics.Counter, Unit: metrics.Bytes, Description: "the number of bytes received in log events"},
}

// LnS spins up the Listen and Serve portion of the router. A router is
Expand Down Expand Up @@ -161,7 +163,9 @@ func (r *Router) LnS(incomingOrPeer string) {
}

for _, metric := range routerMetrics {
metric.Name = r.incomingOrPeer + metric.Name
if strings.HasPrefix(metric.Name, "_") {
metric.Name = r.incomingOrPeer + metric.Name
}
r.Metrics.Register(metric)
}

Expand Down Expand Up @@ -392,6 +396,8 @@ func (r *Router) event(w http.ResponseWriter, req *http.Request) {
return
}

r.Metrics.Count("bytes_received_trace", len(reqBod))

ev, err := r.requestToEvent(ctx, req, reqBod)
if err != nil {
r.handlerReturnWithError(w, ErrReqToEvent, err)
Expand Down Expand Up @@ -602,6 +608,14 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
IsRoot: isRootSpan(ev, r.Config),
}

if r.incomingOrPeer == "incoming" {
if span.Data["meta.signal_type"] == "log" {
r.Metrics.Count("bytes_received_log", span.GetDataSize())
} else {
r.Metrics.Count("bytes_received_trace", span.GetDataSize())
}
}

// we know we're a span, but we need to check if we're in Stress Relief mode;
// if we are, then we want to make an immediate, deterministic trace decision
// and either drop or send the trace without even trying to cache or forward it.
Expand Down
Loading