From 1444acd11751f69e3a8f85be787ddfbe37371d7d Mon Sep 17 00:00:00 2001 From: Hyunsoo Kim Date: Mon, 26 Feb 2024 15:54:24 -0500 Subject: [PATCH] add gpu metric consumer that uses k8s decorator for attributes --- internal/aws/containerinsight/const.go | 4 + internal/aws/containerinsight/utils.go | 33 +++ .../internal/cadvisor/cadvisor_linux.go | 5 +- .../internal/cadvisor/cadvisor_linux_test.go | 4 +- .../internal/cadvisor/extractors/extractor.go | 3 + .../internal/gpu/dcgmscraper.go | 29 ++- .../internal/gpu/decorator.go | 204 +++++++++++++++ .../internal/gpu/decorator_test.go | 233 ++++++++++++++++++ .../internal/stores/podstore.go | 4 +- .../internal/stores/store.go | 7 +- .../internal/stores/utils.go | 3 + .../internal/stores/utils_test.go | 2 + .../awscontainerinsightreceiver/receiver.go | 6 +- 13 files changed, 515 insertions(+), 22 deletions(-) create mode 100644 receiver/awscontainerinsightreceiver/internal/gpu/decorator.go create mode 100644 receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 38a94360bd8b..6fb5d762cb6f 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -151,6 +151,10 @@ const ( // Special type for pause container // because containerd does not set container name pause container name to POD like docker does. TypeInfraContainer = "InfraContainer" + TypeGpuContainer = "ContainerGPU" + TypeGpuPod = "PodGPU" + TypeGpuNode = "NodeGPU" + TypeGpuCluster = "ClusterGPU" // unit UnitBytes = "Bytes" diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index 5f734597bfb1..18cf0662559b 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -166,6 +166,39 @@ func GetUnitForMetric(metric string) string { return metricToUnitMap[metric] } +// ConvertToFieldsAndTags converts OTLP metric to a field containing metric values and a tag containing for decoration +func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) (map[string]any, map[string]string) { + fields := make(map[string]any) + tags := make(map[string]string) + if m.Name() == "" { + return fields, tags + } + + // value is not needed for label decoration + fields[m.Name()] = 0 + + var dps pmetric.NumberDataPointSlice + switch m.Type() { + case pmetric.MetricTypeGauge: + dps = m.Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = m.Sum().DataPoints() + default: + logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String())) + } + + // should support metrics with more than 1 datapoints? + if dps.Len() > 1 { + logger.Warn("Metric with more than 1 datapoint is not supported", zap.String("metric", m.Name()), zap.Int("datapoints", dps.Len())) + } + attrs := dps.At(0).Attributes() + attrs.Range(func(k string, v pcommon.Value) bool { + tags[k] = v.Str() + return true + }) + return fields, tags +} + // ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics { md := pmetric.NewMetrics() diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index 971217b9c05e..4a4521479faa 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -22,6 +22,7 @@ import ( cInfo "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/manager" "github.com/google/cadvisor/utils/sysfs" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -109,7 +110,7 @@ type EcsInfo interface { } type Decorator interface { - Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric + Decorate(stores.CIMetric) stores.CIMetric Shutdown() error } @@ -307,7 +308,7 @@ func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) out := c.k8sDecorator.Decorate(m) if out != nil { - result = append(result, out) + result = append(result, out.(*extractors.CAdvisorMetric)) } } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go index 85451e580ea2..9e38af49c936 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go @@ -16,10 +16,10 @@ import ( info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/manager" "github.com/google/cadvisor/utils/sysfs" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "github.com/stretchr/testify/assert" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" ) @@ -73,7 +73,7 @@ var mockCreateManagerWithError = func(memoryCache *memory.InMemoryCache, sysfs s type MockK8sDecorator struct { } -func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric { +func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric { return metric } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 398ad4805a59..a10c891117e4 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -8,12 +8,15 @@ import ( "time" cinfo "github.com/google/cadvisor/info/v1" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "go.uber.org/zap" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" ) +var _ stores.CIMetric = (*CAdvisorMetric)(nil) + func GetStats(info *cinfo.ContainerInfo) *cinfo.ContainerStats { if len(info.Stats) == 0 { return nil diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go index 8aa650750c6a..6f737d978b57 100644 --- a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go +++ b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go @@ -9,6 +9,7 @@ import ( "fmt" "time" + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -35,6 +36,7 @@ type DcgmScraper struct { host component.Host hostInfoProvider hostInfoProvider prometheusReceiver receiver.Metrics + k8sDecorator Decorator running bool } @@ -44,6 +46,8 @@ type DcgmScraperOpts struct { Consumer consumer.Metrics Host component.Host HostInfoProvider hostInfoProvider + K8sDecorator Decorator + Logger *zap.Logger } type hostInfoProvider interface { @@ -137,13 +141,6 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) { Replacement: "${1}", Action: relabel.Replace, }, - { - SourceLabels: model.LabelNames{"pod"}, - TargetLabel: "PodName", - Regex: relabel.MustNewRegexp("(.+)-(.+)"), - Replacement: "${1}", - Action: relabel.Replace, - }, // additional k8s podname for service name decoration { SourceLabels: model.LabelNames{"pod"}, @@ -179,8 +176,15 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) { TelemetrySettings: opts.TelemetrySettings, } + decoConsumer := decorateConsumer{ + containerOrchestrator: ci.EKS, + nextConsumer: opts.Consumer, + k8sDecorator: opts.K8sDecorator, + logger: opts.Logger, + } + promFactory := prometheusreceiver.NewFactory() - promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, opts.Consumer) + promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, &decoConsumer) if err != nil { return nil, fmt.Errorf("failed to create prometheus receiver: %w", err) } @@ -191,6 +195,7 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) { host: opts.Host, hostInfoProvider: opts.HostInfoProvider, prometheusReceiver: promReceiver, + k8sDecorator: opts.K8sDecorator, }, nil } @@ -205,6 +210,7 @@ func (ds *DcgmScraper) GetMetrics() []pmetric.Metrics { } ds.running = err == nil } + return nil } @@ -216,4 +222,11 @@ func (ds *DcgmScraper) Shutdown() { } ds.running = false } + + if ds.k8sDecorator != nil { + err := ds.k8sDecorator.Shutdown() + if err != nil { + ds.settings.Logger.Error("Unable to shutdown K8sDecorator", zap.Error(err)) + } + } } diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/decorator.go b/receiver/awscontainerinsightreceiver/internal/gpu/decorator.go new file mode 100644 index 000000000000..4c106db52730 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/gpu/decorator.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package gpu + +import ( + "context" + "errors" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +const ( + gpuUtil = "DCGM_FI_DEV_GPU_UTIL" + gpuMemUtil = "DCGM_FI_DEV_FB_USED_PERCENT" + gpuMemUsed = "DCGM_FI_DEV_FB_USED" + gpuMemTotal = "DCGM_FI_DEV_FB_TOTAL" + gpuTemperature = "DCGM_FI_DEV_GPU_TEMP" + gpuPowerDraw = "DCGM_FI_DEV_POWER_USAGE" +) + +var _ stores.CIMetric = (*gpuMetric)(nil) + +var metricToUnit = map[string]string{ + gpuUtil: "Percent", + gpuMemUtil: "Percent", + gpuMemUsed: "Bytes", + gpuMemTotal: "Bytes", + gpuTemperature: "None", + gpuPowerDraw: "None", +} + +type gpuMetric struct { + // key/value pairs that are typed and contain the metric (numerical) data + fields map[string]any + // key/value string pairs that are used to identify the metrics + tags map[string]string +} + +func newResourceMetric(mType string, logger *zap.Logger) *gpuMetric { + metric := &gpuMetric{ + fields: make(map[string]any), + tags: make(map[string]string), + } + metric.tags[ci.MetricType] = mType + return metric +} + +func (gr *gpuMetric) GetTags() map[string]string { + return gr.tags +} + +func (gr *gpuMetric) GetFields() map[string]any { + return gr.fields +} + +func (gr *gpuMetric) GetMetricType() string { + return gr.tags[ci.MetricType] +} + +func (gr *gpuMetric) AddTags(tags map[string]string) { + for k, v := range tags { + gr.tags[k] = v + } +} + +func (gr *gpuMetric) HasField(key string) bool { + return gr.fields[key] != nil +} + +func (gr *gpuMetric) AddField(key string, val any) { + gr.fields[key] = val +} + +func (gr *gpuMetric) GetField(key string) any { + return gr.fields[key] +} + +func (gr *gpuMetric) HasTag(key string) bool { + return gr.tags[key] != "" +} + +func (gr *gpuMetric) AddTag(key, val string) { + gr.tags[key] = val +} + +func (gr *gpuMetric) GetTag(key string) string { + return gr.tags[key] +} + +func (gr *gpuMetric) RemoveTag(key string) { + delete(gr.tags, key) +} + +// GPU decorator acts as an interceptor of metrics before the scraper sends them to the next designated consumer +type decorateConsumer struct { + containerOrchestrator string + nextConsumer consumer.Metrics + k8sDecorator Decorator + logger *zap.Logger +} + +func (dc *decorateConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (dc *decorateConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + resourceTags := make(map[string]string) + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + // get resource attributes + ras := rms.At(i).Resource().Attributes() + ras.Range(func(k string, v pcommon.Value) bool { + resourceTags[k] = v.AsString() + return true + }) + ilms := rms.At(i).ScopeMetrics() + for j := 0; j < ilms.Len(); j++ { + ms := ilms.At(j).Metrics() + for k := 0; k < ms.Len(); k++ { + m := ms.At(k) + fields, tags := ci.ConvertToFieldsAndTags(m, dc.logger) + maps.Copy(tags, resourceTags) + rm := gpuMetric{ + fields: fields, + tags: tags, + } + if !rm.HasTag(ci.MetricType) { + // force type to be Container to decorate with container level labels + rm.AddTag(ci.MetricType, ci.TypeGpuContainer) + } + dc.decorateMetrics([]*gpuMetric{&rm}) + dc.updateAttributes(m, rm) + if unit, ok := metricToUnit[m.Name()]; ok { + m.SetUnit(unit) + } + } + } + } + return dc.nextConsumer.ConsumeMetrics(ctx, md) +} + +type Decorator interface { + Decorate(stores.CIMetric) stores.CIMetric + Shutdown() error +} + +func (dc *decorateConsumer) decorateMetrics(metrics []*gpuMetric) []*gpuMetric { + var result []*gpuMetric + for _, m := range metrics { + // add tags for EKS + if dc.containerOrchestrator == ci.EKS { + out := dc.k8sDecorator.Decorate(m) + if out != nil { + result = append(result, out.(*gpuMetric)) + } + } + } + return result +} + +func (dc *decorateConsumer) updateAttributes(m pmetric.Metric, gm gpuMetric) { + if len(gm.tags) < 1 { + return + } + var dps pmetric.NumberDataPointSlice + switch m.Type() { + case pmetric.MetricTypeGauge: + dps = m.Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = m.Sum().DataPoints() + default: + dc.logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String())) + } + + if dps.Len() < 1 { + return + } + attrs := dps.At(0).Attributes() + for tk, tv := range gm.tags { + // type gets set with metrictransformer while duplicating metrics at different resource levels + if tk == ci.MetricType { + continue + } + attrs.PutStr(tk, tv) + } +} + +func (dc *decorateConsumer) Shutdown() error { + var errs error + + if dc.k8sDecorator != nil { + errs = errors.Join(errs, dc.k8sDecorator.Shutdown()) + } + return errs +} diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go b/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go new file mode 100644 index 000000000000..4cbcb44700b5 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go @@ -0,0 +1,233 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package gpu + +import ( + "context" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +var _ Decorator = (*MockK8sDecorator)(nil) + +type mockGpuMetric struct { + tags map[string]string + fields map[string]any +} + +func (m *mockGpuMetric) HasField(key string) bool { + return m.fields[key] != nil +} + +func (m *mockGpuMetric) AddField(key string, val any) { + m.fields[key] = val +} + +func (m *mockGpuMetric) GetField(key string) any { + return m.fields[key] +} + +func (m *mockGpuMetric) HasTag(key string) bool { + return m.tags[key] != "" +} + +func (m *mockGpuMetric) AddTag(key, val string) { + m.tags[key] = val +} + +func (m *mockGpuMetric) GetTag(key string) string { + return m.tags[key] +} + +func (m *mockGpuMetric) RemoveTag(key string) { + delete(m.tags, key) +} + +type MockK8sDecorator struct { +} + +func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric { + return metric +} + +func (m *MockK8sDecorator) Shutdown() error { + return nil +} + +type mockNextConsumer struct { +} + +func (mc *mockNextConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (mc *mockNextConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + return nil +} + +func TestConsumeMetrics(t *testing.T) { + logger, _ := zap.NewDevelopment() + dc := &decorateConsumer{ + containerOrchestrator: "EKS", + nextConsumer: consumertest.NewNop(), + k8sDecorator: &MockK8sDecorator{}, + logger: logger, + } + ctx := context.Background() + + testcases := map[string]struct { + metrics pmetric.Metrics + want pmetric.Metrics + shouldError bool + }{ + "empty": { + metrics: pmetric.NewMetrics(), + want: pmetric.NewMetrics(), + shouldError: false, + }, + "unit": { + metrics: generateMetrics(map[string]map[string]string{ + gpuUtil: { + "device": "test0", + }, + gpuMemUtil: { + "device": "test0", + }, + gpuMemTotal: { + "device": "test0", + }, + gpuMemUsed: { + "device": "test0", + }, + gpuPowerDraw: { + "device": "test0", + }, + gpuTemperature: { + "device": "test0", + }, + }), + want: generateMetrics(map[string]map[string]string{ + gpuUtil: { + "device": "test0", + "Unit": "Percent", + }, + gpuMemUtil: { + "device": "test0", + "Unit": "Percent", + }, + gpuMemTotal: { + "device": "test0", + "Unit": "Bytes", + }, + gpuMemUsed: { + "device": "test0", + "Unit": "Bytes", + }, + gpuPowerDraw: { + "device": "test0", + "Unit": "None", + }, + gpuTemperature: { + "device": "test0", + "Unit": "None", + }, + }), + shouldError: false, + }, + "noUnit": { + metrics: generateMetrics(map[string]map[string]string{ + "test": { + "device": "test0", + }, + }), + want: generateMetrics(map[string]map[string]string{ + "test": { + "device": "test0", + }, + }), + shouldError: false, + }, + "typeUnchanged": { + metrics: generateMetrics(map[string]map[string]string{ + gpuUtil: { + "device": "test0", + "Type": "TestType", + }, + }), + want: generateMetrics(map[string]map[string]string{ + gpuUtil: { + "device": "test0", + "Type": "TestType", + "Unit": "Percent", + }, + }), + shouldError: false, + }, + } + + for _, tc := range testcases { + err := dc.ConsumeMetrics(ctx, tc.metrics) + if tc.shouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tc.metrics.MetricCount(), tc.want.MetricCount()) + if tc.want.MetricCount() == 0 { + continue + } + actuals := tc.metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + actuals.Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + wants := tc.want.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + wants.Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + for i := 0; i < wants.Len(); i++ { + actual := actuals.At(i) + want := wants.At(i) + assert.Equal(t, want.Name(), actual.Name()) + assert.Equal(t, want.Unit(), actual.Unit()) + actualAttrs := actual.Gauge().DataPoints().At(0).Attributes() + wantAttrs := want.Gauge().DataPoints().At(0).Attributes() + assert.Equal(t, wantAttrs.Len(), actualAttrs.Len()) + wantAttrs.Range(func(k string, v pcommon.Value) bool { + av, ok := actualAttrs.Get(k) + assert.True(t, ok) + assert.Equal(t, v, av) + return true + }) + } + } +} + +func generateMetrics(nameToDims map[string]map[string]string) pmetric.Metrics { + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for name, dims := range nameToDims { + m := ms.AppendEmpty() + m.SetName(name) + gauge := m.SetEmptyGauge().DataPoints().AppendEmpty() + gauge.SetIntValue(10) + for k, v := range dims { + if k == "Unit" { + m.SetUnit(v) + continue + } + gauge.Attributes().PutStr(k, v) + } + } + return md +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 41b182f181be..46296fc6de5b 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -483,7 +483,7 @@ func (p *PodStore) decorateMem(metric CIMetric, pod *corev1.Pod) { } func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { - if metric.GetTag(ci.MetricType) == ci.TypePod { + if metric.GetTag(ci.MetricType) == ci.TypePod || metric.GetTag(ci.MetricType) == ci.TypeGpuPod { metric.AddTag(ci.PodStatus, string(pod.Status.Phase)) if p.includeEnhancedMetrics { @@ -510,7 +510,7 @@ func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) { } p.setPrevMeasurement(ci.TypePod, podKey, prevPodMeasurement{containersRestarts: curContainerRestarts}) } - } else if metric.GetTag(ci.MetricType) == ci.TypeContainer { + } else if metric.GetTag(ci.MetricType) == ci.TypeContainer || metric.GetTag(ci.MetricType) == ci.TypeGpuContainer { if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" { for _, containerStatus := range pod.Status.ContainerStatuses { if containerStatus.Name == containerName { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index 266d80b7a723..6dd0a0a0ff04 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -10,13 +10,8 @@ import ( "time" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" ) -var _ cadvisor.Decorator = &K8sDecorator{} - // CIMetric represents the raw metric interface for container insights type CIMetric interface { HasField(key string) bool @@ -89,7 +84,7 @@ func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, return k, nil } -func (k *K8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric { +func (k *K8sDecorator) Decorate(metric CIMetric) CIMetric { kubernetesBlob := map[string]any{} for _, store := range k.stores { ok := store.Decorate(k.ctx, metric, kubernetesBlob) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils.go b/receiver/awscontainerinsightreceiver/internal/stores/utils.go index c1db9f3f9884..8c66c07a8fbc 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils.go @@ -20,6 +20,7 @@ const ( // https://github.com/kubernetes/apimachinery/blob/master/pkg/util/rand/rand.go#L83 kubeAllowedStringAlphaNums = "bcdfghjklmnpqrstvwxz2456789" cronJobAllowedString = "0123456789" + gpuNvidiaKey = "nvidia.com/gpu" ) func createPodKeyFromMetaData(pod *corev1.Pod) string { @@ -121,6 +122,8 @@ func TagMetricSource(metric CIMetric) { sources = append(sources, []string{"cadvisor", "calculated"}...) case ci.TypeContainerDiskIO: sources = append(sources, []string{"cadvisor"}...) + case ci.TypeGpuContainer: + sources = append(sources, []string{"pod", "calculated"}...) } if len(sources) > 0 { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 397d325cc773..57d2c2134cec 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -175,6 +175,7 @@ func TestUtils_TagMetricSource(t *testing.T) { ci.TypeContainer, ci.TypeContainerFS, ci.TypeContainerDiskIO, + ci.TypeGpuContainer, } expectedSources := []string{ @@ -188,6 +189,7 @@ func TestUtils_TagMetricSource(t *testing.T) { "[\"cadvisor\",\"pod\",\"calculated\"]", "[\"cadvisor\",\"calculated\"]", "[\"cadvisor\"]", + "[\"pod\",\"calculated\"]", } for i, mtype := range types { tags := map[string]string{ diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 0d659fa1c7b6..13a94e8c9cd5 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -98,7 +98,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) } - err = acir.initDcgmScraper(ctx, host, hostinfo) + err = acir.initDcgmScraper(ctx, host, hostinfo, k8sDecorator) if err != nil { acir.settings.Logger.Debug("Unable to start dcgm scraper", zap.Error(err)) } @@ -176,7 +176,7 @@ func (acir *awsContainerInsightReceiver) initPrometheusScraper(ctx context.Conte }) return err } -func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info) error { +func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info, decorator *stores.K8sDecorator) error { if !acir.config.EnableGpuMetric { return nil } @@ -188,6 +188,8 @@ func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, ho Consumer: acir.nextConsumer, Host: host, HostInfoProvider: hostinfo, + K8sDecorator: decorator, + Logger: acir.settings.Logger, }) return err }