Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Misc] Operator: Typed Workqueue used #143

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ type Controller struct {
gardenerCertInformerFactory gardenerCertInformers.SharedInformerFactory
certManagerInformerFactory certManagerInformers.SharedInformerFactory
gardenerDNSInformerFactory gardenerDNSInformers.SharedInformerFactory
queues map[int]workqueue.RateLimitingInterface
queues map[int]workqueue.TypedRateLimitingInterface[QueueItem]
eventBroadcaster events.EventBroadcaster
eventRecorder events.EventRecorder
}

func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, apiExtClient apiext.Interface, promClient promop.Interface) *Controller {
queues := map[int]workqueue.RateLimitingInterface{
ResourceCAPApplication: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPApplicationVersion: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPTenant: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPTenantOperation: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceOperatorDomains: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}),
ResourceCAPApplicationVersion: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplicationVersion]}),
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
ResourceOperatorDomains: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceOperatorDomains]}),
}

// Use 30mins as the default Resync interval for kube / proprietary resources
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
v1alpha1scheme.AddToScheme(scheme)
istioscheme.AddToScheme(scheme)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
eventBroadcaster.StartStructuredLogging(klog.Level(1))
eventBroadcaster.StartLogging(klog.Background())
recorder := eventBroadcaster.NewRecorder(scheme, "cap-controller.sme.sap.com")

c := &Controller{
Expand Down Expand Up @@ -222,24 +223,19 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {

klog.V(2).InfoS("Processing queue item in work queue", "resource", getResourceKindFromKey(key), "queue length", q.Len())

i, shutdown := q.Get()
item, shutdown := q.Get()
if shutdown {
return fmt.Errorf("queue (%d) shutdown", key) // stop processing when the queue has been shutdown
}

// [IMPORTANT] always mark the item as done (after processing it)
defer q.Done(i)
defer q.Done(item)

var (
err error
skipItem bool
result *ReconcileResult
)
item, ok := i.(QueueItem)
if !ok {
klog.ErrorS(err, "unknown item found in queue", "resource", getResourceKindFromKey(key))
return nil // process next item
}

attempts := q.NumRequeues(item)

Expand Down Expand Up @@ -268,14 +264,14 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {
klog.ErrorS(err, "queue processing error", "resource", getResourceKindFromKey(key))
if !skipItem {
// add back to queue for re-processing
q.AddRateLimited(i)
q.AddRateLimited(item)
return nil
}
}

// Forget the item after processing it
// This just clears the rate limiter from tracking the item
q.Forget(i)
q.Forget(item)

if result != nil {
// requeue resources specified in the reconciliation result
Expand All @@ -300,7 +296,7 @@ func (c *Controller) processReconcileResult(result *ReconcileResult) {
}
}

func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.RateLimitingInterface) {
func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.TypedRateLimitingInterface[QueueItem]) {
if r := recover(); r != nil {
// Log the Error / Stack Trace
err := fmt.Errorf("panic: %v", r)
Expand All @@ -320,5 +316,4 @@ func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q wor
// Add the item back to the queue to be processed again with a RateLimited delay
q.AddRateLimited(item)
}

}
6 changes: 2 additions & 4 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestController_processQueueItem(t *testing.T) {

c := getTestController(testResources{cas: []*v1alpha1.CAPApplication{ca}, cats: []*v1alpha1.CAPTenant{cat}, preventStart: true})
if tt.resource == 9 || tt.resource == 99 {
c.queues[tt.resource] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.queues[tt.resource] = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{})
}

dummyKubeInformerFactory := &dummyInformerFactoryType{c.kubeInformerFactory, tt.resourceNamespace, nil}
Expand Down Expand Up @@ -263,10 +263,8 @@ func TestController_processQueueItem(t *testing.T) {
cancel()
expectedRes = testC.processQueueItem(ctx, tt.resource)
} else {
if tt.resource < 4 || tt.resource == 9 {
if tt.resource < 4 || tt.resource == 9 || tt.resource == 99 {
q.Add(item)
} else if tt.resource == 99 {
q.Add(tt.resource)
}
expectedRes = testC.processQueueItem(context.TODO(), tt.resource)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
var expectedResult = false

type dummyType struct {
workqueue.RateLimitingInterface
workqueue.TypedRateLimitingInterface[QueueItem]
}

func (q *dummyType) Add(item interface{}) {
func (q *dummyType) Add(item QueueItem) {
expectedResult = true
}

func (q *dummyType) AddAfter(item interface{}, duration time.Duration) {
func (q *dummyType) AddAfter(item QueueItem, duration time.Duration) {
expectedResult = true
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestController_initializeInformers(t *testing.T) {
c := getTestController(testResources{})
expectedResult = false

queues := map[int]workqueue.RateLimitingInterface{
queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: &dummyType{},
ResourceCAPApplicationVersion: &dummyType{},
ResourceCAPTenant: &dummyType{},
Expand Down