diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index c97fb111..1bfa9299 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -11,8 +11,10 @@ import ( "sync" "time" + "github.com/samber/lo" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" + authorizationv1 "k8s.io/api/authorization/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -23,6 +25,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + authorizationtypev1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -61,16 +64,23 @@ type Controller struct { agentVersion *config.AgentVersion healthzProvider *HealthzProvider - conditionalInformers []conditionalInformer + conditionalInformers []conditionalInformer + selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface } type conditionalInformer struct { + // name is the name of the API resource, ex.: "poddisruptionbudgets" + name string // groupVersion is the group and version of the API type, ex.: "policy/v1" groupVersion string // apiType is the type of the API object, ex.: "*v1.PodDisruptionBudget" apiType reflect.Type - // informer is the informer for the API type - informer cache.SharedInformer + // informerFactory is the informer for the API type + informerFactory func() cache.SharedIndexInformer + // permissionVerbs is list of verbs which represent the permissions + permissionVerbs []string + // isApplied is true if the permission is applied + isApplied bool } func New( @@ -85,6 +95,7 @@ func New( v version.Interface, agentVersion *config.AgentVersion, healthzProvider *HealthzProvider, + selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface, ) *Controller { healthzProvider.Initializing() @@ -108,25 +119,37 @@ func New( conditionalInformers := []conditionalInformer{ { - groupVersion: policyv1.SchemeGroupVersion.String(), - apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}), - informer: f.Policy().V1().PodDisruptionBudgets().Informer(), + name: "poddisruptionbudgets", + groupVersion: policyv1.SchemeGroupVersion.String(), + apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}), + permissionVerbs: []string{"get", "list", "watch"}, + isApplied: false, + informerFactory: func() cache.SharedIndexInformer { + return f.Policy().V1().PodDisruptionBudgets().Informer() + }, }, { - groupVersion: storagev1.SchemeGroupVersion.String(), - apiType: reflect.TypeOf(&storagev1.CSINode{}), - informer: f.Storage().V1().CSINodes().Informer(), + name: "csinodes", + groupVersion: storagev1.SchemeGroupVersion.String(), + apiType: reflect.TypeOf(&storagev1.CSINode{}), + permissionVerbs: []string{"get", "list", "watch"}, + isApplied: false, + informerFactory: func() cache.SharedIndexInformer { + return f.Storage().V1().CSINodes().Informer() + }, }, { - groupVersion: autoscalingv1.SchemeGroupVersion.String(), - apiType: reflect.TypeOf(&autoscalingv1.HorizontalPodAutoscaler{}), - informer: f.Autoscaling().V1().HorizontalPodAutoscalers().Informer(), + name: "horizontalpodautoscalers", + groupVersion: autoscalingv1.SchemeGroupVersion.String(), + apiType: reflect.TypeOf(&autoscalingv1.HorizontalPodAutoscaler{}), + permissionVerbs: []string{"get", "list", "watch"}, + isApplied: false, + informerFactory: func() cache.SharedIndexInformer { + return f.Autoscaling().V1().HorizontalPodAutoscalers().Informer() + }, }, } - //Applies conditional informers if the API type is available in the cluster - defaultInformers = applyConditionalInformers(discovery, log, defaultInformers, conditionalInformers) - handledInformers := map[reflect.Type]*custominformers.HandledInformer{} for typ, informer := range defaultInformers { handledInformers[typ] = custominformers.NewHandledInformer(log, queue, informer, typ, nil) @@ -149,20 +172,21 @@ func New( ) return &Controller{ - log: log, - clusterID: clusterID, - castaiclient: castaiclient, - provider: provider, - cfg: cfg, - delta: delta.New(log, clusterID, v.Full()), - queue: queue, - informers: handledInformers, - agentVersion: agentVersion, - healthzProvider: healthzProvider, - metricsClient: metricsClient, - discovery: discovery, - informerFactory: f, - conditionalInformers: conditionalInformers, + log: log, + clusterID: clusterID, + castaiclient: castaiclient, + provider: provider, + cfg: cfg, + delta: delta.New(log, clusterID, v.Full()), + queue: queue, + informers: handledInformers, + agentVersion: agentVersion, + healthzProvider: healthzProvider, + metricsClient: metricsClient, + discovery: discovery, + informerFactory: f, + conditionalInformers: conditionalInformers, + selfSubjectAccessReview: selfSubjectAccessReview, } } @@ -241,9 +265,7 @@ func (c *Controller) Run(ctx context.Context) error { }, c.cfg.Interval, ctx.Done()) }() - if missingInformers := c.getMissingConditionalInformers(); len(missingInformers) > 0 { - go c.startConditionalInformersWithWatcher(ctx, missingInformers) - } + go c.startConditionalInformersWithWatcher(ctx, c.conditionalInformers) podMetricsType := reflect.TypeOf(&v1beta1.PodMetrics{}) go c.startPodMetricsInformersWithWatcher(ctx, podMetricsType) @@ -259,18 +281,45 @@ func (c *Controller) Run(ctx context.Context) error { } func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, conditionalInformers []conditionalInformer) { + tryConditionalInformers := conditionalInformers if err := wait.PollImmediateInfiniteWithContext(ctx, time.Minute*2, func(ctx context.Context) (done bool, err error) { - apiResourceLists := fetchApiResourceLists(c.discovery, c.log) + apiResourceLists := fetchAPIResourceLists(c.discovery, c.log) if apiResourceLists == nil { return false, nil } + c.log.Infof("Cluster API server is available, trying to start conditional informers") - c.log.Infof("Cluster API server is available, starting conditional informers") - for _, informer := range conditionalInformers { - apiResourceListForGroupVersion := getApiResourceListByGroupVersion(informer.groupVersion, apiResourceLists) - if isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) { - custominformers.NewHandledInformer(c.log, c.queue, informer.informer, informer.apiType, nil) + for i, informer := range tryConditionalInformers { + if informer.isApplied { + continue + } + apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.groupVersion, apiResourceLists) + if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) { + c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available", + informer.name, + ) + continue } + + if !c.informerHasAccess(ctx, informer) { + c.log.Warnf("Skipping conditional informer name: %v, because required access is not available", + informer.name, + ) + continue + } + + c.log.Infof("Starting conditional informer for %v", informer.name) + tryConditionalInformers[i].isApplied = true + + handledInformer := custominformers.NewHandledInformer(c.log, c.queue, informer.informerFactory(), informer.apiType, nil) + go handledInformer.Run(ctx.Done()) + } + + filterNotAppliedConditionInformers := lo.Filter(tryConditionalInformers, func(informer conditionalInformer, _ int) bool { + return !informer.isApplied + }) + if len(filterNotAppliedConditionInformers) > 0 { + return false, nil } return true, nil }); err != nil { @@ -280,12 +329,12 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c func (c *Controller) startPodMetricsInformersWithWatcher(ctx context.Context, podMetricsType reflect.Type) { if err := wait.PollImmediateInfiniteWithContext(ctx, time.Minute*2, func(ctx context.Context) (done bool, err error) { - apiResourceLists := fetchApiResourceLists(c.discovery, c.log) + apiResourceLists := fetchAPIResourceLists(c.discovery, c.log) if apiResourceLists == nil { return false, nil } - apiResourceListForGroupVersion := getApiResourceListByGroupVersion(v1beta1.SchemeGroupVersion.String(), apiResourceLists) + apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(v1beta1.SchemeGroupVersion.String(), apiResourceLists) if !isResourceAvailable(podMetricsType, apiResourceListForGroupVersion) { return false, nil } @@ -432,33 +481,43 @@ func (c *Controller) debugQueueContent(maxItems int) string { return content } -func (c *Controller) getMissingConditionalInformers() []conditionalInformer { - var missingInformers []conditionalInformer - for _, informer := range c.conditionalInformers { - if _, ok := c.informers[informer.apiType]; !ok { - missingInformers = append(missingInformers, informer) +func (c *Controller) informerHasAccess(ctx context.Context, informer conditionalInformer) bool { + // Cut the groupName from the groupVersion from "/", example: "policy/v1beta1" -> "policy" or "policy/v1" -> "policy" + groupName := strings.Split(informer.groupVersion, "/")[0] + + // Check if allowed to access all resources with the wildcard "*" verb + if access := c.informerIsAllowedToAccessResource(ctx, "*", informer, groupName); access.Status.Allowed { + return true + } + + for _, verb := range informer.permissionVerbs { + access := c.informerIsAllowedToAccessResource(ctx, verb, informer, groupName) + if !access.Status.Allowed { + return false } } - return missingInformers + return true } -func applyConditionalInformers(client discovery.DiscoveryInterface, log logrus.FieldLogger, defaultInformers map[reflect.Type]cache.SharedInformer, conditionalInformers []conditionalInformer) map[reflect.Type]cache.SharedInformer { - apiResourceLists := fetchApiResourceLists(client, log) - if apiResourceLists == nil { - return defaultInformers - } +func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview { + access, err := c.selfSubjectAccessReview.Create(ctx, &authorizationv1.SelfSubjectAccessReview{ + Spec: authorizationv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: verb, + Group: groupName, + Resource: informer.name, + }, + }, + }, metav1.CreateOptions{}) - informersMap := defaultInformers - for _, informer := range conditionalInformers { - apiResourceList := getApiResourceListByGroupVersion(informer.groupVersion, apiResourceLists) - if isResourceAvailable(informer.apiType, apiResourceList) { - informersMap[informer.apiType] = informer.informer - } + if err != nil { + c.log.Warnf("Error when getting server resources: %v", err.Error()) + return &authorizationv1.SelfSubjectAccessReview{} } - return informersMap + return access } -func fetchApiResourceLists(client discovery.DiscoveryInterface, log logrus.FieldLogger) []*metav1.APIResourceList { +func fetchAPIResourceLists(client discovery.DiscoveryInterface, log logrus.FieldLogger) []*metav1.APIResourceList { _, apiResourceLists, err := client.ServerGroupsAndResources() if err != nil { log.Warnf("Error when getting server resources: %v", err.Error()) @@ -467,7 +526,7 @@ func fetchApiResourceLists(client discovery.DiscoveryInterface, log logrus.Field return apiResourceLists } -func getApiResourceListByGroupVersion(groupVersion string, apiResourceLists []*metav1.APIResourceList) *metav1.APIResourceList { +func getAPIResourceListByGroupVersion(groupVersion string, apiResourceLists []*metav1.APIResourceList) *metav1.APIResourceList { for _, apiResourceList := range apiResourceLists { if apiResourceList.GroupVersion == groupVersion { return apiResourceList diff --git a/internal/services/controller/controller_exclude_race_test.go b/internal/services/controller/controller_exclude_race_test.go index ec830b84..d73e35ad 100644 --- a/internal/services/controller/controller_exclude_race_test.go +++ b/internal/services/controller/controller_exclude_race_test.go @@ -127,7 +127,7 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) { Interval: 2 * time.Second, PrepTimeout: 2 * time.Second, InitialSleepDuration: 10 * time.Millisecond, - }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log)) + }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews()) f.Start(ctx.Done()) diff --git a/internal/services/controller/controller_test.go b/internal/services/controller/controller_test.go index c8dc7acf..63866cc8 100644 --- a/internal/services/controller/controller_test.go +++ b/internal/services/controller/controller_test.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.uber.org/goleak" + authorizationv1 "k8s.io/api/authorization/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" @@ -24,6 +25,7 @@ import ( fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake" k8stesting "k8s.io/client-go/testing" "k8s.io/metrics/pkg/apis/metrics/v1beta1" metrics_fake "k8s.io/metrics/pkg/client/clientset/versioned/fake" @@ -99,14 +101,35 @@ func TestController_HappyPath(t *testing.T) { csiData, err := delta.Encode(csi) require.NoError(t, err) + fakeSelfSubjectAccessReviewsClient := &authfakev1.FakeSelfSubjectAccessReviews{ + Fake: &authfakev1.FakeAuthorizationV1{ + Fake: &k8stesting.Fake{}, + }, + } + + // returns true for all requests to fakeSelfSubjectAccessReviewsClient + fakeSelfSubjectAccessReviewsClient.Fake.PrependReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, &authorizationv1.SelfSubjectAccessReview{ + Status: authorizationv1.SubjectAccessReviewStatus{ + Allowed: true, + }, + }, nil + }) + clientset := fake.NewSimpleClientset(node, pod, pdb, hpa, csi) clientset.Fake.Resources = []*metav1.APIResourceList{ { GroupVersion: autoscalingv1.SchemeGroupVersion.String(), APIResources: []metav1.APIResource{ { - Name: "horizontalpodautoscalers", - Kind: "HorizontalPodAutoscaler", + Group: "autoscaling", + Name: "horizontalpodautoscalers", + Kind: "HorizontalPodAutoscaler", + Verbs: []string{ + "list", + "watch", + "get", + }, }, }, }, @@ -114,8 +137,14 @@ func TestController_HappyPath(t *testing.T) { GroupVersion: storagev1.SchemeGroupVersion.String(), APIResources: []metav1.APIResource{ { - Name: "csinodes", - Kind: "CSINode", + Group: "storage.k8s.io", + Name: "csinodes", + Kind: "CSINode", + Verbs: []string{ + "list", + "watch", + "get", + }, }, }, }, @@ -169,7 +198,12 @@ func TestController_HappyPath(t *testing.T) { Interval: 15 * time.Second, PrepTimeout: 2 * time.Second, InitialSleepDuration: 10 * time.Millisecond, - }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log)) + }, + version, + agentVersion, + NewHealthzProvider(defaultHealthzCfg, log), + fakeSelfSubjectAccessReviewsClient, + ) f.Start(ctx.Done()) go func() { @@ -181,8 +215,10 @@ func TestController_HappyPath(t *testing.T) { GroupVersion: policyv1.SchemeGroupVersion.String(), APIResources: []metav1.APIResource{ { - Name: "poddisruptionbudgets", - Kind: "PodDisruptionBudget", + Group: "policy", + Name: "poddisruptionbudgets", + Kind: "PodDisruptionBudget", + Verbs: []string{"get", "list", "watch"}, }, }, }) @@ -218,23 +254,11 @@ func TestNew(t *testing.T) { f := informers.NewSharedInformerFactory(clientset, 0) log := logrus.New() log.SetLevel(logrus.DebugLevel) - ctrl := New( - log, - f, - clientset.Discovery(), - castaiclient, - metricsClient, - provider, - clusterID.String(), - &config.Controller{ - Interval: 15 * time.Second, - PrepTimeout: 2 * time.Second, - InitialSleepDuration: 10 * time.Millisecond, - }, - version, - agentVersion, - NewHealthzProvider(defaultHealthzCfg, log), - ) + ctrl := New(log, f, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{ + Interval: 15 * time.Second, + PrepTimeout: 2 * time.Second, + InitialSleepDuration: 10 * time.Millisecond, + }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews()) r.NotNil(ctrl) diff --git a/internal/services/controller/worker.go b/internal/services/controller/worker.go index 1f43d58c..6dfc257b 100644 --- a/internal/services/controller/worker.go +++ b/internal/services/controller/worker.go @@ -60,6 +60,7 @@ func Loop( v, agentVersion, healthzProvider, + clientset.AuthorizationV1().SelfSubjectAccessReviews(), ) f.Start(ctrlCtx.Done())