Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Feb 21, 2025
1 parent aa3f440 commit 62b19f4
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 7 deletions.
95 changes: 89 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"net/http"
"os"
"reflect"
"runtime"
"time"

Expand Down Expand Up @@ -39,11 +41,12 @@ type Agent struct {
clientPrivateKeyPEM []byte
lastHealth *protobufs.ComponentHealth

logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
health health.Reporter
logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
usageStore *usageStore
health health.Reporter
}

func NewAgent(refineryLogger Logger, agentVersion string, currentConfig config.Config, metrics metrics.Metrics, health health.Reporter) *Agent {
Expand All @@ -57,7 +60,10 @@ func NewAgent(refineryLogger Logger, agentVersion string, currentConfig config.C
effectiveConfig: currentConfig,
metrics: metrics,
health: health,
usageStore: newUsageStore(),
}
metricsType := reflect.TypeOf(metrics)
agent.logger.Errorf(context.Background(), "metrics type: %v", metricsType)
agent.createAgentIdentity()
agent.logger.Debugf(context.Background(), "starting opamp client, id=%v", agent.instanceId)
if err := agent.connect(); err != nil {
Expand Down Expand Up @@ -165,6 +171,7 @@ func (agent *Agent) connect() error {
agent.logger.Debugf(context.Background(), "started opamp client")

go agent.healthCheck()
go agent.usageReport()
return nil
}

Expand Down Expand Up @@ -196,12 +203,88 @@ func (agent *Agent) healthCheck() {
report := agent.calculateHealth()
if report != nil {
agent.lastHealth = report
agent.opampClient.SetHealth(report)
if err := agent.opampClient.SetHealth(report); err != nil {
agent.logger.Errorf(context.Background(), "Could not report health to OpAMP server: %v", err)
}
}

traceUsage, ok := agent.metrics.Get("bytes_received_trace")
if !ok {
panic("leaky wallet from trace")
}
logUsage, ok := agent.metrics.Get("bytes_received_log")
if !ok {
panic("leaky wallet from log")
}

agent.usageStore.Add(traceUsage, logUsage)
}
}
}

func (agent *Agent) usageReport() {
timer := time.NewTicker(15 * time.Second)
defer timer.Stop()

for {
select {
case <-agent.ctx.Done():
return
case <-timer.C:
usageReport, err := agent.usageStore.NewReport()
if err != nil {
if errors.Is(err, errNoData) {
continue
}
agent.logger.Errorf(context.Background(), "Could not generate usage report: %v", err)
continue
}

agent.logger.Errorf(context.Background(), "Usage report: %v", usageReport)

// Try to send the usage report, retry once if pending
if err := agent.sendUsageReport(usageReport); err != nil {
agent.logger.Errorf(context.Background(), "Could not send usage report: %v", err)
}
}
}
}

func (agent *Agent) sendUsageReport(usageReport []byte) error {
isSent, err := agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})

if err != nil {
if errors.Is(err, types.ErrCustomMessagePending) {
agent.logger.Debugf(context.Background(), "Usage report is pending")
select {
case <-agent.ctx.Done():
return agent.ctx.Err()
case <-isSent:
// Retry sending the message once
isSent, err = agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})
if err != nil {
return err
}
}
} else {
return err
}
}

select {
case <-agent.ctx.Done():
return agent.ctx.Err()
case <-isSent:
return nil
}
}

