Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata watcher and informer #111

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -307,3 +310,25 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}
return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil
}

// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata.
func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer {
ctx := context.Background()
if indexers == nil {
indexers = cache.Indexers{}
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Resource(gvr).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Resource(gvr).Watch(ctx, options)
},
},
&metav1.PartialObjectMetadata{},
opts.SyncTimeout,
indexers,
)
return informer
}
33 changes: 33 additions & 0 deletions kubernetes/metadata/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package metadata

import (
"fmt"
"reflect"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -94,3 +98,32 @@ func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr

return nil
}

// RemoveUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute
// Pod metadata. This function works for both ReplicaSet and PartialObjectMetadata.
func RemoveUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) {
switch old := obj.(type) {
case *appsv1.ReplicaSet:
transformed := &appsv1.ReplicaSet{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
case *metav1.PartialObjectMetadata:
transformed := &metav1.PartialObjectMetadata{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
default:
return nil, fmt.Errorf("obj of type %v neither a ReplicaSet nor a PartialObjectMetadata", reflect.TypeOf(obj))
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
}
}
127 changes: 126 additions & 1 deletion kubernetes/metadata/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestReplicaset_Generate(t *testing.T) {
}
}

func TestReplicase_GenerateFromName(t *testing.T) {
func TestReplicast_GenerateFromName(t *testing.T) {
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
client := k8sfake.NewSimpleClientset()
boolean := true
tests := []struct {
Expand Down Expand Up @@ -232,3 +232,128 @@ func TestReplicase_GenerateFromName(t *testing.T) {
})
}
}

func TestReplicaset_RemoveUnnecessaryData(t *testing.T) {
boolean := true
tests := []struct {
input kubernetes.Resource
output kubernetes.Resource
name string
err error
}{
{
name: "test simple object with owner",
input: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx:1.12",
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
},
output: &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "test simple object with owner",
input: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
UID: uid,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
output: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-rs",
Namespace: defaultNs,
ResourceVersion: "688594",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps",
Kind: "Deployment",
Name: "nginx-deployment",
UID: "005f3b90-4b9d-12f8-acf0-31020a840144",
Controller: &boolean,
},
},
},
},
},
{
name: "wrong resource type",
input: &v1.Pod{},
err: fmt.Errorf("obj of type *v1.Pod neither a ReplicaSet nor a PartialObjectMetadata"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
transformed, err := RemoveUnnecessaryReplicaSetData(test.input)
assert.Equal(t, test.err, err)
assert.Equal(t, test.output, transformed)
})
}
}
24 changes: 24 additions & 0 deletions kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
"strings"

"k8s.io/client-go/metadata"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -82,6 +84,28 @@ func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.I
return client, nil
}

// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}

cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}

return client, nil
}

// BuildConfig is a helper function that builds configs from a kubeconfig filepath.
// If kubeconfigPath is not passed in we fallback to inClusterConfig.
// If inClusterConfig fails, we fallback to the default config.
Expand Down
68 changes: 64 additions & 4 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"fmt"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -111,13 +116,28 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
}
return NewNamedWatcherWithInformer(name, client, resource, informer, opts)
}

// NewNamedWatcherWithInformer initializes the watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// This function requires the underlying informer to be passed by the caller.
func NewNamedWatcherWithInformer(
name string,
client kubernetes.Interface,
resource Resource,
informer cache.SharedInformer,
opts WatchOptions,
) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var cachedObject runtime.Object

store = informer.GetStore()
queue = workqueue.NewNamed(name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queue = workqueue.NewNamed(name)
queue = workqueue.NewWithConfig(QueueConfig{
Name: name,
})

nit: since you touch this code maybe you wanna deal with deprecated calls as well

Copy link
Contributor Author

@swiatekm swiatekm Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually want to avoid touching it as much as possible. I think the recommended way here is to use typed queues, and I don't want to make that kind of change in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swiatekm could you make an issue or PR (if it's quicker but I doubt that) to address the deprecated calls? That way we don't lose track of this work. Thanks.

Expand Down Expand Up @@ -145,7 +165,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
handler: NoOpEventHandlerFuncs{},
}

_, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
w.enqueue(o, add)
},
Expand Down Expand Up @@ -182,6 +202,46 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
return w, nil
}

// NewMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node).
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
func NewMetadataWatcher(
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
return NewNamedMetadataWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc)
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}

// NewNamedMetadataWatcher initializes a metadata-only watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
// Event handlers defined on this watcher receive PartialObjectMetadata resources.
func NewNamedMetadataWatcher(
name string,
client kubernetes.Interface,
metadataClient metadata.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
informer := NewMetadataInformer(metadataClient, gvr, opts, indexers)

if transformFunc != nil {
err := informer.SetTransform(transformFunc)
if err != nil {
return nil, err
}
}

return NewNamedWatcherWithInformer(name, client, &v1.PartialObjectMetadata{}, informer, opts)
}

// AddEventHandler adds a resource handler to process each request that is coming into the watcher
func (w *watcher) AddEventHandler(h ResourceEventHandler) {
w.handler = h
Expand Down
Loading
Loading