From de378390d5daf83c38adfa6c86b881d14c272949 Mon Sep 17 00:00:00 2001 From: Shawn Kaplan Date: Thu, 19 Oct 2023 09:28:53 -0700 Subject: [PATCH 1/2] Added support for updating Access Log Policy --- controllers/accesslogpolicy_controller.go | 2 +- .../access_log_subscription_manager.go | 138 +++++--- .../access_log_subscription_manager_mock.go | 15 + .../access_log_subscription_manager_test.go | 217 ++++++++++++- .../access_log_subscription_synthesizer.go | 7 +- ...ccess_log_subscription_synthesizer_test.go | 56 +++- .../integration/access_log_policy_test.go | 301 +++++++++++++++++- 7 files changed, 670 insertions(+), 66 deletions(-) diff --git a/controllers/accesslogpolicy_controller.go b/controllers/accesslogpolicy_controller.go index 3bac0f10..32e9d3ca 100644 --- a/controllers/accesslogpolicy_controller.go +++ b/controllers/accesslogpolicy_controller.go @@ -272,7 +272,7 @@ func (r *accessLogPolicyReconciler) updateAccessLogPolicyAnnotations( } for _, als := range accessLogSubscriptions { - if als.Spec.EventType == core.CreateEvent { + if als.Spec.EventType != core.DeleteEvent { oldAlp := alp.DeepCopy() if alp.ObjectMeta.Annotations == nil { alp.ObjectMeta.Annotations = make(map[string]string) diff --git a/pkg/deploy/lattice/access_log_subscription_manager.go b/pkg/deploy/lattice/access_log_subscription_manager.go index 465b095c..c6d2c76f 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager.go +++ b/pkg/deploy/lattice/access_log_subscription_manager.go @@ -18,6 +18,7 @@ import ( type AccessLogSubscriptionManager interface { Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) + Update(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) Delete(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) error } @@ -72,58 +73,101 @@ func (m *defaultAccessLogSubscriptionManager) Create( } createALSOutput, err := vpcLatticeSess.CreateAccessLogSubscriptionWithContext(ctx, createALSInput) - if err != nil { - switch e := err.(type) { - case *vpclattice.AccessDeniedException: - return nil, services.NewInvalidError(e.Message()) - case *vpclattice.ResourceNotFoundException: - if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { - return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) - } - return nil, services.NewInvalidError(e.Message()) - case *vpclattice.ConflictException: - /* - * Conflict may arise if we retry creation due to a failure elsewhere in the controller, - * so we check if the conflicting ALS was created for the same ALP via its tags. - * If it is the same ALP, return success. Else, return ConflictError. - */ - listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ - ResourceIdentifier: &resourceIdentifier, - } - listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) - if err != nil { - return nil, err - } - for _, als := range listALSOutput.Items { - if *als.DestinationArn == accessLogSubscription.Spec.DestinationArn { - listTagsInput := &vpclattice.ListTagsForResourceInput{ - ResourceArn: als.Arn, - } - listTagsOutput, err := vpcLatticeSess.ListTagsForResourceWithContext(ctx, listTagsInput) - if err != nil { - return nil, err - } - value, exists := listTagsOutput.Tags[lattice.AccessLogPolicyTagKey] - if exists && *value == accessLogSubscription.Spec.ALPNamespacedName.String() { - return &lattice.AccessLogSubscriptionStatus{ - Arn: *als.Arn, - }, nil - } + if err == nil { + return &lattice.AccessLogSubscriptionStatus{ + Arn: *createALSOutput.Arn, + }, nil + } + + switch e := err.(type) { + case *vpclattice.AccessDeniedException: + return nil, services.NewInvalidError(e.Message()) + case *vpclattice.ResourceNotFoundException: + if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { + return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) + } + return nil, services.NewInvalidError(e.Message()) + case *vpclattice.ConflictException: + /* + * Conflict may arise if we retry creation due to a failure elsewhere in the controller, + * so we check if the conflicting ALS was created for the same ALP via its tags. + * If it is the same ALP, return success. Else, return ConflictError. + */ + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: &resourceIdentifier, + } + listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + if err != nil { + return nil, err + } + for _, als := range listALSOutput.Items { + if *als.DestinationArn == accessLogSubscription.Spec.DestinationArn { + listTagsInput := &vpclattice.ListTagsForResourceInput{ + ResourceArn: als.Arn, + } + listTagsOutput, err := vpcLatticeSess.ListTagsForResourceWithContext(ctx, listTagsInput) + if err != nil { + return nil, err + } + value, exists := listTagsOutput.Tags[lattice.AccessLogPolicyTagKey] + if exists && *value == accessLogSubscription.Spec.ALPNamespacedName.String() { + return &lattice.AccessLogSubscriptionStatus{ + Arn: *als.Arn, + }, nil } } - return nil, services.NewConflictError( - string(accessLogSubscription.Spec.SourceType), - accessLogSubscription.Spec.SourceName, - e.Message(), - ) - default: - return nil, err } + return nil, services.NewConflictError( + string(accessLogSubscription.Spec.SourceType), + accessLogSubscription.Spec.SourceName, + e.Message(), + ) + default: + return nil, err } +} - return &lattice.AccessLogSubscriptionStatus{ - Arn: *createALSOutput.Arn, - }, nil +func (m *defaultAccessLogSubscriptionManager) Update( + ctx context.Context, + accessLogSubscription *lattice.AccessLogSubscription, +) (*lattice.AccessLogSubscriptionStatus, error) { + vpcLatticeSess := m.cloud.Lattice() + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), + DestinationArn: aws.String(accessLogSubscription.Spec.DestinationArn), + } + updateALSOutput, err := vpcLatticeSess.UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput) + if err == nil { + return &lattice.AccessLogSubscriptionStatus{ + Arn: *updateALSOutput.Arn, + }, nil + } + + switch e := err.(type) { + case *vpclattice.AccessDeniedException: + return nil, services.NewInvalidError(e.Message()) + case *vpclattice.ResourceNotFoundException: + if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { + return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) + } + return nil, services.NewInvalidError(e.Message()) + case *vpclattice.ConflictException: + /* + * A conflict can happen when the destination type of the new ALS is different from the original. + * To gracefully handle this, we create a new ALS with the new destination, then delete the old one. + */ + alsStatus, err := m.Create(ctx, accessLogSubscription) + if err != nil { + return nil, err + } + err = m.Delete(ctx, accessLogSubscription) + if err != nil { + return nil, err + } + return alsStatus, nil + default: + return nil, err + } } func (m *defaultAccessLogSubscriptionManager) Delete( diff --git a/pkg/deploy/lattice/access_log_subscription_manager_mock.go b/pkg/deploy/lattice/access_log_subscription_manager_mock.go index bec48823..2aa7675b 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_mock.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_mock.go @@ -63,3 +63,18 @@ func (mr *MockAccessLogSubscriptionManagerMockRecorder) Delete(arg0, arg1 interf mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockAccessLogSubscriptionManager)(nil).Delete), arg0, arg1) } + +// Update mocks base method. +func (m *MockAccessLogSubscriptionManager) Update(arg0 context.Context, arg1 *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", arg0, arg1) + ret0, _ := ret[0].(*lattice.AccessLogSubscriptionStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Update indicates an expected call of Update. +func (mr *MockAccessLogSubscriptionManagerMockRecorder) Update(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockAccessLogSubscriptionManager)(nil).Update), arg0, arg1) +} diff --git a/pkg/deploy/lattice/access_log_subscription_manager_test.go b/pkg/deploy/lattice/access_log_subscription_manager_test.go index caa2c1c3..e1c9ca1e 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_test.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_test.go @@ -26,7 +26,6 @@ const ( cloudWatchDestinationArn = "arn:aws:logs:us-west-2:123456789012:log-group:test:*" firehoseDestinationArn = "arn:aws:firehose:us-west-2:123456789012:deliverystream/test" accessLogSubscriptionArn = "arn:aws:vpc-lattice:us-west-2:123456789012:accesslogsubscription/als-12345678901234567" - accessLogSubscriptionId = "als-12345678901234567" ) var accessLogPolicyNamespacedName = types.NamespacedName{ @@ -44,7 +43,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { lattice.AccessLogPolicyTagKey: aws.String(accessLogPolicyNamespacedName.String()), }) - t.Run("Create_NewAccessLogSubscriptionForServiceNetwork_ReturnsSuccess", func(t *testing.T) { + t.Run("Create_NewALSForServiceNetwork_ReturnsNewALSStatus", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -67,7 +66,6 @@ func TestAccessLogSubscriptionManager(t *testing.T) { } createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ Arn: aws.String(accessLogSubscriptionArn), - Id: aws.String(accessLogSubscriptionId), } mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) @@ -79,7 +77,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.Equal(t, accessLogSubscriptionArn, resp.Arn) }) - t.Run("Create_NewAccessLogSubscriptionForService_ReturnsSuccess", func(t *testing.T) { + t.Run("Create_NewALSForService_ReturnsNewALSStatus", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceSourceType, @@ -101,7 +99,6 @@ func TestAccessLogSubscriptionManager(t *testing.T) { } createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ Arn: aws.String(accessLogSubscriptionArn), - Id: aws.String(accessLogSubscriptionId), } mockLattice.EXPECT().FindService(ctx, serviceNameProvider).Return(findServiceOutput, nil) @@ -113,7 +110,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.Equal(t, accessLogSubscriptionArn, resp.Arn) }) - t.Run("Create_NewAccessLogSubscriptionForDeletedServiceNetwork_ReturnsNotFoundError", func(t *testing.T) { + t.Run("Create_NewALSForDeletedServiceNetwork_ReturnsNotFoundError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -148,7 +145,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsNotFoundError(err)) }) - t.Run("Create_NewAccessLogSubscriptionForDeletedService_ReturnsNotFoundError", func(t *testing.T) { + t.Run("Create_NewALSForDeletedService_ReturnsNotFoundError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceSourceType, @@ -182,7 +179,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsNotFoundError(err)) }) - t.Run("Create_NewAccessLogSubscriptionForMissingS3Destination_ReturnsInvalidError", func(t *testing.T) { + t.Run("Create_NewALSForMissingS3Destination_ReturnsInvalidError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -217,7 +214,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsInvalidError(err)) }) - t.Run("Create_NewAccessLogSubscriptionForMissingCloudWatchDestination_ReturnsInvalidError", func(t *testing.T) { + t.Run("Create_NewALSForMissingCloudWatchDestination_ReturnsInvalidError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -252,7 +249,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsInvalidError(err)) }) - t.Run("Create_NewAccessLogSubscriptionForMissingFirehoseDestination_ReturnsInvalidError", func(t *testing.T) { + t.Run("Create_NewALSForMissingFirehoseDestination_ReturnsInvalidError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -287,7 +284,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsInvalidError(err)) }) - t.Run("Create_ConflictingAccessLogSubscriptionForSameResourceFromDifferentPolicy_ReturnsConflictError", func(t *testing.T) { + t.Run("Create_ConflictingALSForSameResourceFromDifferentPolicy_ReturnsConflictError", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -342,7 +339,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsConflictError(err)) }) - t.Run("Create_ConflictingAccessLogSubscriptionForSameResourceFromSamePolicy_ReturnsSuccess", func(t *testing.T) { + t.Run("Create_ConflictingALSForSameResourceFromSamePolicy_ReturnsNewALSStatus", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, @@ -438,6 +435,200 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsNotFoundError(err)) }) + t.Run("Update_ALSWithSameDestinationType_UpdatesALSAndReturnsSuccess", func(t *testing.T) { + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSOutput := &vpclattice.UpdateAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + } + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(updateALSOutput, nil) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, err) + assert.Equal(t, accessLogSubscriptionArn, resp.Arn) + }) + + t.Run("Update_ALSWithDifferentDestinationType_CreatesNewALSThenDeletesOldALSAndReturnsNewALSStatus", func(t *testing.T) { + newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new" + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSErr := &vpclattice.ConflictException{ + ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), + } + serviceNetworkInfo := &services.ServiceNetworkInfo{ + SvcNetwork: vpclattice.ServiceNetworkSummary{ + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + Tags: expectedTags, + } + createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ + Arn: aws.String(newAccessLogSubscriptionArn), + } + deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + } + deleteALSOutput := &vpclattice.DeleteAccessLogSubscriptionOutput{} + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSErr) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) + mockLattice.EXPECT().DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput).Return(deleteALSOutput, nil) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, err) + assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) + }) + + t.Run("Update_ALSDoesNotExist_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSError := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), + } + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) + }) + + t.Run("Update_AccessDeniedExceptionReceived_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSError := &vpclattice.AccessDeniedException{} + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) + }) + + t.Run("Update_ServiceNetworkDoesNotExist_ReturnsNotFoundError", func(t *testing.T) { + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSError := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("SERVICE_NETWORK"), + } + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) + }) + + t.Run("Update_ServiceDoesNotExist_ReturnsNotFoundError", func(t *testing.T) { + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: core.UpdateEvent, + }, + Status: &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + }, + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSError := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("SERVICE"), + } + + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) + }) + t.Run("Test_Delete_AccessLogSubscriptionExists_ReturnsSuccess", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ @@ -463,7 +654,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.Nil(t, err) }) - t.Run("Test_Delete_AccessLogSubscriptionDoesNotExist_ReturnsSuccess", func(t *testing.T) { + t.Run("Delete_ALSDoesNotExist_ReturnsSuccess", func(t *testing.T) { accessLogSubscription := &lattice.AccessLogSubscription{ Spec: lattice.AccessLogSubscriptionSpec{ SourceType: lattice.ServiceNetworkSourceType, diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer.go b/pkg/deploy/lattice/access_log_subscription_synthesizer.go index c49e9fcd..d9e3d316 100644 --- a/pkg/deploy/lattice/access_log_subscription_synthesizer.go +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer.go @@ -49,8 +49,11 @@ func (s *accessLogSubscriptionSynthesizer) Synthesize(ctx context.Context) error als.Status = alsStatus case core.UpdateEvent: s.log.Debugf("Started updating Access Log Subscription %s", als.ID()) - // TODO - return nil + alsStatus, err := s.accessLogSubscriptionManager.Update(ctx, als) + if err != nil { + return err + } + als.Status = alsStatus case core.DeleteEvent: s.log.Debugf("Started deleting Access Log Subscription %s", als.ID()) if als.Status == nil { diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go index a9d91ec3..f44880c2 100644 --- a/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go @@ -26,7 +26,7 @@ func TestSynthesizeAccessLogSubscription(t *testing.T) { k8sClient := mockclient.NewMockClient(c) builder := gateway.NewAccessLogSubscriptionModelBuilder(gwlog.FallbackLogger, k8sClient) - t.Run("SpecIsNotDeleted_CreatesAccessLogSubscription", func(t *testing.T) { + t.Run("SpecIsCreated_CreatesAccessLogSubscription", func(t *testing.T) { input := &anv1alpha1.AccessLogPolicy{ Spec: anv1alpha1.AccessLogPolicySpec{ DestinationArn: aws.String(s3DestinationArn), @@ -46,7 +46,7 @@ func TestSynthesizeAccessLogSubscription(t *testing.T) { assert.Nil(t, err) }) - t.Run("SpecIsNotDeletedButErrorOccurs_ReturnsError", func(t *testing.T) { + t.Run("SpecIsCreatedButErrorOccurs_ReturnsError", func(t *testing.T) { input := &anv1alpha1.AccessLogPolicy{ Spec: anv1alpha1.AccessLogPolicySpec{ DestinationArn: aws.String(s3DestinationArn), @@ -66,6 +66,58 @@ func TestSynthesizeAccessLogSubscription(t *testing.T) { assert.NotNil(t, err) }) + t.Run("SpecIsUpdated_UpdatesAccessLogSubscription", func(t *testing.T) { + input := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + anv1alpha1.AccessLogSubscriptionAnnotationKey: "arn:aws:vpc-lattice:us-west-2:123456789012:accesslogsubscription/als-12345678901234567", + }, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: "Gateway", + Name: "TestName", + }, + }, + } + + stack, accessLogSubscription, _ := builder.Build(context.Background(), input) + + k8sClient.EXPECT().List(context.Background(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockManager.EXPECT().Update(ctx, accessLogSubscription).Return(&lattice.AccessLogSubscriptionStatus{}, nil).AnyTimes() + + synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) + err := synthesizer.Synthesize(ctx) + assert.Nil(t, err) + }) + + t.Run("SpecIsUpdatedButErrorOccurs_ReturnsError", func(t *testing.T) { + input := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + anv1alpha1.AccessLogSubscriptionAnnotationKey: "arn:aws:vpc-lattice:us-west-2:123456789012:accesslogsubscription/als-12345678901234567", + }, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: "Gateway", + Name: "TestName", + }, + }, + } + + stack, accessLogSubscription, _ := builder.Build(context.Background(), input) + + k8sClient.EXPECT().List(context.Background(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockManager.EXPECT().Update(ctx, accessLogSubscription).Return(nil, errors.New("mock error")).AnyTimes() + + synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) + err := synthesizer.Synthesize(ctx) + assert.NotNil(t, err) + }) + t.Run("SpecIsDeleted_DeletesAccessLogSubscription", func(t *testing.T) { input := &anv1alpha1.AccessLogPolicy{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/suites/integration/access_log_policy_test.go b/test/suites/integration/access_log_policy_test.go index a61e024c..07072839 100644 --- a/test/suites/integration/access_log_policy_test.go +++ b/test/suites/integration/access_log_policy_test.go @@ -33,8 +33,10 @@ import ( var _ = Describe("Access Log Policy", Ordered, func() { const ( k8sResourceName = "test-access-log-policy" + k8sResource2Name = "test-access-log-policy-secondary" bucketName = "k8s-test-lattice-bucket" logGroupName = "k8s-test-lattice-log-group" + logGroup2Name = "k8s-test-lattice-log-group-secondary" deliveryStreamName = "k8s-test-lattice-delivery-stream" deliveryStreamRoleName = "k8s-test-lattice-delivery-stream-role" deliveryStreamRolePolicy = `{ @@ -74,6 +76,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { grpcRoute *gwv1alpha2.GRPCRoute bucketArn string logGroupArn string + logGroup2Arn string deliveryStreamArn string roleArn string ) @@ -95,6 +98,13 @@ var _ = Describe("Access Log Policy", Ordered, func() { Expect(err).To(BeNil()) logGroupArn = fmt.Sprintf("arn:aws:logs:%s:%s:log-group:%s:*", config.Region, config.AccountID, logGroupName) + // Create secondary CloudWatch Log Group + _, err = logsClient.CreateLogGroupWithContext(ctx, &cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: aws.String(logGroup2Name), + }) + Expect(err).To(BeNil()) + logGroup2Arn = fmt.Sprintf("arn:aws:logs:%s:%s:log-group:%s:*", config.Region, config.AccountID, logGroup2Name) + // Create IAM Role for Firehose Delivery Stream iamClient = iam.New(session.Must(session.NewSession(&aws.Config{Region: aws.String(config.Region)}))) createRoleOutput, err := iamClient.CreateRoleWithContext(ctx, &iam.CreateRoleInput{ @@ -587,6 +597,291 @@ var _ = Describe("Access Log Policy", Ordered, func() { }).Should(Succeed()) }) + It("update properly changes or replaces Access Log Subscription and sets Access Log Policy status", func() { + originalAlsArn := "" + currentAlsArn := "" + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(logGroupArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + + // Service Network should have 1 Access Log Subscription with CloudWatch Log Group destination + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(logGroupArn)) + + // Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + + currentAlsArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + originalAlsArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + }).Should(Succeed()) + + // Update to different destination of same type + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(logGroup2Arn) + testFramework.ExpectUpdated(ctx, alp) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(2)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + + // Service Network should have 1 Access Log Subscription with updated CloudWatch Log Group destination + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(logGroup2Arn)) + + // Access Log Subscription ARN should be unchanged + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(originalAlsArn)) + + // Access Log Subscription should have default tags and Access Log Policy tag applied + expectedTags := testFramework.Cloud.DefaultTagsMergedWith(services.Tags{ + lattice.AccessLogPolicyTagKey: aws.String(alpNamespacedName.String()), + }) + listTagsInput := &vpclattice.ListTagsForResourceInput{ + ResourceArn: listALSOutput.Items[0].Arn, + } + listTagsOutput, err := testFramework.LatticeClient.ListTagsForResourceWithContext(ctx, listTagsInput) + g.Expect(err).To(BeNil()) + g.Expect(listTagsOutput.Tags).To(BeEquivalentTo(expectedTags)) + }).Should(Succeed()) + + // Update to different destination of different type + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(bucketArn) + testFramework.ExpectUpdated(ctx, alp) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(3)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + + // Service Network should only have 1 Access Log Subscription, with S3 Bucket destination + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + + // New Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).ToNot(BeEquivalentTo(originalAlsArn)) + currentAlsArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + + // New Access Log Subscription should have default tags and Access Log Policy tag applied + expectedTags := testFramework.Cloud.DefaultTagsMergedWith(services.Tags{ + lattice.AccessLogPolicyTagKey: aws.String(alpNamespacedName.String()), + }) + listTagsInput := &vpclattice.ListTagsForResourceInput{ + ResourceArn: listALSOutput.Items[0].Arn, + } + listTagsOutput, err := testFramework.LatticeClient.ListTagsForResourceWithContext(ctx, listTagsInput) + g.Expect(err).To(BeNil()) + g.Expect(listTagsOutput.Tags).To(BeEquivalentTo(expectedTags)) + }).Should(Succeed()) + + // Update to destination that does not exist + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(bucketArn + "doesnotexist") + testFramework.ExpectUpdated(ctx, alp) + + Eventually(func(g Gomega) { + // Policy status should be Invalid + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(4)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonInvalid))) + + // Service Network should still have previous Access Log Subscription + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + + // Same Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(currentAlsArn)) + }).Should(Succeed()) + + // Update to targetRef that does not exist + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(bucketArn) + alp.Spec.TargetRef = &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: "doesnotexist", + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + } + testFramework.ExpectUpdated(ctx, alp) + + Eventually(func(g Gomega) { + // Policy status should be TargetNotFound + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(5)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound))) + + // Service Network should still have previous Access Log Subscription + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + + // Same Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(currentAlsArn)) + }).Should(Succeed()) + + // Create second Access Log Policy for original destination + accessLogPolicy2 := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResource2Name, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(logGroupArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy2) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy2.Name, + Namespace: accessLogPolicy2.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + }).Should(Succeed()) + + // Attempt to update first Access Log Policy to use the original destination + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec = anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(logGroupArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + } + testFramework.ExpectUpdated(ctx, alp) + + Eventually(func(g Gomega) { + // Policy status should be Conflicted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(6)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonConflicted))) + + // Service Network should now have the old and new Access Log Subscriptions + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(2)) + + // Same Access Log Subscription ARN should be in the first Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(currentAlsArn)) + }).Should(Succeed()) + }) + It("deletion removes the Access Log Subscription for the corresponding Service Network when the targetRef's Kind is Gateway", func() { accessLogPolicy := &anv1alpha1.AccessLogPolicy{ ObjectMeta: metav1.ObjectMeta{ @@ -759,11 +1054,15 @@ var _ = Describe("Access Log Policy", Ordered, func() { }) Expect(err).To(BeNil()) - // Delete CloudWatch Log Group + // Delete CloudWatch Log Groups _, err = logsClient.DeleteLogGroupWithContext(ctx, &cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String(logGroupName), }) Expect(err).To(BeNil()) + _, err = logsClient.DeleteLogGroupWithContext(ctx, &cloudwatchlogs.DeleteLogGroupInput{ + LogGroupName: aws.String(logGroup2Name), + }) + Expect(err).To(BeNil()) // Delete Firehose Delivery Stream Eventually(func() (string, error) { From c67790d409ee1df0f290b99c9bdb511fcf20f7a1 Mon Sep 17 00:00:00 2001 From: Shawn Kaplan Date: Fri, 20 Oct 2023 15:57:51 -0700 Subject: [PATCH 2/2] Addressed PR comments, added update functionality for Access Log Policy targetRef --- controllers/accesslogpolicy_controller.go | 11 +- .../access_log_subscription_manager.go | 106 ++-- .../access_log_subscription_manager_mock.go | 2 +- .../access_log_subscription_manager_test.go | 492 ++++++------------ .../access_log_subscription_synthesizer.go | 2 +- ...ccess_log_subscription_synthesizer_test.go | 4 +- .../integration/access_log_policy_test.go | 135 ++++- 7 files changed, 370 insertions(+), 382 deletions(-) diff --git a/controllers/accesslogpolicy_controller.go b/controllers/accesslogpolicy_controller.go index 32e9d3ca..8d5f3669 100644 --- a/controllers/accesslogpolicy_controller.go +++ b/controllers/accesslogpolicy_controller.go @@ -162,17 +162,18 @@ func (r *accessLogPolicyReconciler) reconcileUpsert(ctx context.Context, alp *an return err } + if alp.Spec.TargetRef.Namespace != nil && string(*alp.Spec.TargetRef.Namespace) != alp.Namespace { + message := "The targetRef's namespace does not match the access log policy's namespace" + return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid, message) + } + targetRefExists, err := r.targetRefExists(ctx, alp) if err != nil { return err } if !targetRefExists { message := "The targetRef could not be found" - err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound, message) - if err != nil { - return err - } - return nil + return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound, message) } stack, err := r.buildAndDeployModel(ctx, alp) diff --git a/pkg/deploy/lattice/access_log_subscription_manager.go b/pkg/deploy/lattice/access_log_subscription_manager.go index c6d2c76f..efb644c2 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager.go +++ b/pkg/deploy/lattice/access_log_subscription_manager.go @@ -19,7 +19,7 @@ import ( type AccessLogSubscriptionManager interface { Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) Update(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) - Delete(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) error + Delete(ctx context.Context, accessLogSubscriptionArn string) error } type defaultAccessLogSubscriptionManager struct { @@ -43,23 +43,9 @@ func (m *defaultAccessLogSubscriptionManager) Create( ) (*lattice.AccessLogSubscriptionStatus, error) { vpcLatticeSess := m.cloud.Lattice() - var resourceIdentifier string - switch accessLogSubscription.Spec.SourceType { - case lattice.ServiceNetworkSourceType: - serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, accessLogSubscription.Spec.SourceName, config.AccountID) - if err != nil { - return nil, err - } - resourceIdentifier = *serviceNetwork.SvcNetwork.Arn - case lattice.ServiceSourceType: - serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(accessLogSubscription.Spec.SourceName) - service, err := vpcLatticeSess.FindService(ctx, serviceNameProvider) - if err != nil { - return nil, err - } - resourceIdentifier = *service.Arn - default: - return nil, fmt.Errorf("unsupported source type: %s", accessLogSubscription.Spec.SourceType) + sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName) + if err != nil { + return nil, err } tags := m.cloud.DefaultTagsMergedWith(services.Tags{ @@ -67,7 +53,7 @@ func (m *defaultAccessLogSubscriptionManager) Create( }) createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: &resourceIdentifier, + ResourceIdentifier: sourceArn, DestinationArn: &accessLogSubscription.Spec.DestinationArn, Tags: tags, } @@ -94,7 +80,7 @@ func (m *defaultAccessLogSubscriptionManager) Create( * If it is the same ALP, return success. Else, return ConflictError. */ listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ - ResourceIdentifier: &resourceIdentifier, + ResourceIdentifier: sourceArn, } listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) if err != nil { @@ -132,6 +118,31 @@ func (m *defaultAccessLogSubscriptionManager) Update( accessLogSubscription *lattice.AccessLogSubscription, ) (*lattice.AccessLogSubscriptionStatus, error) { vpcLatticeSess := m.cloud.Lattice() + + // If the source is modified, we need to replace the ALS + getALSInput := &vpclattice.GetAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), + } + getALSOutput, err := vpcLatticeSess.GetAccessLogSubscriptionWithContext(ctx, getALSInput) + if err != nil { + switch e := err.(type) { + case *vpclattice.AccessDeniedException: + return nil, services.NewInvalidError(e.Message()) + case *vpclattice.ResourceNotFoundException: + return nil, services.NewInvalidError(e.Message()) + default: + return nil, err + } + } + sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName) + if err != nil { + return nil, err + } + if *getALSOutput.ResourceArn != *sourceArn { + return m.replaceAccessLogSubscription(ctx, accessLogSubscription) + } + + // Source is not modified, try to update destinationArn in the existing ALS updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), DestinationArn: aws.String(accessLogSubscription.Spec.DestinationArn), @@ -156,15 +167,7 @@ func (m *defaultAccessLogSubscriptionManager) Update( * A conflict can happen when the destination type of the new ALS is different from the original. * To gracefully handle this, we create a new ALS with the new destination, then delete the old one. */ - alsStatus, err := m.Create(ctx, accessLogSubscription) - if err != nil { - return nil, err - } - err = m.Delete(ctx, accessLogSubscription) - if err != nil { - return nil, err - } - return alsStatus, nil + return m.replaceAccessLogSubscription(ctx, accessLogSubscription) default: return nil, err } @@ -172,11 +175,11 @@ func (m *defaultAccessLogSubscriptionManager) Update( func (m *defaultAccessLogSubscriptionManager) Delete( ctx context.Context, - accessLogSubscription *lattice.AccessLogSubscription, + accessLogSubscriptionArn string, ) error { vpcLatticeSess := m.cloud.Lattice() deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), } _, err := vpcLatticeSess.DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput) if err != nil { @@ -186,3 +189,44 @@ func (m *defaultAccessLogSubscriptionManager) Delete( } return nil } + +func (m *defaultAccessLogSubscriptionManager) getSourceArn( + ctx context.Context, + sourceType lattice.SourceType, + sourceName string, +) (*string, error) { + vpcLatticeSess := m.cloud.Lattice() + + switch sourceType { + case lattice.ServiceNetworkSourceType: + serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, sourceName, config.AccountID) + if err != nil { + return nil, err + } + return serviceNetwork.SvcNetwork.Arn, nil + case lattice.ServiceSourceType: + serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(sourceName) + service, err := vpcLatticeSess.FindService(ctx, serviceNameProvider) + if err != nil { + return nil, err + } + return service.Arn, nil + default: + return nil, fmt.Errorf("unsupported source type: %s", sourceType) + } +} + +func (m *defaultAccessLogSubscriptionManager) replaceAccessLogSubscription( + ctx context.Context, + accessLogSubscription *lattice.AccessLogSubscription, +) (*lattice.AccessLogSubscriptionStatus, error) { + newAlsStatus, err := m.Create(ctx, accessLogSubscription) + if err != nil { + return nil, err + } + err = m.Delete(ctx, accessLogSubscription.Status.Arn) + if err != nil { + return nil, err + } + return newAlsStatus, nil +} diff --git a/pkg/deploy/lattice/access_log_subscription_manager_mock.go b/pkg/deploy/lattice/access_log_subscription_manager_mock.go index 2aa7675b..99ea39d2 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_mock.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_mock.go @@ -51,7 +51,7 @@ func (mr *MockAccessLogSubscriptionManagerMockRecorder) Create(arg0, arg1 interf } // Delete mocks base method. -func (m *MockAccessLogSubscriptionManager) Delete(arg0 context.Context, arg1 *lattice.AccessLogSubscription) error { +func (m *MockAccessLogSubscriptionManager) Delete(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Delete", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/pkg/deploy/lattice/access_log_subscription_manager_test.go b/pkg/deploy/lattice/access_log_subscription_manager_test.go index e1c9ca1e..8241f8c9 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_test.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_test.go @@ -33,6 +33,18 @@ var accessLogPolicyNamespacedName = types.NamespacedName{ Name: "test-name", } +func simpleAccessLogSubscription(eventType core.EventType) *lattice.AccessLogSubscription { + return &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + ALPNamespacedName: accessLogPolicyNamespacedName, + EventType: eventType, + }, + } +} + func TestAccessLogSubscriptionManager(t *testing.T) { c := gomock.NewController(t) defer c.Finish() @@ -42,34 +54,50 @@ func TestAccessLogSubscriptionManager(t *testing.T) { expectedTags := cloud.DefaultTagsMergedWith(services.Tags{ lattice.AccessLogPolicyTagKey: aws.String(accessLogPolicyNamespacedName.String()), }) + serviceNetworkInfo := &services.ServiceNetworkInfo{ + SvcNetwork: vpclattice.ServiceNetworkSummary{ + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSForSNInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + Tags: expectedTags, + } + createALSForSvcInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceArn), + DestinationArn: aws.String(s3DestinationArn), + Tags: expectedTags, + } + createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + } + getALSInput := &vpclattice.GetAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + } + getALSOutput := &vpclattice.GetAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + ResourceArn: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + DestinationArn: aws.String(s3DestinationArn), + } + updateALSOutput := &vpclattice.UpdateAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + } + deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), + } + deleteALSOutput := &vpclattice.DeleteAccessLogSubscriptionOutput{} t.Run("Create_NewALSForServiceNetwork_ReturnsNewALSStatus", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } - createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ - Arn: aws.String(accessLogSubscriptionArn), - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Create(ctx, accessLogSubscription) @@ -78,31 +106,16 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForService_ReturnsNewALSStatus", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) + accessLogSubscription.Spec.SourceType = lattice.ServiceSourceType serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(sourceName) findServiceOutput := &vpclattice.ServiceSummary{ Arn: aws.String(serviceArn), Name: aws.String(sourceName), } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } - createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ - Arn: aws.String(accessLogSubscriptionArn), - } mockLattice.EXPECT().FindService(ctx, serviceNameProvider).Return(findServiceOutput, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSvcInput).Return(createALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Create(ctx, accessLogSubscription) @@ -111,33 +124,14 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForDeletedServiceNetwork_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) createALSErr := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("SERVICE_NETWORK"), ResourceId: aws.String(serviceNetworkArn), } mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(nil, createALSErr) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Create(ctx, accessLogSubscription) @@ -146,32 +140,20 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForDeletedService_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) + accessLogSubscription.Spec.SourceType = lattice.ServiceSourceType serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(sourceName) findServiceOutput := &vpclattice.ServiceSummary{ Arn: aws.String(serviceArn), Name: aws.String(sourceName), } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } createALSErr := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("SERVICE"), ResourceId: aws.String(serviceArn), } mockLattice.EXPECT().FindService(ctx, serviceNameProvider).Return(findServiceOutput, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSvcInput).Return(nil, createALSErr) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Create(ctx, accessLogSubscription) @@ -180,33 +162,14 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForMissingS3Destination_ReturnsInvalidError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) createALSErr := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("BUCKET"), ResourceId: aws.String(s3DestinationArn), } mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(nil, createALSErr) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Create(ctx, accessLogSubscription) @@ -215,21 +178,8 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForMissingCloudWatchDestination_ReturnsInvalidError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: cloudWatchDestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) + accessLogSubscription.Spec.DestinationArn = cloudWatchDestinationArn createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ ResourceIdentifier: aws.String(serviceNetworkArn), DestinationArn: aws.String(cloudWatchDestinationArn), @@ -250,21 +200,8 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewALSForMissingFirehoseDestination_ReturnsInvalidError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: firehoseDestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) + accessLogSubscription.Spec.DestinationArn = firehoseDestinationArn createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ ResourceIdentifier: aws.String(serviceNetworkArn), DestinationArn: aws.String(firehoseDestinationArn), @@ -285,26 +222,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_ConflictingALSForSameResourceFromDifferentPolicy_ReturnsConflictError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) createALSErr := &vpclattice.ConflictException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } @@ -329,7 +247,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { } mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(nil, createALSErr) mockLattice.EXPECT().ListAccessLogSubscriptionsWithContext(ctx, listALSInput).Return(listALSOutput, nil) mockLattice.EXPECT().ListTagsForResourceWithContext(ctx, listTagsInput).Return(listTagsOutput, nil) @@ -340,26 +258,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_ConflictingALSForSameResourceFromSamePolicy_ReturnsNewALSStatus", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } - serviceNetworkInfo := &services.ServiceNetworkInfo{ - SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), - }, - } - createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), - DestinationArn: aws.String(s3DestinationArn), - Tags: expectedTags, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) createALSErr := &vpclattice.ConflictException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } @@ -384,7 +283,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { } mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) - mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(nil, createALSErr) mockLattice.EXPECT().ListAccessLogSubscriptionsWithContext(ctx, listALSInput).Return(listALSOutput, nil) mockLattice.EXPECT().ListTagsForResourceWithContext(ctx, listTagsInput).Return(listTagsOutput, nil) @@ -395,15 +294,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewAccessLogSubscriptionForMissingServiceNetwork_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) notFoundErr := services.NewNotFoundError("", "") mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(nil, notFoundErr) @@ -415,15 +306,8 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Create_NewAccessLogSubscriptionForMissingService_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.CreateEvent, - }, - } + accessLogSubscription := simpleAccessLogSubscription(core.CreateEvent) + accessLogSubscription.Spec.SourceType = lattice.ServiceSourceType notFoundErr := services.NewNotFoundError("", "") serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(sourceName) @@ -436,26 +320,13 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Update_ALSWithSameDestinationType_UpdatesALSAndReturnsSuccess", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), - } - updateALSOutput := &vpclattice.UpdateAccessLogSubscriptionOutput{ - Arn: aws.String(accessLogSubscriptionArn), + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(updateALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) @@ -466,46 +337,57 @@ func TestAccessLogSubscriptionManager(t *testing.T) { t.Run("Update_ALSWithDifferentDestinationType_CreatesNewALSThenDeletesOldALSAndReturnsNewALSStatus", func(t *testing.T) { newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new" - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } updateALSErr := &vpclattice.ConflictException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } + createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ + Arn: aws.String(newAccessLogSubscriptionArn), + } + + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSErr) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSForSNInput).Return(createALSOutput, nil) + mockLattice.EXPECT().DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput).Return(deleteALSOutput, nil) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, err) + assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) + }) + + t.Run("Update_ALSWithDifferentSource_CreatesNewALSThenDeletesOldALSAndReturnsNewALSStatus", func(t *testing.T) { + newAccessLogSubscriptionArn := accessLogSubscriptionArn + "new" + newSourceArn := serviceNetworkArn + "new" + newSourceName := sourceName + "new" + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Spec.SourceName = newSourceName + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, + } serviceNetworkInfo := &services.ServiceNetworkInfo{ SvcNetwork: vpclattice.ServiceNetworkSummary{ - Arn: aws.String(serviceNetworkArn), - Name: aws.String(sourceName), + Arn: aws.String(newSourceArn), + Name: aws.String(newSourceName), }, } createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ - ResourceIdentifier: aws.String(serviceNetworkArn), + ResourceIdentifier: aws.String(newSourceArn), DestinationArn: aws.String(s3DestinationArn), Tags: expectedTags, } createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ Arn: aws.String(newAccessLogSubscriptionArn), } - deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - } - deleteALSOutput := &vpclattice.DeleteAccessLogSubscriptionOutput{} - mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSErr) - mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, newSourceName, config.AccountID).Return(serviceNetworkInfo, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, newSourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) mockLattice.EXPECT().DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput).Return(deleteALSOutput, nil) @@ -515,27 +397,34 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.Equal(t, newAccessLogSubscriptionArn, resp.Arn) }) - t.Run("Update_ALSDoesNotExist_ReturnsInvalidError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, + t.Run("Update_ALSDoesNotExistOnGet_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), + getALSError := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), + } + + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(nil, getALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) + }) + + t.Run("Update_ALSDoesNotExistOnUpdate_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } updateALSError := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) @@ -544,25 +433,30 @@ func TestAccessLogSubscriptionManager(t *testing.T) { assert.True(t, services.IsInvalidError(err)) }) - t.Run("Update_AccessDeniedExceptionReceived_ReturnsInvalidError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, + t.Run("Update_AccessDeniedExceptionReceivedOnGet_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), + getALSError := &vpclattice.AccessDeniedException{} + + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(nil, getALSError) + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) + resp, err := mgr.Update(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) + }) + + t.Run("Update_AccessDeniedExceptionReceivedOnUpdate_ReturnsInvalidError", func(t *testing.T) { + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } updateALSError := &vpclattice.AccessDeniedException{} + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) @@ -572,26 +466,16 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Update_ServiceNetworkDoesNotExist_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } updateALSError := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("SERVICE_NETWORK"), } + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) @@ -601,26 +485,16 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Update_ServiceDoesNotExist_ReturnsNotFoundError", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.UpdateEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - DestinationArn: aws.String(s3DestinationArn), + accessLogSubscription := simpleAccessLogSubscription(core.UpdateEvent) + accessLogSubscription.Status = &lattice.AccessLogSubscriptionStatus{ + Arn: accessLogSubscriptionArn, } updateALSError := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("SERVICE"), } + mockLattice.EXPECT().GetAccessLogSubscriptionWithContext(ctx, getALSInput).Return(getALSOutput, nil) + mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName, config.AccountID).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(nil, updateALSError) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) @@ -630,46 +504,14 @@ func TestAccessLogSubscriptionManager(t *testing.T) { }) t.Run("Test_Delete_AccessLogSubscriptionExists_ReturnsSuccess", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.DeleteEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - } - deleteALSOutput := &vpclattice.DeleteAccessLogSubscriptionOutput{} - mockLattice.EXPECT().DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput).Return(deleteALSOutput, nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) - err := mgr.Delete(ctx, accessLogSubscription) + err := mgr.Delete(ctx, accessLogSubscriptionArn) assert.Nil(t, err) }) t.Run("Delete_ALSDoesNotExist_ReturnsSuccess", func(t *testing.T) { - accessLogSubscription := &lattice.AccessLogSubscription{ - Spec: lattice.AccessLogSubscriptionSpec{ - SourceType: lattice.ServiceNetworkSourceType, - SourceName: sourceName, - DestinationArn: s3DestinationArn, - ALPNamespacedName: accessLogPolicyNamespacedName, - EventType: core.DeleteEvent, - }, - Status: &lattice.AccessLogSubscriptionStatus{ - Arn: accessLogSubscriptionArn, - }, - } - deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ - AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), - } deleteALSErr := &vpclattice.ResourceNotFoundException{ ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), } @@ -677,7 +519,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { mockLattice.EXPECT().DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput).Return(nil, deleteALSErr) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) - err := mgr.Delete(ctx, accessLogSubscription) + err := mgr.Delete(ctx, accessLogSubscriptionArn) assert.Nil(t, err) }) } diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer.go b/pkg/deploy/lattice/access_log_subscription_synthesizer.go index d9e3d316..97e036d5 100644 --- a/pkg/deploy/lattice/access_log_subscription_synthesizer.go +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer.go @@ -60,7 +60,7 @@ func (s *accessLogSubscriptionSynthesizer) Synthesize(ctx context.Context) error s.log.Debugf("Ignoring deletion of Access Log Subscription because als %s has no ARN", als.ID()) return nil } - err := s.accessLogSubscriptionManager.Delete(ctx, als) + err := s.accessLogSubscriptionManager.Delete(ctx, als.Status.Arn) if err != nil { return err } diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go index f44880c2..54d0bcc3 100644 --- a/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go @@ -137,7 +137,7 @@ func TestSynthesizeAccessLogSubscription(t *testing.T) { stack, accessLogSubscription, _ := builder.Build(context.Background(), input) - mockManager.EXPECT().Delete(ctx, accessLogSubscription).Return(nil).Times(1) + mockManager.EXPECT().Delete(ctx, accessLogSubscription.Status.Arn).Return(nil).Times(1) synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) err := synthesizer.Synthesize(ctx) @@ -187,7 +187,7 @@ func TestSynthesizeAccessLogSubscription(t *testing.T) { stack, accessLogSubscription, _ := builder.Build(context.Background(), input) - mockManager.EXPECT().Delete(ctx, accessLogSubscription).Return(errors.New("mock error")).Times(1) + mockManager.EXPECT().Delete(ctx, accessLogSubscription.Status.Arn).Return(errors.New("mock error")).Times(1) synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) err := synthesizer.Synthesize(ctx) diff --git a/test/suites/integration/access_log_policy_test.go b/test/suites/integration/access_log_policy_test.go index 07072839..5707874b 100644 --- a/test/suites/integration/access_log_policy_test.go +++ b/test/suites/integration/access_log_policy_test.go @@ -600,6 +600,8 @@ var _ = Describe("Access Log Policy", Ordered, func() { It("update properly changes or replaces Access Log Subscription and sets Access Log Policy status", func() { originalAlsArn := "" currentAlsArn := "" + expectedGeneration := 1 + latticeService := testFramework.GetVpcLatticeService(ctx, core.NewHTTPRoute(*httpRoute)) accessLogPolicy := &anv1alpha1.AccessLogPolicy{ ObjectMeta: metav1.ObjectMeta{ Name: k8sResourceName, @@ -650,6 +652,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { Expect(err).To(BeNil()) alp.Spec.DestinationArn = aws.String(logGroup2Arn) testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 Eventually(func(g Gomega) { // Policy status should be Accepted @@ -663,7 +666,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) - g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(2)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) // Service Network should have 1 Access Log Subscription with updated CloudWatch Log Group destination @@ -698,6 +701,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { Expect(err).To(BeNil()) alp.Spec.DestinationArn = aws.String(bucketArn) testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 Eventually(func(g Gomega) { // Policy status should be Accepted @@ -707,7 +711,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) - g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(3)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) // Service Network should only have 1 Access Log Subscription, with S3 Bucket destination @@ -737,12 +741,72 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(listTagsOutput.Tags).To(BeEquivalentTo(expectedTags)) }).Should(Succeed()) + // Update to different targetRef + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.TargetRef = &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "HTTPRoute", + Name: gwv1alpha2.ObjectName(httpRoute.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + } + testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + + // Service Network should have 0 Access Log Subscriptions + listALSForSNInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + listALSForSNOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSForSNInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSForSNOutput.Items)).To(BeEquivalentTo(0)) + + // VPC Lattice Service should have 1 Access Log Subscription, with S3 Bucket destination + listALSForSvcInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: latticeService.Arn, + } + listALSForSvcOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSForSvcInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSForSvcOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(*listALSForSvcOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + g.Expect(listALSForSvcOutput.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) + + // New Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSForSvcOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).ToNot(BeEquivalentTo(originalAlsArn)) + currentAlsArn = alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey] + + // New Access Log Subscription should have default tags and Access Log Policy tag applied + expectedTags := testFramework.Cloud.DefaultTagsMergedWith(services.Tags{ + lattice.AccessLogPolicyTagKey: aws.String(alpNamespacedName.String()), + }) + listTagsInput := &vpclattice.ListTagsForResourceInput{ + ResourceArn: listALSForSvcOutput.Items[0].Arn, + } + listTagsOutput, err := testFramework.LatticeClient.ListTagsForResourceWithContext(ctx, listTagsInput) + g.Expect(err).To(BeNil()) + g.Expect(listTagsOutput.Tags).To(BeEquivalentTo(expectedTags)) + }).Should(Succeed()) + // Update to destination that does not exist alp = &anv1alpha1.AccessLogPolicy{} err = testFramework.Client.Get(ctx, alpNamespacedName, alp) Expect(err).To(BeNil()) alp.Spec.DestinationArn = aws.String(bucketArn + "doesnotexist") testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 Eventually(func(g Gomega) { // Policy status should be Invalid @@ -752,17 +816,17 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) - g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(4)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonInvalid))) - // Service Network should still have previous Access Log Subscription + // VPC Lattice Service should still have previous Access Log Subscription listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ - ResourceIdentifier: testServiceNetwork.Arn, + ResourceIdentifier: latticeService.Arn, } listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) g.Expect(err).To(BeNil()) g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) - g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) // Same Access Log Subscription ARN should be in the Access Log Policy's annotations @@ -782,6 +846,7 @@ var _ = Describe("Access Log Policy", Ordered, func() { Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), } testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 Eventually(func(g Gomega) { // Policy status should be TargetNotFound @@ -791,17 +856,52 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) - g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(5)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonTargetNotFound))) - // Service Network should still have previous Access Log Subscription + // VPC Lattice Service should still have previous Access Log Subscription listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ - ResourceIdentifier: testServiceNetwork.Arn, + ResourceIdentifier: latticeService.Arn, } listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) g.Expect(err).To(BeNil()) g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) - g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) + g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + + // Same Access Log Subscription ARN should be in the Access Log Policy's annotations + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(*listALSOutput.Items[0].Arn)) + g.Expect(alp.Annotations[anv1alpha1.AccessLogSubscriptionAnnotationKey]).To(BeEquivalentTo(currentAlsArn)) + }).Should(Succeed()) + + // Update to targetRef with wrong namespace + alp = &anv1alpha1.AccessLogPolicy{} + err = testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + alp.Spec.DestinationArn = aws.String(bucketArn) + alp.Spec.TargetRef.Namespace = (*gwv1alpha2.Namespace)(aws.String("invalid")) + testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 + + Eventually(func(g Gomega) { + // Policy status should be Invalid + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonInvalid))) + + // VPC Lattice Service should still have previous Access Log Subscription + listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: latticeService.Arn, + } + listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) + g.Expect(err).To(BeNil()) + g.Expect(len(listALSOutput.Items)).To(BeEquivalentTo(1)) + g.Expect(listALSOutput.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) g.Expect(*listALSOutput.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) // Same Access Log Subscription ARN should be in the Access Log Policy's annotations @@ -819,8 +919,8 @@ var _ = Describe("Access Log Policy", Ordered, func() { DestinationArn: aws.String(logGroupArn), TargetRef: &gwv1alpha2.PolicyTargetReference{ Group: gwv1beta1.GroupName, - Kind: "Gateway", - Name: gwv1alpha2.ObjectName(testGateway.Name), + Kind: "HTTPRoute", + Name: gwv1alpha2.ObjectName(httpRoute.Name), Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), }, }, @@ -851,12 +951,13 @@ var _ = Describe("Access Log Policy", Ordered, func() { DestinationArn: aws.String(logGroupArn), TargetRef: &gwv1alpha2.PolicyTargetReference{ Group: gwv1beta1.GroupName, - Kind: "Gateway", - Name: gwv1alpha2.ObjectName(testGateway.Name), + Kind: "HTTPRoute", + Name: gwv1alpha2.ObjectName(httpRoute.Name), Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), }, } testFramework.ExpectUpdated(ctx, alp) + expectedGeneration = expectedGeneration + 1 Eventually(func(g Gomega) { // Policy status should be Conflicted @@ -866,12 +967,12 @@ var _ = Describe("Access Log Policy", Ordered, func() { g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) - g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(6)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(expectedGeneration)) g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonConflicted))) - // Service Network should now have the old and new Access Log Subscriptions + // VPC Lattice Service should now have the old and new Access Log Subscriptions listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ - ResourceIdentifier: testServiceNetwork.Arn, + ResourceIdentifier: latticeService.Arn, } listALSOutput, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) g.Expect(err).To(BeNil())