From d575b52b619f89809dba7724d27f6667a6341421 Mon Sep 17 00:00:00 2001 From: Kulwant Singh Date: Sat, 16 Dec 2023 11:00:04 -0800 Subject: [PATCH] Add pod level metric collection for Windows This PR defines code structure for metric provider which works on Windows. ### Changelog 1. Changed receiver.go in awscontainerinsights to run for Windows with metric provider. 2. Added summary API in kubeletclient 3. Add kubeletProvider to return metrics at different levels i.e. pod, contianer, node. 4. Updated hostInfo providers to run for Windows. 5. Updated ebsVolume Info provider to run for Windows. ## Todos: 1. Define correct ebsVolume Info provider for Windows 2. Change logic around k8s leader election to run for Windows --- .../cadvisor/extractors/cpu_extractor.go | 2 +- .../cadvisor/extractors/diskio_extractor.go | 2 +- .../internal/cadvisor/extractors/extractor.go | 2 +- .../cadvisor/extractors/fs_extractor.go | 2 +- .../cadvisor/extractors/mem_extractor.go | 2 +- .../cadvisor/extractors/net_extractor.go | 4 +- .../internal/host/ebsvolume.go | 7 ++ .../internal/host/nodeCapacity.go | 12 +- .../internal/host/utilconst_windows.go | 14 +++ .../internal/host/utils.go | 6 - .../internal/host/utilsconst.go | 13 ++ .../internal/k8swindows/k8swindows.go | 111 ++++++++++++++++++ .../k8swindows/k8swindows_nowindows.go | 32 +++++ .../internal/k8swindows/kubelet.go | 93 +++++++++++++++ .../stores/kubeletutil/kubeletclient.go | 20 ++++ .../awscontainerinsightreceiver/receiver.go | 71 ++++++----- 16 files changed, 346 insertions(+), 47 deletions(-) create mode 100644 receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go create mode 100644 receiver/awscontainerinsightreceiver/internal/host/utilsconst.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go index e2b8851d6740..08369aa2be90 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go @@ -33,7 +33,7 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf // When there is more than one stats point, always use the last one curStats := GetStats(info) - metric := newCadvisorMetric(containerType, c.logger) + metric := NewCadvisorMetric(containerType, c.logger) metric.cgroupPath = info.Name multiplier := float64(decimalToMillicores) assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go index 384acf0e5e4f..f3f3d5888c4c 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go @@ -41,7 +41,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat expectedKey := []string{ci.DiskIOAsync, ci.DiskIOSync, ci.DiskIORead, ci.DiskIOWrite, ci.DiskIOTotal} for _, cur := range curStatsSet { curDevName := devName(cur) - metric := newCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger) + metric := NewCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger) metric.tags[ci.DiskDev] = curDevName for _, key := range expectedKey { if curVal, curOk := cur.Stats[key]; curOk { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 398ad4805a59..457d5a3dbf46 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -44,7 +44,7 @@ type CAdvisorMetric struct { logger *zap.Logger } -func newCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric { +func NewCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric { metric := &CAdvisorMetric{ fields: make(map[string]any), tags: make(map[string]string), diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go index 6e2f888b461b..093ff0d4b1cf 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go @@ -33,7 +33,7 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem metrics := make([]*CAdvisorMetric, 0, len(stats.Filesystem)) for _, v := range stats.Filesystem { - metric := newCadvisorMetric(containerType, f.logger) + metric := NewCadvisorMetric(containerType, f.logger) if v.Device == "" { continue } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go index fe7750c39093..310061282750 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go @@ -28,7 +28,7 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf return metrics } - metric := newCadvisorMetric(containerType, m.logger) + metric := NewCadvisorMetric(containerType, m.logger) metric.cgroupPath = info.Name curStats := GetStats(info) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go index 3affa24971fd..d8bdb23fcfe9 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go @@ -70,7 +70,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro netIfceMetrics[i] = netIfceMetric - metric := newCadvisorMetric(mType, n.logger) + metric := NewCadvisorMetric(mType, n.logger) metric.tags[ci.NetIfce] = cur.Name for k, v := range netIfceMetric { metric.fields[ci.MetricName(mType, k)] = v @@ -81,7 +81,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro aggregatedFields := ci.SumFields(netIfceMetrics) if len(aggregatedFields) > 0 { - metric := newCadvisorMetric(containerType, n.logger) + metric := NewCadvisorMetric(containerType, n.logger) for k, v := range aggregatedFields { metric.fields[ci.MetricName(containerType, k)] = v } diff --git a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go index 23690675c287..e6eeefdb4c89 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go +++ b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "strings" "sync" "time" @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string { // for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html + + // Windows does not support file system eg: /rootfs to get Nvme Block name. + // todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device + if runtime.GOOS == "windows" { + return "" + } hasRootFs := true if _, err := e.osLstat(hostProc); os.IsNotExist(err) { hasRootFs = false diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go index 8c77d4634831..522591e95c82 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" "os" + "runtime" "github.com/shirou/gopsutil/v3/common" "github.com/shirou/gopsutil/v3/cpu" @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap opt(nc) } - if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { - return nil, err + ctx := context.Background() + if runtime.GOOS != "windows" { + if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { + return nil, err + } + envMap := common.EnvMap{common.HostProcEnvKey: hostProc} + ctx = context.WithValue(ctx, common.EnvKey, envMap) } - envMap := common.EnvMap{common.HostProcEnvKey: hostProc} - ctx := context.WithValue(context.Background(), common.EnvKey, envMap) nc.parseCPU(ctx) nc.parseMemory(ctx) diff --git a/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go b/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go new file mode 100644 index 000000000000..d6a53dcb0df8 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !linux +// +build !linux + +package host + +// These variables are invalid for Windows +const ( + rootfs = "" + hostProc = rootfs + "" + hostMounts = hostProc + "" +) diff --git a/receiver/awscontainerinsightreceiver/internal/host/utils.go b/receiver/awscontainerinsightreceiver/internal/host/utils.go index edd4d00833cb..b9e2fed4d854 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/host/utils.go @@ -10,12 +10,6 @@ import ( "time" ) -const ( - rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container - hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc" - hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host -) - func hostJitter(max time.Duration) time.Duration { hostName, err := os.Hostname() if err != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go b/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go new file mode 100644 index 000000000000..ab6bbada96ba --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go @@ -0,0 +1,13 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows +// +build !windows + +package host + +const ( + rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container + hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc" + hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host +) diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go new file mode 100644 index 000000000000..d0c806347f93 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package k8swindows + +import ( + "context" + "errors" + "os" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type K8sWindows struct { + cancel context.CancelFunc + logger *zap.Logger + nodeName string `toml:"node_name"` + k8sDecorator stores.K8sDecorator + summaryProvider *kubeletSummaryProvider + hostInfo host.Info +} + +func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) { + nodeName := os.Getenv("HOST_NAME") + if nodeName == "" { + return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") + } + k8sSummaryProvider, err := new(logger, hostInfo) + if err != nil { + logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err)) + return nil, err + } + return &K8sWindows{ + logger: logger, + nodeName: nodeName, + k8sDecorator: *decorator, + summaryProvider: k8sSummaryProvider, + hostInfo: hostInfo, + }, nil +} + +func (k *K8sWindows) GetMetrics() []pmetric.Metrics { + k.logger.Debug("D! called K8sWindows GetMetrics") + var result []pmetric.Metrics + + metrics, err := k.summaryProvider.getMetrics() + if err != nil { + k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err)) + return result + } + metrics = k.decorateMetrics(metrics) + for _, k8sSummaryMetric := range metrics { + md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger) + result = append(result, md) + } + + return result +} + +func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric { + //ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes() + var result []*extractors.CAdvisorMetric + for _, m := range cadvisormetrics { + tags := m.GetTags() + //c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV) + + // add version + //tags[ci.Version] = c.version + + // add nodeName for node, pod and container + metricType := tags[ci.MetricType] + if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) || + ci.IsPod(metricType) || ci.IsContainer(metricType)) { + tags[ci.NodeNameKey] = c.nodeName + } + + // add instance id and type + if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" { + tags[ci.InstanceID] = instanceID + } + if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" { + tags[ci.InstanceType] = instanceType + } + + // add scaling group name + tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName() + + // add tags for EKS + tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName() + + out := c.k8sDecorator.Decorate(m) + if out != nil { + result = append(result, out) + } + } + return result +} + +func (k *K8sWindows) Shutdown() error { + k.logger.Debug("D! called K8sWindows Shutdown") + return nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go new file mode 100644 index 000000000000..5f3d237afddb --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows +// +build !windows + +package k8swindows + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type K8sWindows struct { +} + +// New is a dummy function to construct a dummy K8sWindows struct for linux +func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) { + return &K8sWindows{}, nil +} + +// GetMetrics is a dummy function to always returns empty metrics for linux +func (k *K8sWindows) GetMetrics() []pmetric.Metrics { + return []pmetric.Metrics{} +} + +func (k *K8sWindows) Shutdown() error { + return nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go new file mode 100644 index 000000000000..a27d8e242850 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package k8swindows + +import ( + "fmt" + "os" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" + + "go.uber.org/zap" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +type kubeletSummaryProvider struct { + logger *zap.Logger + hostIP string + hostPort string + client *kubeletutil.KubeletClient + hostInfo host.Info +} + +func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) { + hostIP := os.Getenv("HOST_IP") + kclient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize kubelet client: %w", err) + } + return &kubeletSummaryProvider{ + logger: logger, + client: kclient, + hostInfo: info, + }, nil +} + +func (k *kubeletSummaryProvider) getMetrics() ([]*extractors.CAdvisorMetric, error) { + summary, err := k.client.Summary(k.logger) + if err != nil { + k.logger.Error("kubelet summary API failed, ", zap.Error(err)) + return nil, err + } + + return k.getPodMetrics(summary) +} + +func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { + var metrics []*extractors.CAdvisorMetric + // todo: implement CPU, memory metrics from containers + return metrics, nil +} + +func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { + // todo: This is not complete implementation of pod level metric collection since network level metrics are pending + // May need to add some more pod level labels for store decorators to work properly + + var metrics []*extractors.CAdvisorMetric + + nodeCPUCores := k.hostInfo.GetNumCores() + for _, pod := range summary.Pods { + k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name)) + metric := extractors.NewCadvisorMetric(ci.TypePod, k.logger) + + metric.AddField(ci.PodIDKey, pod.PodRef.UID) + metric.AddField(ci.K8sPodNameKey, pod.PodRef.Name) + metric.AddField(ci.K8sNamespace, pod.PodRef.Namespace) + + // CPU metric + metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), *pod.CPU.UsageCoreNanoSeconds) + metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores)) + + // Memory metrics + metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity()) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100) + metrics = append(metrics, metric) + } + return metrics, nil +} + +func (k *kubeletSummaryProvider) getNodeMetrics() ([]*extractors.CAdvisorMetric, error) { + var metrics []*extractors.CAdvisorMetric + //todo: Implement CPU, memory and network metrics at node + return metrics, nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go index 85ec8cb8b8be..d838a9b64066 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet" @@ -62,3 +63,22 @@ func (k *KubeletClient) ListPods() ([]corev1.Pod, error) { return pods.Items, nil } + +// Summary hits kubelet summary API using service account authentication. +// Summary API returns metrics at container, pod and node level for CPU, memory, networking and file system resources. +func (k *KubeletClient) Summary(logger *zap.Logger) (*stats.Summary, error) { + logger.Debug("Calling kubelet /stats/summary API") + + b, err := k.restClient.Get("/stats/summary") + if err != nil { + return nil, fmt.Errorf("call to kubelet /stats/summary API failed %w", err) + } + var out stats.Summary + err = json.Unmarshal(b, &out) + if err != nil { + return nil, fmt.Errorf("kubelet summary unmarshalling failed %w", err) + } + + logger.Debug("/stats/summary API response unmarshalled successfully") + return &out, nil +} diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 9972acd01718..0db9a3b02820 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -6,6 +6,7 @@ package awscontainerinsightreceiver // import "github.com/open-telemetry/opentel import ( "context" "errors" + "runtime" "time" "go.opentelemetry.io/collector/component" @@ -21,6 +22,7 @@ import ( ecsinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/ecsInfo" hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" ) @@ -33,13 +35,13 @@ type metricsProvider interface { // awsContainerInsightReceiver implements the receiver.Metrics type awsContainerInsightReceiver struct { - settings component.TelemetrySettings - nextConsumer consumer.Metrics - config *Config - cancel context.CancelFunc - cadvisor metricsProvider - k8sapiserver metricsProvider - prometheusScraper *k8sapiserver.PrometheusScraper + settings component.TelemetrySettings + nextConsumer consumer.Metrics + config *Config + cancel context.CancelFunc + containerMetricsProvider metricsProvider + k8sapiserver metricsProvider + prometheusScraper *k8sapiserver.PrometheusScraper } // newAWSContainerInsightReceiver creates the aws container insight receiver with the given parameters. @@ -74,26 +76,34 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone return err } - decoratorOption := cadvisor.WithDecorator(k8sDecorator) - acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption) - if err != nil { - return err - } + // todo: will need to use k8sAPIServer to add stats about enhanced CI. + if runtime.GOOS == "windows" { + acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, k8sDecorator, *hostinfo) + if err != nil { + return err + } + } else { + decoratorOption := cadvisor.WithDecorator(k8sDecorator) + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption) + if err != nil { + return err + } - leaderElection, err := k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), - k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) - if err != nil { - return err - } + leaderElection, err := k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), + k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) + if err != nil { + return err + } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) - if err != nil { - return err - } + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) + if err != nil { + return err + } - err = acir.startPrometheusScraper(ctx, host, hostinfo, leaderElection) - if err != nil { - acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + err = acir.startPrometheusScraper(ctx, host, hostinfo, leaderElection) + if err != nil { + acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + } } } if acir.config.ContainerOrchestrator == ci.ECS { @@ -105,7 +115,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone ecsOption := cadvisor.WithECSInfoCreator(ecsInfo) - acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, ecsOption) + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, ecsOption) if err != nil { return err } @@ -186,8 +196,8 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { if acir.k8sapiserver != nil { errs = errors.Join(errs, acir.k8sapiserver.Shutdown()) } - if acir.cadvisor != nil { - errs = errors.Join(errs, acir.cadvisor.Shutdown()) + if acir.containerMetricsProvider != nil { + errs = errors.Join(errs, acir.containerMetricsProvider.Shutdown()) } return errs @@ -197,14 +207,15 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { // collectData collects container stats from Amazon ECS Task Metadata Endpoint func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error { var mds []pmetric.Metrics - if acir.cadvisor == nil && acir.k8sapiserver == nil { + + if acir.containerMetricsProvider == nil && acir.k8sapiserver == nil { err := errors.New("both cadvisor and k8sapiserver failed to start") acir.settings.Logger.Error("Failed to collect stats", zap.Error(err)) return err } - if acir.cadvisor != nil { - mds = append(mds, acir.cadvisor.GetMetrics()...) + if acir.containerMetricsProvider != nil { + mds = append(mds, acir.containerMetricsProvider.GetMetrics()...) } if acir.k8sapiserver != nil {