diff --git a/.chloggen/add-pod-resources-store.yaml b/.chloggen/add-pod-resources-store.yaml new file mode 100644 index 000000000000..67c5d7777528 --- /dev/null +++ b/.chloggen/add-pod-resources-store.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscontainerinsightreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: added a new podresourcestore which provides mapping from resource to container and vice-versa + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [167] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: this change provides a new store in awscontainerinsightreceiver, which when started provides mapping from resources to container and vice versa using kubelet podresourcesapi. \ No newline at end of file diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index f5f164ca91aa..76345567b4b3 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -23,10 +23,12 @@ require ( go.opentelemetry.io/otel/trace v1.23.1 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.26.0 + google.golang.org/grpc v1.61.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 k8s.io/klog v1.0.0 + k8s.io/kubelet v0.27.3 ) require ( @@ -140,7 +142,6 @@ require ( golang.org/x/time v0.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect - google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index 25aff89d014f..72eed1243421 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -741,6 +741,8 @@ k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= +k8s.io/kubelet v0.27.3 h1:5WhTV1iiBu9q/rr+gvy65LQ+K/e7dmgcaYjys5ipLqY= +k8s.io/kubelet v0.27.3/go.mod h1:Mz42qgZZgWgPmOJEYaR5evmh+EoSwFzEvPBozA2y9mg= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go new file mode 100644 index 000000000000..d9ec7c769522 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kubeletutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" + +import ( + "context" + "fmt" + "net" + "os" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" +) + +const ( + socketPath = "/var/lib/kubelet/pod-resources/kubelet.sock" + connectionTimeout = 10 * time.Second +) + +type PodResourcesClient struct { + delegateClient podresourcesapi.PodResourcesListerClient + conn *grpc.ClientConn +} + +func NewPodResourcesClient() (*PodResourcesClient, error) { + podResourcesClient := &PodResourcesClient{} + + conn, err := podResourcesClient.connectToServer(socketPath) + podResourcesClient.conn = conn + if err != nil { + return nil, fmt.Errorf("failed to connect to server: %w", err) + } + + podResourcesClient.delegateClient = podresourcesapi.NewPodResourcesListerClient(conn) + + return podResourcesClient, nil +} + +func (p *PodResourcesClient) connectToServer(socket string) (*grpc.ClientConn, error) { + _, err := os.Stat(socket) + if os.IsNotExist(err) { + return nil, fmt.Errorf("socket path does not exist: %s", socket) + } else if err != nil { + return nil, fmt.Errorf("failed to check socket path: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) + defer cancel() + + conn, err := grpc.DialContext(ctx, + socket, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + d := net.Dialer{} + return d.DialContext(ctx, "unix", addr) + }), + ) + + if err != nil { + return nil, fmt.Errorf("failure connecting to '%s': %w", socket, err) + } + + return conn, nil +} + +func (p *PodResourcesClient) ListPods() (*podresourcesapi.ListPodResourcesResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) + defer cancel() + + resp, err := p.delegateClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{}) + if err != nil { + return nil, fmt.Errorf("failure getting pod resources: %w", err) + } + + return resp, nil +} + +func (p *PodResourcesClient) Shutdown() { + err := p.conn.Close() + if err != nil { + return + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go new file mode 100644 index 000000000000..d2266ffd1cbb --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + v1 "k8s.io/kubelet/pkg/apis/podresources/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" +) + +const ( + taskTimeout = 10 * time.Second +) + +var ( + instance *PodResourcesStore + once sync.Once +) + +type ContainerInfo struct { + PodName string + ContainerName string + Namespace string +} + +type ResourceInfo struct { + ResourceName string + DeviceID string +} + +type PodResourcesClientInterface interface { + ListPods() (*v1.ListPodResourcesResponse, error) + Shutdown() +} + +type PodResourcesStore struct { + containerInfoToResourcesMap map[ContainerInfo][]ResourceInfo + resourceToPodContainerMap map[ResourceInfo]ContainerInfo + resourceNameSet map[string]struct{} + lastRefreshed time.Time + ctx context.Context + cancel context.CancelFunc + logger *zap.Logger + podResourcesClient PodResourcesClientInterface +} + +func NewPodResourcesStore(logger *zap.Logger) *PodResourcesStore { + once.Do(func() { + podResourcesClient, _ := kubeletutil.NewPodResourcesClient() + ctx, cancel := context.WithCancel(context.Background()) + instance = &PodResourcesStore{ + containerInfoToResourcesMap: make(map[ContainerInfo][]ResourceInfo), + resourceToPodContainerMap: make(map[ResourceInfo]ContainerInfo), + resourceNameSet: make(map[string]struct{}), + lastRefreshed: time.Now(), + ctx: ctx, + cancel: cancel, + logger: logger, + podResourcesClient: podResourcesClient, + } + + go func() { + refreshTicker := time.NewTicker(time.Second) + for { + select { + case <-refreshTicker.C: + instance.refreshTick() + case <-instance.ctx.Done(): + refreshTicker.Stop() + return + } + } + }() + }) + return instance +} + +func (p *PodResourcesStore) refreshTick() { + now := time.Now() + if now.Sub(p.lastRefreshed) >= taskTimeout { + p.refresh() + p.lastRefreshed = now + } +} + +func (p *PodResourcesStore) refresh() { + doRefresh := func() { + p.updateMaps() + } + + refreshWithTimeout(p.ctx, doRefresh, taskTimeout) +} + +func (p *PodResourcesStore) updateMaps() { + p.containerInfoToResourcesMap = make(map[ContainerInfo][]ResourceInfo) + p.resourceToPodContainerMap = make(map[ResourceInfo]ContainerInfo) + + if len(p.resourceNameSet) == 0 { + p.logger.Warn("No resource names allowlisted thus skipping updating of maps.") + return + } + + devicePods, err := p.podResourcesClient.ListPods() + + if err != nil { + p.logger.Error(fmt.Sprintf("Error getting pod resources: %v", err)) + return + } + + for _, pod := range devicePods.GetPodResources() { + for _, container := range pod.GetContainers() { + for _, device := range container.GetDevices() { + + containerInfo := ContainerInfo{ + PodName: pod.GetName(), + Namespace: pod.GetNamespace(), + ContainerName: container.GetName(), + } + + for _, deviceID := range device.GetDeviceIds() { + resourceInfo := ResourceInfo{ + ResourceName: device.GetResourceName(), + DeviceID: deviceID, + } + _, found := p.resourceNameSet[resourceInfo.ResourceName] + if found { + p.containerInfoToResourcesMap[containerInfo] = append(p.containerInfoToResourcesMap[containerInfo], resourceInfo) + p.resourceToPodContainerMap[resourceInfo] = containerInfo + } + } + } + } + } +} + +func (p *PodResourcesStore) GetContainerInfo(deviceID string, resourceName string) *ContainerInfo { + key := ResourceInfo{DeviceID: deviceID, ResourceName: resourceName} + if containerInfo, ok := p.resourceToPodContainerMap[key]; ok { + return &containerInfo + } + return nil +} + +func (p *PodResourcesStore) GetResourcesInfo(podName string, containerName string, namespace string) *[]ResourceInfo { + key := ContainerInfo{PodName: podName, ContainerName: containerName, Namespace: namespace} + if resourceInfo, ok := p.containerInfoToResourcesMap[key]; ok { + return &resourceInfo + } + return nil +} + +func (p *PodResourcesStore) AddResourceName(resourceName string) { + p.resourceNameSet[resourceName] = struct{}{} +} + +func (p *PodResourcesStore) Shutdown() { + p.cancel() + p.podResourcesClient.Shutdown() +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go new file mode 100644 index 000000000000..02fbac32f44f --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go @@ -0,0 +1,267 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" +) + +const ( + defaultResourceName = "Resource-1" + defaultPodName = "Pod-1" + defaultNamespace = "Namespace-1" + defaultContainerName = "Container-1" + defaultDeviceID1 = "Device-1" + defaultDeviceID2 = "Device-2" + defaultDeviceID3 = "Device-3" + defaultDeviceID4 = "Device-4" + defaultResourceNameSkipped = "Resource-Skipped" + defaultContainerNameNoDevice = "Container-NoDevice" + defaultNamespaceNoDevice = "Namespace-NoDevice" + defaultPodNameNoDevice = "Pod-NoDevice" +) + +var ( + expectedContainerInfoToResourcesMap = map[ContainerInfo][]ResourceInfo{ + { + PodName: defaultPodName, + ContainerName: defaultContainerName, + Namespace: defaultNamespace, + }: { + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID1, + }, + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID2, + }, + }, + } + + expectedResourceToPodContainerMap = map[ResourceInfo]ContainerInfo{ + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID1, + }: { + PodName: defaultPodName, + ContainerName: defaultContainerName, + Namespace: defaultNamespace, + }, + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID2, + }: { + PodName: defaultPodName, + ContainerName: defaultContainerName, + Namespace: defaultNamespace, + }, + } + + expectedContainerInfo = ContainerInfo{ + PodName: defaultPodName, + ContainerName: defaultContainerName, + Namespace: defaultNamespace, + } + + expectedResourceInfo = []ResourceInfo{ + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID1, + }, + { + ResourceName: defaultResourceName, + DeviceID: defaultDeviceID2, + }, + } + + listPodResourcesResponse = &podresourcesv1.ListPodResourcesResponse{ + PodResources: []*podresourcesv1.PodResources{ + { + Name: defaultPodName, + Namespace: defaultNamespace, + Containers: []*podresourcesv1.ContainerResources{ + { + Name: defaultContainerName, + Devices: []*podresourcesv1.ContainerDevices{ + { + ResourceName: defaultResourceName, + DeviceIds: []string{defaultDeviceID1, defaultDeviceID2}, + }, + { + ResourceName: defaultResourceNameSkipped, + DeviceIds: []string{defaultDeviceID3, defaultDeviceID4}, + }, + }, + }, + }, + }, + { + Name: defaultPodNameNoDevice, + Namespace: defaultNamespaceNoDevice, + Containers: []*podresourcesv1.ContainerResources{ + { + Name: defaultContainerNameNoDevice, + Devices: []*podresourcesv1.ContainerDevices{}, + }, + }, + }, + }, + } + + listPodResourcesResponseWithEmptyPodResources = &podresourcesv1.ListPodResourcesResponse{ + PodResources: []*podresourcesv1.PodResources{}, + } + + listPodResourcesResponseWithEmptyResponse = &podresourcesv1.ListPodResourcesResponse{} + + resourceNameSet = map[string]struct{}{ + defaultResourceName: {}, + } +) + +type MockPodResourcesClient struct { + response *podresourcesv1.ListPodResourcesResponse + err error + shutdownCalled bool +} + +func (m *MockPodResourcesClient) ListPods() (*podresourcesv1.ListPodResourcesResponse, error) { + return m.response, m.err +} + +func (m *MockPodResourcesClient) Shutdown() { + m.shutdownCalled = true +} + +func TestNewPodResourcesStore(t *testing.T) { + logger := zap.NewNop() + store := NewPodResourcesStore(logger) + assert.NotNil(t, store, "PodResourcesStore should not be nil") + assert.NotNil(t, store.ctx, "Context should not be nil") + assert.NotNil(t, store.cancel, "Cancel function should not be nil") +} + +func TestRefreshTick(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) + + store.lastRefreshed = time.Now().Add(-time.Hour) + + store.refreshTick() + + assert.True(t, store.lastRefreshed.After(time.Now().Add(-time.Hour)), "lastRefreshed should have been updated") +} + +func TestShutdown(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) + + mockClient := &MockPodResourcesClient{listPodResourcesResponse, nil, false} + store.podResourcesClient = mockClient + + store.Shutdown() + + assert.True(t, mockClient.shutdownCalled, "Shutdown method of the client should have been called") +} + +func TestUpdateMaps(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) + store.updateMaps() + + assert.NotNil(t, store.containerInfoToResourcesMap) + assert.NotNil(t, store.resourceToPodContainerMap) + assert.Equal(t, len(expectedContainerInfoToResourcesMap), len(store.containerInfoToResourcesMap)) + assert.Equal(t, len(expectedResourceToPodContainerMap), len(store.resourceToPodContainerMap)) + assert.Equal(t, expectedContainerInfoToResourcesMap, store.containerInfoToResourcesMap) + assert.Equal(t, expectedResourceToPodContainerMap, store.resourceToPodContainerMap) +} + +func TestGets(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) + store.updateMaps() + + assertMapsContainData(t, store) +} + +func TestGetsWhenThereAreNoPods(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyPodResources, nil) + store.updateMaps() + + assertMapsDontContainData(t, store) +} + +func TestGetsWhenPodResourcesResponseIsEmpty(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyResponse, nil) + store.updateMaps() + + assertMapsDontContainData(t, store) +} + +func TestGetsWhenPodResourcesThrowsError(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyResponse, fmt.Errorf("mocked behavior")) + store.updateMaps() + + assertMapsDontContainData(t, store) +} + +func TestAddResourceName(t *testing.T) { + store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) + + store.resourceNameSet = make(map[string]struct{}) + store.updateMaps() + assertMapsDontContainData(t, store) + + // After adding resource to map + store.AddResourceName(defaultResourceName) + store.updateMaps() + assertMapsContainData(t, store) +} + +func constructPodResourcesStore(containerToDevices map[ContainerInfo][]ResourceInfo, deviceToContainer map[ResourceInfo]ContainerInfo, podResourcesResponse *podresourcesv1.ListPodResourcesResponse, podResourcesError error) *PodResourcesStore { + logger, _ := zap.NewDevelopment() + return &PodResourcesStore{ + containerInfoToResourcesMap: containerToDevices, + resourceToPodContainerMap: deviceToContainer, + resourceNameSet: resourceNameSet, + lastRefreshed: time.Now(), + ctx: context.Background(), + cancel: func() {}, + logger: logger, + podResourcesClient: &MockPodResourcesClient{podResourcesResponse, podResourcesError, false}, + } +} + +func assertMapsContainData(t *testing.T, store *PodResourcesStore) { + assert.Equal(t, len(expectedContainerInfoToResourcesMap), len(store.containerInfoToResourcesMap)) + assert.Equal(t, len(expectedResourceToPodContainerMap), len(store.resourceToPodContainerMap)) + + assert.Equal(t, expectedContainerInfo, *store.GetContainerInfo(defaultDeviceID1, defaultResourceName)) + assert.Equal(t, expectedResourceInfo, *store.GetResourcesInfo(defaultPodName, defaultContainerName, defaultNamespace)) + + actualResourceInfo := store.GetResourcesInfo(defaultPodNameNoDevice, defaultContainerNameNoDevice, defaultNamespaceNoDevice) + if actualResourceInfo != nil { + t.Errorf("Expected GetResourcesInfo to return nil for an unexpected key, but got %v", actualResourceInfo) + } +} + +func assertMapsDontContainData(t *testing.T, store *PodResourcesStore) { + assert.Equal(t, 0, len(store.containerInfoToResourcesMap)) + assert.Equal(t, 0, len(store.resourceToPodContainerMap)) + + actualContainerInfo := store.GetContainerInfo(defaultDeviceID1, defaultResourceName) + if actualContainerInfo != nil { + t.Errorf("Expected GetContainerInfo to return nil for an unexpected key, but got %v", actualContainerInfo) + } + + actualResourceInfo := store.GetResourcesInfo(defaultPodName, defaultContainerName, defaultNamespace) + if actualResourceInfo != nil { + t.Errorf("Expected GetResourcesInfo to return nil for an unexpected key, but got %v", actualResourceInfo) + } +}