Skip to content

Commit

Permalink
Implement new receiver to add prometheus scraper for Kueue metrics (#253
Browse files Browse the repository at this point in the history
)

Implement new receiver to add prometheus scraper for Kueue metrics (#253)
  • Loading branch information
rvasahu-amazon authored Nov 14, 2024
1 parent 2091941 commit cb251fe
Show file tree
Hide file tree
Showing 15 changed files with 2,104 additions and 4 deletions.
24 changes: 24 additions & 0 deletions .chloggen-aws/kueue-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'new_component'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscontainerinsightskueuereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds receiver that sets up Prometheus scraper to collect select Kueue metrics."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [1] # TODO: PR number in staging is 1. Maybe this needs to be changed to something else.

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Implements a Prometheus scraper to collect Kueue metrics from the Kueue controller manager |
service if Kueue is installed.

# e.g. '[aws]'
# Include 'aws' if the change is done done by cwa
# Default: '[user]'
change_logs: [rvasahu-amazon]
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") {
strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsKueueMetricsScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
Expand Down
15 changes: 15 additions & 0 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
gpuMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsDCGMExporterScraper")
neuronMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
neuronMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNeuronMonitorScraper")
kueueMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
kueueMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKueueMetricsScraper")

counterSumMetrics := map[string]*metricInfo{
"spanCounter": {
Expand Down Expand Up @@ -387,6 +389,19 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
"myServiceNS/containerInsightsNeuronMonitorScraper",
containerInsightsReceiver,
},
{
"kueue receiver",
kueueMetric,
map[string]string{
"isItAnError": "false",
"spanName": "testSpan",
},
map[string]string{
"spanName": "testSpan",
},
"myServiceNS/containerInsightsKueueMetricsScraper",
containerInsightsReceiver,
},
}

for _, tc := range testCases {
Expand Down
22 changes: 22 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ const (
HyperPodSchedulable = "schedulable"
HyperPodUnschedulable = "unschedulable"

// kueue metrics

KueuePendingWorkloads = "kueue_pending_workloads"
KueueEvictedWorkloadsTotal = "kueue_evicted_workloads_total"
KueueAdmittedActiveWorkloads = "kueue_admitted_active_workloads"
KueueClusterQueueResourceUsage = "kueue_cluster_queue_resource_usage"
KueueClusterQueueNominalQuota = "kueue_cluster_queue_nominal_quota"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
Expand All @@ -173,6 +181,10 @@ const (
TypeContainer = "Container"
TypeContainerFS = "ContainerFS"
TypeContainerDiskIO = "ContainerDiskIO"

// kueue metric types
TypeClusterQueue = "ClusterQueue"

// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
Expand All @@ -189,6 +201,7 @@ const (
// unit
UnitBytes = "Bytes"
UnitMegaBytes = "Megabytes"
UnitSecond = "Second"
UnitNanoSecond = "Nanoseconds"
UnitBytesPerSec = "Bytes/Second"
UnitCount = "Count"
Expand Down Expand Up @@ -325,6 +338,15 @@ func init() {
NodeCount: UnitCount,
FailedNodeCount: UnitCount,

// kueue metrics
KueuePendingWorkloads: UnitCount,
KueueEvictedWorkloadsTotal: UnitCount,
KueueAdmittedActiveWorkloads: UnitCount,
KueueClusterQueueResourceUsage: UnitCount,
KueueClusterQueueNominalQuota: UnitCount,
// unit for KueueClusterQueue resource metrics depend on resource type. UnitCount is appropriate
// for CPU and CPU cores, but UnitBytes would be more appropriate for resource type memory.

// others
RunningPodCount: UnitCount,
RunningContainerCount: UnitCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ type PodStore struct {
enableAcceleratedComputeMetrics bool
}

func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedComputeMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool,
enableAcceleratedComputeMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
if hostName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName)
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/awscontainerinsightreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func (acir *awsContainerInsightReceiver) initEKS(ctx context.Context, host compo
hostName string, kubeletClient *kubeletutil.KubeletClient) error {
k8sDecorator, err := stores.NewK8sDecorator(ctx, kubeletClient, acir.config.TagService, acir.config.PrefFullPodName,
acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel,
acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics, acir.config.KubeConfigPath, hostName,
acir.config.RunOnSystemd, acir.settings.Logger)
acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics,
acir.config.KubeConfigPath, hostName, acir.config.RunOnSystemd, acir.settings.Logger)
if err != nil {
acir.settings.Logger.Warn("Unable to start K8s decorator", zap.Error(err))
} else {
Expand Down Expand Up @@ -286,6 +286,7 @@ func (acir *awsContainerInsightReceiver) initPrometheusScraper(ctx context.Conte
})
return err
}

func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, host component.Host, hostInfo *hostinfo.Info, localNodeDecorator stores.Decorator) error {
if !acir.config.EnableAcceleratedComputeMetrics {
return nil
Expand Down
1 change: 1 addition & 0 deletions receiver/awscontainerinsightskueuereceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
18 changes: 18 additions & 0 deletions receiver/awscontainerinsightskueuereceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awscontainerinsightskueuereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver"

import (
"time"
)

// Config defines configuration for aws ecs container metrics receiver.
type Config struct {
// CollectionInterval is the interval at which metrics should be collected. The default is 60 second.
CollectionInterval time.Duration `mapstructure:"collection_interval"`

// ClusterName can be used to explicitly provide the Cluster's Name for scenarios where it's not
// possible to auto-detect it using EC2 tags.
ClusterName string `mapstructure:"cluster_name"`
}
58 changes: 58 additions & 0 deletions receiver/awscontainerinsightskueuereceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awscontainerinsightskueuereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver"

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
kueueMetricsStability = component.StabilityLevelDevelopment
)

var (
receiverType component.Type = component.MustNewType("awscontainerinsightskueuereceiver")
)

// Factory for awscontainerinsightreceiver
const (
// Default collection interval. Every 60s the receiver will collect metrics
defaultCollectionInterval = 60 * time.Second

// Rely on EC2 tags to auto-detect cluster name by default
defaultClusterName = ""
)

// NewFactory creates a factory for AWS container insight receiver
func NewFactory() receiver.Factory {
return receiver.NewFactory(
receiverType,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, kueueMetricsStability))
}

// createDefaultConfig returns a default config for the receiver.
func createDefaultConfig() component.Config {
return &Config{
CollectionInterval: defaultCollectionInterval,
ClusterName: defaultClusterName,
}
}

// CreateMetricsReceiver creates an AWS Container Insight receiver.
func createMetricsReceiver(
_ context.Context,
params receiver.Settings,
baseCfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {

rCfg := baseCfg.(*Config)
return newAWSContainerInsightReceiver(params.TelemetrySettings, rCfg, consumer)
}
Loading

0 comments on commit cb251fe

Please sign in to comment.