diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go index 9e966097e256..55c4f2bac3aa 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go @@ -36,7 +36,7 @@ type RawMetric struct { } type MetricExtractor interface { - HasValue(summary *RawMetric) bool - GetValue(summary *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric + HasValue(summary RawMetric) bool + GetValue(summary RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric Shutdown() error } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index 8306937ceabd..d298546a4555 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -15,6 +15,7 @@ import ( cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" + kubeletsummaryprovider "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "go.opentelemetry.io/collector/pdata/pmetric" @@ -22,12 +23,12 @@ import ( ) type K8sWindows struct { - cancel context.CancelFunc - logger *zap.Logger - nodeName string `toml:"node_name"` - k8sDecorator stores.K8sDecorator - summaryProvider *kubeletSummaryProvider - hostInfo host.Info + cancel context.CancelFunc + logger *zap.Logger + nodeName string `toml:"node_name"` + k8sDecorator stores.K8sDecorator + kubeletSummaryProvider *kubeletsummaryprovider.SummaryProvider + hostInfo host.Info } var metricsExtractors = []extractors.MetricExtractor{} @@ -37,21 +38,23 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) if nodeName == "" { return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") } - k8sSummaryProvider, err := new(logger, hostInfo) - if err != nil { - logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err)) - return nil, err - } metricsExtractors = []extractors.MetricExtractor{} metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger)) metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger)) + + ksp, err := kubeletsummaryprovider.New(logger, &hostInfo, metricsExtractors) + if err != nil { + logger.Error("failed to initialize kubelet SummaryProvider, ", zap.Error(err)) + return nil, err + } + return &K8sWindows{ - logger: logger, - nodeName: nodeName, - k8sDecorator: *decorator, - summaryProvider: k8sSummaryProvider, - hostInfo: hostInfo, + logger: logger, + nodeName: nodeName, + k8sDecorator: *decorator, + kubeletSummaryProvider: ksp, + hostInfo: hostInfo, }, nil } @@ -59,14 +62,15 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics { k.logger.Debug("D! called K8sWindows GetMetrics") var result []pmetric.Metrics - metrics, err := k.summaryProvider.getMetrics() + metrics, err := k.kubeletSummaryProvider.GetMetrics() if err != nil { - k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err)) + k.logger.Error("failed to get metrics from kubelet SummaryProvider, ", zap.Error(err)) return result } + metrics = cExtractor.MergeMetrics(metrics) metrics = k.decorateMetrics(metrics) - for _, k8sSummaryMetric := range metrics { - md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger) + for _, ciMetric := range metrics { + md := ci.ConvertToOTLPMetrics(ciMetric.GetFields(), ciMetric.GetTags(), k.logger) result = append(result, md) } @@ -116,7 +120,3 @@ func (k *K8sWindows) Shutdown() error { k.logger.Debug("D! called K8sWindows Shutdown") return nil } - -func GetMetricsExtractors() []extractors.MetricExtractor { - return metricsExtractors -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go deleted file mode 100644 index 6e255466e198..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build windows -// +build windows - -package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" - -import ( - "fmt" - "os" - "strconv" - - ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" - "go.uber.org/zap" - stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" -) - -type kubeletSummaryProvider struct { - logger *zap.Logger - hostIP string - hostPort string - client *kubeletutil.KubeletClient - hostInfo host.Info -} - -func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) { - hostIP := os.Getenv("HOST_IP") - kclient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize kubelet client: %w", err) - } - - return &kubeletSummaryProvider{ - logger: logger, - client: kclient, - hostInfo: info, - }, nil -} - -func (k *kubeletSummaryProvider) getMetrics() ([]*cExtractor.CAdvisorMetric, error) { - summary, err := k.client.Summary(k.logger) - if err != nil { - k.logger.Error("kubelet summary API failed, ", zap.Error(err)) - return nil, err - } - - return k.getPodMetrics(summary) -} - -func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { - var metrics []*cExtractor.CAdvisorMetric - // todo: implement CPU, memory metrics from containers - return metrics, nil -} - -func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { - // todo: This is not complete implementation of pod level metric collection since network level metrics are pending - // May need to add some more pod level labels for store decorators to work properly - - var metrics []*cExtractor.CAdvisorMetric - - for _, pod := range summary.Pods { - k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name)) - - tags := map[string]string{} - - tags[ci.PodIDKey] = pod.PodRef.UID - tags[ci.K8sPodNameKey] = pod.PodRef.Name - tags[ci.K8sNamespace] = pod.PodRef.Namespace - - rawMetric := extractors.ConvertPodToRaw(pod) - tags[ci.Timestamp] = strconv.FormatInt(rawMetric.Time.UnixNano(), 10) - for _, extractor := range GetMetricsExtractors() { - if extractor.HasValue(rawMetric) { - metrics = append(metrics, extractor.GetValue(rawMetric, &k.hostInfo, ci.TypePod)...) - } - } - for _, metric := range metrics { - metric.AddTags(tags) - } - } - return metrics, nil -} - -func (k *kubeletSummaryProvider) getNodeMetrics() ([]*cExtractor.CAdvisorMetric, error) { - var metrics []*cExtractor.CAdvisorMetric - //todo: Implement CPU, memory and network metrics at node - return metrics, nil -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go new file mode 100644 index 000000000000..b9c5a5d9d6e6 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go @@ -0,0 +1,52 @@ +//go:build windows +// +build windows + +package kubelet + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" + "go.uber.org/zap" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +// KubeletProvider Represents interface to kubelet. +type KubeletProvider interface { + GetSummary() (*stats.Summary, error) +} + +type kubeletProvider struct { + logger *zap.Logger + hostIP string + hostPort string + client *kubeletutil.KubeletClient +} + +// getClient Returns singleton kubelet client. +func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) { + if kp.client != nil { + return kp.client, nil + } + kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, kp.logger) + if err != nil { + kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err)) + return nil, err + } + kp.client = kclient + return kclient, nil +} + +// GetSummary Get Summary from kubelet API. +func (kp *kubeletProvider) GetSummary() (*stats.Summary, error) { + kclient, err := kp.getClient() + if err != nil { + kp.logger.Error("failed to get kubelet client, ", zap.Error(err)) + return nil, err + } + + summary, err := kclient.Summary(kp.logger) + if err != nil { + kp.logger.Error("failure from kubelet on getting summary, ", zap.Error(err)) + return nil, err + } + return summary, nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go new file mode 100644 index 000000000000..e11449e95270 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go @@ -0,0 +1,162 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" + +import ( + "fmt" + "os" + "strconv" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" + + "go.uber.org/zap" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +// SummaryProvider represents receiver to get container metric from Kubelet. +type SummaryProvider struct { + logger *zap.Logger + kubeletProvider KubeletProvider + hostInfo cExtractor.CPUMemInfoProvider + metricExtractors []extractors.MetricExtractor +} + +func createDefaultKubeletProvider(logger *zap.Logger) KubeletProvider { + return &kubeletProvider{logger: logger, hostIP: os.Getenv("HOST_IP"), hostPort: ci.KubeSecurePort} +} + +// Options decorates SummaryProvider struct. +type Options func(provider *SummaryProvider) + +func New(logger *zap.Logger, info cExtractor.CPUMemInfoProvider, mextractor []extractors.MetricExtractor, opts ...Options) (*SummaryProvider, error) { + sp := &SummaryProvider{ + logger: logger, + hostInfo: info, + kubeletProvider: createDefaultKubeletProvider(logger), + metricExtractors: mextractor, + } + + for _, opt := range opts { + opt(sp) + } + return sp, nil +} + +func (sp *SummaryProvider) GetMetrics() ([]*cExtractor.CAdvisorMetric, error) { + var metrics []*cExtractor.CAdvisorMetric + + summary, err := sp.kubeletProvider.GetSummary() + if err != nil { + sp.logger.Error("failed to get summary from kubeletProvider, ", zap.Error(err)) + return nil, err + } + outMetrics, err := sp.getPodMetrics(summary) + if err != nil { + sp.logger.Error("failed to get pod metrics using kubelet summary, ", zap.Error(err)) + return metrics, err + } + metrics = append(metrics, outMetrics...) + + nodeMetics, err := sp.getNodeMetrics(summary) + if err != nil { + sp.logger.Error("failed to get node metrics using kubelet summary, ", zap.Error(err)) + return nodeMetics, err + } + metrics = append(metrics, nodeMetics...) + + return metrics, nil +} + +// getContainerMetrics returns container level metrics from kubelet summary. +func (sp *SummaryProvider) getContainerMetrics(pod stats.PodStats) ([]*cExtractor.CAdvisorMetric, error) { + var metrics []*cExtractor.CAdvisorMetric + + for _, container := range pod.Containers { + tags := map[string]string{} + + tags[ci.PodIDKey] = pod.PodRef.UID + tags[ci.K8sPodNameKey] = pod.PodRef.Name + tags[ci.K8sNamespace] = pod.PodRef.Namespace + tags[ci.ContainerNamekey] = container.Name + containerID := fmt.Sprintf("%s-%s", pod.PodRef.UID, container.Name) + tags[ci.ContainerIDkey] = containerID + + rawMetric := extractors.ConvertContainerToRaw(container, pod) + tags[ci.Timestamp] = strconv.FormatInt(rawMetric.Time.UnixNano(), 10) + + for _, extractor := range sp.metricExtractors { + if extractor.HasValue(rawMetric) { + metrics = append(metrics, extractor.GetValue(rawMetric, sp.hostInfo, ci.TypeContainer)...) + } + } + for _, metric := range metrics { + metric.AddTags(tags) + } + } + return metrics, nil +} + +// getPodMetrics returns pod and container level metrics from kubelet summary. +func (sp *SummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { + // todo: This is not complete implementation of pod level metric collection since network level metrics are pending + // May need to add some more pod level labels for store decorators to work properly + var metrics []*cExtractor.CAdvisorMetric + + if summary == nil { + return metrics, nil + } + + for _, pod := range summary.Pods { + var metricsPerPod []*cExtractor.CAdvisorMetric + + tags := map[string]string{} + + tags[ci.PodIDKey] = pod.PodRef.UID + tags[ci.K8sPodNameKey] = pod.PodRef.Name + tags[ci.K8sNamespace] = pod.PodRef.Namespace + + rawMetric := extractors.ConvertPodToRaw(pod) + tags[ci.Timestamp] = strconv.FormatInt(rawMetric.Time.UnixNano(), 10) + + for _, extractor := range sp.metricExtractors { + if extractor.HasValue(rawMetric) { + metricsPerPod = append(metricsPerPod, extractor.GetValue(rawMetric, sp.hostInfo, ci.TypePod)...) + } + } + for _, metric := range metricsPerPod { + metric.AddTags(tags) + } + metrics = append(metrics, metricsPerPod...) + + containerMetrics, err := sp.getContainerMetrics(pod) + if err != nil { + sp.logger.Error("failed to get container metrics, ", zap.Error(err)) + return containerMetrics, err + } + metrics = append(metrics, containerMetrics...) + } + return metrics, nil +} + +// getNodeMetrics returns Node level metrics from kubelet summary. +func (sp *SummaryProvider) getNodeMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { + var metrics []*cExtractor.CAdvisorMetric + + if summary == nil { + return metrics, nil + } + + rawMetric := extractors.ConvertNodeToRaw(summary.Node) + for _, extractor := range sp.metricExtractors { + if extractor.HasValue(rawMetric) { + metrics = append(metrics, extractor.GetValue(rawMetric, sp.hostInfo, ci.TypeNode)...) + } + } + return metrics, nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go new file mode 100644 index 000000000000..f5415573b115 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go @@ -0,0 +1,109 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package kubelet + +import ( + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "testing" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cTestUtils "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +// MockKubeletProvider Mock provider implements KubeletProvider interface. +type MockKubeletProvider struct { + logger *zap.Logger + t *testing.T +} + +func (m *MockKubeletProvider) GetSummary() (*stats.Summary, error) { + return testutils.LoadKubeletSummary(m.t, "./../extractors/testdata/CurSingleKubeletSummary.json"), nil +} + +func createKubeletDecoratorWithMockKubeletProvider(t *testing.T, logger *zap.Logger) Options { + return func(provider *SummaryProvider) { + provider.kubeletProvider = &MockKubeletProvider{t: t, logger: logger} + } +} + +func mockInfoProvider() cTestUtils.MockHostInfo { + hostInfo := cTestUtils.MockHostInfo{ClusterName: "cluster"} + return hostInfo +} + +func mockMetricExtractors() []extractors.MetricExtractor { + metricsExtractors := []extractors.MetricExtractor{} + metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(&zap.Logger{})) + metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(&zap.Logger{})) + return metricsExtractors +} + +// TestGetPodMetrics Verify tags on pod and container levels metrics. +func TestGetPodMetrics(t *testing.T) { + + k8sSummaryProvider, err := New(&zap.Logger{}, mockInfoProvider(), mockMetricExtractors(), createKubeletDecoratorWithMockKubeletProvider(t, &zap.Logger{})) + summary, err := k8sSummaryProvider.kubeletProvider.GetSummary() + metrics, err := k8sSummaryProvider.getPodMetrics(summary) + + assert.NoError(t, err) + assert.NotNil(t, metrics) + + podMetric := metrics[1] + assert.Equal(t, podMetric.GetMetricType(), ci.TypePod) + assert.NotNil(t, podMetric.GetTag(ci.PodIDKey)) + assert.NotNil(t, podMetric.GetTag(ci.K8sPodNameKey)) + assert.NotNil(t, podMetric.GetTag(ci.K8sNamespace)) + assert.NotNil(t, podMetric.GetTag(ci.Timestamp)) + + containerMetric := metrics[len(metrics)-1] + assert.Equal(t, containerMetric.GetMetricType(), ci.TypeContainer) + assert.NotNil(t, containerMetric.GetTag(ci.PodIDKey)) + assert.NotNil(t, containerMetric.GetTag(ci.K8sPodNameKey)) + assert.NotNil(t, containerMetric.GetTag(ci.K8sNamespace)) + assert.NotNil(t, containerMetric.GetTag(ci.Timestamp)) + assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey)) + assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey)) +} + +// TestGetContainerMetrics verify tags on container level metrics returned. +func TestGetContainerMetrics(t *testing.T) { + + k8sSummaryProvider, err := New(&zap.Logger{}, mockInfoProvider(), mockMetricExtractors(), createKubeletDecoratorWithMockKubeletProvider(t, &zap.Logger{})) + summary, err := k8sSummaryProvider.kubeletProvider.GetSummary() + + metrics, err := k8sSummaryProvider.getContainerMetrics(summary.Pods[0]) + assert.NoError(t, err) + assert.NotNil(t, metrics) + + containerMetric := metrics[1] + assert.Equal(t, containerMetric.GetMetricType(), ci.TypeContainer) + assert.NotNil(t, containerMetric.GetTag(ci.PodIDKey)) + assert.NotNil(t, containerMetric.GetTag(ci.K8sPodNameKey)) + assert.NotNil(t, containerMetric.GetTag(ci.K8sNamespace)) + assert.NotNil(t, containerMetric.GetTag(ci.Timestamp)) + assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey)) + assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey)) +} + +// TestGetNodeMetrics verify tags on node level metrics. +func TestGetNodeMetrics(t *testing.T) { + + k8sSummaryProvider, err := New(&zap.Logger{}, mockInfoProvider(), mockMetricExtractors(), createKubeletDecoratorWithMockKubeletProvider(t, &zap.Logger{})) + summary, err := k8sSummaryProvider.kubeletProvider.GetSummary() + + metrics, err := k8sSummaryProvider.getNodeMetrics(summary) + assert.NoError(t, err) + assert.NotNil(t, metrics) + + containerMetric := metrics[1] + assert.Equal(t, containerMetric.GetMetricType(), ci.TypeNode) +}