Skip to content

Commit

Permalink
Added Support for Updating Access Log Policies With Gateway and Route…
Browse files Browse the repository at this point in the history
… Events (#450)

* Added support for updating Access Log Policy

* Added access log policy handling for gateway and route events

---------

Co-authored-by: Shawn Kaplan <[email protected]>
  • Loading branch information
xWink and Shawn Kaplan authored Oct 25, 2023
1 parent 9ca9939 commit de8994c
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 13 deletions.
45 changes: 44 additions & 1 deletion controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"reflect"

"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
Expand All @@ -30,7 +31,10 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand Down Expand Up @@ -91,7 +95,10 @@ func RegisterAccessLogPolicyController(
}

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1beta1.Gateway{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1beta1.HTTPRoute{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))

return builder.Complete(r)
}
Expand Down Expand Up @@ -315,3 +322,39 @@ func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus(

return nil
}

func (r *accessLogPolicyReconciler) findImpactedAccessLogPolicies(eventObj client.Object) []reconcile.Request {
listOptions := &client.ListOptions{
Namespace: eventObj.GetNamespace(),
}

alps := &anv1alpha1.AccessLogPolicyList{}
err := r.client.List(context.TODO(), alps, listOptions)
if err != nil {
r.log.Errorf("Failed to list all Access Log Policies, %s", err)
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 0, len(alps.Items))
for _, alp := range alps.Items {
if string(alp.Spec.TargetRef.Name) != eventObj.GetName() {
continue
}

targetRefKind := string(alp.Spec.TargetRef.Kind)
eventObjKind := reflect.TypeOf(eventObj).Elem().Name()
if targetRefKind != eventObjKind {
continue
}

r.log.Debugf("Adding Access Log Policy %s/%s to queue due to %s event", alp.Namespace, alp.Name, targetRefKind)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: alp.Namespace,
Name: alp.Name,
},
})
}

return requests
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

