Skip to content

Commit

Permalink
[receiver/prometheus]: group metrics by job/instance label; clean up …
Browse files Browse the repository at this point in the history
…and add tests

Signed-off-by: Florian Bacher <[email protected]>
  • Loading branch information
bacherfl committed Jul 31, 2024
1 parent 63556dc commit ede2035
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 90 deletions.
151 changes: 61 additions & 90 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ type transaction struct {
trimSuffixes bool
enableNativeHistograms bool
ctx context.Context
families map[scopeID]map[string]*metricFamily
resourceFamilies map[resourceKey]map[scopeID]map[string]*metricFamily
families map[resourceKey]map[scopeID]map[string]*metricFamily
mc scrape.MetricMetadataStore
sink consumer.Metrics
externalLabels labels.Labels
nodeResources map[resourceKey]pcommon.Resource
nodeResource pcommon.Resource
scopeAttributes map[scopeID]pcommon.Map
scopeAttributes map[resourceKey]map[scopeID]pcommon.Map
logger *zap.Logger
buildInfo component.BuildInfo
metricAdjuster MetricsAdjuster
Expand All @@ -73,8 +71,7 @@ func newTransaction(
enableNativeHistograms bool) *transaction {
return &transaction{
ctx: ctx,
families: make(map[scopeID]map[string]*metricFamily),
resourceFamilies: make(map[resourceKey]map[scopeID]map[string]*metricFamily),
families: make(map[resourceKey]map[scopeID]map[string]*metricFamily),
isNew: true,
trimSuffixes: trimSuffixes,
enableNativeHistograms: enableNativeHistograms,
Expand All @@ -85,7 +82,7 @@ func newTransaction(
buildInfo: settings.BuildInfo,
obsrecv: obsrecv,
bufBytes: make([]byte, 0, 1024),
scopeAttributes: make(map[scopeID]pcommon.Map),
scopeAttributes: make(map[resourceKey]map[scopeID]pcommon.Map),
nodeResources: map[resourceKey]pcommon.Resource{},
}
}
Expand All @@ -106,8 +103,7 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64,
ls = b.Labels()
}

// TODO t.isNew can not be used as-is with the check for job/instance pair already being present - look into how the calls to limitTransaction can be reduced
resourceKey, err := t.initTransaction(ls)
rKey, err := t.initTransaction(ls)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -143,17 +139,17 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64,

// For the `target_info` metric we need to convert it to resource attributes.
if metricName == prometheus.TargetInfoMetricName {
t.AddTargetInfo(ls)
t.AddTargetInfo(*rKey, ls)
return 0, nil
}

// For the `otel_scope_info` metric we need to convert it to scope attributes.
if metricName == prometheus.ScopeInfoMetricName {
t.addScopeInfo(ls)
t.addScopeInfo(*rKey, ls)
return 0, nil
}

curMF, existing := t.getOrCreateMetricFamily(*resourceKey, getScopeID(ls), metricName)
curMF, existing := t.getOrCreateMetricFamily(*rKey, getScopeID(ls), metricName)

if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram {
// If a histogram has both classic and native version, the native histogram is scraped
Expand Down Expand Up @@ -189,41 +185,24 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64,
// getOrCreateMetricFamily returns the metric family for the given metric name and scope,
// and true if an existing family was found.
func (t *transaction) getOrCreateMetricFamily(key resourceKey, scope scopeID, mn string) (*metricFamily, bool) {
_, ok := t.families[scope]
if !ok {
t.families[scope] = make(map[string]*metricFamily)
}
if _, ok := t.resourceFamilies[key]; !ok {
t.resourceFamilies[key] = make(map[scopeID]map[string]*metricFamily)
}
if _, ok := t.resourceFamilies[key][scope]; !ok {
t.resourceFamilies[key][scope] = make(map[string]*metricFamily)
}
//curMf, ok := t.families[scope][mn]
//if !ok {
// fn := mn
// if _, ok := t.mc.GetMetadata(mn); !ok {
// fn = normalizeMetricName(mn)
// }
// if mf, ok := t.families[scope][fn]; ok && mf.includesMetric(mn) {
// curMf = mf
// } else {
// curMf = newMetricFamily(mn, t.mc, t.logger)
// t.families[scope][curMf.name] = curMf
// return curMf, false
// }
//}
curMf, ok := t.resourceFamilies[key][scope][mn]
if _, ok := t.families[key]; !ok {
t.families[key] = make(map[scopeID]map[string]*metricFamily)
}
if _, ok := t.families[key][scope]; !ok {
t.families[key][scope] = make(map[string]*metricFamily)
}

curMf, ok := t.families[key][scope][mn]
if !ok {
fn := mn
if _, ok := t.mc.GetMetadata(mn); !ok {
fn = normalizeMetricName(mn)
}
if mf, ok := t.resourceFamilies[key][scope][fn]; ok && mf.includesMetric(mn) {
if mf, ok := t.families[key][scope][fn]; ok && mf.includesMetric(mn) {
curMf = mf
} else {
curMf = newMetricFamily(mn, t.mc, t.logger)
t.resourceFamilies[key][scope][curMf.name] = curMf
t.families[key][scope][curMf.name] = curMf
return curMf, false
}
}
Expand Down Expand Up @@ -332,22 +311,22 @@ func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) u

// getMetrics returns all metrics to the given slice.
// The only error returned by this function is errNoDataToBuild.
func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
if len(t.resourceFamilies) == 0 {
func (t *transaction) getMetrics() (pmetric.Metrics, error) {
if len(t.families) == 0 {
return pmetric.Metrics{}, errNoDataToBuild
}

md := pmetric.NewMetrics()

for rKey, families := range t.resourceFamilies {
if len(t.resourceFamilies[rKey]) == 0 {
for rKey, families := range t.families {
if len(families) == 0 {
continue
}
rms := md.ResourceMetrics().AppendEmpty()
resource, ok := t.nodeResources[rKey]
if !ok {
continue
}
rms := md.ResourceMetrics().AppendEmpty()
resource.CopyTo(rms.Resource())

for scope, mfs := range families {
Expand All @@ -363,9 +342,10 @@ func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, er
ils.Scope().SetVersion(scope.version)
// If we got an otel_scope_info metric for that scope, get scope
// attributes from it.
attributes, ok := t.scopeAttributes[scope]
if ok {
attributes.CopyTo(ils.Scope().Attributes())
if scopeAttributes, ok := t.scopeAttributes[rKey]; ok {
if attributes, ok := scopeAttributes[scope]; ok {
attributes.CopyTo(ils.Scope().Attributes())
}
}
}
metrics := ils.Metrics()
Expand All @@ -374,33 +354,20 @@ func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, er
}
}
}

//rms := md.ResourceMetrics().AppendEmpty()
//resource.CopyTo(rms.Resource())
//
//for scope, mfs := range t.families {
// ils := rms.ScopeMetrics().AppendEmpty()
// // If metrics don't include otel_scope_name or otel_scope_version
// // labels, use the receiver name and version.
// if scope == emptyScopeID {
// ils.Scope().SetName(receiverName)
// ils.Scope().SetVersion(t.buildInfo.Version)
// } else {
// // Otherwise, use the scope that was provided with the metrics.
// ils.Scope().SetName(scope.name)
// ils.Scope().SetVersion(scope.version)
// // If we got an otel_scope_info metric for that scope, get scope
// // attributes from it.
// attributes, ok := t.scopeAttributes[scope]
// if ok {
// attributes.CopyTo(ils.Scope().Attributes())
// }
// }
// metrics := ils.Metrics()
// for _, mf := range mfs {
// mf.appendMetric(metrics, t.trimSuffixes)
// }
//}
// remove the resource if no metrics were added to avoid returning resources with empty data points
md.ResourceMetrics().RemoveIf(func(metrics pmetric.ResourceMetrics) bool {
if metrics.ScopeMetrics().Len() == 0 {
return true
}
remove := true
for i := 0; i < metrics.ScopeMetrics().Len(); i++ {
if metrics.ScopeMetrics().At(i).Metrics().Len() > 0 {
remove = false
break
}
}
return remove
})

return md, nil
}
Expand Down Expand Up @@ -428,16 +395,16 @@ func (t *transaction) initTransaction(labels labels.Labels) (*resourceKey, error
return nil, errors.New("unable to find MetricMetadataStore in context")
}

resourceKey, err := t.getJobAndInstance(labels)
rKey, err := t.getJobAndInstance(labels)
if err != nil {
return nil, err
}
if _, ok := t.nodeResources[*resourceKey]; !ok {
t.nodeResources[*resourceKey] = CreateResource(resourceKey.job, resourceKey.instance, target.DiscoveredLabels())
if _, ok := t.nodeResources[*rKey]; !ok {
t.nodeResources[*rKey] = CreateResource(rKey.job, rKey.instance, target.DiscoveredLabels())
}
t.nodeResource = CreateResource(resourceKey.job, resourceKey.instance, target.DiscoveredLabels())

t.isNew = false
return resourceKey, nil
return rKey, nil
}

func (t *transaction) getJobAndInstance(labels labels.Labels) (*resourceKey, error) {
Expand Down Expand Up @@ -478,7 +445,7 @@ func (t *transaction) Commit() error {
}

ctx := t.obsrecv.StartMetricsOp(t.ctx)
md, err := t.getMetrics(t.nodeResource)
md, err := t.getMetrics()
if err != nil {
t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
return err
Expand Down Expand Up @@ -508,17 +475,19 @@ func (t *transaction) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ met
return 0, nil
}

func (t *transaction) AddTargetInfo(ls labels.Labels) {
attrs := t.nodeResource.Attributes()
ls.Range(func(lbl labels.Label) {
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
return
}
attrs.PutStr(lbl.Name, lbl.Value)
})
func (t *transaction) AddTargetInfo(key resourceKey, ls labels.Labels) {
if resource, ok := t.nodeResources[key]; ok {
attrs := resource.Attributes()
ls.Range(func(lbl labels.Label) {
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
return
}
attrs.PutStr(lbl.Name, lbl.Value)
})
}
}

func (t *transaction) addScopeInfo(ls labels.Labels) {
func (t *transaction) addScopeInfo(key resourceKey, ls labels.Labels) {
attrs := pcommon.NewMap()
scope := scopeID{}
ls.Range(func(lbl labels.Label) {
Expand All @@ -535,7 +504,9 @@ func (t *transaction) addScopeInfo(ls labels.Labels) {
}
attrs.PutStr(lbl.Name, lbl.Value)
})
t.scopeAttributes[scope] = attrs
if scopeAttributes, ok := t.scopeAttributes[key]; ok {
scopeAttributes[scope] = attrs
}
}

func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
Expand Down
52 changes: 52 additions & 0 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
conventions "go.opentelemetry.io/collector/semconv/v1.25.0"
"testing"
"time"

Expand Down Expand Up @@ -178,6 +179,57 @@ func testTransactionAppendResource(t *testing.T, enableNativeHistograms bool) {
require.Equal(t, expectedResource, gotResource)
}

func TestTransactionAppendMultipleResources(t *testing.T) {
for _, enableNativeHistograms := range []bool{true, false} {
t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) {
testTransactionAppendMultipleResources(t, enableNativeHistograms)
})
}
}

func testTransactionAppendMultipleResources(t *testing.T, enableNativeHistograms bool) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopSettings(), nopObsRecv(t), false, enableNativeHistograms)
_, err := tr.Append(0, labels.FromMap(map[string]string{
model.InstanceLabel: "localhost:8080",
model.JobLabel: "test-1",
model.MetricNameLabel: "counter_test",
}), time.Now().Unix()*1000, 1.0)
assert.NoError(t, err)
_, err = tr.Append(0, labels.FromMap(map[string]string{
model.InstanceLabel: "localhost:8080",
model.JobLabel: "test-2",
model.MetricNameLabel: startTimeMetricName,
}), time.Now().UnixMilli(), 1.0)
assert.NoError(t, err)
assert.NoError(t, tr.Commit())

expectedResources := []pcommon.Resource{
CreateResource("test-1", "localhost:8080", labels.FromStrings(model.SchemeLabel, "http")),
CreateResource("test-2", "localhost:8080", labels.FromStrings(model.SchemeLabel, "http")),
}

mds := sink.AllMetrics()
require.Len(t, mds, 1)
require.Equal(t, 2, mds[0].ResourceMetrics().Len())

for _, expectedResource := range expectedResources {
foundResource := false
expectedServiceName, _ := expectedResource.Attributes().Get(conventions.AttributeServiceName)
for i := 0; i < mds[0].ResourceMetrics().Len(); i++ {
res := mds[0].ResourceMetrics().At(i).Resource()
if serviceName, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
if serviceName.AsString() == expectedServiceName.AsString() {
foundResource = true
require.Equal(t, expectedResource, res)
break
}
}
}
require.True(t, foundResource)
}
}

func TestReceiverVersionAndNameAreAttached(t *testing.T) {
for _, enableNativeHistograms := range []bool{true, false} {
t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) {
Expand Down

0 comments on commit ede2035

Please sign in to comment.