From a698e0f05da29e85737a1a76abb0599738df834d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Thu, 3 Oct 2024 15:38:10 +0200 Subject: [PATCH] Add metadata watcher and informer (#111) Introduce metadata-only watchers to the kubernetes package. These are useful if we only need to track metadata for a resource - a good example are ReplicaSets, for which we usually only care about the OwnerReferences. As a result, we only store the metadata, reducing steady-state memory consumption, but also only get updates involving metadata, reducing churn greatly in larger clusters. The implementation introduces new constructors for the Watcher, allowing an informer to be passed in. Existing constructors are implemented using the new constructor, though none of the code actually changes. As a result, it is now possible to unit test the watcher, and I've added some basic unit tests for it. We also add two helper functions: - `GetKubernetesMetadataClient` creates a metadata-only kubernetes client, and is very similar to the existing `GetKubernetesClient` - `RemoveUnnecessaryReplicaSetData` is a transform function that can be passed into an informer so it only stores the metadata we actually use I tested these new functions in both beats and agent, in a kind cluster as well as one of our staging clusters. This is part of the solution to https://github.com/elastic/elastic-agent/pull/5580. --------- Co-authored-by: Mauri de Souza Meneguzzo --- kubernetes/informer.go | 25 ++++ kubernetes/metadata/replicaset.go | 32 +++++ kubernetes/metadata/replicaset_test.go | 127 ++++++++++++++++++- kubernetes/util.go | 24 ++++ kubernetes/watcher.go | 70 ++++++++++- kubernetes/watcher_test.go | 164 +++++++++++++++++++++++++ 6 files changed, 437 insertions(+), 5 deletions(-) create mode 100644 kubernetes/watcher_test.go diff --git a/kubernetes/informer.go b/kubernetes/informer.go index 76f059e9e..ed7e7d4bd 100644 --- a/kubernetes/informer.go +++ b/kubernetes/informer.go @@ -21,6 +21,9 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -307,3 +310,25 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio } return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil } + +// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata. +func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer { + ctx := context.Background() + if indexers == nil { + indexers = cache.Indexers{} + } + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return client.Resource(gvr).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return client.Resource(gvr).Watch(ctx, options) + }, + }, + &metav1.PartialObjectMetadata{}, + opts.SyncTimeout, + indexers, + ) + return informer +} diff --git a/kubernetes/metadata/replicaset.go b/kubernetes/metadata/replicaset.go index 794a2260d..4279de6fd 100644 --- a/kubernetes/metadata/replicaset.go +++ b/kubernetes/metadata/replicaset.go @@ -18,6 +18,9 @@ package metadata import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -94,3 +97,32 @@ func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr return nil } + +// RemoveUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute +// Pod metadata. This function works for both ReplicaSet and PartialObjectMetadata. +func RemoveUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) { + switch old := obj.(type) { + case *appsv1.ReplicaSet: + transformed := &appsv1.ReplicaSet{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + ResourceVersion: old.GetResourceVersion(), + }, + } + return transformed, nil + case *metav1.PartialObjectMetadata: + transformed := &metav1.PartialObjectMetadata{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + ResourceVersion: old.GetResourceVersion(), + }, + } + return transformed, nil + default: + return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj) + } +} diff --git a/kubernetes/metadata/replicaset_test.go b/kubernetes/metadata/replicaset_test.go index cb30fe8ad..826e3f005 100644 --- a/kubernetes/metadata/replicaset_test.go +++ b/kubernetes/metadata/replicaset_test.go @@ -120,7 +120,7 @@ func TestReplicaset_Generate(t *testing.T) { } } -func TestReplicase_GenerateFromName(t *testing.T) { +func TestReplicaset_GenerateFromName(t *testing.T) { client := k8sfake.NewSimpleClientset() boolean := true tests := []struct { @@ -232,3 +232,128 @@ func TestReplicase_GenerateFromName(t *testing.T) { }) } } + +func TestReplicaset_RemoveUnnecessaryData(t *testing.T) { + boolean := true + tests := []struct { + input kubernetes.Resource + output kubernetes.Resource + name string + err error + }{ + { + name: "test simple object with owner", + input: &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + UID: uid, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "demo", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "demo", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx:1.12", + Ports: []v1.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, + }, + output: &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + }, + { + name: "test simple object with owner", + input: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + UID: uid, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + output: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + }, + { + name: "wrong resource type", + input: &v1.Pod{}, + err: fmt.Errorf("obj of type *v1.Pod neither a ReplicaSet nor a PartialObjectMetadata"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + transformed, err := RemoveUnnecessaryReplicaSetData(test.input) + assert.Equal(t, test.err, err) + assert.Equal(t, test.output, transformed) + }) + } +} diff --git a/kubernetes/util.go b/kubernetes/util.go index 040ce9348..d02ecbffa 100644 --- a/kubernetes/util.go +++ b/kubernetes/util.go @@ -24,6 +24,8 @@ import ( "os" "strings" + "k8s.io/client-go/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -82,6 +84,28 @@ func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.I return client, nil } +// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an +// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, +// it parses the config file to get the config required to build a client. +func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) { + if kubeconfig == "" { + kubeconfig = GetKubeConfigEnvironmentVariable() + } + + cfg, err := BuildConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("unable to build kube config due to error: %w", err) + } + cfg.QPS = opt.QPS + cfg.Burst = opt.Burst + client, err := metadata.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err) + } + + return client, nil +} + // BuildConfig is a helper function that builds configs from a kubeconfig filepath. // If kubeconfigPath is not passed in we fallback to inClusterConfig. // If inClusterConfig fails, we fallback to the default config. diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 8090d97d8..1c90460a7 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -22,6 +22,11 @@ import ( "fmt" "time" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/metadata" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -102,6 +107,7 @@ type watcher struct { // NewWatcher initializes the watcher client to provide a events handler for // resource from the cluster (filtered to the given node) +// Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name. func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { return NewNamedWatcher("", client, resource, opts, indexers) } @@ -111,13 +117,28 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { - var store cache.Store - var queue workqueue.Interface - var cachedObject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err } + return NewNamedWatcherWithInformer(name, client, resource, informer, opts) +} + +// NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +// This function requires the underlying informer to be passed by the caller. +func NewNamedWatcherWithInformer( + name string, + client kubernetes.Interface, + resource Resource, + informer cache.SharedInformer, + opts WatchOptions, +) (Watcher, error) { + var store cache.Store + var queue workqueue.Interface + var cachedObject runtime.Object store = informer.GetStore() queue = workqueue.NewNamed(name) @@ -145,7 +166,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource handler: NoOpEventHandlerFuncs{}, } - _, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { w.enqueue(o, add) }, @@ -182,6 +203,47 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource return w, nil } +// NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for +// resource from the cluster (filtered to the given node). +// Event handlers defined on this watcher receive PartialObjectMetadata resources. +// Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name. +func NewMetadataWatcher( + client kubernetes.Interface, + metadataClient metadata.Interface, + gvr schema.GroupVersionResource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc) +} + +// NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +// Event handlers defined on this watcher receive PartialObjectMetadata resources. +func NewNamedMetadataWatcher( + name string, + client kubernetes.Interface, + metadataClient metadata.Interface, + gvr schema.GroupVersionResource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + informer := NewMetadataInformer(metadataClient, gvr, opts, indexers) + + if transformFunc != nil { + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } + } + + return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts) +} + // AddEventHandler adds a resource handler to process each request that is coming into the watcher func (w *watcher) AddEventHandler(h ResourceEventHandler) { w.handler = h diff --git a/kubernetes/watcher_test.go b/kubernetes/watcher_test.go new file mode 100644 index 000000000..2bb3143ef --- /dev/null +++ b/kubernetes/watcher_test.go @@ -0,0 +1,164 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kubernetes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + cachetest "k8s.io/client-go/tools/cache/testing" +) + +func TestWatcherStartAndStop(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + require.NoError(t, watcher.Start()) + watcher.Stop() +} + +func TestWatcherHandlers(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + + var added, updated, deleted bool + + watcher.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + added = true + }, + UpdateFunc: func(obj interface{}) { + updated = true + }, + DeleteFunc: func(obj interface{}) { + deleted = true + }, + }) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + pod := &Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + ResourceVersion: "1", + }, + } + // add a resource + listWatch.Add(pod) + assert.Eventually(t, func() bool { + return added + }, time.Second*5, time.Millisecond) + + // update the resource + modifiedPod := pod.DeepCopy() + modifiedPod.SetResourceVersion("2") + listWatch.Modify(modifiedPod) + assert.Eventually(t, func() bool { + return updated + }, time.Second*5, time.Millisecond) + + // delete the resource + listWatch.Delete(modifiedPod) + assert.Eventually(t, func() bool { + return deleted + }, time.Second*5, time.Millisecond) +} + +func TestWatcherIsUpdated(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + // set a custom IsUpdated that always returns true + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, + WatchOptions{IsUpdated: func(old, new interface{}) bool { + return true + }}) + require.NoError(t, err) + + var updated bool + + watcher.AddEventHandler(ResourceEventHandlerFuncs{ + UpdateFunc: func(obj interface{}) { + updated = true + }, + }) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + pod := &Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + }, + } + listWatch.Add(pod) + + // update the resource, but don't actually change it + // with the default IsUpdated, our handler wouldn't be called, but with our custom one, it will + modifiedPod := pod.DeepCopy() + listWatch.Modify(modifiedPod) + assert.Eventually(t, func() bool { + return updated + }, time.Second*5, time.Millisecond) + +} + +func TestCachedObject(t *testing.T) { + t.Skip("Currently bugged, and not used anywhere") + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Namespace{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + namespace := &Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + ResourceVersion: "1", + }, + } + listWatch.Add(namespace) + assert.EventuallyWithT(t, func(collectT *assert.CollectT) { + assert.Equal(collectT, namespace, watcher.CachedObject()) + }, time.Second*5, time.Millisecond) +}