Skip to content

Commit

Permalink
Add metadata watcher and informer
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Oct 2, 2024
1 parent 3f3a65d commit 64b342a
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 5 deletions.
25 changes: 25 additions & 0 deletions kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -307,3 +310,25 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}
return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil
}

// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata.
func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer {
ctx := context.Background()
if indexers == nil {
indexers = cache.Indexers{}
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Resource(gvr).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Resource(gvr).Watch(ctx, options)
},
},
&metav1.PartialObjectMetadata{},
opts.SyncTimeout,
indexers,
)
return informer
}
34 changes: 34 additions & 0 deletions kubernetes/metadata/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package metadata

import (
"fmt"
"reflect"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -93,3 +98,32 @@ func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr

return nil
}

// RemoveUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute
// Pod metadata. This function works for both ReplicaSet and PartialObjectMetadata.
func RemoveUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) {
switch old := obj.(type) {
case *appsv1.ReplicaSet:
transformed := &appsv1.ReplicaSet{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
case *metav1.PartialObjectMetadata:
transformed := &metav1.PartialObjectMetadata{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
default:
return nil, fmt.Errorf("obj of type %v neither a ReplicaSet nor a PartialObjectMetadata", reflect.TypeOf(obj))
}
}
127 changes: 126 additions & 1 deletion kubernetes/metadata/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestReplicaset_Generate(t *testing.T) {
}
}

func TestReplicase_GenerateFromName(t *testing.T) {
func TestReplicast_GenerateFromName(t *testing.T) {
client := k8sfake.NewSimpleClientset()
boolean := true
tests := []struct {
Expand Down Expand Up @@ -198,3 +198,128 @@ func TestReplicase_GenerateFromName(t *testing.T) {
})
}
}

func TestReplicaset_RemoveUnnecessaryData(t *testing.T) {
boolean := true
tests := []struct {
input kubernetes.Resource
output kubernetes.Resource
name string
err error
}{
{
name: "test simple object with owner",
input: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx:1.12",
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
},
output: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "test simple object with owner",
input: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
output: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "wrong resource type",
input: &v1.Pod{},
err: fmt.Errorf("obj of type *v1.Pod neither a ReplicaSet nor a PartialObjectMetadata"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
transformed, err := RemoveUnnecessaryReplicaSetData(test.input)
assert.Equal(t, test.err, err)
assert.Equal(t, test.output, transformed)
})
}
}
24 changes: 24 additions & 0 deletions kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
"strings"

"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -82,6 +84,28 @@ func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.I
return client, nil
}

// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}

cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}

return client, nil
}

// BuildConfig is a helper function that builds configs from a kubeconfig filepath.
// If kubeconfigPath is not passed in we fallback to inClusterConfig.
// If inClusterConfig fails, we fallback to the default config.
Expand Down
68 changes: 64 additions & 4 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"fmt"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -111,13 +116,28 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
}
return NewNamedWatcherWithInformer(name, client, resource, informer, opts)
}

// NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// This function requires the underlying informer to be passed by the caller.
func NewNamedWatcherWithInformer(
name string,
client kubernetes.Interface,
resource Resource,
informer cache.SharedInformer,
opts WatchOptions,
) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object

store = informer.GetStore()
queue = workqueue.NewNamed(name)
Expand Down Expand Up @@ -145,7 +165,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
handler: NoOpEventHandlerFuncs{},
}

_, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
w.enqueue(o, add)
},
Expand Down Expand Up @@ -182,6 +202,46 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
return w, nil
}

// NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node).
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
func NewMetadataWatcher(
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc)
}

// NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
func NewNamedMetadataWatcher(
name string,
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
informer := NewMetadataInformer(metadataClient, gvr, opts, indexers)

if transformFunc != nil {
err := informer.SetTransform(transformFunc)
if err != nil {
return nil, err
}
}

return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts)
}

// AddEventHandler adds a resource handler to process each request that is coming into the watcher
func (w *watcher) AddEventHandler(h ResourceEventHandler) {
w.handler = h
Expand Down
Loading

0 comments on commit 64b342a

Please sign in to comment.