Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: reconciliation workflows #9

Merged
merged 9 commits into from
Jul 29, 2024
39 changes: 23 additions & 16 deletions controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"log"
"sync"
"time"
Expand All @@ -12,6 +13,13 @@ import (
"k8s.io/client-go/tools/cache"
)

type ResourceEvent struct {
Resource schema.GroupVersionResource
EventType EventType
OldObject RuntimeObject
NewObject RuntimeObject
}

type RuntimeLinkFunc func(objs Store) machinery.LinkFunc

type ControllerOptions struct {
Expand All @@ -24,7 +32,7 @@ type ControllerOptions struct {
}

type ControllerOptionFunc func(*ControllerOptions)
type CallbackFunc func(EventType, RuntimeObject, RuntimeObject, *machinery.Topology)
type CallbackFunc func(context.Context, ResourceEvent, *machinery.Topology)

func WithClient(client *dynamic.DynamicClient) ControllerOptionFunc {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -65,7 +73,8 @@ func WithObjectLinks(objectLinks ...RuntimeLinkFunc) ControllerOptionFunc {
func NewController(f ...ControllerOptionFunc) *Controller {
opts := &ControllerOptions{
informers: map[string]InformerBuilder{},
callback: func(EventType, RuntimeObject, RuntimeObject, *machinery.Topology) {},
callback: func(context.Context, ResourceEvent, *machinery.Topology) {
},
}

for _, fn := range f {
Expand All @@ -75,7 +84,7 @@ func NewController(f ...ControllerOptionFunc) *Controller {
controller := &Controller{
client: opts.client,
cache: newCacheStore(),
topology: NewGatewayAPITopology(opts.policyKinds, opts.objectKinds, opts.objectLinks),
topology: newGatewayAPITopologyBuilder(opts.policyKinds, opts.objectKinds, opts.objectLinks),
informers: map[string]cache.SharedInformer{},
callback: opts.callback,
}
Expand All @@ -88,10 +97,10 @@ func NewController(f ...ControllerOptionFunc) *Controller {
}

type Controller struct {
mu sync.Mutex
mu sync.RWMutex
client *dynamic.DynamicClient
cache *cacheStore
topology *GatewayAPITopology
topology *gatewayAPITopologyBuilder
informers map[string]cache.SharedInformer
callback CallbackFunc
}
Expand All @@ -117,16 +126,15 @@ func (c *Controller) Start() {
wait.Until(func() {}, time.Second, stopCh)
}

func (c *Controller) add(obj RuntimeObject) {
func (c *Controller) add(resource schema.GroupVersionResource, obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

c.cache.Add(obj)
c.topology.Refresh(c.cache.List())
c.propagate(CreateEvent, nil, obj)
c.propagate(ResourceEvent{resource, CreateEvent, nil, obj})
}

func (c *Controller) update(oldObj, newObj RuntimeObject) {
func (c *Controller) update(resource schema.GroupVersionResource, oldObj, newObj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -135,19 +143,18 @@ func (c *Controller) update(oldObj, newObj RuntimeObject) {
}

c.cache.Add(newObj)
c.topology.Refresh(c.cache.List())
c.propagate(UpdateEvent, oldObj, newObj)
c.propagate(ResourceEvent{resource, UpdateEvent, oldObj, newObj})
}

func (c *Controller) delete(obj RuntimeObject) {
func (c *Controller) delete(resource schema.GroupVersionResource, obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

c.cache.Delete(obj)
c.topology.Refresh(c.cache.List())
c.propagate(DeleteEvent, obj, nil)
c.propagate(ResourceEvent{resource, DeleteEvent, obj, nil})
}

func (c *Controller) propagate(eventType EventType, oldObj, newObj RuntimeObject) {
c.callback(eventType, oldObj, newObj, c.topology.Get())
func (c *Controller) propagate(resourceEvent ResourceEvent) {
topology := c.topology.Build(c.cache.List())
c.callback(context.TODO(), resourceEvent, topology)
}
6 changes: 3 additions & 3 deletions controller/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ func For[T RuntimeObject](resource schema.GroupVersionResource, namespace string
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o any) {
obj := o.(T)
controller.add(obj)
controller.add(resource, obj)
},
UpdateFunc: func(o, newO any) {
oldObj := o.(T)
newObj := newO.(T)
controller.update(oldObj, newObj)
controller.update(resource, oldObj, newObj)
},
DeleteFunc: func(o any) {
obj := o.(T)
controller.delete(obj)
controller.delete(resource, obj)
},
})
informer.SetTransform(Restructure[T])
Expand Down
9 changes: 9 additions & 0 deletions controller/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package controller

import gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

var (
GatewayClassesResource = gwapiv1.SchemeGroupVersion.WithResource("gatewayclasses")
GatewaysResource = gwapiv1.SchemeGroupVersion.WithResource("gateways")
HTTPRoutesResource = gwapiv1.SchemeGroupVersion.WithResource("httproutes")
)
44 changes: 44 additions & 0 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package controller

import (
"context"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kuadrant/policy-machinery/machinery"
)

type ResourceEventMatcher struct {
Resource *schema.GroupVersionResource
EventType *EventType
ObjectNamespace string
ObjectName string
}

type Subscription struct {
ReconcileFunc CallbackFunc
Events []ResourceEventMatcher
}

// Subscriber calls the reconciler function of the first subscription that matches the event
type Subscriber []Subscription

func (s Subscriber) Reconcile(ctx context.Context, resourceEvent ResourceEvent, topology *machinery.Topology) {
subscription, found := lo.Find(s, func(subscription Subscription) bool {
_, found := lo.Find(subscription.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
if obj == nil {
obj = resourceEvent.NewObject
}
return (m.Resource == nil || *m.Resource == resourceEvent.Resource) &&
(m.EventType == nil || *m.EventType == resourceEvent.EventType) &&
(m.ObjectNamespace == "" || m.ObjectNamespace == obj.GetNamespace()) &&
(m.ObjectName == "" || m.ObjectName == obj.GetName())
})
return found
})
if found && subscription.ReconcileFunc != nil {
subscription.ReconcileFunc(ctx, resourceEvent, topology)
}
}
28 changes: 5 additions & 23 deletions controller/topology.go → controller/topology_builder.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package controller

import (
"sync"

"github.com/samber/lo"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -11,27 +9,21 @@ import (
"github.com/kuadrant/policy-machinery/machinery"
)

func NewGatewayAPITopology(policyKinds, objectKinds []schema.GroupKind, objectLinks []RuntimeLinkFunc) *GatewayAPITopology {
return &GatewayAPITopology{
topology: machinery.NewTopology(),
func newGatewayAPITopologyBuilder(policyKinds, objectKinds []schema.GroupKind, objectLinks []RuntimeLinkFunc) *gatewayAPITopologyBuilder {
return &gatewayAPITopologyBuilder{
policyKinds: policyKinds,
objectKinds: objectKinds,
objectLinks: objectLinks,
}
}

type GatewayAPITopology struct {
mu sync.RWMutex
topology *machinery.Topology
type gatewayAPITopologyBuilder struct {
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []RuntimeLinkFunc
}

func (t *GatewayAPITopology) Refresh(objs Store) {
t.mu.Lock()
defer t.mu.Unlock()

func (t *gatewayAPITopologyBuilder) Build(objs Store) *machinery.Topology {
gatewayClasses := lo.FilterMap(lo.Values(objs[schema.GroupKind{Group: gwapiv1.GroupVersion.Group, Kind: "GatewayClass"}]), func(obj RuntimeObject, _ int) (*gwapiv1.GatewayClass, bool) {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
Expand Down Expand Up @@ -102,17 +94,7 @@ func (t *GatewayAPITopology) Refresh(objs Store) {
opts = append(opts, machinery.WithGatewayAPITopologyObjects(objects...))
}

t.topology = machinery.NewGatewayAPITopology(opts...)
}

func (t *GatewayAPITopology) Get() *machinery.Topology {
t.mu.RLock()
defer t.mu.RUnlock()
if t.topology == nil {
return nil
}
topology := *t.topology
return &topology
return machinery.NewGatewayAPITopology(opts...)
}

type Object struct {
Expand Down
40 changes: 40 additions & 0 deletions controller/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package controller

import (
"context"
"sync"

"github.com/kuadrant/policy-machinery/machinery"
)

// Workflow runs an optional precondition reconciliation function, then dispatches the reconciliation event to
// a list of concurrent reconciliation tasks, and runs an optional postcondition reconciliation function.
type Workflow struct {
Precondition CallbackFunc
Tasks []CallbackFunc
Postcondition CallbackFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvent ResourceEvent, topology *machinery.Topology) {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvent, topology)
}

// dispatch the event to concurrent tasks
funcs := d.Tasks
waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(funcs))
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvent, topology)
}()
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvent, topology)
}
}
6 changes: 6 additions & 0 deletions examples/kuadrant/apis/v1alpha2/dnspolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package v1alpha2

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gwapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/kuadrant/policy-machinery/machinery"

kuadrantapis "github.com/kuadrant/policy-machinery/examples/kuadrant/apis"
)

var (
DNSPolicyKind = schema.GroupKind{Group: SchemeGroupVersion.Group, Kind: "DNSPolicy"}
DNSPoliciesResource = SchemeGroupVersion.WithResource("dnspolicies")
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:metadata:labels="gateway.networking.k8s.io/policy=inherited"
Expand Down
6 changes: 6 additions & 0 deletions examples/kuadrant/apis/v1alpha2/tlspolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import (
certmanv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
certmanmetav1 "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gwapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/kuadrant/policy-machinery/machinery"

kuadrantapis "github.com/kuadrant/policy-machinery/examples/kuadrant/apis"
)

var (
TLSPolicyKind = schema.GroupKind{Group: SchemeGroupVersion.Group, Kind: "TLSPolicy"}
TLSPoliciesResource = SchemeGroupVersion.WithResource("tlspolicies")
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:metadata:labels="gateway.networking.k8s.io/policy=inherited"
Expand Down
6 changes: 6 additions & 0 deletions examples/kuadrant/apis/v1beta3/authpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import (

authorinov1beta2 "github.com/kuadrant/authorino/api/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gwapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/kuadrant/policy-machinery/machinery"

kuadrantapis "github.com/kuadrant/policy-machinery/examples/kuadrant/apis"
)

var (
AuthPolicyKind = schema.GroupKind{Group: SchemeGroupVersion.Group, Kind: "AuthPolicy"}
AuthPoliciesResource = SchemeGroupVersion.WithResource("authpolicies")
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:metadata:labels="gateway.networking.k8s.io/policy=inherited"
Expand Down
6 changes: 6 additions & 0 deletions examples/kuadrant/apis/v1beta3/ratelimitpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import (
"encoding/json"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gwapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/kuadrant/policy-machinery/machinery"

kuadrantapis "github.com/kuadrant/policy-machinery/examples/kuadrant/apis"
)

var (
RateLimitPolicyKind = schema.GroupKind{Group: SchemeGroupVersion.Group, Kind: "RateLimitPolicy"}
RateLimitPoliciesResource = SchemeGroupVersion.WithResource("ratelimitpolicies")
)

const (
EqualOperator WhenConditionOperator = "eq"
NotEqualOperator WhenConditionOperator = "neq"
Expand Down
Loading