func (agent *Agent) calculateHealth() *protobufs.ComponentHealth {
lastHealth := agent.lastHealth
report := healthMessage(agent.health.IsAlive())
Expand Down
81 changes: 81 additions & 0 deletions agent/otel_metrics_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package agent

import (
"fmt"
"math"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type otlpMetrics struct {
metrics pmetric.Metrics
ms pmetric.Sum
}

func newOTLPMetrics() *otlpMetrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
resourceAttrs := rm.Resource().Attributes()
resourceAttrs.PutStr("service.name", "-service")
resourceAttrs.PutStr("service.version", "1.0.0")
resourceAttrs.PutStr("host.name", "example-host")
sm := rm.ScopeMetrics().AppendEmpty()
ms := sm.Metrics().AppendEmpty()
ms.SetName("bytes_received")
sum := ms.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
return &otlpMetrics{
metrics: metrics,
ms: sum,
}
}

func (om *otlpMetrics) addOTLPSum(timestamp time.Time, value float64, signal usageSignal) error {
intVal, err := convertFloat64ToInt64(value)
if err != nil {
return err
}
d := om.ms.DataPoints().AppendEmpty()
d.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
d.SetIntValue(intVal)
d.Attributes().PutStr("signal", string(signal))
return nil
}

func OTLPMetrics(name string, signal usageSignal, value float64) (pmetric.ResourceMetrics, error) {
intVal, err := convertFloat64ToInt64(value)
if err != nil {
return pmetric.ResourceMetrics{}, err
}
metrics := pmetric.NewResourceMetrics()

// Set resource attributes
resourceAttrs := metrics.Resource().Attributes()
resourceAttrs.PutStr("service.name", "-service")
resourceAttrs.PutStr("service.version", "1.0.0")
resourceAttrs.PutStr("host.name", "example-host")

sm := metrics.ScopeMetrics().AppendEmpty()
m := sm.Metrics().AppendEmpty()
m.SetName("bytes_received")
s := m.SetEmptySum()
s.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
d := s.DataPoints().AppendEmpty()
d.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
d.SetIntValue(intVal)
d.Attributes().PutStr("signal", string(signal))

return metrics, nil
}

func convertFloat64ToInt64(value float64) (int64, error) {
if value > math.MaxInt64 {
return 0, fmt.Errorf("value %f is too large to convert to int64", value)
}
if value < math.MinInt64 {
return 0, fmt.Errorf("value %f is too small to convert to int64", value)
}
return int64(value), nil
}
79 changes: 79 additions & 0 deletions agent/usage_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package agent

import (
"errors"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
)

var errNoData = errors.New("no data to report")

type usageStore struct {
lastUsageData totalUsage

mut sync.Mutex
datapoints []usage
}

func newUsageStore() *usageStore {
return &usageStore{
lastUsageData: make(totalUsage),
datapoints: make([]usage, 0),
}
}

func (ur *usageStore) Add(traceUsage, logUsage float64) {
ur.mut.Lock()
defer ur.mut.Unlock()

if traceUsage != 0 {
deltaTraceUsage := traceUsage - ur.lastUsageData[signal_trace].val
ur.datapoints = append(ur.datapoints, usage{signal: signal_trace, val: deltaTraceUsage, timestamp: time.Now()})
ur.lastUsageData[signal_trace] = usage{signal: signal_trace, val: traceUsage}
}
if logUsage != 0 {
deltaLogUsage := logUsage - ur.lastUsageData[signal_log].val
ur.datapoints = append(ur.datapoints, usage{signal: signal_log, val: deltaLogUsage, timestamp: time.Now()})
ur.lastUsageData[signal_log] = usage{signal: signal_log, val: logUsage}
}

}

func (ur *usageStore) NewReport() ([]byte, error) {
ur.mut.Lock()
defer ur.mut.Unlock()

if len(ur.datapoints) == 0 {
return nil, errNoData
}

otlpMetrics := newOTLPMetrics()
for _, usage := range ur.datapoints {
otlpMetrics.addOTLPSum(usage.timestamp, usage.val, usage.signal)
}

jsonMarshaler := &pmetric.JSONMarshaler{}
data, err := jsonMarshaler.MarshalMetrics(otlpMetrics.metrics)
if err != nil {
return nil, err
}
ur.datapoints = ur.datapoints[:0]
return data, nil
}

type usageSignal string

var (
signal_trace usageSignal = "trace"
signal_log usageSignal = "log"
)

type totalUsage map[usageSignal]usage

type usage struct {
signal usageSignal
val float64
timestamp time.Time
}
6 changes: 6 additions & 0 deletions metrics/multi_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (m *MultiMetrics) Increment(name string) { // for counters
for _, ch := range m.children {
ch.Increment(name)
}
m.lock.Lock()
defer m.lock.Unlock()
m.values[name]++
}

func (m *MultiMetrics) Gauge(name string, val interface{}) { // for gauges
Expand All @@ -91,6 +94,9 @@ func (m *MultiMetrics) Count(name string, n interface{}) { // for counters
for _, ch := range m.children {
ch.Count(name, n)
}
m.lock.Lock()
defer m.lock.Unlock()
m.values[name] += ConvertNumeric(n)
}

func (m *MultiMetrics) Histogram(name string, obs interface{}) { // for histogram
Expand Down
16 changes: 15 additions & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ var routerMetrics = []metrics.Metadata{
{Name: "_router_peer", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of spans proxied to a peer"},
{Name: "_router_batch", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of events received"},
{Name: "_router_otlp", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of otlp requests received"},
{Name: "bytes_received_trace", Type: metrics.Counter, Unit: metrics.Bytes, Description: "the number of bytes received in trace events"},
{Name: "bytes_received_log", Type: metrics.Counter, Unit: metrics.Bytes, Description: "the number of bytes received in log events"},
}

// LnS spins up the Listen and Serve portion of the router. A router is
Expand Down Expand Up @@ -161,7 +163,9 @@ func (r *Router) LnS(incomingOrPeer string) {
}

for _, metric := range routerMetrics {
metric.Name = r.incomingOrPeer + metric.Name
if strings.HasPrefix(metric.Name, "_") {
metric.Name = r.incomingOrPeer + metric.Name
}
r.Metrics.Register(metric)
}

Expand Down Expand Up @@ -392,6 +396,8 @@ func (r *Router) event(w http.ResponseWriter, req *http.Request) {
return
}

r.Metrics.Count("bytes_received_trace", len(reqBod))

ev, err := r.requestToEvent(ctx, req, reqBod)
if err != nil {
r.handlerReturnWithError(w, ErrReqToEvent, err)
Expand Down Expand Up @@ -602,6 +608,14 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
IsRoot: isRootSpan(ev, r.Config),
}

if r.incomingOrPeer == "incoming" {
if span.Data["meta.signal_type"] == "log" {
r.Metrics.Count("bytes_received_log", span.GetDataSize())
} else {
r.Metrics.Count("bytes_received_trace", span.GetDataSize())
}
}

// we know we're a span, but we need to check if we're in Stress Relief mode;
// if we are, then we want to make an immediate, deterministic trace decision
// and either drop or send the trace without even trying to cache or forward it.
Expand Down

0 comments on commit 62b19f4

Please sign in to comment.