Skip to content

Commit

Permalink
Fix: Don't apply informer if it doesn't have corresponding RBAC rule (#…
Browse files Browse the repository at this point in the history
…129)

Fix: Don't apply informer if it doesn't have corresponding RBAC rule

---------

Co-authored-by: tautvydasliekis <[email protected]>
  • Loading branch information
tautvydasLiekis and tautvydasliekis authored Jul 13, 2023
1 parent 72a1578 commit de1e725
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 85 deletions.
179 changes: 119 additions & 60 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"sync"
"time"

"github.com/samber/lo"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
authorizationv1 "k8s.io/api/authorization/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -23,6 +25,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
authorizationtypev1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
Expand Down Expand Up @@ -61,16 +64,23 @@ type Controller struct {
agentVersion *config.AgentVersion
healthzProvider *HealthzProvider

conditionalInformers []conditionalInformer
conditionalInformers []conditionalInformer
selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface
}

type conditionalInformer struct {
// name is the name of the API resource, ex.: "poddisruptionbudgets"
name string
// groupVersion is the group and version of the API type, ex.: "policy/v1"
groupVersion string
// apiType is the type of the API object, ex.: "*v1.PodDisruptionBudget"
apiType reflect.Type
// informer is the informer for the API type
informer cache.SharedInformer
// informerFactory is the informer for the API type
informerFactory func() cache.SharedIndexInformer
// permissionVerbs is list of verbs which represent the permissions
permissionVerbs []string
// isApplied is true if the permission is applied
isApplied bool
}

func New(
Expand All @@ -85,6 +95,7 @@ func New(
v version.Interface,
agentVersion *config.AgentVersion,
healthzProvider *HealthzProvider,
selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface,
) *Controller {
healthzProvider.Initializing()

Expand All @@ -108,25 +119,37 @@ func New(

conditionalInformers := []conditionalInformer{
{
groupVersion: policyv1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}),
informer: f.Policy().V1().PodDisruptionBudgets().Informer(),
name: "poddisruptionbudgets",
groupVersion: policyv1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}),
permissionVerbs: []string{"get", "list", "watch"},
isApplied: false,
informerFactory: func() cache.SharedIndexInformer {
return f.Policy().V1().PodDisruptionBudgets().Informer()
},
},
{
groupVersion: storagev1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&storagev1.CSINode{}),
informer: f.Storage().V1().CSINodes().Informer(),
name: "csinodes",
groupVersion: storagev1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&storagev1.CSINode{}),
permissionVerbs: []string{"get", "list", "watch"},
isApplied: false,
informerFactory: func() cache.SharedIndexInformer {
return f.Storage().V1().CSINodes().Informer()
},
},
{
groupVersion: autoscalingv1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&autoscalingv1.HorizontalPodAutoscaler{}),
informer: f.Autoscaling().V1().HorizontalPodAutoscalers().Informer(),
name: "horizontalpodautoscalers",
groupVersion: autoscalingv1.SchemeGroupVersion.String(),
apiType: reflect.TypeOf(&autoscalingv1.HorizontalPodAutoscaler{}),
permissionVerbs: []string{"get", "list", "watch"},
isApplied: false,
informerFactory: func() cache.SharedIndexInformer {
return f.Autoscaling().V1().HorizontalPodAutoscalers().Informer()
},
},
}

//Applies conditional informers if the API type is available in the cluster
defaultInformers = applyConditionalInformers(discovery, log, defaultInformers, conditionalInformers)

handledInformers := map[reflect.Type]*custominformers.HandledInformer{}
for typ, informer := range defaultInformers {
handledInformers[typ] = custominformers.NewHandledInformer(log, queue, informer, typ, nil)
Expand All @@ -149,20 +172,21 @@ func New(
)

return &Controller{
log: log,
clusterID: clusterID,
castaiclient: castaiclient,
provider: provider,
cfg: cfg,
delta: delta.New(log, clusterID, v.Full()),
queue: queue,
informers: handledInformers,
agentVersion: agentVersion,
healthzProvider: healthzProvider,
metricsClient: metricsClient,
discovery: discovery,
informerFactory: f,
conditionalInformers: conditionalInformers,
log: log,
clusterID: clusterID,
castaiclient: castaiclient,
provider: provider,
cfg: cfg,
delta: delta.New(log, clusterID, v.Full()),
queue: queue,
informers: handledInformers,
agentVersion: agentVersion,
healthzProvider: healthzProvider,
metricsClient: metricsClient,
discovery: discovery,
informerFactory: f,
conditionalInformers: conditionalInformers,
selfSubjectAccessReview: selfSubjectAccessReview,
}
}

Expand Down Expand Up @@ -241,9 +265,7 @@ func (c *Controller) Run(ctx context.Context) error {
}, c.cfg.Interval, ctx.Done())
}()

if missingInformers := c.getMissingConditionalInformers(); len(missingInformers) > 0 {
go c.startConditionalInformersWithWatcher(ctx, missingInformers)
}
go c.startConditionalInformersWithWatcher(ctx, c.conditionalInformers)

podMetricsType := reflect.TypeOf(&v1beta1.PodMetrics{})
go c.startPodMetricsInformersWithWatcher(ctx, podMetricsType)
Expand All @@ -259,18 +281,45 @@ func (c *Controller) Run(ctx context.Context) error {
}

