Skip to content

Commit

Permalink
add gpu metric consumer that uses k8s decorator for attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
movence committed Feb 26, 2024
1 parent 52cd972 commit 1444acd
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 22 deletions.
4 changes: 4 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ const (
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
TypeGpuContainer = "ContainerGPU"
TypeGpuPod = "PodGPU"
TypeGpuNode = "NodeGPU"
TypeGpuCluster = "ClusterGPU"

// unit
UnitBytes = "Bytes"
Expand Down
33 changes: 33 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,39 @@ func GetUnitForMetric(metric string) string {
return metricToUnitMap[metric]
}

// ConvertToFieldsAndTags converts OTLP metric to a field containing metric values and a tag containing for decoration
func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) (map[string]any, map[string]string) {
fields := make(map[string]any)
tags := make(map[string]string)
if m.Name() == "" {
return fields, tags
}

// value is not needed for label decoration
fields[m.Name()] = 0

var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
default:
logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String()))
}

// should support metrics with more than 1 datapoints?
if dps.Len() > 1 {
logger.Warn("Metric with more than 1 datapoint is not supported", zap.String("metric", m.Name()), zap.Int("datapoints", dps.Len()))
}
attrs := dps.At(0).Attributes()
attrs.Range(func(k string, v pcommon.Value) bool {
tags[k] = v.Str()
return true
})
return fields, tags
}

// ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics
func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics {
md := pmetric.NewMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cInfo "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager"
"github.com/google/cadvisor/utils/sysfs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -109,7 +110,7 @@ type EcsInfo interface {
}

type Decorator interface {
Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric
Decorate(stores.CIMetric) stores.CIMetric
Shutdown() error
}

Expand Down Expand Up @@ -307,7 +308,7 @@ func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric)

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
result = append(result, out.(*extractors.CAdvisorMetric))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager"
"github.com/google/cadvisor/utils/sysfs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ var mockCreateManagerWithError = func(memoryCache *memory.InMemoryCache, sysfs s
type MockK8sDecorator struct {
}

func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric {
func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric {
return metric
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"time"

cinfo "github.com/google/cadvisor/info/v1"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)

var _ stores.CIMetric = (*CAdvisorMetric)(nil)

func GetStats(info *cinfo.ContainerInfo) *cinfo.ContainerStats {
if len(info.Stats) == 0 {
return nil
Expand Down
29 changes: 21 additions & 8 deletions receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"time"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
Expand All @@ -35,6 +36,7 @@ type DcgmScraper struct {
host component.Host
hostInfoProvider hostInfoProvider
prometheusReceiver receiver.Metrics
k8sDecorator Decorator
running bool
}

Expand All @@ -44,6 +46,8 @@ type DcgmScraperOpts struct {
Consumer consumer.Metrics
Host component.Host
HostInfoProvider hostInfoProvider
K8sDecorator Decorator
Logger *zap.Logger
}

type hostInfoProvider interface {
Expand Down Expand Up @@ -137,13 +141,6 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) {
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"pod"},
TargetLabel: "PodName",
Regex: relabel.MustNewRegexp("(.+)-(.+)"),
Replacement: "${1}",
Action: relabel.Replace,
},
// additional k8s podname for service name decoration
{
SourceLabels: model.LabelNames{"pod"},
Expand Down Expand Up @@ -179,8 +176,15 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) {
TelemetrySettings: opts.TelemetrySettings,
}

decoConsumer := decorateConsumer{
containerOrchestrator: ci.EKS,
nextConsumer: opts.Consumer,
k8sDecorator: opts.K8sDecorator,
logger: opts.Logger,
}

promFactory := prometheusreceiver.NewFactory()
promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, opts.Consumer)
promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, &decoConsumer)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus receiver: %w", err)
}
Expand All @@ -191,6 +195,7 @@ func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) {
host: opts.Host,
hostInfoProvider: opts.HostInfoProvider,
prometheusReceiver: promReceiver,
k8sDecorator: opts.K8sDecorator,
}, nil
}

Expand All @@ -205,6 +210,7 @@ func (ds *DcgmScraper) GetMetrics() []pmetric.Metrics {
}
ds.running = err == nil
}

return nil
}

Expand All @@ -216,4 +222,11 @@ func (ds *DcgmScraper) Shutdown() {
}
ds.running = false
}

if ds.k8sDecorator != nil {
err := ds.k8sDecorator.Shutdown()
if err != nil {
ds.settings.Logger.Error("Unable to shutdown K8sDecorator", zap.Error(err))
}
}
}
Loading

0 comments on commit 1444acd

Please sign in to comment.