const (
AccessLogPolicyKind = "AccessLogPolicy"
AccessLogSubscriptionAnnotationKey = "VpcLatticeAccessLogSubscription"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/deploy/lattice/access_log_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *defaultAccessLogSubscriptionManager) Update(
case *vpclattice.AccessDeniedException:
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ResourceNotFoundException:
return nil, services.NewInvalidError(e.Message())
return m.Create(ctx, accessLogSubscription)
default:
return nil, err
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (m *defaultAccessLogSubscriptionManager) Update(
if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" {
return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName)
}
return nil, services.NewInvalidError(e.Message())
return m.Create(ctx, accessLogSubscription)
case *vpclattice.ConflictException:
/*
* A conflict can happen when the destination type of the new ALS is different from the original.
Expand Down
20 changes: 14 additions & 6 deletions pkg/deploy/lattice/access_log_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,40 +397,48 @@ func TestAccessLogSubscriptionManager(t *testing.T) {
assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn)
})

t.Run("Update_ALSDoesNotExistOnGet_ReturnsInvalidError", func(t *testing.T) {
t.Run("Update_ALSDoesNotExistOnGet_CreatesNewALS", func(t *testing.T) {
newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new"
accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent)
accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{
Arn: accessLogSubscriptionArn,
}
getALSError := &vpclattice.ResourceNotFoundException{
ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"),
}
createALSOutput.Arn = aws.String(newAccessLogSubscriptionArn)

mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(nil, getALSError)
mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil)
mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil)

mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud)
resp, err := mgr.Update(ctx, accessLogSubscription)
assert.Nil(t, resp)
assert.True(t, services.IsInvalidError(err))
assert.Nil(t, err)
assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn)
})

t.Run("Update_ALSDoesNotExistOnUpdate_ReturnsInvalidError", func(t *testing.T) {
t.Run("Update_ALSDoesNotExistOnUpdate_CreatesNewALS", func(t *testing.T) {
newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new"
accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent)
accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{
Arn: accessLogSubscriptionArn,
}
updateALSError := &vpclattice.ResourceNotFoundException{
ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"),
}
createALSOutput.Arn = aws.String(newAccessLogSubscriptionArn)

mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil)
mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil)
mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError)
mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil)
mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil)

mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud)
resp, err := mgr.Update(ctx, accessLogSubscription)
assert.Nil(t, resp)
assert.True(t, services.IsInvalidError(err))
assert.Nil(t, err)
assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn)
})

t.Run("Update_AccessDeniedExceptionReceivedOnGet_ReturnsInvalidError", func(t *testing.T) {
Expand Down
138 changes: 136 additions & 2 deletions test/suites/integration/access_log_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
var _ = Describe("Access Log Policy", Ordered, func() {
const (
k8sResourceName = "test-access-log-policy"
k8sResource2Name = "test-access-log-policy-secondary"
k8sResourceName2 = "test-access-log-policy-secondary"
bucketName = "k8s-test-lattice-bucket"
logGroupName = "k8s-test-lattice-log-group"
logGroup2Name = "k8s-test-lattice-log-group-secondary"
Expand Down Expand Up @@ -912,7 +912,7 @@ var _ = Describe("Access Log Policy", Ordered, func() {
// Create second Access Log Policy for original destination
accessLogPolicy2 := &anv1alpha1.AccessLogPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: k8sResource2Name,
Name: k8sResourceName2,
Namespace: k8snamespace,
},
Spec: anv1alpha1.AccessLogPolicySpec{
Expand Down Expand Up @@ -1113,6 +1113,140 @@ var _ = Describe("Access Log Policy", Ordered, func() {
}).Should(Succeed())
})

It("status is updated when targetRef is deleted and recreated", func() {
// Create HTTPRoute, Service, and Deployment
deployment, k8sService := testFramework.NewNginxApp(test.ElasticSearchOptions{
Name: k8sResourceName2,
Namespace: k8snamespace,
})
route := testFramework.NewHttpRoute(testGateway, k8sService, "Service")
route.Name = "test-access-log-policies"
testFramework.ExpectCreated(ctx, route, deployment, k8sService)

// Delete HTTPRoute, Service, and Deployment
defer func() {
testFramework.ExpectDeleted(ctx, route)
testFramework.SleepForRouteDeletion()
testFramework.ExpectDeletedThenNotFound(ctx, route, k8sService, deployment)
}()

accessLogPolicy := &anv1alpha1.AccessLogPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: k8sResourceName,
Namespace: k8snamespace,
},
Spec: anv1alpha1.AccessLogPolicySpec{
DestinationArn: aws.String(bucketArn),
TargetRef: &gwv1alpha2.PolicyTargetReference{
Group: gwv1beta1.GroupName,
Kind: "HTTPRoute",
Name: gwv1alpha2.ObjectName(route.Name),
Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)),
},
},
}
testFramework.ExpectCreated(ctx, accessLogPolicy)
expectedGeneration := 1
alpNamespacedName := types.NamespacedName{
Name: accessLogPolicy.Name,
Namespace: accessLogPolicy.Namespace,
}

latticeService := testFramework.GetVpcLatticeService(ctx, core.NewHTTPRoute(*route))

Eventually(func(g Gomega) {
// VPC Lattice Service should have an Access Log Subscription
output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{
ResourceIdentifier: latticeService.Arn,
})
g.Expect(err).To(BeNil())
g.Expect(len(output.Items)).To(BeEquivalentTo(1))
}).Should(Succeed())

// Delete HTTPRoute
testFramework.ExpectDeleted(ctx, route)
testFramework.SleepForRouteDeletion()
testFramework.ExpectDeletedThenNotFound(ctx, route)

Eventually(func(g Gomega) {
// Policy status should be TargetNotFound
alp := &anv1alpha1.AccessLogPolicy{}
err := testFramework.Client.Get(ctx, alpNamespacedName, alp)
g.Expect(err).To(BeNil())
g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1))
g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted)))
g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse))
g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration))
g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound)))
}).Should(Succeed())

// Recreate HTTPRoute
route = testFramework.NewHttpRoute(testGateway, k8sService, "Service")
route.Name = "test-access-log-policies"
testFramework.ExpectCreated(ctx, route)

var originalALSArn string
Eventually(func(g Gomega) {
// Policy status should be Accepted
alp := &anv1alpha1.AccessLogPolicy{}
err := testFramework.Client.Get(ctx, alpNamespacedName, alp)
g.Expect(err).To(BeNil())
g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1))
g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted)))
g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue))
g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration))
g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted)))
originalALSArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]
}).Should(Succeed())

// Delete HTTPRoute
testFramework.ExpectDeleted(ctx, route)
testFramework.SleepForRouteDeletion()
testFramework.ExpectDeletedThenNotFound(ctx, route)

// Change ALP destination type
alp := &anv1alpha1.AccessLogPolicy{}
err := testFramework.Client.Get(ctx, alpNamespacedName, alp)
Expect(err).To(BeNil())
alp.Spec.DestinationArn = aws.String(logGroupArn)
testFramework.ExpectUpdated(ctx, alp)
expectedGeneration = expectedGeneration + 1

Eventually(func(g Gomega) {
// Policy status should be TargetNotFound
alp := &anv1alpha1.AccessLogPolicy{}
err := testFramework.Client.Get(ctx, alpNamespacedName, alp)
g.Expect(err).To(BeNil())
g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1))
g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted)))
g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse))
g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration))
g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound)))
}).Should(Succeed())

// Recreate HTTPRoute
route = testFramework.NewHttpRoute(testGateway, k8sService, "Service")
route.Name = "test-access-log-policies"
testFramework.ExpectCreated(ctx, route)

var newALSArn string
Eventually(func(g Gomega) {
// Policy status should be Accepted
alp := &anv1alpha1.AccessLogPolicy{}
err := testFramework.Client.Get(ctx, alpNamespacedName, alp)
g.Expect(err).To(BeNil())
g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1))
g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted)))
g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue))
g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration))
g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted)))

// Changing destination type should have resulted in ALS replacement
newALSArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]
g.Expect(newALSArn).ToNot(BeEquivalentTo(originalALSArn))
}).Should(Succeed())
})

AfterEach(func() {
// Delete Access Log Policies in test namespace
alps := &anv1alpha1.AccessLogPolicyList{}
Expand Down
4 changes: 3 additions & 1 deletion test/suites/integration/vpc_association_policy_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package integration

import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/vpclattice"
Expand Down Expand Up @@ -130,7 +132,7 @@ var _ = Describe("Test vpc association policy", Ordered, func() {
associated, _, err := testFramework.IsVpcAssociatedWithServiceNetwork(ctx, test.CurrentClusterVpcId, testServiceNetwork)
g.Expect(err).To(BeNil())
g.Expect(associated).To(BeTrue())
}).Should(Succeed())
}).WithTimeout(5 * time.Minute).Should(Succeed())

// Clean up the vpc association policy
testFramework.ExpectDeletedThenNotFound(ctx, vpcAssociationPolicy)
Expand Down

0 comments on commit de8994c

Please sign in to comment.