Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHandling: Gateway/HTTPRoute/GRPCRoute resource changes enqueue events for attached IAMAuthPolicy/AccessLogPolicy resource #438

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,10 @@ func addOptionalCRDs(scheme *runtime.Scheme) {
Version: "v1alpha1",
}
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.TargetGroupPolicy{}, &anv1alpha1.TargetGroupPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.VpcAssociationPolicy{}, &anv1alpha1.VpcAssociationPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.AccessLogPolicy{}, &anv1alpha1.AccessLogPolicyList{})
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.IAMAuthPolicy{}, &anv1alpha1.IAMAuthPolicyList{})

metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)
}

Expand Down Expand Up @@ -186,6 +184,11 @@ func main() {
setupLog.Fatalf("accesslogpolicy controller setup failed: %s", err)
}

err = controllers.RegisterIAMAuthPolicyController(ctrlLog.Named("iam-auth-policy"), cloud, latticeDataStore, finalizerManager, mgr)
if err != nil {
setupLog.Fatalf("iamauthpolicy controller setup failed: %s", err)
}

go latticestore.GetDefaultLatticeDataStore().ServeIntrospection()

//+kubebuilder:scaffold:builder
Expand Down
13 changes: 12 additions & 1 deletion controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

gwvv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/controllers/eventhandlers"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
Expand Down Expand Up @@ -88,8 +92,15 @@ func RegisterAccessLogPolicyController(
stackMarshaller: stackMarshaller,
}

gatewayEventHandler := eventhandlers.NewGatewayEventHandler(log, mgrClient)
httpRouteEventHandler := eventhandlers.NewHTTPRouteEventHandler(log, mgrClient)
grpcRouteEventHandler := eventhandlers.NewGRPCRouteEventHandler(log, mgrClient)

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.Gateway{}}, gatewayEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.HTTPRoute{}}, httpRouteEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, grpcRouteEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))

return builder.Complete(r)
}
Expand Down
41 changes: 41 additions & 0 deletions controllers/eventhandlers/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/config"
)

// TODO: Remove `enqueueRequestsForGatewayEvent`, and use `gatewayEventHandler` only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we not do this now?

type enqueueRequestsForGatewayEvent struct {
log gwlog.Logger
client client.Client
Expand Down Expand Up @@ -119,3 +121,42 @@ func (h *enqueueRequestsForGatewayEvent) enqueueImpactedRoutes(queue workqueue.R
}
}
}

type gatewayEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewGatewayEventHandler(log gwlog.Logger, client client.Client) *gatewayEventHandler {
return &gatewayEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *gatewayEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if gw, ok := obj.(*gateway_api.Gateway); ok {
policies := h.mapper.GatewayToIAMAuthPolicies(context.Background(), gw)
for _, p := range policies {
h.log.Infof("Gateway [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", gw.Namespace, gw.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *gatewayEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if gw, ok := obj.(*gateway_api.Gateway); ok {
policies := h.mapper.GatewayToAccessLogPolicies(context.Background(), gw)
for _, p := range policies {
h.log.Infof("Gateway [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", gw.Namespace, gw.Name, p.Namespace, p.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use square brackets around the Gateway and Policy namespaced names? We don't do this elsewhere in the codebase

requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
53 changes: 53 additions & 0 deletions controllers/eventhandlers/grpcroute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package eventhandlers

import (
"context"

"sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type grpcRouteEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewGRPCRouteEventHandler(log gwlog.Logger, client client.Client) *grpcRouteEventHandler {
return &grpcRouteEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *grpcRouteEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have a test code for this, easier way to test is extract logic into method level so that you can refer to it outside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have unit tests for GetAttachedPolicies() does it enough?

return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1alpha2.GRPCRoute); ok {
policies := h.mapper.GRPCRouteToIAMAuthPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("GRPCRoute [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *grpcRouteEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1alpha2.GRPCRoute); ok {
policies := h.mapper.GRPCRouteToAccessLogPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("GRPCRoute [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
53 changes: 53 additions & 0 deletions controllers/eventhandlers/httproute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package eventhandlers

import (
"context"

"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type httpRouteEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewHTTPRouteEventHandler(log gwlog.Logger, client client.Client) *httpRouteEventHandler {
return &httpRouteEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *httpRouteEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1beta1.HTTPRoute); ok {
policies := h.mapper.HTTPRouteToIAMAuthPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("HTTPRoute [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *httpRouteEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1beta1.HTTPRoute); ok {
policies := h.mapper.HTTPRouteToAccessLogPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("HTTPRoute [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
33 changes: 33 additions & 0 deletions controllers/eventhandlers/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/gateway"

"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
Expand Down Expand Up @@ -73,6 +76,36 @@ func (r *resourceMapper) VpcAssociationPolicyToGateway(ctx context.Context, vap
return policyToTargetRefObj(r, ctx, vap, &gateway_api.Gateway{})
}

func (r *resourceMapper) GatewayToIAMAuthPolicies(ctx context.Context, gw *gateway_api.Gateway) []*anv1alpha1.IAMAuthPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like a dependency relation from eventhandler to gateway package, I think this means something is wrong. GetAttachedPolicies() will be more suitable for a separate package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will change

policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(gw), &anv1alpha1.IAMAuthPolicy{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there's an error here?

return policies
}

func (r *resourceMapper) GatewayToAccessLogPolicies(ctx context.Context, gw *gateway_api.Gateway) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(gw), &anv1alpha1.AccessLogPolicy{})
return policies
}

func (r *resourceMapper) HTTPRouteToIAMAuthPolicies(ctx context.Context, route *gateway_api.HTTPRoute) []*anv1alpha1.IAMAuthPolicy {
policy, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.IAMAuthPolicy{})
return policy
}

func (r *resourceMapper) HTTPRouteToAccessLogPolicies(ctx context.Context, route *gateway_api.HTTPRoute) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.AccessLogPolicy{})
return policies
}

func (r *resourceMapper) GRPCRouteToIAMAuthPolicies(ctx context.Context, route *gateway_api_v1alpha2.GRPCRoute) []*anv1alpha1.IAMAuthPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.IAMAuthPolicy{})
return policies
}

func (r *resourceMapper) GRPCRouteToAccessLogPolicies(ctx context.Context, route *gateway_api_v1alpha2.GRPCRoute) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.AccessLogPolicy{})
return policies
}

func policyToTargetRefObj[T client.Object](r *resourceMapper, ctx context.Context, policy core.Policy, retObj T) T {
null := *new(T)
if policy == nil {
Expand Down
110 changes: 110 additions & 0 deletions controllers/iamauthpolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2021.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwvv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/controllers/eventhandlers"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/deploy"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

const (
authPolicyFinalizer = "iamauthpolicy.k8s.aws/resources"
)

type authPolicyReconciler struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iamAuthPolicyReconciler

log gwlog.Logger
client client.Client
scheme *runtime.Scheme
finalizerManager k8s.FinalizerManager
eventRecorder record.EventRecorder
cloud aws.Cloud
dataStore *latticestore.LatticeDataStore
stackMarshaller deploy.StackMarshaller
}

func RegisterIAMAuthPolicyController(
log gwlog.Logger,
cloud aws.Cloud,
dataStore *latticestore.LatticeDataStore,
finalizerManager k8s.FinalizerManager,
mgr ctrl.Manager,
) error {
k8sClient := mgr.GetClient()
scheme := mgr.GetScheme()
evtRec := mgr.GetEventRecorderFor("iamauthpolicy")

stackMarshaller := deploy.NewDefaultStackMarshaller()

r := &authPolicyReconciler{
log: log,
client: k8sClient,
scheme: scheme,
finalizerManager: finalizerManager,
eventRecorder: evtRec,
cloud: cloud,
stackMarshaller: stackMarshaller,
dataStore: dataStore,
}

gatewayEventHandler := eventhandlers.NewGatewayEventHandler(log, k8sClient)
httpRouteEventHandler := eventhandlers.NewHTTPRouteEventHandler(log, k8sClient)
grpcRouteEventHandler := eventhandlers.NewGRPCRouteEventHandler(log, k8sClient)

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.IAMAuthPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.Gateway{}}, gatewayEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.HTTPRoute{}}, httpRouteEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, grpcRouteEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
return builder.Complete(r)
}

func (r *authPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Infow("reconcile", "name", req.Name)
recErr := r.reconcile(ctx, req)
res, retryErr := lattice_runtime.HandleReconcileError(recErr)
if res.RequeueAfter != 0 {
r.log.Infow("requeue request", "name", req.Name, "requeueAfter", res.RequeueAfter)
} else if res.Requeue {
r.log.Infow("requeue request", "name", req.Name)
} else if retryErr == nil {
r.log.Infow("reconciled", "name", req.Name)
}
return res, retryErr
}

func (r *authPolicyReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
//TODO: implement reconcile
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (p *AccessLogPolicy) GetNamespacedName() types.NamespacedName {

func (pl *AccessLogPolicyList) GetItems() []core.Policy {
items := make([]core.Policy, len(pl.Items))
for i, item := range pl.Items {
items[i] = &item
for i := range pl.Items {
items[i] = &pl.Items[i]
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous code items[i] = &item has a bug that mess up the item pointers. it must use items[i] = &pl.Items[i]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. What was the bug doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous way &item could cause this scenario:

if pl.Items actually have {httproute-1, httproute-2}
the GetItems() could return {&httproute-2, &httproute-2}

return items
}
Loading