Skip to content

Commit

Permalink
- Add resourceMapper.VpcAssociationPolicyToGateway() method
Browse files Browse the repository at this point in the history
- Add vpcAssociationPolicyEventHandler.MapToGateway() method
- Make gateway_controller optionally watches the `VpcAssociationPolicy`
  • Loading branch information
Zijun Wang committed Sep 18, 2023
1 parent 2385943 commit aaffb2a
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 59 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ go.work*
**/envFile

# IDE files/directories
.idea/
.idea/

# gomock generated prog.go
pkg/aws/services/gomock_reflect_*
28 changes: 17 additions & 11 deletions cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"flag"
"os"

"github.com/go-logr/zapr"

"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/go-logr/zapr"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -34,18 +35,20 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"

"github.com/aws/aws-application-networking-k8s/controllers"
//+kubebuilder:scaffold:imports
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/external-dns/endpoint"
gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gateway_api_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/aws/aws-application-networking-k8s/controllers"

//+kubebuilder:scaffold:imports
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
)

var (
Expand All @@ -70,12 +73,15 @@ func addOptionalCRDs(scheme *runtime.Scheme) {
scheme.AddKnownTypes(dnsEndpoint, &endpoint.DNSEndpoint{}, &endpoint.DNSEndpointList{})
metav1.AddToGroupVersion(scheme, dnsEndpoint)

targetGroupPolicy := schema.GroupVersion{
Group: "application-networking.k8s.aws",
awsGatewayControllerCRDGroupVersion := schema.GroupVersion{
Group: v1alpha1.GroupName,
Version: "v1alpha1",
}
scheme.AddKnownTypes(targetGroupPolicy, &v1alpha1.TargetGroupPolicy{}, &v1alpha1.TargetGroupPolicyList{})
metav1.AddToGroupVersion(scheme, targetGroupPolicy)
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.TargetGroupPolicy{}, &v1alpha1.TargetGroupPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.VpcAssociationPolicy{}, &v1alpha1.VpcAssociationPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)
}

func main() {
Expand Down
104 changes: 68 additions & 36 deletions controllers/eventhandlers/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package eventhandlers

import (
"context"
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

type resourceMapper struct {
Expand All @@ -21,16 +24,16 @@ type resourceMapper struct {
}

const (
coreGroupName = "" // empty means core by definition
serviceKind = "Service"
serviceImportKind = "ServiceImport"
gatewayKind = "Gateway"
)

func (r *resourceMapper) ServiceToRoutes(ctx context.Context, svc *corev1.Service, routeType core.RouteType) []core.Route {
if svc == nil {
return nil
}
return r.backendRefToRoutes(ctx, svc, coreGroupName, serviceKind, routeType)
return r.backendRefToRoutes(ctx, svc, corev1.GroupName, serviceKind, routeType)
}

func (r *resourceMapper) ServiceImportToRoutes(ctx context.Context, svc *mcs_api.ServiceImport, routeType core.RouteType) []core.Route {
Expand Down Expand Up @@ -63,49 +66,78 @@ func (r *resourceMapper) EndpointsToService(ctx context.Context, ep *corev1.Endp
}

func (r *resourceMapper) TargetGroupPolicyToService(ctx context.Context, tgp *v1alpha1.TargetGroupPolicy) *corev1.Service {
if tgp == nil {
return nil
}
policyName := k8s.NamespacedName(tgp).String()
return policyToTargetRefObj(r, ctx, tgp, &corev1.Service{})
}

targetRef := tgp.Spec.TargetRef
if targetRef == nil {
r.log.Infow("TargetGroupPolicy does not have targetRef, skipping",
"policyName", policyName)
return nil
}
if targetRef.Group != coreGroupName || targetRef.Kind != serviceKind {
r.log.Infow("Detected non-Service TargetGroupPolicy attachment, skipping",
"policyName", policyName, "targetRef", targetRef)
return nil
}
namespace := tgp.Namespace
if targetRef.Namespace != nil && namespace != string(*targetRef.Namespace) {
r.log.Infow("Detected cross namespace TargetGroupPolicy attachment, skipping",
"policyName", policyName, "targetRef", targetRef)
return nil
func (r *resourceMapper) VpcAssociationPolicyToGateway(ctx context.Context, vap *v1alpha1.VpcAssociationPolicy) *gateway_api.Gateway {
return policyToTargetRefObj(r, ctx, vap, &gateway_api.Gateway{})
}

func policyToTargetRefObj[T client.Object](r *resourceMapper, ctx context.Context, policy core.Policy, retObj T) T {
null := *new(T)
if policy == nil {
return null
}
policyNamespacedName := policy.GetNamespacedName()

svcName := types.NamespacedName{
Namespace: namespace,
targetRef := policy.GetTargetRef()
if targetRef == nil {
r.log.Infow("Policy does not have targetRef, skipping",
"policyName", policyNamespacedName)
return null
}
expectedGroup, expectedKind, err := k8sResourceTypeToGroupNameAndKindName(retObj)
if err != nil {
r.log.Errorw("Failed to get expected GroupVersion of targetRefObj",
"policyName", policyNamespacedName, "reason", err.Error())
return null
}

if targetRef.Group != gateway_api.Group(expectedGroup) || targetRef.Kind != gateway_api.Kind(expectedKind) {
r.log.Infow("Detected targetRef GroupVersion and expected retObj GroupVersion are different, skipping",
"policyName", policyNamespacedName,
"targetRef", targetRef,
"expectedGroup", expectedGroup,
"expectedKind", expectedKind)
return null
}
if targetRef.Namespace != nil && policyNamespacedName.Namespace != string(*targetRef.Namespace) {
r.log.Infow("Detected Policy and TargetRef namespace are different, skipping",
"policyName", policyNamespacedName, "targetRef", targetRef,
"policyNamespace", policyNamespacedName.Namespace)
return null
}

key := types.NamespacedName{
Namespace: policyNamespacedName.Namespace,
Name: string(targetRef.Name),
}
svc := &corev1.Service{}
if err := r.client.Get(ctx, svcName, svc); err != nil {
if err := r.client.Get(ctx, key, retObj); err != nil {
if errors.IsNotFound(err) {
r.log.Debugw("TargetGroupPolicy is referring to non-existent service, skipping",
"policyName", policyName, "serviceName", svcName.String())
r.log.Debugw("Policy is referring to a non-existent targetRefObj, skipping",
"policyName", policyNamespacedName, "targetRef", targetRef)
} else {
// Still gracefully skipping the event but errors other than NotFound are bad sign.
r.log.Errorw("Failed to query targetRef of TargetGroupPolicy",
"policyName", policyName, "serviceName", svcName.String(), "reason", err.Error())
"policyName", policyNamespacedName, "targetRef", targetRef, "reason", err.Error())
}
return nil
return null
}
r.log.Debugw("TargetGroupPolicy change on Service detected",
"policyName", policyName, "serviceName", svcName.String())
"policyName", policyNamespacedName, "targetRef", targetRef)

return svc
return retObj
}

func k8sResourceTypeToGroupNameAndKindName(obj client.Object) (string, string, error) {
switch obj.(type) {
case *corev1.Service:
return corev1.GroupName, serviceKind, nil
case *gateway_api.Gateway:
return gateway_api.GroupName, gatewayKind, nil
default:
return "", "", fmt.Errorf("un-registered obj type: %T", obj)
}
}

func (r *resourceMapper) backendRefToRoutes(ctx context.Context, obj client.Object, group, kind string, routeType core.RouteType) []core.Route {
Expand Down
112 changes: 107 additions & 5 deletions controllers/eventhandlers/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package eventhandlers
import (
"context"
"errors"
mock_client "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client"
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
"testing"

mock_client "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client"
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

func createHTTPRoute(name, namespace string, backendRef gateway_api.BackendObjectReference) gateway_api.HTTPRoute {
Expand Down Expand Up @@ -194,3 +196,103 @@ func TestTargetGroupPolicyToService(t *testing.T) {
}
}
}

func TestVpcAssociationPolicyToGateway(t *testing.T) {
c := gomock.NewController(t)
defer c.Finish()

ns1 := "default"
ns2 := "non-default"

testCases := []struct {
testCaseName string
namespace string
targetKind gateway_api.Kind
targetNamespace *gateway_api.Namespace
gatewayFound bool
expectSuccess bool
}{
{
testCaseName: "namespace not match",
namespace: ns1,
targetKind: "Gateway",
targetNamespace: (*gateway_api.Namespace)(&ns2),
expectSuccess: false,
},
{
testCaseName: "targetKind not match scenario 1",
namespace: ns1,
targetKind: "NotGateway",
targetNamespace: (*gateway_api.Namespace)(&ns1),
expectSuccess: false,
},
{
testCaseName: "targetKind not match scenario 2",
namespace: ns1,
targetKind: "Service",
targetNamespace: (*gateway_api.Namespace)(&ns1),
expectSuccess: false,
},
{
testCaseName: "gateway not found",
namespace: ns1,
targetKind: "Gateway",
targetNamespace: (*gateway_api.Namespace)(&ns1),
gatewayFound: false,
expectSuccess: false,
},
{
testCaseName: "gateway found, targetRef namespace match",
namespace: ns1,
targetKind: "Gateway",
targetNamespace: (*gateway_api.Namespace)(&ns1),
gatewayFound: true,
expectSuccess: true,
},
{
testCaseName: "gateway found, targetRef namespace not defined",
namespace: ns1,
targetKind: "Gateway",
targetNamespace: nil,
gatewayFound: true,
expectSuccess: true,
},
}

for _, tt := range testCases {
mockClient := mock_client.NewMockClient(c)
mapper := &resourceMapper{log: gwlog.FallbackLogger, client: mockClient}
if tt.gatewayFound {
mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
} else {
mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("fail")).AnyTimes()
}
var targetRefGroupName string
if tt.targetKind == "Gateway" {
targetRefGroupName = gateway_api.GroupName
} else if tt.targetKind == "Service" {
targetRefGroupName = corev1.GroupName
}

gw := mapper.VpcAssociationPolicyToGateway(context.Background(), &v1alpha1.VpcAssociationPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "test--vpc-association-policy",
Namespace: tt.namespace,
},

Spec: v1alpha1.VpcAssociationPolicySpec{
TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{
Group: gateway_api.Group(targetRefGroupName),
Kind: tt.targetKind,
Name: "test-gw",
Namespace: tt.targetNamespace,
},
},
})
if tt.expectSuccess {
assert.NotNil(t, gw)
} else {
assert.Nil(t, gw)
}
}
}
35 changes: 35 additions & 0 deletions controllers/eventhandlers/vpcAssociationPolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package eventhandlers

import (
"context"

"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type vpcAssociationPolicyEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewVpcAssociationPolicyEventHandler(log gwlog.Logger, client client.Client) *vpcAssociationPolicyEventHandler {
return &vpcAssociationPolicyEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *vpcAssociationPolicyEventHandler) MapToGateway() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
if vap, ok := obj.(*v1alpha1.VpcAssociationPolicy); ok {
if gw := h.mapper.VpcAssociationPolicyToGateway(context.Background(), vap); gw != nil {
return []reconcile.Request{{NamespacedName: k8s.NamespacedName(gw)}}
}
}
return nil
})
}
Loading

0 comments on commit aaffb2a

Please sign in to comment.