Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit pod status metrics for pending pods from api server #139

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions internal/aws/k8s/k8sclient/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type PodClient interface {
// Get the mapping between the namespace and the number of belonging pods
NamespaceToRunningPodNum() map[string]int
PodInfos() []*PodInfo
}

type podClientOption func(*podClient)
Expand All @@ -39,6 +40,7 @@ type podClient struct {

mu sync.RWMutex
namespaceToRunningPodNumMap map[string]int
podInfos []*PodInfo
}

func (c *podClient) NamespaceToRunningPodNum() map[string]int {
Expand All @@ -50,22 +52,35 @@ func (c *podClient) NamespaceToRunningPodNum() map[string]int {
return c.namespaceToRunningPodNumMap
}

func (c *podClient) PodInfos() []*PodInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.podInfos
}

func (c *podClient) refresh() {
c.mu.Lock()
defer c.mu.Unlock()

objsList := c.store.List()
namespaceToRunningPodNumMapNew := make(map[string]int)
podInfos := make([]*PodInfo, 0)
for _, obj := range objsList {
pod := obj.(*podInfo)
if pod.phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.namespace] = 1
pod := obj.(*PodInfo)
podInfos = append(podInfos, pod)

if pod.Phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.Namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.Namespace] = 1
} else {
namespaceToRunningPodNumMapNew[pod.namespace] = podNum + 1
namespaceToRunningPodNumMapNew[pod.Namespace] = podNum + 1
}
}
}
c.podInfos = podInfos
c.namespaceToRunningPodNumMap = namespaceToRunningPodNumMapNew
}

Expand Down Expand Up @@ -105,9 +120,14 @@ func transformFuncPod(obj interface{}) (interface{}, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Pod type", obj)
}
info := new(podInfo)
info.namespace = pod.Namespace
info.phase = pod.Status.Phase
info := new(PodInfo)
info.Name = pod.Name
info.Namespace = pod.Namespace
info.Uid = string(pod.UID)
info.Labels = pod.Labels
info.OwnerReferences = pod.OwnerReferences
info.Phase = pod.Status.Phase
info.Conditions = pod.Status.Conditions
return info, nil
}

Expand Down
12 changes: 9 additions & 3 deletions internal/aws/k8s/k8sclient/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c

import (
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type podInfo struct {
namespace string
phase v1.PodPhase
type PodInfo struct {
Name string
Namespace string
Uid string
Labels map[string]string
OwnerReferences []metaV1.OwnerReference
Phase v1.PodPhase
Conditions []v1.PodCondition
}
39 changes: 39 additions & 0 deletions internal/aws/k8s/k8sclient/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,42 @@ func TestTransformFuncPod(t *testing.T) {
assert.Nil(t, info)
assert.NotNil(t, err)
}

func TestPodClient_PodNameToPodMap(t *testing.T) {
skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078")
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
setOption := podSyncCheckerOption(&mockReflectorSyncChecker{})

samplePodArray := []interface{}{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Name: "kube-proxy-csm88",
Namespace: "kube-system",
SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88",
},
Status: v1.PodStatus{
Phase: "Running",
},
},
}

fakeClientSet := fake.NewSimpleClientset()
client := newPodClient(fakeClientSet, zap.NewNop(), setOption)
assert.NoError(t, client.store.Replace(samplePodArray, ""))
client.refresh()

expectedArray := []*PodInfo{
{
Name: "kube-proxy-csm88",
Namespace: "kube-system",
Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Labels: map[string]string{},
Phase: v1.PodRunning,
},
}

resultMap := client.PodInfos()
assert.Equal(t, expectedArray, resultMap)
client.shutdown()
assert.True(t, client.stopped)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,51 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
)

// K8sAPIServer is a struct that produces metrics from kubernetes api server
type K8sAPIServer struct {
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
addFullPodNameMetricLabel bool
includeEnhancedMetrics bool
}

type clusterNameProvider interface {
GetClusterName() string
GetInstanceID() string
GetInstanceType() string
}

type Option func(*K8sAPIServer)

// NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, options ...Option) (*K8sAPIServer, error) {
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use the option pattern so the signature doesnt have to change everywhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be taken care of as part of the code cleanup task


k := &K8sAPIServer{
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
addFullPodNameMetricLabel: addFullPodNameMetricLabel,
includeEnhancedMetrics: includeEnhancedMetrics,
}

