From de8994c8ffbab778f64ab9fe26f3e82f0bdfa6da Mon Sep 17 00:00:00 2001 From: xWink <45508693+xWink@users.noreply.github.com> Date: Tue, 24 Oct 2023 17:36:17 -0700 Subject: [PATCH] Added Support for Updating Access Log Policies With Gateway and Route Events (#450) * Added support for updating Access Log Policy * Added access log policy handling for gateway and route events --------- Co-authored-by: Shawn Kaplan --- controllers/accesslogpolicy_controller.go | 45 +++++- .../v1alpha1/accesslogpolicy_types.go | 1 - .../access_log_subscription_manager.go | 4 +- .../access_log_subscription_manager_test.go | 20 ++- .../integration/access_log_policy_test.go | 138 +++++++++++++++++- .../vpc_association_policy_test.go | 4 +- 6 files changed, 199 insertions(+), 13 deletions(-) diff --git a/controllers/accesslogpolicy_controller.go b/controllers/accesslogpolicy_controller.go index 8d5f3669..ef996b7e 100644 --- a/controllers/accesslogpolicy_controller.go +++ b/controllers/accesslogpolicy_controller.go @@ -19,6 +19,7 @@ package controllers import ( "context" "fmt" + "reflect" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" @@ -30,7 +31,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -91,7 +95,10 @@ func RegisterAccessLogPolicyController( } builder := ctrl.NewControllerManagedBy(mgr). - For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})) + For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&source.Kind{Type: &gwv1beta1.Gateway{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&source.Kind{Type: &gwv1beta1.HTTPRoute{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, handler.EnqueueRequestsFromMapFunc(r.findImpactedAccessLogPolicies), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})) return builder.Complete(r) } @@ -315,3 +322,39 @@ func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus( return nil } + +func (r *accessLogPolicyReconciler) findImpactedAccessLogPolicies(eventObj client.Object) []reconcile.Request { + listOptions := &client.ListOptions{ + Namespace: eventObj.GetNamespace(), + } + + alps := &anv1alpha1.AccessLogPolicyList{} + err := r.client.List(context.TODO(), alps, listOptions) + if err != nil { + r.log.Errorf("Failed to list all Access Log Policies, %s", err) + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, 0, len(alps.Items)) + for _, alp := range alps.Items { + if string(alp.Spec.TargetRef.Name) != eventObj.GetName() { + continue + } + + targetRefKind := string(alp.Spec.TargetRef.Kind) + eventObjKind := reflect.TypeOf(eventObj).Elem().Name() + if targetRefKind != eventObjKind { + continue + } + + r.log.Debugf("Adding Access Log Policy %s/%s to queue due to %s event", alp.Namespace, alp.Name, targetRefKind) + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: alp.Namespace, + Name: alp.Name, + }, + }) + } + + return requests +} diff --git a/pkg/apis/applicationnetworking/v1alpha1/accesslogpolicy_types.go b/pkg/apis/applicationnetworking/v1alpha1/accesslogpolicy_types.go index 52c18b6d..1ea9c0a2 100644 --- a/pkg/apis/applicationnetworking/v1alpha1/accesslogpolicy_types.go +++ b/pkg/apis/applicationnetworking/v1alpha1/accesslogpolicy_types.go @@ -10,7 +10,6 @@ import ( ) const ( - AccessLogPolicyKind = "AccessLogPolicy" AccessLogSubscriptionAnnotationKey = "VpcLatticeAccessLogSubscription" ) diff --git a/pkg/deploy/lattice/access_log_subscription_manager.go b/pkg/deploy/lattice/access_log_subscription_manager.go index efb644c2..d0a713d2 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager.go +++ b/pkg/deploy/lattice/access_log_subscription_manager.go @@ -129,7 +129,7 @@ func (m *defaultAccessLogSubscriptionManager) Update( case *vpclattice.AccessDeniedException: return nil, services.NewInvalidError(e.Message()) case *vpclattice.ResourceNotFoundException: - return nil, services.NewInvalidError(e.Message()) + return m.Create(ctx, accessLogSubscription) default: return nil, err } @@ -161,7 +161,7 @@ func (m *defaultAccessLogSubscriptionManager) Update( if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) } - return nil, services.NewInvalidError(e.Message()) + return m.Create(ctx, accessLogSubscription) case *vpclattice.ConflictException: /* * A conflict can happen when the destination type of the new ALS is different from the original. diff --git a/pkg/deploy/lattice/access_log_subscription_manager_test.go b/pkg/deploy/lattice/access_log_subscription_manager_test.go index 8241f8c9..89aa4b0b 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_test.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_test.go @@ -397,7 +397,8 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) }) - t.Run("Update_ALSDoesNotExistOnGet_ReturnsInvalidError", func(t *testing.T) { + t.Run("Update_ALSDoesNotExistOnGet_CreatesNewALS", func(t *testing.T) { + newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new" accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ Arn: accessLogSubscriptionArn, @@ -405,16 +406,20 @@ func TestAccessLogSubscriptionManager(t *testing.T) { getALSError := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } + createALSOutput.Arn = aws.String(newAccessLogSubscriptionArn) mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(nil, getALSError) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Update(ctx, accessLogSubscription) - assert.Nil(t, resp) - assert.True(t, services.IsInvalidError(err)) + assert.Nil(t, err) + assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) }) - t.Run("Update_ALSDoesNotExistOnUpdate_ReturnsInvalidError", func(t *testing.T) { + t.Run("Update_ALSDoesNotExistOnUpdate_CreatesNewALS", func(t *testing.T) { + newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new" accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ Arn: accessLogSubscriptionArn, @@ -422,15 +427,18 @@ func TestAccessLogSubscriptionManager(t *testing.T) { updateALSError := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } + createALSOutput.Arn = aws.String(newAccessLogSubscriptionArn) mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Update(ctx, accessLogSubscription) - assert.Nil(t, resp) - assert.True(t, services.IsInvalidError(err)) + assert.Nil(t, err) + assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) }) t.Run("Update_AccessDeniedExceptionReceivedOnGet_ReturnsInvalidError", func(t *testing.T) { diff --git a/test/suites/integration/access_log_policy_test.go b/test/suites/integration/access_log_policy_test.go index 5707874b..ab569211 100644 --- a/test/suites/integration/access_log_policy_test.go +++ b/test/suites/integration/access_log_policy_test.go @@ -33,7 +33,7 @@ import ( var _ = Describe("Access Log Policy", Ordered, func() { const ( k8sResourceName = "test-access-log-policy" - k8sResource2Name = "test-access-log-policy-secondary" + k8sResourceName2 = "test-access-log-policy-secondary" bucketName = "k8s-test-lattice-bucket" logGroupName = "k8s-test-lattice-log-group" logGroup2Name = "k8s-test-lattice-log-group-secondary" @@ -912,7 +912,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { // Create second Access Log Policy for original destination accessLogPolicy2 := &anv1alpha1.AccessLogPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: k8sResource2Name, + Name: k8sResourceName2, Namespace: k8snamespace, }, Spec: anv1alpha1.AccessLogPolicySpec{ @@ -1113,6 +1113,140 @@ var _ = Describe("Access Log Policy", Ordered, func() { }).Should(Succeed()) }) + It("status is updated when targetRef is deleted and recreated", func() { + // Create HTTPRoute, Service, and Deployment + deployment, k8sService := testFramework.NewNginxApp(test.ElasticSearchOptions{ + Name: k8sResourceName2, + Namespace: k8snamespace, + }) + route := testFramework.NewHttpRoute(testGateway, k8sService, "Service") + route.Name = "test-access-log-policies" + testFramework.ExpectCreated(ctx, route, deployment, k8sService) + + // Delete HTTPRoute, Service, and Deployment + defer func() { + testFramework.ExpectDeleted(ctx, route) + testFramework.SleepForRouteDeletion() + testFramework.ExpectDeletedThenNotFound(ctx, route, k8sService, deployment) + }() + + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "HTTPRoute", + Name: gwv1alpha2.ObjectName(route.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + expectedGeneration := 1 + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + + latticeService := testFramework.GetVpcLatticeService(ctx, core.NewHTTPRoute(*route)) + + Eventually(func(g Gomega) { + // VPC Lattice Service should have an Access Log Subscription + output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: latticeService.Arn, + }) + g.Expect(err).To(BeNil()) + g.Expect(len(output.Items)).To(BeEquivalentTo(1)) + }).Should(Succeed()) + + // Delete HTTPRoute + testFramework.ExpectDeleted(ctx, route) + testFramework.SleepForRouteDeletion() + testFramework.ExpectDeletedThenNotFound(ctx, route) + + Eventually(func(g Gomega) { + // Policy status should be TargetNotFound + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound))) + }).Should(Succeed()) + + // Recreate HTTPRoute + route = testFramework.NewHttpRoute(testGateway, k8sService, "Service") + route.Name = "test-access-log-policies" + testFramework.ExpectCreated(ctx, route) + + var originalALSArn string + Eventually(func(g Gomega) { + // Policy status should be Accepted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + originalALSArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + }).Should(Succeed()) + + // Delete HTTPRoute + testFramework.ExpectDeleted(ctx, route) + testFramework.SleepForRouteDeletion() + testFramework.ExpectDeletedThenNotFound(ctx, route) + + // Change ALP destination type + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(logGroupArn) + testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 + + Eventually(func(g Gomega) { + // Policy status should be TargetNotFound + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound))) + }).Should(Succeed()) + + // Recreate HTTPRoute + route = testFramework.NewHttpRoute(testGateway, k8sService, "Service") + route.Name = "test-access-log-policies" + testFramework.ExpectCreated(ctx, route) + + var newALSArn string + Eventually(func(g Gomega) { + // Policy status should be Accepted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + + // Changing destination type should have resulted in ALS replacement + newALSArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + g.Expect(newALSArn).ToNot(BeEquivalentTo(originalALSArn)) + }).Should(Succeed()) + }) + AfterEach(func() { // Delete Access Log Policies in test namespace alps := &anv1alpha1.AccessLogPolicyList{} diff --git a/test/suites/integration/vpc_association_policy_test.go b/test/suites/integration/vpc_association_policy_test.go index 1b3f6fa9..f5e0b214 100644 --- a/test/suites/integration/vpc_association_policy_test.go +++ b/test/suites/integration/vpc_association_policy_test.go @@ -1,6 +1,8 @@ package integration import ( + "time" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/vpclattice" @@ -130,7 +132,7 @@ var _ = Describe("Test vpc association policy", Ordered, func() { associated, _, err := testFramework.IsVpcAssociatedWithServiceNetwork(ctx, test.CurrentClusterVpcId, testServiceNetwork) g.Expect(err).To(BeNil()) g.Expect(associated).To(BeTrue()) - }).Should(Succeed()) + }).WithTimeout(5 * time.Minute).Should(Succeed()) // Clean up the vpc association policy testFramework.ExpectDeletedThenNotFound(ctx, vpcAssociationPolicy)