Skip to content

Commit

Permalink
Merge branch 'ci-neuron' into ci-neuron
Browse files Browse the repository at this point in the history
  • Loading branch information
sam6134 authored Mar 6, 2024
2 parents 3168bb2 + 9cb314e commit 0c8eac2
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 11 deletions.
2 changes: 1 addition & 1 deletion internal/aws/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestCanCreateTransport(t *testing.T) {

_, err := NewServer(cfg, logger)
assert.Error(t, err, "NewServer should fail")
assert.Contains(t, err.Error(), "failed to parse proxy URL")
assert.Contains(t, err.Error(), "invalid control character in URL")
}

func TestGetServiceEndpointInvalidAWSConfig(t *testing.T) {
Expand Down
235 changes: 235 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package gpu

import (
"context"
"errors"
"fmt"
"time"

configutil "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/model/relabel"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)

const (
caFile = "/etc/amazon-cloudwatch-observability-agent-cert/tls-ca.crt"
collectionInterval = 60 * time.Second
jobName = "containerInsightsDCGMExporterScraper"
scraperMetricsPath = "/metrics"
scraperK8sServiceSelector = "k8s-app=dcgm-exporter-service"
)

type DcgmScraper struct {
ctx context.Context
settings component.TelemetrySettings
host component.Host
hostInfoProvider hostInfoProvider
prometheusReceiver receiver.Metrics
k8sDecorator Decorator
running bool
}

type DcgmScraperOpts struct {
Ctx context.Context
TelemetrySettings component.TelemetrySettings
Consumer consumer.Metrics
Host component.Host
HostInfoProvider hostInfoProvider
K8sDecorator Decorator
Logger *zap.Logger
}

type hostInfoProvider interface {
GetClusterName() string
GetInstanceID() string
}

func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) {
if opts.Consumer == nil {
return nil, errors.New("consumer cannot be nil")
}
if opts.Host == nil {
return nil, errors.New("host cannot be nil")
}
if opts.HostInfoProvider == nil {
return nil, errors.New("cluster name provider cannot be nil")
}

promConfig := prometheusreceiver.Config{
PrometheusConfig: &config.Config{
ScrapeConfigs: []*config.ScrapeConfig{getScraperConfig(opts.HostInfoProvider)},
},
}

params := receiver.CreateSettings{
TelemetrySettings: opts.TelemetrySettings,
}

decoConsumer := decorateConsumer{
containerOrchestrator: ci.EKS,
nextConsumer: opts.Consumer,
k8sDecorator: opts.K8sDecorator,
logger: opts.Logger,
}

promFactory := prometheusreceiver.NewFactory()
promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, &decoConsumer)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus receiver: %w", err)
}

return &DcgmScraper{
ctx: opts.Ctx,
settings: opts.TelemetrySettings,
host: opts.Host,
hostInfoProvider: opts.HostInfoProvider,
prometheusReceiver: promReceiver,
k8sDecorator: opts.K8sDecorator,
}, nil
}

func getScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
return &config.ScrapeConfig{
HTTPClientConfig: configutil.HTTPClientConfig{
TLSConfig: configutil.TLSConfig{
CAFile: caFile,
InsecureSkipVerify: false,
},
},
ScrapeInterval: model.Duration(collectionInterval),
ScrapeTimeout: model.Duration(collectionInterval),
JobName: jobName,
Scheme: "https",
MetricsPath: scraperMetricsPath,
ServiceDiscoveryConfigs: discovery.Configs{
&kubernetes.SDConfig{
Role: kubernetes.RoleService,
NamespaceDiscovery: kubernetes.NamespaceDiscovery{
IncludeOwnNamespace: true,
},
Selectors: []kubernetes.SelectorConfig{
{
Role: kubernetes.RoleService,
Label: scraperK8sServiceSelector,
},
},
},
},
MetricRelabelConfigs: getMetricRelabelConfig(hostInfoProvider),
}
}

func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config {
return []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("DCGM_.*"),
Action: relabel.Keep,
},
{
SourceLabels: model.LabelNames{"Hostname"},
TargetLabel: ci.NodeNameKey,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"namespace"},
TargetLabel: ci.AttributeK8sNamespace,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
// hacky way to inject static values (clusterName & instanceId) to label set without additional processor
// relabel looks up an existing label then creates another label with given key (TargetLabel) and value (static)
{
SourceLabels: model.LabelNames{"namespace"},
TargetLabel: ci.ClusterNameKey,
Regex: relabel.MustNewRegexp(".*"),
Replacement: hostInfoProvider.GetClusterName(),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"namespace"},
TargetLabel: ci.InstanceID,
Regex: relabel.MustNewRegexp(".*"),
Replacement: hostInfoProvider.GetInstanceID(),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"pod"},
TargetLabel: ci.AttributeFullPodName,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
// additional k8s podname for service name and k8s blob decoration
{
SourceLabels: model.LabelNames{"pod"},
TargetLabel: ci.AttributeK8sPodName,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"container"},
TargetLabel: ci.AttributeContainerName,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"device"},
TargetLabel: ci.AttributeGpuDevice,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
}
}