func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, conditionalInformers []conditionalInformer) {
tryConditionalInformers := conditionalInformers
if err := wait.PollImmediateInfiniteWithContext(ctx, time.Minute*2, func(ctx context.Context) (done bool, err error) {
apiResourceLists := fetchApiResourceLists(c.discovery, c.log)
apiResourceLists := fetchAPIResourceLists(c.discovery, c.log)
if apiResourceLists == nil {
return false, nil
}
c.log.Infof("Cluster API server is available, trying to start conditional informers")

c.log.Infof("Cluster API server is available, starting conditional informers")
for _, informer := range conditionalInformers {
apiResourceListForGroupVersion := getApiResourceListByGroupVersion(informer.groupVersion, apiResourceLists)
if isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) {
custominformers.NewHandledInformer(c.log, c.queue, informer.informer, informer.apiType, nil)
for i, informer := range tryConditionalInformers {
if informer.isApplied {
continue
}
apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.groupVersion, apiResourceLists)
if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) {
c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available",
informer.name,
)
continue
}

if !c.informerHasAccess(ctx, informer) {
c.log.Warnf("Skipping conditional informer name: %v, because required access is not available",
informer.name,
)
continue
}

c.log.Infof("Starting conditional informer for %v", informer.name)
tryConditionalInformers[i].isApplied = true

handledInformer := custominformers.NewHandledInformer(c.log, c.queue, informer.informerFactory(), informer.apiType, nil)
go handledInformer.Run(ctx.Done())
}

filterNotAppliedConditionInformers := lo.Filter(tryConditionalInformers, func(informer conditionalInformer, _ int) bool {
return !informer.isApplied
})
if len(filterNotAppliedConditionInformers) > 0 {
return false, nil
}
return true, nil
}); err != nil {
Expand All @@ -280,12 +329,12 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c

func (c *Controller) startPodMetricsInformersWithWatcher(ctx context.Context, podMetricsType reflect.Type) {
if err := wait.PollImmediateInfiniteWithContext(ctx, time.Minute*2, func(ctx context.Context) (done bool, err error) {
apiResourceLists := fetchApiResourceLists(c.discovery, c.log)
apiResourceLists := fetchAPIResourceLists(c.discovery, c.log)
if apiResourceLists == nil {
return false, nil
}

apiResourceListForGroupVersion := getApiResourceListByGroupVersion(v1beta1.SchemeGroupVersion.String(), apiResourceLists)
apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(v1beta1.SchemeGroupVersion.String(), apiResourceLists)
if !isResourceAvailable(podMetricsType, apiResourceListForGroupVersion) {
return false, nil
}
Expand Down Expand Up @@ -432,33 +481,43 @@ func (c *Controller) debugQueueContent(maxItems int) string {
return content
}

func (c *Controller) getMissingConditionalInformers() []conditionalInformer {
var missingInformers []conditionalInformer
for _, informer := range c.conditionalInformers {
if _, ok := c.informers[informer.apiType]; !ok {
missingInformers = append(missingInformers, informer)
func (c *Controller) informerHasAccess(ctx context.Context, informer conditionalInformer) bool {
// Cut the groupName from the groupVersion from "/", example: "policy/v1beta1" -> "policy" or "policy/v1" -> "policy"
groupName := strings.Split(informer.groupVersion, "/")[0]

// Check if allowed to access all resources with the wildcard "*" verb
if access := c.informerIsAllowedToAccessResource(ctx, "*", informer, groupName); access.Status.Allowed {
return true
}

for _, verb := range informer.permissionVerbs {
access := c.informerIsAllowedToAccessResource(ctx, verb, informer, groupName)
if !access.Status.Allowed {
return false
}
}
return missingInformers
return true
}

func applyConditionalInformers(client discovery.DiscoveryInterface, log logrus.FieldLogger, defaultInformers map[reflect.Type]cache.SharedInformer, conditionalInformers []conditionalInformer) map[reflect.Type]cache.SharedInformer {
apiResourceLists := fetchApiResourceLists(client, log)
if apiResourceLists == nil {
return defaultInformers
}
func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview {
access, err := c.selfSubjectAccessReview.Create(ctx, &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Verb: verb,
Group: groupName,
Resource: informer.name,
},
},
}, metav1.CreateOptions{})

informersMap := defaultInformers
for _, informer := range conditionalInformers {
apiResourceList := getApiResourceListByGroupVersion(informer.groupVersion, apiResourceLists)
if isResourceAvailable(informer.apiType, apiResourceList) {
informersMap[informer.apiType] = informer.informer
}
if err != nil {
c.log.Warnf("Error when getting server resources: %v", err.Error())
return &authorizationv1.SelfSubjectAccessReview{}
}
return informersMap
return access
}

func fetchApiResourceLists(client discovery.DiscoveryInterface, log logrus.FieldLogger) []*metav1.APIResourceList {
func fetchAPIResourceLists(client discovery.DiscoveryInterface, log logrus.FieldLogger) []*metav1.APIResourceList {
_, apiResourceLists, err := client.ServerGroupsAndResources()
if err != nil {
log.Warnf("Error when getting server resources: %v", err.Error())
Expand All @@ -467,7 +526,7 @@ func fetchApiResourceLists(client discovery.DiscoveryInterface, log logrus.Field
return apiResourceLists
}

func getApiResourceListByGroupVersion(groupVersion string, apiResourceLists []*metav1.APIResourceList) *metav1.APIResourceList {
func getAPIResourceListByGroupVersion(groupVersion string, apiResourceLists []*metav1.APIResourceList) *metav1.APIResourceList {
for _, apiResourceList := range apiResourceLists {
if apiResourceList.GroupVersion == groupVersion {
return apiResourceList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
Interval: 2 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log))
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews())

f.Start(ctx.Done())

Expand Down
Loading

0 comments on commit de1e725

Please sign in to comment.