Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubelet availability check for systemd CI. #216

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

// Environment variables
HostName = "HOST_NAME"
HostIP = "HOST_IP"
RunInContainer = "RUN_IN_CONTAINER"
RunAsHostProcessContainer = "RUN_AS_HOST_PROCESS_CONTAINER"

Expand Down
22 changes: 14 additions & 8 deletions internal/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,25 @@ type tlsClientProvider struct {
}

func (p *tlsClientProvider) BuildClient() (Client, error) {
rootCAs, err := systemCertPoolPlusPath(p.cfg.CAFile)
if err != nil {
return nil, err
}
clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile)
if err != nil {
return nil, err
var rootCAs *x509.CertPool
var certificates []tls.Certificate
if !p.cfg.InsecureSkipVerify {
var err error
rootCAs, err = systemCertPoolPlusPath(p.cfg.CAFile)
if err != nil {
return nil, err
}
clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile)
if err != nil {
return nil, err
}
certificates = []tls.Certificate{clientCert}
}
return defaultTLSClient(
p.endpoint,
p.cfg.InsecureSkipVerify,
rootCAs,
[]tls.Certificate{clientCert},
certificates,
nil,
p.logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) {
if kp.client != nil {
return kp.client, nil
}
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, "", kp.logger)
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, nil, kp.logger)
if err != nil {
kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err))
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type KubeletClient struct {
restClient kubelet.Client
}

func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger *zap.Logger) (*KubeletClient, error) {
func NewKubeletClient(kubeIP string, port string, clientConfig *kubelet.ClientConfig, logger *zap.Logger) (*KubeletClient, error) {
kubeClient := &KubeletClient{
Port: port,
KubeIP: kubeIP,
Expand All @@ -37,17 +37,10 @@ func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger
endpoint = endpoint + ":" + port

// use service account for authentication by default
clientConfig := &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
if kubeConfigPath != "" {
// use kube-config for authentication
if clientConfig == nil {
clientConfig = &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeKubeConfig,
KubeConfigPath: kubeConfigPath,
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
}
Expand Down Expand Up @@ -98,3 +91,29 @@ func (k *KubeletClient) Summary(logger *zap.Logger) (*stats.Summary, error) {
logger.Debug("/stats/summary API response unmarshalled successfully")
return &out, nil
}

func ClientConfig(kubeConfigPath string, isSystemd bool) *kubelet.ClientConfig {
if !isSystemd {
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
}
if kubeConfigPath != "" {
// use kube-config for authentication
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeKubeConfig,
KubeConfigPath: kubeConfigPath,
},
}
}
// insecure TLS if not provided
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeTLS,
},
InsecureSkipVerify: true,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestNewKubeletClient(t *testing.T) {
},
}
for _, tt := range tests {
client, err := NewKubeletClient(tt.kubeIP, tt.port, "", zap.NewNop())
client, err := NewKubeletClient(tt.kubeIP, tt.port, nil, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, client.KubeIP, tt.kubeIP)
fc := (client.restClient).(*fakeClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"fmt"
"os"

"go.uber.org/zap"

Expand Down Expand Up @@ -42,18 +41,14 @@ type Decorator interface {
}

func NewLocalNodeDecorator(logger *zap.Logger, containerOrchestrator string, hostInfo hostInfo, hostName string, options ...Option) (*LocalNodeDecorator, error) {
nodeName := os.Getenv(ci.HostName)
if nodeName == "" && containerOrchestrator == ci.EKS {
nodeName = hostName
if nodeName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName)
}
if hostName == "" && containerOrchestrator == ci.EKS {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName)
}

d := &LocalNodeDecorator{
hostInfo: hostInfo,
version: "0",
nodeName: nodeName,
nodeName: hostName,
containerOrchestrator: containerOrchestrator,
logger: logger,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,11 @@ func TestNewLocalNodeDecorator(t *testing.T) {
assert.NotNil(t, d)
assert.NoError(t, err)
assert.Equal(t, d.nodeName, "test-hostname")

t.Setenv(ci.HostName, "host")
d, err = NewLocalNodeDecorator(logger, "eks", nil, "")
assert.NotNil(t, d)
assert.NoError(t, err)
assert.Equal(t, d.nodeName, "host")
}

func TestEbsVolumeInfo(t *testing.T) {
t.Setenv(ci.HostName, "host")
hostInfo := testutils.MockHostInfo{}
d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "")
d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "host")
assert.NotNil(t, d)
assert.NoError(t, err)

Expand Down Expand Up @@ -83,7 +76,6 @@ func (d mockK8sDecorator) Shutdown() error {
}

func TestExpectedTags(t *testing.T) {
t.Setenv(ci.HostName, "host")
hostInfo := testutils.MockHostInfo{ClusterName: "my-cluster"}
k8sDecorator := mockK8sDecorator{}
ecsInfo := testutils.MockECSInfo{}
Expand Down Expand Up @@ -136,7 +128,7 @@ func TestExpectedTags(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo))
d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "host", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo))
assert.NotNil(t, d)
assert.NoError(t, err)

