Skip to content

Commit

Permalink
Add kubelet summary API for Windows (amazon-contributing#142)
Browse files Browse the repository at this point in the history
* Add pod level metric collection for Windows

This PR defines code structure for metric provider which works on Windows.

1. Changed receiver.go in awscontainerinsights to run for Windows with metric provider.
2. Added summary API in kubeletclient
3. Add kubeletProvider to return metrics at different levels i.e. pod, contianer, node.
4. Updated hostInfo providers to run for Windows.
5. Updated ebsVolume Info provider to run for Windows.

1. Define correct ebsVolume Info provider for Windows
2. Change logic around k8s leader election to run for Windows

# Conflicts:
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go
#	receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go
#	receiver/awscontainerinsightreceiver/receiver.go
  • Loading branch information
KlwntSingh committed Mar 1, 2024
1 parent 9cb314e commit 8995fb9
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/klog v1.0.0
k8s.io/kubelet v0.28.3
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
)

Expand Down
2 changes: 2 additions & 0 deletions receiver/awscontainerinsightreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt
func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string {
// for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html

// Windows does not support file system eg: /rootfs to get Nvme Block name.
// todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device
if runtime.GOOS == "windows" {
return ""
}
hasRootFs := true
if _, err := e.osLstat(hostProc); os.IsNotExist(err) {
hasRootFs = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"os"
"runtime"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
Expand Down Expand Up @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap
opt(nc)
}

if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
ctx := context.Background()
if runtime.GOOS != "windows" {
if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx = context.WithValue(ctx, common.EnvKey, envMap)
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx := context.WithValue(context.Background(), common.EnvKey, envMap)

nc.parseCPU(ctx)
nc.parseMemory(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !linux
// +build !linux

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

// These variables are invalid for Windows
const (
rootfs = ""
hostProc = rootfs + ""
hostMounts = hostProc + ""
)
6 changes: 0 additions & 6 deletions receiver/awscontainerinsightreceiver/internal/host/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"time"
)

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)

func hostJitter(max time.Duration) time.Duration {
hostName, err := os.Hostname()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/host/utilsconst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)
110 changes: 110 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"context"
"errors"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
cancel context.CancelFunc
logger *zap.Logger
nodeName string `toml:"node_name"`
k8sDecorator stores.K8sDecorator
summaryProvider *kubeletSummaryProvider
hostInfo host.Info
}

func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) {
nodeName := os.Getenv("HOST_NAME")
if nodeName == "" {
return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config")
}
k8sSummaryProvider, err := new(logger, hostInfo)
if err != nil {
logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err))
return nil, err
}
return &K8sWindows{
logger: logger,
nodeName: nodeName,
k8sDecorator: *decorator,
summaryProvider: k8sSummaryProvider,
hostInfo: hostInfo,
}, nil
}

func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
k.logger.Debug("D! called K8sWindows GetMetrics")
var result []pmetric.Metrics

metrics, err := k.summaryProvider.getMetrics()
if err != nil {
k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err))
return result
}
metrics = k.decorateMetrics(metrics)
for _, k8sSummaryMetric := range metrics {
md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger)
result = append(result, md)
}

return result
}

func (c *K8sWindows) decorateMetrics(windowsmetrics []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*stores.RawContainerInsightsMetric
for _, m := range windowsmetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)

// add version
//tags[ci.Version] = c.version

// add nodeName for node, pod and container
metricType := tags[ci.MetricType]
if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
ci.IsPod(metricType) || ci.IsContainer(metricType)) {
tags[ci.NodeNameKey] = c.nodeName
}

// add instance id and type
if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" {
tags[ci.InstanceID] = instanceID
}
if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" {
tags[ci.InstanceType] = instanceType
}

// add scaling group name
tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName()

// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out.(*stores.RawContainerInsightsMetric))
}
}
return result
}

func (k *K8sWindows) Shutdown() error {
k.logger.Debug("D! called K8sWindows Shutdown")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
}

// New is a dummy function to construct a dummy K8sWindows struct for linux
func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) {
return &K8sWindows{}, nil
}

// GetMetrics is a dummy function to always returns empty metrics for linux
func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return []pmetric.Metrics{}
}

func (k *K8sWindows) Shutdown() error {
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"fmt"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"

"go.uber.org/zap"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

type kubeletSummaryProvider struct {
logger *zap.Logger
hostIP string
hostPort string
client *kubeletutil.KubeletClient
hostInfo host.Info
}

func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) {
hostIP := os.Getenv("HOST_IP")
kclient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize kubelet client: %w", err)
}
return &kubeletSummaryProvider{
logger: logger,
client: kclient,
hostInfo: info,
}, nil
}

func (k *kubeletSummaryProvider) getMetrics() ([]*stores.RawContainerInsightsMetric, error) {
summary, err := k.client.Summary(k.logger)
if err != nil {
k.logger.Error("kubelet summary API failed, ", zap.Error(err))
return nil, err
}

return k.getPodMetrics(summary)
}

func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*stores.RawContainerInsightsMetric, error) {
var metrics []*stores.RawContainerInsightsMetric
// todo: implement CPU, memory metrics from containers
return metrics, nil
}

func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*stores.RawContainerInsightsMetric, error) {
// todo: This is not complete implementation of pod level metric collection since network level metrics are pending
// May need to add some more pod level labels for store decorators to work properly

var metrics []*stores.RawContainerInsightsMetric

nodeCPUCores := k.hostInfo.GetNumCores()
for _, pod := range summary.Pods {
k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name))
metric := stores.NewRawContainerInsightsMetric(ci.TypePod, k.logger)

metric.AddField(ci.AttributePodID, pod.PodRef.UID)
metric.AddField(ci.AttributeK8sPodName, pod.PodRef.Name)
metric.AddField(ci.AttributeK8sNamespace, pod.PodRef.Namespace)

// CPU metric
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), *pod.CPU.UsageCoreNanoSeconds)
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores))

// Memory metrics
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity())
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100)
metrics = append(metrics, metric)
}
return metrics, nil
}

func (k *kubeletSummaryProvider) getNodeMetrics() ([]*stores.RawContainerInsightsMetric, error) {
var metrics []*stores.RawContainerInsightsMetric
//todo: Implement CPU, memory and network metrics at node
return metrics, nil
}
Loading

0 comments on commit 8995fb9

Please sign in to comment.