Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: caching for Discovery Client #682

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions cmd/kar-controllers/app/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package discovery

import (
"k8s.io/klog/v2"
"time"

"k8s.io/client-go/rest"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
)

var GlobalCachedDiscoveryClient discovery.CachedDiscoveryInterface

func InitializeGlobalDiscoveryClient(config *rest.Config) error {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return err
}
GlobalCachedDiscoveryClient = memory.NewMemCacheClient(discoveryClient)

go startDiscoveryRefreshTicker()

return nil
}

func RefreshDiscoveryCache() {
klog.Infof("Invalidating discovery cache")
GlobalCachedDiscoveryClient.Invalidate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should have some error handling or logging in case any errors arise when attempting to invalidate the discovery cache? WDYT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalidate doesn't return anything so I don't think this is possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, you're right! Thanks Kevin.

}

func startDiscoveryRefreshTicker() {
ticker := time.NewTicker(5 * time.Hour)
for {
select {
case <-ticker.C:
RefreshDiscoveryCache()
}
}
}
8 changes: 8 additions & 0 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package app

import (
"net/http"
"k8s.io/klog/v2"
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/discovery"
"strings"

_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand Down Expand Up @@ -62,6 +64,12 @@ func Run(opt *options.ServerOption) error {
AgentConfigs: strings.Split(opt.AgentConfigs, ","),
}

if err = discovery.InitializeGlobalDiscoveryClient(restConfig); err != nil {
klog.Errorf("Error initializing global discovery client: %s", err)
} else {
klog.Infof("Initializing global discovery client")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding the log level here makes sense as this is informational, so can be filtered out at other log levels.

Suggested change
klog.Infof("Initializing global discovery client")
klog.V(4).Infof("Initializing global discovery client")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Fiona,
Thanks for the review. I'll update the log :)

The refresh timeframe set to 5 hours is an arbitrary number, I am aiming for a timeframe which both promotes the efficiency of the cache, whist not holding cache for too long, therefor I chose to half that of the controller-runtime resync period of 10 hours.

Although I am open for discussion if you have any suggestions towards the timeframe :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about it, and how it would work. From what I understand the client only initializes once, and then refreshes after 5 hours. If resources are created soon after a refresh, will they then only be available for manipulation after the next refresh?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you query for an object type which doesn't exist in the cache it would cause a cache miss and invalidation triggering queries to the k8 discovery client to repopulate the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kevin. So if a new object of an object type that does exist is created, this will be discoverable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the discovery client cache doesn't cache individual k8 objects. As I understand it, the Discovery Client is the piece of k8 which resolves names such as pod or po into a fully resolved kubernetes API endpoint. Without a cache, every time one tries to get a pod from kubernetes it will iterate through the discovery API until it finds an entry with that name. This means that in the worst case that every resource will be checked using a GET request which can cause client throttling. Without a cache this would happen every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining!

}

jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig)
if jobctrl == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"runtime/debug"
"strings"
"time"


discoveryCache "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/discovery"
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
v1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -86,8 +87,8 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
name := ""

namespaced := true
// todo:DELETEME dd := common.KubeClient.Discovery()
dd := gr.clients.Discovery()

dd := discoveryCache.GlobalCachedDiscoveryClient
apigroups, err := restmapper.GetAPIGroupResources(dd)
if err != nil {
klog.Errorf("[Cleanup] Error getting API resources, err=%#v", err)
Expand Down Expand Up @@ -206,8 +207,7 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
}()

namespaced := true
// todo:DELETEME dd := common.KubeClient.Discovery()
dd := gr.clients.Discovery()
dd := discoveryCache.GlobalCachedDiscoveryClient
apigroups, err := restmapper.GetAPIGroupResources(dd)
if err != nil {
klog.Errorf("Error getting API resources, err=%#v", err)
Expand Down Expand Up @@ -624,7 +624,7 @@ func getContainerResources(container v1.Container, replicas float64) *clustersta

// returns status of an item present in etcd
func (gr *GenericResources) IsItemCompleted(awgr *arbv1.AppWrapperGenericResource, namespace string, appwrapperName string, genericItemName string) (completed bool) {
dd := gr.clients.Discovery()
dd := discoveryCache.GlobalCachedDiscoveryClient
apigroups, err := restmapper.GetAPIGroupResources(dd)
if err != nil {
klog.Errorf("[IsItemCompleted] Error getting API resources, err=%#v", err)
Expand Down