Expand Down
34 changes: 9 additions & 25 deletions receiver/awscontainerinsightreceiver/internal/stores/podstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"log"
"os"
"regexp"
"strings"
"sync"
Expand All @@ -21,7 +20,6 @@ import (
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)

const (
Expand Down Expand Up @@ -122,48 +120,34 @@ type PodStore struct {
includeEnhancedMetrics bool
}

func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, kubeConfigPath, logger)
if err != nil {
return nil, err
func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool,
includeEnhancedMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
if hostName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName)
}

// Try to detect kubelet permission issue here
if _, err := podClient.ListPods(); err != nil {
return nil, fmt.Errorf("cannot get pod from kubelet, err: %w", err)
}

nodeName := os.Getenv(ci.HostName)
if nodeName == "" {
nodeName = hostName
if nodeName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName)
}
}

k8sClient := &k8sclient.K8sClient{}
var k8sClient *k8sclient.K8sClient
nodeInfo := &nodeInfo{
nodeName: nodeName,
nodeName: hostName,
provider: nil,
logger: logger,
}
if !isSystemdEnabled {
k8sClient = k8sclient.Get(logger,
k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)),
k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", hostName)),
k8sclient.CaptureNodeLevelInfo(true),
)

if k8sClient == nil {
return nil, errors.New("failed to start pod store because k8sclient is nil")
}
nodeInfo = newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger)
nodeInfo = newNodeInfo(hostName, k8sClient.GetNodeClient(), logger)
}

podStore := &PodStore{
cache: newMapWithExpiry(podsExpiry),
prevMeasurements: sync.Map{},
//prevMeasurements: make(map[string]*mapWithExpiry),
podClient: podClient,
podClient: client,
nodeInfo: nodeInfo,
prefFullPodName: prefFullPodName,
includeEnhancedMetrics: includeEnhancedMetrics,
Expand Down
18 changes: 6 additions & 12 deletions receiver/awscontainerinsightreceiver/internal/stores/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"errors"
"os"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)

// CIMetric represents the raw metric interface for container insights
Expand Down Expand Up @@ -43,21 +43,15 @@ type K8sDecorator struct {
podStore *PodStore
}

func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, customHostIP string, customHostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) {
hostIP := os.Getenv("HOST_IP")
if hostIP == "" {
hostIP = customHostIP
if hostIP == "" {
return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config or passed as part of the agent config")
}
}

func NewK8sDecorator(ctx context.Context, kubeletClient *kubeletutil.KubeletClient, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool,
addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string,
hostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) {
k := &K8sDecorator{
ctx: ctx,
addContainerNameMetricLabel: addContainerNameMetricLabel,
}

podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, kubeConfigPath, customHostName, isSystemd, logger)
podstore, err := NewPodStore(kubeletClient, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, hostName, isSystemd, logger)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading