From 23c8e928712a1be995f5b9125ac78aa807661bb5 Mon Sep 17 00:00:00 2001 From: Sergio Franco Date: Tue, 10 Oct 2023 11:17:23 +0100 Subject: [PATCH] feat: Watch policies configured by Gateway params --- ...eway-controller.clusterserviceversion.yaml | 11 +- cmd/controller/main.go | 29 ++++- .../cluster-config/configmap.yaml | 7 +- config/rbac/role.yaml | 9 ++ pkg/_internal/slice/predicates.go | 7 ++ pkg/_internal/slice/slice.go | 10 ++ pkg/controllers/gateway/gateway_controller.go | 67 ++++++++++- pkg/controllers/gateway/params.go | 18 +++ pkg/policysync/eventhandler.go | 82 +++++++++++++ pkg/policysync/policy.go | 52 ++++++++ pkg/policysync/policy_test.go | 108 +++++++++++++++++ pkg/policysync/reflect.go | 100 ++++++++++++++++ pkg/policysync/runnable.go | 51 ++++++++ pkg/policysync/syncer.go | 27 +++++ pkg/policysync/unstructured.go | 113 ++++++++++++++++++ 15 files changed, 685 insertions(+), 6 deletions(-) create mode 100644 pkg/_internal/slice/predicates.go create mode 100644 pkg/policysync/eventhandler.go create mode 100644 pkg/policysync/policy.go create mode 100644 pkg/policysync/policy_test.go create mode 100644 pkg/policysync/reflect.go create mode 100644 pkg/policysync/runnable.go create mode 100644 pkg/policysync/syncer.go create mode 100644 pkg/policysync/unstructured.go diff --git a/bundle/manifests/multicluster-gateway-controller.clusterserviceversion.yaml b/bundle/manifests/multicluster-gateway-controller.clusterserviceversion.yaml index dbbed6b45..ef6b0b06d 100644 --- a/bundle/manifests/multicluster-gateway-controller.clusterserviceversion.yaml +++ b/bundle/manifests/multicluster-gateway-controller.clusterserviceversion.yaml @@ -4,7 +4,7 @@ metadata: annotations: alm-examples: '[]' capabilities: Basic Install - createdAt: "2023-09-21T16:23:26Z" + createdAt: "2023-10-10T15:03:19Z" operators.operatorframework.io/builder: operator-sdk-v1.28.0 operators.operatorframework.io/project_layout: go.kubebuilder.io/v3 name: multicluster-gateway-controller.v0.0.0 @@ -292,6 +292,15 @@ spec: - get - patch - update + - apiGroups: + - kuadrant.io + resources: + - authpolicies + - ratelimitpolicies + verbs: + - get + - list + - watch - apiGroups: - kuadrant.io resources: diff --git a/cmd/controller/main.go b/cmd/controller/main.go index b40166131..62be13a10 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -26,10 +26,15 @@ import ( clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta1" workv1 "open-cluster-management.io/api/work/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log" @@ -48,6 +53,7 @@ import ( "github.com/Kuadrant/multicluster-gateway-controller/pkg/dns/dnsprovider" "github.com/Kuadrant/multicluster-gateway-controller/pkg/health" "github.com/Kuadrant/multicluster-gateway-controller/pkg/placement" + "github.com/Kuadrant/multicluster-gateway-controller/pkg/policysync" //+kubebuilder:scaffold:imports ) @@ -176,10 +182,27 @@ func main() { os.Exit(1) } + dynamicClient := dynamic.NewForConfigOrDie(mgr.GetConfig()) + dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( + dynamicClient, + 0, + corev1.NamespaceAll, + nil, + ) + + policyInformersManager := policysync.NewPolicyInformersManager(dynamicInformerFactory) + if err := policyInformersManager.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to start policy informers manager") + os.Exit(1) + } + if err = (&gateway.GatewayReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Placement: placer, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Placement: placer, + PolicyInformersManager: policyInformersManager, + DynamicClient: dynamicClient, + WatchedPolicies: map[schema.GroupVersionResource]cache.ResourceEventHandlerRegistration{}, }).SetupWithManager(mgr, ctx); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Gateway") os.Exit(1) diff --git a/config/quick-start/control-cluster/cluster-config/configmap.yaml b/config/quick-start/control-cluster/cluster-config/configmap.yaml index e38aed51b..abc21e947 100644 --- a/config/quick-start/control-cluster/cluster-config/configmap.yaml +++ b/config/quick-start/control-cluster/cluster-config/configmap.yaml @@ -4,4 +4,9 @@ metadata: name: gateway-params namespace: multi-cluster-gateways data: - downstreamClass: "istio" \ No newline at end of file + params: | + { + "policiesToSync": [ + { "group": "kuadrant.io", "version": "v1beta1", "resource": "authpolicies" } + ] + } \ No newline at end of file diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index a8ed95f9a..ac80d56c7 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -124,6 +124,15 @@ rules: - get - patch - update +- apiGroups: + - kuadrant.io + resources: + - authpolicies + - ratelimitpolicies + verbs: + - get + - list + - watch - apiGroups: - kuadrant.io resources: diff --git a/pkg/_internal/slice/predicates.go b/pkg/_internal/slice/predicates.go new file mode 100644 index 000000000..7e8c0d228 --- /dev/null +++ b/pkg/_internal/slice/predicates.go @@ -0,0 +1,7 @@ +package slice + +func EqualsTo[T comparable](x T) func(T) bool { + return func(y T) bool { + return x == y + } +} diff --git a/pkg/_internal/slice/slice.go b/pkg/_internal/slice/slice.go index 26c12e40d..4e10ba0c4 100644 --- a/pkg/_internal/slice/slice.go +++ b/pkg/_internal/slice/slice.go @@ -58,6 +58,16 @@ func Filter[T any](slice []T, predicate func(T) bool) []T { return result } +func Map[T, R any](slice []T, f func(T) R) []R { + result := make([]R, len(slice)) + + for i, elem := range slice { + result[i] = f(elem) + } + + return result +} + func MapErr[T, R any](slice []T, f func(T) (R, error)) ([]R, error) { result := make([]R, len(slice)) diff --git a/pkg/controllers/gateway/gateway_controller.go b/pkg/controllers/gateway/gateway_controller.go index c5c3085c1..2ff2abe7e 100644 --- a/pkg/controllers/gateway/gateway_controller.go +++ b/pkg/controllers/gateway/gateway_controller.go @@ -31,8 +31,10 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -51,6 +53,7 @@ import ( "github.com/Kuadrant/multicluster-gateway-controller/pkg/_internal/slice" "github.com/Kuadrant/multicluster-gateway-controller/pkg/apis/v1alpha1" "github.com/Kuadrant/multicluster-gateway-controller/pkg/dns" + "github.com/Kuadrant/multicluster-gateway-controller/pkg/policysync" ) const ( @@ -86,11 +89,17 @@ type GatewayPlacer interface { // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;delete // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="kuadrant.io",resources=authpolicies;ratelimitpolicies,verbs=get;list;watch + // GatewayReconciler reconciles a Gateway object type GatewayReconciler struct { client.Client Scheme *runtime.Scheme Placement GatewayPlacer + // SharedInformerFactory dynamicinformer.DynamicSharedInformerFactory + PolicyInformersManager *policysync.PolicyInformersManager + DynamicClient dynamic.Interface + WatchedPolicies map[schema.GroupVersionResource]cache.ResourceEventHandlerRegistration } func isDeleting(g *gatewayv1beta1.Gateway) bool { @@ -367,7 +376,8 @@ func (r *GatewayReconciler) getTLSSecrets(ctx context.Context, upstreamGateway * return tlsSecrets, listenerTLSErr } -func (r *GatewayReconciler) reconcileParams(_ context.Context, gateway *gatewayv1beta1.Gateway, params *Params) error { +func (r *GatewayReconciler) reconcileParams(ctx context.Context, gateway *gatewayv1beta1.Gateway, params *Params) error { + log := crlog.FromContext(ctx) downstreamClass := params.GetDownstreamClass() @@ -375,6 +385,61 @@ func (r *GatewayReconciler) reconcileParams(_ context.Context, gateway *gatewayv gateway.Spec.GatewayClassName = gatewayv1beta1.ObjectName(downstreamClass) + policiesToSync := slice.Map(params.PoliciesToSync, ParamsGroupVersionResource.ToGroupVersionResource) + + for _, gvr := range policiesToSync { + // If it's already watched skip it + _, ok := r.WatchedPolicies[gvr] + if ok { + continue + } + + log.Info("Creating event handler for policy", "gvr", gvr) + + // Add the event handler for the policy + eventHandler := &policysync.ResourceEventHandler{ + Log: log, + GVR: gvr, + Client: r.Client, + DynamicClient: r.DynamicClient, + Gateway: gateway, + Syncer: &policysync.FakeSyncer{}, + } + informer := r.PolicyInformersManager.InformerFactory.ForResource(gvr).Informer() + reg, err := informer.AddEventHandler(eventHandler) + if err != nil { + return err + } + + // Start the informer + if err := r.PolicyInformersManager.AddInformer(informer); err != nil { + return err + } + + // Keep track of the watched policy + r.WatchedPolicies[gvr] = reg + } + + // Stop watching policies if they're removed from the params + policiesToUnwatch := []schema.GroupVersionResource{} + for gvr, reg := range r.WatchedPolicies { + if slice.Contains(policiesToSync, slice.EqualsTo(gvr)) { + continue + } + + log.Info("Stopping watch for policy", "gvr", gvr) + + if err := r.PolicyInformersManager.InformerFactory.ForResource(gvr).Informer().RemoveEventHandler(reg); err != nil { + return err + } + + policiesToUnwatch = append(policiesToUnwatch, gvr) + } + + for _, gvr := range policiesToUnwatch { + delete(r.WatchedPolicies, gvr) + } + return nil } diff --git a/pkg/controllers/gateway/params.go b/pkg/controllers/gateway/params.go index e54904e86..c69f26b99 100644 --- a/pkg/controllers/gateway/params.go +++ b/pkg/controllers/gateway/params.go @@ -17,6 +17,24 @@ type Params struct { // DownstreamClass specifies what GatewayClassName to set in the // downstream clusters. For example: DownstreamClass string `json:"downstreamClass,omitempty"` + + // PoliciesToSync specifies a listof Policy GVRs that will be watched + // in the hub and synced to the spokes + PoliciesToSync []ParamsGroupVersionResource `json:"policiesToSync,omitempty"` +} + +type ParamsGroupVersionResource struct { + Group string `json:"group"` + Version string `json:"version"` + Resource string `json:"resource"` +} + +func (gvr ParamsGroupVersionResource) ToGroupVersionResource() schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: gvr.Group, + Version: gvr.Version, + Resource: gvr.Resource, + } } func (p *Params) GetDownstreamClass() string { diff --git a/pkg/policysync/eventhandler.go b/pkg/policysync/eventhandler.go new file mode 100644 index 000000000..ecb626738 --- /dev/null +++ b/pkg/policysync/eventhandler.go @@ -0,0 +1,82 @@ +package policysync + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" +) + +type ResourceEventHandler struct { + Log logr.Logger + GVR schema.GroupVersionResource + Client client.Client + DynamicClient dynamic.Interface + Gateway *gatewayv1beta1.Gateway + + Syncer Syncer +} + +var _ cache.ResourceEventHandler = &ResourceEventHandler{} + +func (h *ResourceEventHandler) OnAdd(reqObj interface{}) { + h.Log.Info("Got watch event for policy", "obj", reqObj) + + ctx := context.Background() + + obj, ok := reqObj.(client.Object) + if !ok { + h.Log.Error(fmt.Errorf("object %v does not inplement client.Object", reqObj), "") + return + } + + if err := h.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil { + h.Log.Error(err, "failed to get object", "object", obj) + } + + policy, err := NewPolicyFor(obj) + if err != nil { + h.Log.Error(err, "failed to build policy from watched object", "object", obj) + return + } + + if err := h.Syncer.SyncPolicy(context.Background(), h.Client, policy); err != nil { + h.Log.Error(err, "failed to sync policy", "policy", policy) + } +} + +func (h *ResourceEventHandler) OnDelete(obj interface{}) { + h.Log.Info("Got watch event for policy", "obj", obj) +} + +func (h *ResourceEventHandler) OnUpdate(_ interface{}, reqObj interface{}) { + h.Log.Info("Got watch event for policy", "obj", reqObj) + + ctx := context.Background() + + obj, ok := reqObj.(client.Object) + if !ok { + h.Log.Error(fmt.Errorf("object %v does not inplement client.Object", reqObj), "") + return + } + + if err := h.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil { + h.Log.Error(err, "failed to get object", "object", obj) + } + + policy, err := NewPolicyFor(obj) + if err != nil { + h.Log.Error(err, "failed to build policy from watched object", "object", obj) + return + } + + if err := h.Syncer.SyncPolicy(context.Background(), h.Client, policy); err != nil { + h.Log.Error(err, "failed to sync policy", "policy", policy) + } +} diff --git a/pkg/policysync/policy.go b/pkg/policysync/policy.go new file mode 100644 index 000000000..2964bc73e --- /dev/null +++ b/pkg/policysync/policy.go @@ -0,0 +1,52 @@ +package policysync + +import ( + "errors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" +) + +type Policy interface { + metav1.Object + + // GetTargetRef returns a copy of the TargetRef field of the policy. + // + // Mutating the return value of this function doesn't change the original + // policy. Use SetTargetRef or UpdateTargetRef for that + GetTargetRef() *gatewayapiv1alpha2.PolicyTargetReference + + // SetTargetRef replaces the TargetRef field of the policy with targetRef + SetTargetRef(targetRef *gatewayapiv1alpha2.PolicyTargetReference) + + // UpdateTargetRef mutates the TargetRef field of the policy by applying + // update() to it + UpdateTargetRef(update func(*gatewayapiv1alpha2.PolicyTargetReference)) + + // IsValidPolicy validates that the object is a valid Gateway policy + IsValidPolicy() error +} + +// NewPolicyFor attempts to create a Policy instance for obj, or returns an +// error if the object is not a valid Gateway Policy +func NewPolicyFor(obj interface{}) (Policy, error) { + if _, ok := obj.(metav1.Object); !ok { + return nil, errors.New("object doesn't implement metav1.Object interface") + } + + var policy Policy + + switch typedObj := obj.(type) { + case *unstructured.Unstructured: + policy = &UnstructuredPolicy{Unstructured: typedObj} + default: + policy = &ReflectPolicy{Object: obj.(metav1.Object)} + } + + if err := policy.IsValidPolicy(); err != nil { + return nil, err + } + + return policy, nil +} diff --git a/pkg/policysync/policy_test.go b/pkg/policysync/policy_test.go new file mode 100644 index 000000000..ec7082cec --- /dev/null +++ b/pkg/policysync/policy_test.go @@ -0,0 +1,108 @@ +package policysync + +import ( + "testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayapiv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/Kuadrant/multicluster-gateway-controller/pkg/apis/v1alpha1" +) + +func TestReflectPolicy(t *testing.T) { + policy := &v1alpha1.DNSPolicy{ + Spec: v1alpha1.DNSPolicySpec{ + TargetRef: gatewayapiv1alpha2.PolicyTargetReference{ + Group: gatewayapiv1beta1.Group("test.io"), + Kind: gatewayapiv1beta1.Kind("Test"), + Name: gatewayapiv1beta1.ObjectName("test"), + }, + }, + } + + reflectPolicy := &ReflectPolicy{ + Object: policy, + } + + if err := reflectPolicy.IsValidPolicy(); err != nil { + t.Fatalf("expectd policy to be valid, but failed with %v", err) + } + + targetRef := reflectPolicy.GetTargetRef() + if string(targetRef.Group) != "test.io" { + t.Fatalf("expected targetRef.Group to be test.io, got %s", targetRef.Group) + } + if string(targetRef.Kind) != "Test" { + t.Fatalf("expected targetRef.Kind to be Test, got %s", targetRef.Kind) + } + if string(targetRef.Name) != "test" { + t.Fatalf("expected targetRef.Kind to be test, got %s", targetRef.Name) + } + + reflectPolicy.UpdateTargetRef(func(targetRef *gatewayapiv1alpha2.PolicyTargetReference) { + namespace := gatewayapiv1beta1.Namespace("default") + name := "changed-name" + + targetRef.Name = gatewayapiv1beta1.ObjectName(name) + targetRef.Namespace = &namespace + }) + + if string(policy.Spec.TargetRef.Name) != "changed-name" { + t.Errorf("expected targetRef.Name to be changed-name, got %s", policy.Spec.TargetRef.Name) + } + if string(*policy.Spec.TargetRef.Namespace) != "default" { + t.Errorf("expected targetRef.Namespace to be default, got %s", *policy.Spec.TargetRef.Namespace) + } +} + +func TestUnstructuredPolicy(t *testing.T) { + policy := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "targetRef": map[string]interface{}{ + "name": "test", + "kind": "Test", + "group": "test.io", + }, + }, + }, + } + + unstructuredPolicy := &UnstructuredPolicy{ + Unstructured: policy, + } + + if err := unstructuredPolicy.IsValidPolicy(); err != nil { + t.Fatalf("expectd policy to be valid, but failed with %v", err) + } + + targetRef := unstructuredPolicy.GetTargetRef() + if string(targetRef.Group) != "test.io" { + t.Fatalf("expected targetRef.Group to be test.io, got %s", targetRef.Group) + } + if string(targetRef.Kind) != "Test" { + t.Fatalf("expected targetRef.Kind to be Test, got %s", targetRef.Kind) + } + if string(targetRef.Name) != "test" { + t.Fatalf("expected targetRef.Kind to be test, got %s", targetRef.Name) + } + + unstructuredPolicy.UpdateTargetRef(func(targetRef *gatewayapiv1alpha2.PolicyTargetReference) { + namespace := gatewayapiv1beta1.Namespace("default") + name := "changed-name" + + targetRef.Name = gatewayapiv1beta1.ObjectName(name) + targetRef.Namespace = &namespace + }) + + actualName := policy.Object["spec"].(map[string]interface{})["targetRef"].(map[string]interface{})["name"].(string) + if actualName != "changed-name" { + t.Errorf("expected targetRef.Name to be changed-name, got %s", actualName) + } + + actualNamespace := policy.Object["spec"].(map[string]interface{})["targetRef"].(map[string]interface{})["namespace"].(*string) + if *actualNamespace != "default" { + t.Errorf("expected targetRef.Namespace to be default, got %s", *actualNamespace) + } +} diff --git a/pkg/policysync/reflect.go b/pkg/policysync/reflect.go new file mode 100644 index 000000000..41526a5a6 --- /dev/null +++ b/pkg/policysync/reflect.go @@ -0,0 +1,100 @@ +package policysync + +import ( + "errors" + "fmt" + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" +) + +const ( + PolicyTargetReferencePath = "sigs.k8s.io/gateway-api/apis/v1alpha2/PolicyTargetReference" +) + +type ReflectPolicy struct { + metav1.Object + + policy *GenericPolicy +} + +var _ Policy = &ReflectPolicy{} + +type GenericPolicy struct { + TargetRef *gatewayapiv1alpha2.PolicyTargetReference +} + +func (p *ReflectPolicy) GetTargetRef() *gatewayapiv1alpha2.PolicyTargetReference { + if p.policy == nil { + policy := p.buildPolicy() + p.policy = &policy + } + + return p.policy.TargetRef +} + +func (p *ReflectPolicy) SetTargetRef(targetRef *gatewayapiv1alpha2.PolicyTargetReference) { + obj := reflect.ValueOf(p.Object).Elem() + + specValue := obj.FieldByName("Spec") + targetRefValue := specValue.FieldByName("TargetRef") + + var valueToSet reflect.Value + if targetRefValue.Kind() == reflect.Struct { + valueToSet = reflect.ValueOf(*targetRef) + } else if targetRefValue.Kind() == reflect.Pointer { + valueToSet = reflect.ValueOf(targetRef) + } + + targetRefValue.Set(valueToSet) + + p.policy.TargetRef = targetRef +} + +func (p *ReflectPolicy) UpdateTargetRef(update func(*gatewayapiv1alpha2.PolicyTargetReference)) { + targetRef := p.GetTargetRef() + if targetRef == nil { + return + } + + update(targetRef) + + p.SetTargetRef(targetRef) +} + +func (p *ReflectPolicy) buildPolicy() GenericPolicy { + obj := reflect.ValueOf(p.Object).Elem() + + specValue := obj.FieldByName("Spec") + targetRefValue := specValue.FieldByName("TargetRef") + + if targetRefValue.Kind() == reflect.Struct { + targetRefValue = targetRefValue.Addr() + } + + return GenericPolicy{ + TargetRef: targetRefValue.Interface().(*gatewayapiv1alpha2.PolicyTargetReference), + } +} + +func (p *ReflectPolicy) IsValidPolicy() error { + objType := reflect.TypeOf(p.Object) + specType, ok := objType.Elem().FieldByName("Spec") + if !ok { + return errors.New("field .Spec missing from object") + } + + targetRefType, ok := specType.Type.FieldByName("TargetRef") + if !ok { + return errors.New("field .Spec.TargetRef missing from object") + } + + typeAndPkg := fmt.Sprintf("%s/%s", targetRefType.Type.PkgPath(), targetRefType.Type.Name()) + + if typeAndPkg != PolicyTargetReferencePath { + return fmt.Errorf("type of .Spec.TargetRef %s not valid. Expected %s", typeAndPkg, PolicyTargetReferencePath) + } + + return nil +} diff --git a/pkg/policysync/runnable.go b/pkg/policysync/runnable.go new file mode 100644 index 000000000..afb13f74f --- /dev/null +++ b/pkg/policysync/runnable.go @@ -0,0 +1,51 @@ +package policysync + +import ( + "context" + + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type PolicyInformersManager struct { + manager manager.Manager + + InformerFactory dynamicinformer.DynamicSharedInformerFactory +} + +func NewPolicyInformersManager(informerFactory dynamicinformer.DynamicSharedInformerFactory) *PolicyInformersManager { + return &PolicyInformersManager{ + InformerFactory: informerFactory, + } +} + +func (p *PolicyInformersManager) SetupWithManager(mgr manager.Manager) error { + p.manager = mgr + return p.manager.Add(p) +} + +func (p *PolicyInformersManager) Start(ctx context.Context) error { + done := make(chan struct{}) + + p.InformerFactory.Start(done) + p.InformerFactory.WaitForCacheSync(done) + + err := <-ctx.Done() + done <- err + + return nil +} + +func (p *PolicyInformersManager) AddInformer(informer cache.SharedIndexInformer) error { + return p.manager.Add(&InformerRunnable{Informer: informer}) +} + +type InformerRunnable struct { + Informer cache.SharedIndexInformer +} + +func (r *InformerRunnable) Start(ctx context.Context) error { + r.Informer.Run(ctx.Done()) + return nil +} diff --git a/pkg/policysync/syncer.go b/pkg/policysync/syncer.go new file mode 100644 index 000000000..d6c6747de --- /dev/null +++ b/pkg/policysync/syncer.go @@ -0,0 +1,27 @@ +package policysync + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + crlog "sigs.k8s.io/controller-runtime/pkg/log" +) + +type Syncer interface { + SyncPolicy(ctx context.Context, apiclient client.Client, policy Policy) error +} + +type FakeSyncer struct { +} + +var _ Syncer = &FakeSyncer{} + +// SyncPolicy implements Syncer. +func (*FakeSyncer) SyncPolicy(ctx context.Context, apiclient client.Client, policy Policy) error { + log := crlog.FromContext(ctx) + + targetRef := policy.GetTargetRef() + log.Info("Syncing policy", "policy", policy, "targetRef", targetRef) + + return nil +} diff --git a/pkg/policysync/unstructured.go b/pkg/policysync/unstructured.go new file mode 100644 index 000000000..bb8648a62 --- /dev/null +++ b/pkg/policysync/unstructured.go @@ -0,0 +1,113 @@ +package policysync + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayapiv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" +) + +type UnstructuredPolicy struct { + *unstructured.Unstructured +} + +var _ Policy = &UnstructuredPolicy{} + +func (p *UnstructuredPolicy) GetTargetRef() *gatewayapiv1alpha2.PolicyTargetReference { + targetRef := p.Object["spec"].(map[string]interface{})["targetRef"].(map[string]interface{}) + if targetRef == nil { + return nil + } + + var namespace *gatewayapiv1beta1.Namespace + if targetRef["namespace"] != nil { + ns := gatewayapiv1beta1.Namespace(targetRef["namespace"].(string)) + namespace = &ns + } + + return &gatewayapiv1alpha2.PolicyTargetReference{ + Group: gatewayapiv1beta1.Group(targetRef["group"].(string)), + Kind: gatewayapiv1beta1.Kind(targetRef["kind"].(string)), + Name: gatewayapiv1beta1.ObjectName(targetRef["name"].(string)), + Namespace: namespace, + } +} + +func (p *UnstructuredPolicy) SetTargetRef(targetRef *gatewayapiv1alpha2.PolicyTargetReference) { + var namespace *string + if targetRef.Namespace != nil { + ns := string(*targetRef.Namespace) + namespace = &ns + } + + asObject := map[string]interface{}{ + "group": string(targetRef.Group), + "kind": string(targetRef.Kind), + "name": string(targetRef.Name), + "namespace": namespace, + } + + spec := p.Object["spec"].(map[string]interface{}) + spec["targetRef"] = asObject +} + +func (p *UnstructuredPolicy) UpdateTargetRef(update func(*gatewayapiv1alpha2.PolicyTargetReference)) { + targetRef := p.GetTargetRef() + if targetRef == nil { + return + } + + update(targetRef) + + p.SetTargetRef(targetRef) +} + +func (p *UnstructuredPolicy) IsValidPolicy() error { + spec, ok := p.Object["spec"] + if !ok { + return errors.New("object missing .spec field") + } + + specMap, ok := spec.(map[string]interface{}) + if !ok { + return fmt.Errorf("expected .spec to be map[string]interface{} but got %v", spec) + } + + targetRef, ok := specMap["targetRef"] + if !ok { + return errors.New("object missing .spec.targetRef field") + } + + targetRefMap, ok := targetRef.(map[string]interface{}) + if !ok { + return fmt.Errorf("expected .spec.targetRef to be map[string]interface{} but got %v", targetRef) + } + + if err := validateMapContains[string]("name", targetRefMap); err != nil { + return err + } + if err := validateMapContains[string]("group", targetRefMap); err != nil { + return err + } + if err := validateMapContains[string]("kind", targetRefMap); err != nil { + return err + } + + return nil +} + +func validateMapContains[T any](k string, m map[string]interface{}) error { + value, ok := m[k] + if !ok { + return fmt.Errorf("field %s missing", k) + } + + _, ok = value.(T) + if !ok { + return fmt.Errorf("invalid type of field %s %v", k, value) + } + + return nil +}