for _, opt := range options {
Expand Down Expand Up @@ -85,6 +96,7 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics {
result = append(result, k.getServiceMetrics(clusterName, timestampNs)...)
result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getPodMetrics(clusterName, timestampNs)...)
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved

return result
}
Expand Down Expand Up @@ -279,6 +291,137 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p
return metrics
}

func (k *K8sAPIServer) getPodMetrics(clusterName, timestampNs string) []pmetric.Metrics {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
var metrics []pmetric.Metrics
podsList := k.leaderElection.podClient.PodInfos()
podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames()

for _, podInfo := range podsList {
if podInfo.Phase == corev1.PodPending {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
fields := map[string]interface{}{}

if k.includeEnhancedMetrics {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
addPodStatusMetrics(fields, podInfo)
addPodConditionMetrics(fields, podInfo)
}

attributes := map[string]string{
ci.ClusterNameKey: clusterName,
ci.MetricType: ci.TypePod,
ci.Timestamp: timestampNs,
ci.PodNameKey: podInfo.Name,
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
ci.K8sNamespace: podInfo.Namespace,
ci.Version: "0",
}

podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name)
if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok {
if len(serviceList) > 0 {
attributes[ci.TypeService] = serviceList[0]
}
}

if k.nodeName != "" {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
attributes["NodeName"] = k.nodeName
}
// add instance id and type of the leader node
if instanceID := k.clusterNameProvider.GetInstanceID(); instanceID != "" {
attributes[ci.InstanceID] = instanceID
}
if instanceType := k.clusterNameProvider.GetInstanceType(); instanceType != "" {
attributes[ci.InstanceType] = instanceType
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
}

attributes[ci.PodStatus] = "Pending"
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved

kubernetesBlob := map[string]interface{}{}
k.getKubernetesBlob(podInfo, kubernetesBlob, attributes)
if k.nodeName != "" {
kubernetesBlob["host"] = k.nodeName
}
if len(kubernetesBlob) > 0 {
kubernetesInfo, err := json.Marshal(kubernetesBlob)
if err != nil {
k.logger.Warn("Error parsing kubernetes blob for pod metrics")
} else {
attributes[ci.Kubernetes] = string(kubernetesInfo)
}
}
attributes[ci.SourcesKey] = "[\"apiserver\"]"
md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger)
metrics = append(metrics, md)
}
}
return metrics
}

func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob map[string]interface{}, attributes map[string]string) {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
var owners []interface{}
podName := ""
for _, owner := range pod.OwnerReferences {
if owner.Kind != "" && owner.Name != "" {
kind := owner.Kind
name := owner.Name
if owner.Kind == ci.ReplicaSet {
rsToDeployment := k.leaderElection.replicaSetClient.ReplicaSetToDeployment()
if parent := rsToDeployment[owner.Name]; parent != "" {
kind = ci.Deployment
name = parent
} else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" {
kind = ci.Deployment
name = parent
}
} else if owner.Kind == ci.Job {
if parent := parseCronJobFromJob(owner.Name); parent != "" {
kind = ci.CronJob
name = parent
} else if !k.addFullPodNameMetricLabel {
name = getJobNamePrefix(name)
}
}
owners = append(owners, map[string]string{"owner_kind": kind, "owner_name": name})

if podName == "" {
if owner.Kind == ci.StatefulSet {
podName = pod.Name
} else if owner.Kind == ci.DaemonSet || owner.Kind == ci.Job ||
owner.Kind == ci.ReplicaSet || owner.Kind == ci.ReplicationController {
podName = name
}
}
}
}

if len(owners) > 0 {
kubernetesBlob["pod_owners"] = owners
}

labels := make(map[string]string)
for k, v := range pod.Labels {
labels[k] = v
}
if len(labels) > 0 {
kubernetesBlob["labels"] = labels
}
kubernetesBlob["namespace_name"] = pod.Namespace
kubernetesBlob["pod_id"] = pod.Uid

// if podName is not set according to a well-known controllers, then set it to its own name
if podName == "" {
if strings.HasPrefix(pod.Name, KubeProxy) && !k.addFullPodNameMetricLabel {
podName = KubeProxy
} else {
podName = pod.Name
}
}

attributes[ci.PodNameKey] = podName
if k.addFullPodNameMetricLabel {
attributes[ci.FullPodNameKey] = pod.Name
kubernetesBlob["pod_name"] = pod.Name
}
}

// Shutdown stops the k8sApiServer
func (k *K8sAPIServer) Shutdown() error {
if k.cancel != nil {
Expand Down
Loading
Loading