diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 2414437ad863..bf185edf34e9 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -20,6 +20,7 @@ import ( type PodClient interface { // Get the mapping between the namespace and the number of belonging pods NamespaceToRunningPodNum() map[string]int + PodInfos() []*PodInfo } type podClientOption func(*podClient) @@ -39,6 +40,7 @@ type podClient struct { mu sync.RWMutex namespaceToRunningPodNumMap map[string]int + podInfos []*PodInfo } func (c *podClient) NamespaceToRunningPodNum() map[string]int { @@ -50,22 +52,35 @@ func (c *podClient) NamespaceToRunningPodNum() map[string]int { return c.namespaceToRunningPodNumMap } +func (c *podClient) PodInfos() []*PodInfo { + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.podInfos +} + func (c *podClient) refresh() { c.mu.Lock() defer c.mu.Unlock() objsList := c.store.List() namespaceToRunningPodNumMapNew := make(map[string]int) + podInfos := make([]*PodInfo, 0) for _, obj := range objsList { - pod := obj.(*podInfo) - if pod.phase == v1.PodRunning { - if podNum, ok := namespaceToRunningPodNumMapNew[pod.namespace]; !ok { - namespaceToRunningPodNumMapNew[pod.namespace] = 1 + pod := obj.(*PodInfo) + podInfos = append(podInfos, pod) + + if pod.Phase == v1.PodRunning { + if podNum, ok := namespaceToRunningPodNumMapNew[pod.Namespace]; !ok { + namespaceToRunningPodNumMapNew[pod.Namespace] = 1 } else { - namespaceToRunningPodNumMapNew[pod.namespace] = podNum + 1 + namespaceToRunningPodNumMapNew[pod.Namespace] = podNum + 1 } } } + c.podInfos = podInfos c.namespaceToRunningPodNumMap = namespaceToRunningPodNumMapNew } @@ -105,9 +120,14 @@ func transformFuncPod(obj interface{}) (interface{}, error) { if !ok { return nil, fmt.Errorf("input obj %v is not Pod type", obj) } - info := new(podInfo) - info.namespace = pod.Namespace - info.phase = pod.Status.Phase + info := new(PodInfo) + info.Name = pod.Name + info.Namespace = pod.Namespace + info.Uid = string(pod.UID) + info.Labels = pod.Labels + info.OwnerReferences = pod.OwnerReferences + info.Phase = pod.Status.Phase + info.Conditions = pod.Status.Conditions return info, nil } diff --git a/internal/aws/k8s/k8sclient/pod_info.go b/internal/aws/k8s/k8sclient/pod_info.go index 333111b39ad3..cdf19bdba8e3 100644 --- a/internal/aws/k8s/k8sclient/pod_info.go +++ b/internal/aws/k8s/k8sclient/pod_info.go @@ -5,9 +5,15 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c import ( v1 "k8s.io/api/core/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type podInfo struct { - namespace string - phase v1.PodPhase +type PodInfo struct { + Name string + Namespace string + Uid string + Labels map[string]string + OwnerReferences []metaV1.OwnerReference + Phase v1.PodPhase + Conditions []v1.PodCondition } diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index 899eff2776d6..95cd83733a96 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -191,3 +191,42 @@ func TestTransformFuncPod(t *testing.T) { assert.Nil(t, info) assert.NotNil(t, err) } + +func TestPodClient_PodNameToPodMap(t *testing.T) { + skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078") + setOption := podSyncCheckerOption(&mockReflectorSyncChecker{}) + + samplePodArray := []interface{}{ + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Name: "kube-proxy-csm88", + Namespace: "kube-system", + SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88", + }, + Status: v1.PodStatus{ + Phase: "Running", + }, + }, + } + + fakeClientSet := fake.NewSimpleClientset() + client := newPodClient(fakeClientSet, zap.NewNop(), setOption) + assert.NoError(t, client.store.Replace(samplePodArray, "")) + client.refresh() + + expectedArray := []*PodInfo{ + { + Name: "kube-proxy-csm88", + Namespace: "kube-system", + Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Labels: map[string]string{}, + Phase: v1.PodRunning, + }, + } + + resultMap := client.PodInfos() + assert.Equal(t, expectedArray, resultMap) + client.shutdown() + assert.True(t, client.stopped) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index af79de6801e5..5f2e4117a53c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -5,25 +5,32 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" + "encoding/json" "errors" "fmt" "os" "strconv" + "strings" "time" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) // K8sAPIServer is a struct that produces metrics from kubernetes api server type K8sAPIServer struct { - nodeName string // get the value from downward API - logger *zap.Logger - clusterNameProvider clusterNameProvider - cancel context.CancelFunc - leaderElection *LeaderElection + nodeName string // get the value from downward API + logger *zap.Logger + clusterNameProvider clusterNameProvider + cancel context.CancelFunc + leaderElection *LeaderElection + addFullPodNameMetricLabel bool + includeEnhancedMetrics bool } type clusterNameProvider interface { @@ -33,12 +40,14 @@ type clusterNameProvider interface { type Option func(*K8sAPIServer) // NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics -func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, options ...Option) (*K8sAPIServer, error) { +func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) { k := &K8sAPIServer{ - logger: logger, - clusterNameProvider: cnp, - leaderElection: leaderElection, + logger: logger, + clusterNameProvider: cnp, + leaderElection: leaderElection, + addFullPodNameMetricLabel: addFullPodNameMetricLabel, + includeEnhancedMetrics: includeEnhancedMetrics, } for _, opt := range options { @@ -85,6 +94,7 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { result = append(result, k.getServiceMetrics(clusterName, timestampNs)...) result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...) result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...) + result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...) return result } @@ -279,6 +289,129 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p return metrics } +// Statues and conditions for all pods assigned to a node are determined in podstore.go. Given Pending pods do not have a node allocated to them, we need to fetch their details from the K8s API Server here. +func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + podsList := k.leaderElection.podClient.PodInfos() + podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames() + + for _, podInfo := range podsList { + if podInfo.Phase == corev1.PodPending { + fields := map[string]interface{}{} + + if k.includeEnhancedMetrics { + addPodStatusMetrics(fields, podInfo) + addPodConditionMetrics(fields, podInfo) + } + + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypePod, + ci.Timestamp: timestampNs, + ci.PodNameKey: podInfo.Name, + ci.K8sNamespace: podInfo.Namespace, + ci.Version: "0", + } + + podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name) + if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok { + if len(serviceList) > 0 { + attributes[ci.TypeService] = serviceList[0] + } + } + + attributes[ci.PodStatus] = string(corev1.PodPending) + attributes["k8s.node.name"] = "pending" + + kubernetesBlob := map[string]interface{}{} + k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) + if k.nodeName != "" { + kubernetesBlob["host"] = k.nodeName + } + if len(kubernetesBlob) > 0 { + kubernetesInfo, err := json.Marshal(kubernetesBlob) + if err != nil { + k.logger.Warn("Error parsing kubernetes blob for pod metrics") + } else { + attributes[ci.Kubernetes] = string(kubernetesInfo) + } + } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + metrics = append(metrics, md) + } + } + return metrics +} + +// TODO this is duplicated code from podstore.go, move this to a common package to re-use +func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob map[string]interface{}, attributes map[string]string) { + var owners []interface{} + podName := "" + for _, owner := range pod.OwnerReferences { + if owner.Kind != "" && owner.Name != "" { + kind := owner.Kind + name := owner.Name + if owner.Kind == ci.ReplicaSet { + rsToDeployment := k.leaderElection.replicaSetClient.ReplicaSetToDeployment() + if parent := rsToDeployment[owner.Name]; parent != "" { + kind = ci.Deployment + name = parent + } else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" { + kind = ci.Deployment + name = parent + } + } else if owner.Kind == ci.Job { + if parent := parseCronJobFromJob(owner.Name); parent != "" { + kind = ci.CronJob + name = parent + } else if !k.addFullPodNameMetricLabel { + name = getJobNamePrefix(name) + } + } + owners = append(owners, map[string]string{"owner_kind": kind, "owner_name": name}) + + if podName == "" { + if owner.Kind == ci.StatefulSet { + podName = pod.Name + } else if owner.Kind == ci.DaemonSet || owner.Kind == ci.Job || + owner.Kind == ci.ReplicaSet || owner.Kind == ci.ReplicationController { + podName = name + } + } + } + } + + if len(owners) > 0 { + kubernetesBlob["pod_owners"] = owners + } + + labels := make(map[string]string) + for k, v := range pod.Labels { + labels[k] = v + } + if len(labels) > 0 { + kubernetesBlob["labels"] = labels + } + kubernetesBlob["namespace_name"] = pod.Namespace + kubernetesBlob["pod_id"] = pod.Uid + + // if podName is not set according to a well-known controllers, then set it to its own name + if podName == "" { + if strings.HasPrefix(pod.Name, KubeProxy) && !k.addFullPodNameMetricLabel { + podName = KubeProxy + } else { + podName = pod.Name + } + } + + attributes[ci.PodNameKey] = podName + if k.addFullPodNameMetricLabel { + attributes[ci.FullPodNameKey] = pod.Name + kubernetesBlob["pod_name"] = pod.Name + } +} + // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 1324cbe6a4eb..592ad36f2ef5 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -132,6 +132,12 @@ func (client *MockClient) NamespaceToRunningPodNum() map[string]int { return args.Get(0).(map[string]int) } +// k8sclient.PodClient +func (client *MockClient) PodInfos() []*k8sclient.PodInfo { + args := client.Called() + return args.Get(0).([]*k8sclient.PodInfo) +} + // k8sclient.NodeClient func (client *MockClient) ClusterFailedNodeCount() int { args := client.Called() @@ -149,6 +155,12 @@ func (client *MockClient) ServiceToPodNum() map[k8sclient.Service]int { return args.Get(0).(map[k8sclient.Service]int) } +// k8sclient.EpClient +func (client *MockClient) PodKeyToServiceNames() map[string][]string { + args := client.Called() + return args.Get(0).(map[string][]string) +} + type mockEventBroadcaster struct { } @@ -212,7 +224,7 @@ func (m mockClusterNameProvider) GetClusterName() string { } func TestK8sAPIServer_New(t *testing.T) { - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false) assert.Nil(t, k8sAPIServer) assert.NotNil(t, err) } @@ -285,6 +297,18 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { }, }, }) + mockClient.On("PodInfos").Return([]*k8sclient.PodInfo{ + { + Name: "kube-proxy-csm88", + Namespace: "kube-system", + Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Phase: v1.PodPending, + }, + }) + mockClient.On("PodKeyToServiceNames").Return(map[string][]string{ + "namespace:kube-system,podName:coredns-7554568866-26jdf": {"kube-dns"}, + "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, + }) leaderElection := &LeaderElection{ k8sClient: &mockK8sClient{}, @@ -302,7 +326,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true) assert.NotNil(t, k8sAPIServer) assert.Nil(t, err) @@ -366,6 +390,17 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.K8sNamespace)) assert.Equal(t, "statefulset1", getStringAttrVal(metric, ci.PodNameKey)) assert.Equal(t, "ClusterStatefulSet", getStringAttrVal(metric, ci.MetricType)) + case ci.TypePod: + assertMetricValueEqual(t, metric, "pod_status_pending", int64(1)) + assertMetricValueEqual(t, metric, "pod_status_running", int64(0)) + assertMetricValueEqual(t, metric, "pod_status_failed", int64(0)) + assertMetricValueEqual(t, metric, "pod_status_ready", int64(0)) + assertMetricValueEqual(t, metric, "pod_status_scheduled", int64(0)) + assertMetricValueEqual(t, metric, "pod_status_succeeded", int64(0)) + assertMetricValueEqual(t, metric, "pod_status_unknown", int64(0)) + assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.K8sNamespace)) + assert.Equal(t, "Pending", getStringAttrVal(metric, "pod_status")) + assert.Equal(t, "Pod", getStringAttrVal(metric, ci.MetricType)) default: assert.Fail(t, "Unexpected metric type: "+metricType) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go new file mode 100644 index 000000000000..779f68f8c647 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -0,0 +1,129 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sapiserver + +import ( + "regexp" + "strings" + + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" +) + +const ( + // kubeAllowedStringAlphaNums holds the characters allowed in replicaset names from as parent deployment + // https://github.com/kubernetes/apimachinery/blob/master/pkg/util/rand/rand.go#L83 + kubeAllowedStringAlphaNums = "bcdfghjklmnpqrstvwxz2456789" + splitRegexStr = "\\.|-" + KubeProxy = "kube-proxy" + cronJobAllowedString = "0123456789" +) + +var ( + re = regexp.MustCompile(splitRegexStr) + + podPhaseMetricNames = map[corev1.PodPhase]string{ + corev1.PodPending: "pod_status_pending", + corev1.PodRunning: "pod_status_running", + corev1.PodSucceeded: "pod_status_succeeded", + corev1.PodFailed: "pod_status_failed", + } + + podConditionMetricNames = map[corev1.PodConditionType]string{ + corev1.PodReady: "pod_status_ready", + corev1.PodScheduled: "pod_status_scheduled", + } + + podConditionUnknownMetric = "pod_status_unknown" +) + +func addPodStatusMetrics(field map[string]interface{}, pod *k8sclient.PodInfo) { + for _, metricName := range podPhaseMetricNames { + field[metricName] = 0 + } + + statusMetricName, validStatus := podPhaseMetricNames[pod.Phase] + if validStatus { + field[statusMetricName] = 1 + } +} + +func addPodConditionMetrics(field map[string]interface{}, pod *k8sclient.PodInfo) { + for _, metricName := range podConditionMetricNames { + field[metricName] = 0 + } + + field[podConditionUnknownMetric] = 0 + + for _, condition := range pod.Conditions { + switch condition.Status { + case corev1.ConditionTrue: + if statusMetricName, ok := podConditionMetricNames[condition.Type]; ok { + field[statusMetricName] = 1 + } + case corev1.ConditionUnknown: + if _, ok := podConditionMetricNames[condition.Type]; ok { + field[podConditionUnknownMetric] = 1 + } + } + } +} + +// get the cronJob name by stripping the last dash following some rules +// return empty if it is not following the rule +func parseCronJobFromJob(name string) string { + lastDash := strings.LastIndexAny(name, "-") + if lastDash == -1 { + // No dash + return "" + } + suffix := name[lastDash+1:] + if len(suffix) != 10 { + // Invalid suffix if it is not 10 rune + return "" + } + + if !stringInRuneset(suffix, cronJobAllowedString) { + // Invalid suffix + return "" + } + + return name[:lastDash] +} + +func getJobNamePrefix(podName string) string { + return re.Split(podName, 2)[0] +} + +func stringInRuneset(name, subset string) bool { + for _, r := range name { + if !strings.ContainsRune(subset, r) { + // Found an unexpected rune in suffix + return false + } + } + return true +} + +// get the deployment name by stripping the last dash following some rules +// return empty if it is not following the rule +func parseDeploymentFromReplicaSet(name string) string { + lastDash := strings.LastIndexAny(name, "-") + if lastDash == -1 { + // No dash + return "" + } + suffix := name[lastDash+1:] + if len(suffix) < 3 { + // Invalid suffix if it is less than 3 + return "" + } + + if !stringInRuneset(suffix, kubeAllowedStringAlphaNums) { + // Invalid suffix + return "" + } + return name[:lastDash] +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go new file mode 100644 index 000000000000..509b1828168f --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sapiserver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" +) + +func TestUtils_parseDeploymentFromReplicaSet(t *testing.T) { + assert.Equal(t, "", parseDeploymentFromReplicaSet("cloudwatch-agent")) + assert.Equal(t, "cloudwatch-agent", parseDeploymentFromReplicaSet("cloudwatch-agent-42kcz")) +} + +func TestUtils_parseCronJobFromJob(t *testing.T) { + assert.Equal(t, "", parseCronJobFromJob("hello-123")) + assert.Equal(t, "hello", parseCronJobFromJob("hello-1234567890")) + assert.Equal(t, "", parseCronJobFromJob("hello-123456789a")) +} + +func TestPodStore_addPodStatusMetrics(t *testing.T) { + fields := map[string]interface{}{} + testPodInfo := k8sclient.PodInfo{ + Name: "kube-proxy-csm88", + Namespace: "kube-system", + Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Labels: map[string]string{}, + Phase: v1.PodRunning, + } + addPodStatusMetrics(fields, &testPodInfo) + + expectedFieldsArray := map[string]interface{}{ + "pod_status_pending": 0, + "pod_status_running": 1, + "pod_status_succeeded": 0, + "pod_status_failed": 0, + } + assert.Equal(t, expectedFieldsArray, fields) +} + +func TestPodStore_addPodConditionMetrics(t *testing.T) { + fields := map[string]interface{}{} + testPodInfo := k8sclient.PodInfo{ + Name: "kube-proxy-csm88", + Namespace: "kube-system", + Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Labels: map[string]string{}, + Phase: v1.PodRunning, + } + addPodConditionMetrics(fields, &testPodInfo) + + expectedFieldsArray := map[string]interface{}{ + "pod_status_ready": 0, + "pod_status_scheduled": 0, + "pod_status_unknown": 0, + } + assert.Equal(t, expectedFieldsArray, fields) +} diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 8a14ba3c2b26..9972acd01718 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -86,7 +86,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone return err } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) if err != nil { return err }