diff --git a/kubernetes/informer.go b/kubernetes/informer.go index 76f059e9e..ed7e7d4bd 100644 --- a/kubernetes/informer.go +++ b/kubernetes/informer.go @@ -21,6 +21,9 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -307,3 +310,25 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio } return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil } + +// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata. +func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer { + ctx := context.Background() + if indexers == nil { + indexers = cache.Indexers{} + } + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return client.Resource(gvr).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return client.Resource(gvr).Watch(ctx, options) + }, + }, + &metav1.PartialObjectMetadata{}, + opts.SyncTimeout, + indexers, + ) + return informer +} diff --git a/kubernetes/metadata/replicaset.go b/kubernetes/metadata/replicaset.go index 3fccc802d..18936cf44 100644 --- a/kubernetes/metadata/replicaset.go +++ b/kubernetes/metadata/replicaset.go @@ -18,6 +18,11 @@ package metadata import ( + "fmt" + "reflect" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -93,3 +98,32 @@ func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr return nil } + +// RemoveUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute +// Pod metadata. This function works for both ReplicaSet and PartialObjectMetadata. +func RemoveUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) { + switch old := obj.(type) { + case *appsv1.ReplicaSet: + transformed := &appsv1.ReplicaSet{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + ResourceVersion: old.GetResourceVersion(), + }, + } + return transformed, nil + case *metav1.PartialObjectMetadata: + transformed := &metav1.PartialObjectMetadata{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + ResourceVersion: old.GetResourceVersion(), + }, + } + return transformed, nil + default: + return nil, fmt.Errorf("obj of type %v neither a ReplicaSet nor a PartialObjectMetadata", reflect.TypeOf(obj)) + } +} diff --git a/kubernetes/metadata/replicaset_test.go b/kubernetes/metadata/replicaset_test.go index 1a19d1f8a..404fba920 100644 --- a/kubernetes/metadata/replicaset_test.go +++ b/kubernetes/metadata/replicaset_test.go @@ -115,7 +115,7 @@ func TestReplicaset_Generate(t *testing.T) { } } -func TestReplicase_GenerateFromName(t *testing.T) { +func TestReplicast_GenerateFromName(t *testing.T) { client := k8sfake.NewSimpleClientset() boolean := true tests := []struct { @@ -198,3 +198,128 @@ func TestReplicase_GenerateFromName(t *testing.T) { }) } } + +func TestReplicaset_RemoveUnnecessaryData(t *testing.T) { + boolean := true + tests := []struct { + input kubernetes.Resource + output kubernetes.Resource + name string + err error + }{ + { + name: "test simple object with owner", + input: &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + UID: uid, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "demo", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "demo", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx:1.12", + Ports: []v1.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, + }, + output: &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + }, + { + name: "test simple object with owner", + input: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + UID: uid, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + output: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: defaultNs, + ResourceVersion: "688594", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + }, + }, + { + name: "wrong resource type", + input: &v1.Pod{}, + err: fmt.Errorf("obj of type *v1.Pod neither a ReplicaSet nor a PartialObjectMetadata"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + transformed, err := RemoveUnnecessaryReplicaSetData(test.input) + assert.Equal(t, test.err, err) + assert.Equal(t, test.output, transformed) + }) + } +} diff --git a/kubernetes/util.go b/kubernetes/util.go index 040ce9348..d02ecbffa 100644 --- a/kubernetes/util.go +++ b/kubernetes/util.go @@ -24,6 +24,8 @@ import ( "os" "strings" + "k8s.io/client-go/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -82,6 +84,28 @@ func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.I return client, nil } +// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an +// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, +// it parses the config file to get the config required to build a client. +func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) { + if kubeconfig == "" { + kubeconfig = GetKubeConfigEnvironmentVariable() + } + + cfg, err := BuildConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("unable to build kube config due to error: %w", err) + } + cfg.QPS = opt.QPS + cfg.Burst = opt.Burst + client, err := metadata.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err) + } + + return client, nil +} + // BuildConfig is a helper function that builds configs from a kubeconfig filepath. // If kubeconfigPath is not passed in we fallback to inClusterConfig. // If inClusterConfig fails, we fallback to the default config. diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 8090d97d8..c2ed277dc 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -22,6 +22,11 @@ import ( "fmt" "time" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/metadata" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -111,13 +116,28 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { - var store cache.Store - var queue workqueue.Interface - var cachedObject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err } + return NewNamedWatcherWithInformer(name, client, resource, informer, opts) +} + +// NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +// This function requires the underlying informer to be passed by the caller. +func NewNamedWatcherWithInformer( + name string, + client kubernetes.Interface, + resource Resource, + informer cache.SharedInformer, + opts WatchOptions, +) (Watcher, error) { + var store cache.Store + var queue workqueue.Interface + var cachedObject runtime.Object store = informer.GetStore() queue = workqueue.NewNamed(name) @@ -145,7 +165,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource handler: NoOpEventHandlerFuncs{}, } - _, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { w.enqueue(o, add) }, @@ -182,6 +202,46 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource return w, nil } +// NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for +// resource from the cluster (filtered to the given node). +// Event handlers defined on this watcher receive PartialObjectMetadata resources. +func NewMetadataWatcher( + client kubernetes.Interface, + metadataClient metadata.Interface, + gvr schema.GroupVersionResource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc) +} + +// NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +// Event handlers defined on this watcher receive PartialObjectMetadata resources. +func NewNamedMetadataWatcher( + name string, + client kubernetes.Interface, + metadataClient metadata.Interface, + gvr schema.GroupVersionResource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + informer := NewMetadataInformer(metadataClient, gvr, opts, indexers) + + if transformFunc != nil { + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } + } + + return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts) +} + // AddEventHandler adds a resource handler to process each request that is coming into the watcher func (w *watcher) AddEventHandler(h ResourceEventHandler) { w.handler = h diff --git a/kubernetes/watcher_test.go b/kubernetes/watcher_test.go new file mode 100644 index 000000000..2bb3143ef --- /dev/null +++ b/kubernetes/watcher_test.go @@ -0,0 +1,164 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kubernetes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + cachetest "k8s.io/client-go/tools/cache/testing" +) + +func TestWatcherStartAndStop(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + require.NoError(t, watcher.Start()) + watcher.Stop() +} + +func TestWatcherHandlers(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + + var added, updated, deleted bool + + watcher.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + added = true + }, + UpdateFunc: func(obj interface{}) { + updated = true + }, + DeleteFunc: func(obj interface{}) { + deleted = true + }, + }) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + pod := &Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + ResourceVersion: "1", + }, + } + // add a resource + listWatch.Add(pod) + assert.Eventually(t, func() bool { + return added + }, time.Second*5, time.Millisecond) + + // update the resource + modifiedPod := pod.DeepCopy() + modifiedPod.SetResourceVersion("2") + listWatch.Modify(modifiedPod) + assert.Eventually(t, func() bool { + return updated + }, time.Second*5, time.Millisecond) + + // delete the resource + listWatch.Delete(modifiedPod) + assert.Eventually(t, func() bool { + return deleted + }, time.Second*5, time.Millisecond) +} + +func TestWatcherIsUpdated(t *testing.T) { + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Pod{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + // set a custom IsUpdated that always returns true + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, + WatchOptions{IsUpdated: func(old, new interface{}) bool { + return true + }}) + require.NoError(t, err) + + var updated bool + + watcher.AddEventHandler(ResourceEventHandlerFuncs{ + UpdateFunc: func(obj interface{}) { + updated = true + }, + }) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + pod := &Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + }, + } + listWatch.Add(pod) + + // update the resource, but don't actually change it + // with the default IsUpdated, our handler wouldn't be called, but with our custom one, it will + modifiedPod := pod.DeepCopy() + listWatch.Modify(modifiedPod) + assert.Eventually(t, func() bool { + return updated + }, time.Second*5, time.Millisecond) + +} + +func TestCachedObject(t *testing.T) { + t.Skip("Currently bugged, and not used anywhere") + client := fake.NewSimpleClientset() + listWatch := cachetest.NewFakeControllerSource() + resource := &Namespace{} + informer := cache.NewSharedInformer(listWatch, resource, 0) + watcher, err := NewNamedWatcherWithInformer("test", client, resource, informer, WatchOptions{}) + require.NoError(t, err) + + require.NoError(t, watcher.Start()) + defer watcher.Stop() + + namespace := &Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: types.UID("poduid"), + Namespace: "test", + ResourceVersion: "1", + }, + } + listWatch.Add(namespace) + assert.EventuallyWithT(t, func(collectT *assert.CollectT) { + assert.Equal(collectT, namespace, watcher.CachedObject()) + }, time.Second*5, time.Millisecond) +}