func (ds *DcgmScraper) GetMetrics() []pmetric.Metrics {
// This method will never return metrics because the metrics are collected by the scraper.
// This method will ensure the scraper is running
if !ds.running {
ds.settings.Logger.Info("The scraper is not running, starting up the scraper")
err := ds.prometheusReceiver.Start(ds.ctx, ds.host)
if err != nil {
ds.settings.Logger.Error("Unable to start PrometheusReceiver", zap.Error(err))
}
ds.running = err == nil
}

return nil
}

func (ds *DcgmScraper) Shutdown() {
if ds.running {
err := ds.prometheusReceiver.Shutdown(ds.ctx)
if err != nil {
ds.settings.Logger.Error("Unable to shutdown PrometheusReceiver", zap.Error(err))
}
ds.running = false
}

if ds.k8sDecorator != nil {
err := ds.k8sDecorator.Shutdown()
if err != nil {
ds.settings.Logger.Error("Unable to shutdown K8sDecorator", zap.Error(err))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) {
TelemetrySettings: scraper.Settings,
}
scraper.PrometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.Ctx, params, &promConfig, consumer)

assert.NoError(t, err)
assert.NotNil(t, mp)
defer mp.Close()
Expand Down
140 changes: 140 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/gpu/decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package gpu

import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

const (
gpuUtil = "DCGM_FI_DEV_GPU_UTIL"
gpuMemUtil = "DCGM_FI_DEV_FB_USED_PERCENT"
gpuMemUsed = "DCGM_FI_DEV_FB_USED"
gpuMemTotal = "DCGM_FI_DEV_FB_TOTAL"
gpuTemperature = "DCGM_FI_DEV_GPU_TEMP"
gpuPowerDraw = "DCGM_FI_DEV_POWER_USAGE"
)

var metricToUnit = map[string]string{
gpuUtil: "Percent",
gpuMemUtil: "Percent",
gpuMemUsed: "Bytes",
gpuMemTotal: "Bytes",
gpuTemperature: "None",
gpuPowerDraw: "None",
}

// GPU decorator acts as an interceptor of metrics before the scraper sends them to the next designated consumer
type decorateConsumer struct {
containerOrchestrator string
nextConsumer consumer.Metrics
k8sDecorator Decorator
logger *zap.Logger
}

func (dc *decorateConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: true,
}
}

func (dc *decorateConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
resourceTags := make(map[string]string)
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
// get resource attributes
ras := rms.At(i).Resource().Attributes()
ras.Range(func(k string, v pcommon.Value) bool {
resourceTags[k] = v.AsString()
return true
})
ilms := rms.At(i).ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ms := ilms.At(j).Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
converted := ci.ConvertToFieldsAndTags(m, dc.logger)
var rcis []*stores.RawContainerInsightsMetric
for _, pair := range converted {
rcis = append(rcis, stores.NewRawContainerInsightsMetricWithData(ci.TypeGpuContainer, pair.Fields, pair.Tags, dc.logger))
}

decorated := dc.decorateMetrics(rcis)
dc.updateAttributes(m, decorated)
if unit, ok := metricToUnit[m.Name()]; ok {
m.SetUnit(unit)
}
}
}
}
return dc.nextConsumer.ConsumeMetrics(ctx, md)
}

type Decorator interface {
Decorate(stores.CIMetric) stores.CIMetric
Shutdown() error
}

func (dc *decorateConsumer) decorateMetrics(rcis []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric {
var result []*stores.RawContainerInsightsMetric
if dc.containerOrchestrator != ci.EKS {
return result
}
for _, rci := range rcis {
// add tags for EKS
out := dc.k8sDecorator.Decorate(rci)
if out != nil {
result = append(result, out.(*stores.RawContainerInsightsMetric))
}
}
return result
}

func (dc *decorateConsumer) updateAttributes(m pmetric.Metric, rcis []*stores.RawContainerInsightsMetric) {
if len(rcis) == 0 {
return
}
var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
default:
dc.logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String()))
}
if dps.Len() == 0 {
return
}
for i := 0; i < dps.Len(); i++ {
if i >= len(rcis) {
// this shouldn't be the case, but it helps to avoid panic
continue
}
attrs := dps.At(i).Attributes()
tags := rcis[i].Tags
for tk, tv := range tags {
// type gets set with metrictransformer while duplicating metrics at different resource levels
if tk == ci.MetricType {
continue
}
attrs.PutStr(tk, tv)
}
}
}

func (dc *decorateConsumer) Shutdown() error {
if dc.k8sDecorator != nil {
return dc.k8sDecorator.Shutdown()
}
return nil
}
Loading

0 comments on commit 0c8eac2

Please sign in to comment.