Skip to content

Commit

Permalink
Add metadata watcher and informer (#111)
Browse files Browse the repository at this point in the history
Introduce metadata-only watchers to the kubernetes package. These are
useful if we only need to track metadata for a resource - a good example
are ReplicaSets, for which we usually only care about the
OwnerReferences. As a result, we only store the metadata, reducing
steady-state memory consumption, but also only get updates involving
metadata, reducing churn greatly in larger clusters.

The implementation introduces new constructors for the Watcher, allowing
an informer to be passed in. Existing constructors are implemented using
the new constructor, though none of the code actually changes. As a
result, it is now possible to unit test the watcher, and I've added some
basic unit tests for it.

We also add two helper functions:

- `GetKubernetesMetadataClient` creates a metadata-only kubernetes
client, and is very similar to the existing `GetKubernetesClient`
- `RemoveUnnecessaryReplicaSetData` is a transform function that can be
passed into an informer so it only stores the metadata we actually use

I tested these new functions in both beats and agent, in a kind cluster
as well as one of our staging clusters.

This is part of the solution to
elastic/elastic-agent#5580.

---------

Co-authored-by: Mauri de Souza Meneguzzo <[email protected]>
  • Loading branch information
swiatekm and mauri870 authored Oct 3, 2024
1 parent 70c0872 commit a698e0f
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 5 deletions.
25 changes: 25 additions & 0 deletions kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -307,3 +310,25 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}
return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil
}

// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata.
func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer {
ctx := context.Background()
if indexers == nil {
indexers = cache.Indexers{}
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Resource(gvr).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Resource(gvr).Watch(ctx, options)
},
},
&metav1.PartialObjectMetadata{},
opts.SyncTimeout,
indexers,
)
return informer
}
32 changes: 32 additions & 0 deletions kubernetes/metadata/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package metadata

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -94,3 +97,32 @@ func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr

return nil
}

// RemoveUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute
// Pod metadata. This function works for both ReplicaSet and PartialObjectMetadata.
func RemoveUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) {
switch old := obj.(type) {
case *appsv1.ReplicaSet:
transformed := &appsv1.ReplicaSet{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
case *metav1.PartialObjectMetadata:
transformed := &metav1.PartialObjectMetadata{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
default:
return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj)
}
}
127 changes: 126 additions & 1 deletion kubernetes/metadata/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestReplicaset_Generate(t *testing.T) {
}
}

func TestReplicase_GenerateFromName(t *testing.T) {
func TestReplicaset_GenerateFromName(t *testing.T) {
client := k8sfake.NewSimpleClientset()
boolean := true
tests := []struct {
Expand Down Expand Up @@ -232,3 +232,128 @@ func TestReplicase_GenerateFromName(t *testing.T) {
})
}
}

func TestReplicaset_RemoveUnnecessaryData(t *testing.T) {
boolean := true
tests := []struct {
input kubernetes.Resource
output kubernetes.Resource
name string
err error
}{
{
name: "test simple object with owner",
input: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx:1.12",
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
},
output: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "test simple object with owner",
input: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
output: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "wrong resource type",
input: &v1.Pod{},
err: fmt.Errorf("obj of type *v1.Pod neither a ReplicaSet nor a PartialObjectMetadata"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
transformed, err := RemoveUnnecessaryReplicaSetData(test.input)
assert.Equal(t, test.err, err)
assert.Equal(t, test.output, transformed)
})
}
}
24 changes: 24 additions & 0 deletions kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
"strings"

"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -82,6 +84,28 @@ func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.I
return client, nil
}

// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}

cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}

return client, nil
}

// BuildConfig is a helper function that builds configs from a kubeconfig filepath.
// If kubeconfigPath is not passed in we fallback to inClusterConfig.
// If inClusterConfig fails, we fallback to the default config.
Expand Down
70 changes: 66 additions & 4 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"fmt"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -102,6 +107,7 @@ type watcher struct {

// NewWatcher initializes the watcher client to provide a events handler for
// resource from the cluster (filtered to the given node)
// Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name.
func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
return NewNamedWatcher("", client, resource, opts, indexers)
}
Expand All @@ -111,13 +117,28 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
}
return NewNamedWatcherWithInformer(name, client, resource, informer, opts)
}

// NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// This function requires the underlying informer to be passed by the caller.
func NewNamedWatcherWithInformer(
name string,
client kubernetes.Interface,
resource Resource,
informer cache.SharedInformer,
opts WatchOptions,
) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object

store = informer.GetStore()
queue = workqueue.NewNamed(name)
Expand Down Expand Up @@ -145,7 +166,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
handler: NoOpEventHandlerFuncs{},
}

_, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
w.enqueue(o, add)
},
Expand Down Expand Up @@ -182,6 +203,47 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
return w, nil
}

// NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node).
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
// Note: This watcher won't emit workqueue metrics. Use NewNamedWatcher to provide an explicit queue name.
func NewMetadataWatcher(
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc)
}

// NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
func NewNamedMetadataWatcher(
name string,
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
informer := NewMetadataInformer(metadataClient, gvr, opts, indexers)

if transformFunc != nil {
err := informer.SetTransform(transformFunc)
if err != nil {
return nil, err
}
}

return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts)
}

// AddEventHandler adds a resource handler to process each request that is coming into the watcher
func (w *watcher) AddEventHandler(h ResourceEventHandler) {
w.handler = h
Expand Down
Loading

0 comments on commit a698e0f

Please sign in to comment.