Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sumconnector] implement summing logic #34797

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c38069d
add connector and sum logic
greatestusername Aug 21, 2024
36d9c7e
add connector testing for traces, spans, metrics, logs
greatestusername Aug 21, 2024
0854f85
changelog and make generate
greatestusername Aug 21, 2024
ed8e9fc
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Aug 22, 2024
9ad7238
fix collector subversion
greatestusername Aug 22, 2024
d3ad724
gotidy golint etc
greatestusername Aug 22, 2024
0ad1e45
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Aug 22, 2024
b010a2d
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 3, 2024
37a22a0
slim down to just trace and span for testing
greatestusername Sep 4, 2024
da219e9
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 4, 2024
f4ddb91
Merge branch '32669-sumconnector-sum-logic' of github.com:greatestuse…
greatestusername Sep 4, 2024
789dc68
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 4, 2024
7182f7a
run make genotelcontribcol
greatestusername Sep 4, 2024
0103e34
Merge branch '32669-sumconnector-sum-logic' of github.com:greatestuse…
greatestusername Sep 4, 2024
72e967d
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 5, 2024
0250a1f
Update cmd/otelcontribcol/go.mod
greatestusername Sep 5, 2024
8a2f85c
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 5, 2024
ad46210
clarify logic and update readme
greatestusername Sep 5, 2024
086fc36
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 5, 2024
73f03a7
generate and lint
greatestusername Sep 5, 2024
1109c52
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 5, 2024
ec9549a
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 6, 2024
f2b0598
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 6, 2024
dd5a3e2
test
greatestusername Sep 9, 2024
be2baa3
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 11, 2024
5708fa4
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 11, 2024
8df9c27
Merge branch '32669-sumconnector-sum-logic' of github.com:greatestuse…
greatestusername Sep 11, 2024
2994121
use new CompareMetrics option to ignore float precision
greatestusername Sep 11, 2024
4fd2b1f
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 11, 2024
9e610f3
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 12, 2024
a2c4513
update README
greatestusername Sep 12, 2024
645f31b
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 13, 2024
0b35466
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 13, 2024
68d4efd
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 17, 2024
247c68d
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 18, 2024
0339d44
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 20, 2024
8d83c2a
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 20, 2024
8c8c6be
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 24, 2024
972debf
gotidy
greatestusername Sep 24, 2024
6327057
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 24, 2024
2ff1fbc
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 24, 2024
900637c
gotidy again
greatestusername Sep 24, 2024
f2f9110
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 25, 2024
482b432
Merge branch 'main' into 32669-sumconnector-sum-logic
greatestusername Sep 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/32669-sumconnector-sum-logic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'
greatestusername marked this conversation as resolved.
Show resolved Hide resolved

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'sumconnector'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "adds connector and summing logic along with tests"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32669]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
greatestusername marked this conversation as resolved.
Show resolved Hide resolved
142 changes: 142 additions & 0 deletions connector/sumconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package sumconnector // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -19,6 +22,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
)

const scopeName = "otelcol/sumconnector"
greatestusername marked this conversation as resolved.
Show resolved Hide resolved

// sum can sum attribute values from spans, span event, metrics, data points, or log records
// and emit the sums onto a metrics pipeline.
type sum struct {
Expand All @@ -38,22 +43,159 @@ func (c *sum) Capabilities() consumer.Capabilities {
}

func (c *sum) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len())
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
spansSummer := newSummer[ottlspan.TransformContext](c.spansMetricDefs)
spanEventsSummer := newSummer[ottlspanevent.TransformContext](c.spanEventsMetricDefs)

for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)

for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spansSummer.update(ctx, span.Attributes(), sCtx))

for l := 0; l < span.Events().Len(); l++ {
event := span.Events().At(l)
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spanEventsSummer.update(ctx, event.Attributes(), eCtx))
}
}
}

if len(spansSummer.sums)+len(spanEventsSummer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceSpan.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceSpan.ScopeSpans().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()
sumScope.Scope().SetName(scopeName)

spansSummer.appendMetricsTo(sumScope.Metrics())
spanEventsSummer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}

func (c *sum) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len())
for i := 0; i < md.ResourceMetrics().Len(); i++ {
resourceMetric := md.ResourceMetrics().At(i)
metricsSummer := newSummer[ottlmetric.TransformContext](c.metricsMetricDefs)
dataPointsSummer := newSummer[ottldatapoint.TransformContext](c.dataPointsMetricDefs)

for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetrics := resourceMetric.ScopeMetrics().At(j)

for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, metricsSummer.update(ctx, pcommon.NewMap(), mCtx))

//exhaustive:enforce
greatestusername marked this conversation as resolved.
Show resolved Hide resolved
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeEmpty:
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
}
}
}

if len(metricsSummer.sums)+len(dataPointsSummer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceMetric.ScopeMetrics().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()
sumScope.Scope().SetName(scopeName)

metricsSummer.appendMetricsTo(sumScope.Metrics())
dataPointsSummer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}

func (c *sum) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len())
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
summer := newSummer[ottllog.TransformContext](c.logsMetricDefs)

for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLogs := resourceLog.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)

lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
multiError = errors.Join(multiError, summer.update(ctx, logRecord.Attributes(), lCtx))
}
}

if len(summer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceLog.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()
sumScope.Scope().SetName(scopeName)

summer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}
Loading
Loading