diff --git a/controllers/accesslogpolicy_controller.go b/controllers/accesslogpolicy_controller.go index 6304eaca..6f0e237e 100644 --- a/controllers/accesslogpolicy_controller.go +++ b/controllers/accesslogpolicy_controller.go @@ -26,14 +26,18 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + builder2 "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/aws" + "github.com/aws/aws-application-networking-k8s/pkg/aws/services" "github.com/aws/aws-application-networking-k8s/pkg/config" "github.com/aws/aws-application-networking-k8s/pkg/deploy" + "github.com/aws/aws-application-networking-k8s/pkg/gateway" "github.com/aws/aws-application-networking-k8s/pkg/k8s" lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime" "github.com/aws/aws-application-networking-k8s/pkg/utils" @@ -50,6 +54,7 @@ type accessLogPolicyReconciler struct { scheme *runtime.Scheme finalizerManager k8s.FinalizerManager eventRecorder record.EventRecorder + modelBuilder gateway.AccessLogSubscriptionModelBuilder stackDeployer deploy.StackDeployer cloud aws.Cloud stackMarshaller deploy.StackMarshaller @@ -65,7 +70,8 @@ func RegisterAccessLogPolicyController( scheme := mgr.GetScheme() evtRec := mgr.GetEventRecorderFor("accesslogpolicy") - stackDeployer := deploy.NewServiceNetworkStackDeployer(log, cloud, mgrClient) + modelBuilder := gateway.NewAccessLogSubscriptionModelBuilder(log, mgrClient) + stackDeployer := deploy.NewAccessLogSubscriptionStackDeployer(log, cloud, mgrClient) stackMarshaller := deploy.NewDefaultStackMarshaller() r := &accessLogPolicyReconciler{ @@ -74,13 +80,14 @@ func RegisterAccessLogPolicyController( scheme: scheme, finalizerManager: finalizerManager, eventRecorder: evtRec, + modelBuilder: modelBuilder, stackDeployer: stackDeployer, cloud: cloud, stackMarshaller: stackMarshaller, } builder := ctrl.NewControllerManagedBy(mgr). - For(&anv1alpha1.AccessLogPolicy{}) + For(&anv1alpha1.AccessLogPolicy{}, builder2.WithPredicates(predicate.GenerationChangedPredicate{})) return builder.Complete(r) } @@ -133,16 +140,24 @@ func (r *accessLogPolicyReconciler) reconcileUpsert(ctx context.Context, alp *an alp.Spec.TargetRef.Kind, alp.Spec.TargetRef.Name) err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound) if err != nil { - return fmt.Errorf("failed to update Access Log Policy status, %w", err) + return err } return nil } - // TODO: Create VPC Lattice Access Log Subscription + err := r.buildAndDeployModel(ctx, alp) + if err != nil { + if services.IsConflictError(err) { + return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonConflicted) + } else if services.IsInvalidError(err) { + return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid) + } + return err + } - err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonAccepted) + err = r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonAccepted) if err != nil { - return fmt.Errorf("failed to update Access Log Policy status, %w", err) + return err } return nil @@ -163,8 +178,8 @@ func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *an switch alp.Spec.TargetRef.Kind { case "Gateway": - gateway := &gwv1beta1.Gateway{} - err = r.client.Get(ctx, targetRefNamespacedName, gateway) + gw := &gwv1beta1.Gateway{} + err = r.client.Get(ctx, targetRefNamespacedName, gw) case "HTTPRoute": httpRoute := &gwv1beta1.HTTPRoute{} err = r.client.Get(ctx, targetRefNamespacedName, httpRoute) @@ -179,6 +194,33 @@ func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *an return err == nil } +func (r *accessLogPolicyReconciler) buildAndDeployModel( + ctx context.Context, + alp *anv1alpha1.AccessLogPolicy, +) error { + stack, _, err := r.modelBuilder.Build(ctx, alp) + if err != nil { + r.eventRecorder.Event(alp, corev1.EventTypeWarning, k8s.AccessLogPolicyEventReasonFailedBuildModel, + fmt.Sprintf("Failed to build model due to %s", err)) + return err + } + + jsonStack, err := r.stackMarshaller.Marshal(stack) + if err != nil { + return err + } + r.log.Debugw("Successfully built model", "stack", jsonStack) + + if err := r.stackDeployer.Deploy(ctx, stack); err != nil { + return err + } + r.log.Debugw("successfully deployed model", + "stack", stack.StackID().Name+":"+stack.StackID().Namespace, + ) + + return nil +} + func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus( ctx context.Context, alp *anv1alpha1.AccessLogPolicy, diff --git a/pkg/aws/services/vpclattice.go b/pkg/aws/services/vpclattice.go index a68508c3..85eb939d 100644 --- a/pkg/aws/services/vpclattice.go +++ b/pkg/aws/services/vpclattice.go @@ -44,6 +44,42 @@ func IsNotFoundError(err error) bool { return errors.As(err, &nfErr) } +type ConflictError struct { + ResourceType string + Name string + Message string +} + +func (e *ConflictError) Error() string { + return fmt.Sprintf("%s %s had a conflict: %s", e.ResourceType, e.Name, e.Message) +} + +func NewConflictError(resourceType string, name string, message string) error { + return &ConflictError{resourceType, name, message} +} + +func IsConflictError(err error) bool { + conflictErr := &ConflictError{} + return errors.As(err, &conflictErr) +} + +type InvalidError struct { + Message string +} + +func (e *InvalidError) Error() string { + return fmt.Sprintf("Invalid input: %s", e.Message) +} + +func NewInvalidError(message string) error { + return &InvalidError{message} +} + +func IsInvalidError(err error) bool { + invalidErr := &InvalidError{} + return errors.As(err, &invalidErr) +} + type Lattice interface { vpclatticeiface.VPCLatticeAPI ListServiceNetworksAsList(ctx context.Context, input *vpclattice.ListServiceNetworksInput) ([]*vpclattice.ServiceNetworkSummary, error) diff --git a/pkg/deploy/lattice/access_log_subscription_manager.go b/pkg/deploy/lattice/access_log_subscription_manager.go new file mode 100644 index 00000000..e641c0cd --- /dev/null +++ b/pkg/deploy/lattice/access_log_subscription_manager.go @@ -0,0 +1,106 @@ +package lattice + +import ( + "context" + + "github.com/aws/aws-sdk-go/service/vpclattice" + + "github.com/aws/aws-application-networking-k8s/pkg/aws" + "github.com/aws/aws-application-networking-k8s/pkg/aws/services" + "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +//go:generate mockgen -destination access_log_subscription_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice AccessLogSubscriptionManager + +type AccessLogSubscriptionManager interface { + Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) +} + +type defaultAccessLogSubscriptionManager struct { + log gwlog.Logger + cloud aws.Cloud +} + +func NewAccessLogSubscriptionManager( + log gwlog.Logger, + cloud aws.Cloud, +) *defaultAccessLogSubscriptionManager { + return &defaultAccessLogSubscriptionManager{ + log: log, + cloud: cloud, + } +} + +func (m *defaultAccessLogSubscriptionManager) Create( + ctx context.Context, + accessLogSubscription *lattice.AccessLogSubscription, +) (*lattice.AccessLogSubscriptionStatus, error) { + vpcLatticeSess := m.cloud.Lattice() + + var resourceIdentifier string + if accessLogSubscription.Spec.SourceType == lattice.ServiceNetworkSourceType { + serviceNetwork, err := getServiceNetworkWithName(ctx, vpcLatticeSess, accessLogSubscription.Spec.SourceName) + if err != nil { + return nil, err + } + resourceIdentifier = *serviceNetwork.Arn + } else { + service, err := getServiceWithName(ctx, vpcLatticeSess, accessLogSubscription.Spec.SourceName) + if err != nil { + return nil, err + } + resourceIdentifier = *service.Arn + } + + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: &resourceIdentifier, + DestinationArn: &accessLogSubscription.Spec.DestinationArn, + } + + createALSOutput, err := vpcLatticeSess.CreateAccessLogSubscriptionWithContext(ctx, createALSInput) + if err != nil { + if e, ok := err.(*vpclattice.ConflictException); ok { + return nil, services.NewConflictError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName, e.Message()) + } else if e, ok := err.(*vpclattice.AccessDeniedException); ok { + return nil, services.NewInvalidError(e.Message()) + } else if e, ok := err.(*vpclattice.ResourceNotFoundException); ok { + if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { + return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) + } + return nil, services.NewInvalidError(e.Message()) + } + return nil, err + } + + return &lattice.AccessLogSubscriptionStatus{ + Arn: *createALSOutput.Arn, + Id: *createALSOutput.Id, + }, nil +} + +func getServiceNetworkWithName(ctx context.Context, vpcLatticeSess services.Lattice, name string) (*vpclattice.ServiceNetworkSummary, error) { + serviceNetworkSummaries, err := vpcLatticeSess.ListServiceNetworksAsList(ctx, &vpclattice.ListServiceNetworksInput{}) + if err != nil { + return nil, err + } + for _, serviceNetworkSummary := range serviceNetworkSummaries { + if *serviceNetworkSummary.Name == name { + return serviceNetworkSummary, nil + } + } + return nil, services.NewNotFoundError("ServiceNetwork", name) +} + +func getServiceWithName(ctx context.Context, vpcLatticeSess services.Lattice, name string) (*vpclattice.ServiceSummary, error) { + serviceSummaries, err := vpcLatticeSess.ListServicesAsList(ctx, &vpclattice.ListServicesInput{}) + if err != nil { + return nil, err + } + for _, serviceSummary := range serviceSummaries { + if *serviceSummary.Name == name { + return serviceSummary, nil + } + } + return nil, services.NewNotFoundError("Service", name) +} diff --git a/pkg/deploy/lattice/access_log_subscription_manager_mock.go b/pkg/deploy/lattice/access_log_subscription_manager_mock.go new file mode 100644 index 00000000..5e8ee567 --- /dev/null +++ b/pkg/deploy/lattice/access_log_subscription_manager_mock.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice (interfaces: AccessLogSubscriptionManager) + +// Package lattice is a generated GoMock package. +package lattice + +import ( + context "context" + reflect "reflect" + + lattice "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + gomock "github.com/golang/mock/gomock" +) + +// MockAccessLogSubscriptionManager is a mock of AccessLogSubscriptionManager interface. +type MockAccessLogSubscriptionManager struct { + ctrl *gomock.Controller + recorder *MockAccessLogSubscriptionManagerMockRecorder +} + +// MockAccessLogSubscriptionManagerMockRecorder is the mock recorder for MockAccessLogSubscriptionManager. +type MockAccessLogSubscriptionManagerMockRecorder struct { + mock *MockAccessLogSubscriptionManager +} + +// NewMockAccessLogSubscriptionManager creates a new mock instance. +func NewMockAccessLogSubscriptionManager(ctrl *gomock.Controller) *MockAccessLogSubscriptionManager { + mock := &MockAccessLogSubscriptionManager{ctrl: ctrl} + mock.recorder = &MockAccessLogSubscriptionManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAccessLogSubscriptionManager) EXPECT() *MockAccessLogSubscriptionManagerMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockAccessLogSubscriptionManager) Create(arg0 context.Context, arg1 *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", arg0, arg1) + ret0, _ := ret[0].(*lattice.AccessLogSubscriptionStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Create indicates an expected call of Create. +func (mr *MockAccessLogSubscriptionManagerMockRecorder) Create(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockAccessLogSubscriptionManager)(nil).Create), arg0, arg1) +} diff --git a/pkg/deploy/lattice/access_log_subscription_manager_test.go b/pkg/deploy/lattice/access_log_subscription_manager_test.go new file mode 100644 index 00000000..e7b1da52 --- /dev/null +++ b/pkg/deploy/lattice/access_log_subscription_manager_test.go @@ -0,0 +1,403 @@ +package lattice + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/vpclattice" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + mockaws "github.com/aws/aws-application-networking-k8s/pkg/aws" + "github.com/aws/aws-application-networking-k8s/pkg/aws/services" + "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +const ( + sourceName = "test" + serviceNetworkArn = "arn:aws:vpc-lattice:us-west-2:123456789012:servicenetwork/sn-12345678901234567" + serviceArn = "arn:aws:vpc-lattice:us-west-2:123456789012:service/svc-12345678901234567" + s3DestinationArn = "arn:aws:s3:::test" + cloudWatchDestinationArn = "arn:aws:logs:us-west-2:123456789012:log-group:test:*" + firehoseDestinationArn = "arn:aws:firehose:us-west-2:123456789012:deliverystream/test" + accessLogSubscriptionArn = "arn:aws:vpc-lattice:us-west-2:123456789012:accesslogsubscription/als-12345678901234567" + accessLogSubscriptionId = "als-12345678901234567" +) + +func Test_Create_NewAccessLogSubscriptionForServiceNetwork_ReturnsSuccess(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + Id: aws.String(accessLogSubscriptionId), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, err) + assert.Equal(t, resp.Arn, accessLogSubscriptionArn) + assert.Equal(t, resp.Id, accessLogSubscriptionId) +} + +func Test_Create_NewAccessLogSubscriptionForService_ReturnsSuccess(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServicesInput := &vpclattice.ListServicesInput{} + listServicesOutput := []*vpclattice.ServiceSummary{ + { + Arn: aws.String(serviceArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSOutput := &vpclattice.CreateAccessLogSubscriptionOutput{ + Arn: aws.String(accessLogSubscriptionArn), + Id: aws.String(accessLogSubscriptionId), + } + + mockLattice.EXPECT().ListServicesAsList(ctx, listServicesInput).Return(listServicesOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(createALSOutput, nil) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, err) + assert.Equal(t, resp.Arn, accessLogSubscriptionArn) + assert.Equal(t, resp.Id, accessLogSubscriptionId) +} + +func Test_Create_NewAccessLogSubscriptionForDeletedServiceNetwork_ReturnsNotFoundError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSErr := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("SERVICE_NETWORK"), + ResourceId: aws.String(serviceNetworkArn), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForDeletedService_ReturnsNotFoundError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServicesInput := &vpclattice.ListServicesInput{} + listServicesOutput := []*vpclattice.ServiceSummary{ + { + Arn: aws.String(serviceArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSErr := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("SERVICE"), + ResourceId: aws.String(serviceArn), + } + + mockLattice.EXPECT().ListServicesAsList(ctx, listServicesInput).Return(listServicesOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForMissingS3Destination_ReturnsInvalidError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSErr := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("BUCKET"), + ResourceId: aws.String(s3DestinationArn), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForMissingCloudWatchDestination_ReturnsInvalidError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: cloudWatchDestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(cloudWatchDestinationArn), + } + createALSErr := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("LOG_GROUP"), + ResourceId: aws.String(cloudWatchDestinationArn), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForMissingFirehoseDestination_ReturnsInvalidError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: firehoseDestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(firehoseDestinationArn), + } + createALSErr := &vpclattice.ResourceNotFoundException{ + ResourceType: aws.String("DELIVERY_STREAM"), + ResourceId: aws.String(firehoseDestinationArn), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsInvalidError(err)) +} + +func Test_Create_ConflictingAccessLogSubscriptionForSameResource_ReturnsConflictError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + listServiceNetworksOutput := []*vpclattice.ServiceNetworkSummary{ + { + Arn: aws.String(serviceNetworkArn), + Name: aws.String(sourceName), + }, + } + createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ + ResourceIdentifier: aws.String(serviceNetworkArn), + DestinationArn: aws.String(s3DestinationArn), + } + createALSErr := &vpclattice.ConflictException{ + ResourceType: aws.String("ACCESS_LOG_SUBSCRIPTION"), + } + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockLattice.EXPECT().CreateAccessLogSubscriptionWithContext(ctx, createALSInput).Return(nil, createALSErr) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsConflictError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForMissingServiceNetwork_ReturnsNotFoundError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServiceNetworksInput := &vpclattice.ListServiceNetworksInput{} + var listServiceNetworksOutput []*vpclattice.ServiceNetworkSummary + + mockLattice.EXPECT().ListServiceNetworksAsList(ctx, listServiceNetworksInput).Return(listServiceNetworksOutput, nil) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) +} + +func Test_Create_NewAccessLogSubscriptionForMissingService_ReturnsNotFoundError(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockCloud := mockaws.NewMockCloud(c) + mockLattice := services.NewMockLattice(c) + + accessLogSubscription := &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: sourceName, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + } + listServicesInput := &vpclattice.ListServicesInput{} + var listServicesOutput []*vpclattice.ServiceSummary + + mockLattice.EXPECT().ListServicesAsList(ctx, listServicesInput).Return(listServicesOutput, nil) + mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() + + mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, mockCloud) + resp, err := mgr.Create(ctx, accessLogSubscription) + assert.Nil(t, resp) + assert.True(t, services.IsNotFoundError(err)) +} diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer.go b/pkg/deploy/lattice/access_log_subscription_synthesizer.go new file mode 100644 index 00000000..d49d3502 --- /dev/null +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer.go @@ -0,0 +1,62 @@ +package lattice + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +type accessLogSubscriptionSynthesizer struct { + log gwlog.Logger + client client.Client + accessLogSubscriptionManager AccessLogSubscriptionManager + stack core.Stack +} + +func NewAccessLogSubscriptionSynthesizer( + log gwlog.Logger, + client client.Client, + accessLogSubscriptionManager AccessLogSubscriptionManager, + stack core.Stack, +) *accessLogSubscriptionSynthesizer { + return &accessLogSubscriptionSynthesizer{ + log: log, + client: client, + accessLogSubscriptionManager: accessLogSubscriptionManager, + stack: stack, + } +} + +func (s *accessLogSubscriptionSynthesizer) Synthesize(ctx context.Context) error { + var accessLogSubscriptions []*model.AccessLogSubscription + err := s.stack.ListResources(&accessLogSubscriptions) + if err != nil { + return err + } + + var ret error = nil + for _, als := range accessLogSubscriptions { + if !als.Spec.IsDeleted { + s.log.Debugf("Started creating or updating access log subscription %s", als.ID()) + _, err := s.accessLogSubscriptionManager.Create(ctx, als) + if err != nil { + s.log.Debugf("Synthesizing access log subscription %s failed due to %s", als.ID(), err) + ret = err + } + } else { + s.log.Debugf("Started deleting access log subscription %s", als.ID()) + // TODO + } + } + + return ret +} + +func (s *accessLogSubscriptionSynthesizer) PostSynthesize(ctx context.Context) error { + // nothing to do here + return nil +} diff --git a/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go new file mode 100644 index 00000000..02fa3afb --- /dev/null +++ b/pkg/deploy/lattice/access_log_subscription_synthesizer_test.go @@ -0,0 +1,73 @@ +package lattice + +import ( + "context" + "errors" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "sigs.k8s.io/gateway-api/apis/v1alpha2" + + mockclient "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client" + anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/gateway" + "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +func Test_Synthesize_AccessLogSubscriptionSpecIsNotDeleted_CreatesAccessLogSubscription(t *testing.T) { + ctx := context.TODO() + c := gomock.NewController(t) + defer c.Finish() + mockManager := NewMockAccessLogSubscriptionManager(c) + k8sClient := mockclient.NewMockClient(c) + builder := gateway.NewAccessLogSubscriptionModelBuilder(gwlog.FallbackLogger, k8sClient) + input := &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: "Gateway", + Name: "TestName", + }, + }, + } + + stack, accessLogSubscription, _ := builder.Build(context.Background(), input) + + k8sClient.EXPECT().List(context.Background(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockManager.EXPECT().Create(ctx, accessLogSubscription).Return(&lattice.AccessLogSubscriptionStatus{}, nil).AnyTimes() + + synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) + err := synthesizer.Synthesize(ctx) + assert.Nil(t, err) +} + +func Test_Synthesize_AccessLogSubscriptionSpecIsNotDeletedButErrorOccurs_ReturnsError(t *testing.T) { + ctx := context.TODO() + c := gomock.NewController(t) + defer c.Finish() + mockManager := NewMockAccessLogSubscriptionManager(c) + k8sClient := mockclient.NewMockClient(c) + builder := gateway.NewAccessLogSubscriptionModelBuilder(gwlog.FallbackLogger, k8sClient) + input := &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: "Gateway", + Name: "TestName", + }, + }, + } + expectedError := errors.New("") + + stack, accessLogSubscription, _ := builder.Build(context.Background(), input) + + k8sClient.EXPECT().List(context.Background(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockManager.EXPECT().Create(ctx, accessLogSubscription).Return(nil, expectedError).AnyTimes() + + synthesizer := NewAccessLogSubscriptionSynthesizer(gwlog.FallbackLogger, k8sClient, mockManager, stack) + err := synthesizer.Synthesize(ctx) + assert.Equal(t, expectedError, err) +} diff --git a/pkg/deploy/stack_deployer.go b/pkg/deploy/stack_deployer.go index 71910c9d..16a3903c 100644 --- a/pkg/deploy/stack_deployer.go +++ b/pkg/deploy/stack_deployer.go @@ -49,7 +49,6 @@ func NewServiceNetworkStackDeployer(log gwlog.Logger, cloud pkg_aws.Cloud, k8sCl // Deploy a resource stack func deploy(ctx context.Context, stack core.Stack, synthesizers []ResourceSynthesizer) error { - for _, synthesizer := range synthesizers { if err := synthesizer.Synthesize(ctx); err != nil { return err @@ -71,7 +70,7 @@ func (d *serviceNetworkStackDeployer) Deploy(ctx context.Context, stack core.Sta return deploy(ctx, stack, synthesizers) } -type LatticeServiceStackDeployer struct { +type latticeServiceStackDeployer struct { log gwlog.Logger cloud pkg_aws.Cloud k8sClient client.Client @@ -89,8 +88,8 @@ func NewLatticeServiceStackDeploy( cloud pkg_aws.Cloud, k8sClient client.Client, latticeDataStore *latticestore.LatticeDataStore, -) *LatticeServiceStackDeployer { - return &LatticeServiceStackDeployer{ +) *latticeServiceStackDeployer { + return &latticeServiceStackDeployer{ log: log, cloud: cloud, k8sClient: k8sClient, @@ -104,7 +103,7 @@ func NewLatticeServiceStackDeploy( } } -func (d *LatticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { +func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { targetGroupSynthesizer := lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sClient, d.targetGroupManager, stack, d.latticeDataStore) targetsSynthesizer := lattice.NewTargetsSynthesizer(d.log, d.cloud, d.targetsManager, stack, d.latticeDataStore) serviceSynthesizer := lattice.NewServiceSynthesizer(d.log, d.latticeServiceManager, d.dnsEndpointManager, stack, d.latticeDataStore) @@ -150,7 +149,7 @@ func (d *LatticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Sta return nil } -type LatticeTargetGroupStackDeployer struct { +type latticeTargetGroupStackDeployer struct { log gwlog.Logger cloud pkg_aws.Cloud k8sclient client.Client @@ -164,8 +163,8 @@ func NewTargetGroupStackDeploy( cloud pkg_aws.Cloud, k8sClient client.Client, latticeDataStore *latticestore.LatticeDataStore, -) *LatticeTargetGroupStackDeployer { - return &LatticeTargetGroupStackDeployer{ +) *latticeTargetGroupStackDeployer { + return &latticeTargetGroupStackDeployer{ log: log, cloud: cloud, k8sclient: k8sClient, @@ -174,7 +173,7 @@ func NewTargetGroupStackDeploy( } } -func (d *LatticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { +func (d *latticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { synthesizers := []ResourceSynthesizer{ lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sclient, d.targetGroupManager, stack, d.latticeDatastore), lattice.NewTargetsSynthesizer(d.log, d.cloud, lattice.NewTargetsManager(d.log, d.cloud, d.latticeDatastore), stack, d.latticeDatastore), @@ -235,3 +234,29 @@ func (d *latticeTargetsStackDeployer) Deploy(ctx context.Context, stack core.Sta } return nil } + +type accessLogSubscriptionStackDeployer struct { + log gwlog.Logger + k8sClient client.Client + stack core.Stack + accessLogSubscriptionManager lattice.AccessLogSubscriptionManager +} + +func NewAccessLogSubscriptionStackDeployer( + log gwlog.Logger, + cloud pkg_aws.Cloud, + k8sClient client.Client, +) *accessLogSubscriptionStackDeployer { + return &accessLogSubscriptionStackDeployer{ + log: log, + k8sClient: k8sClient, + accessLogSubscriptionManager: lattice.NewAccessLogSubscriptionManager(log, cloud), + } +} + +func (d *accessLogSubscriptionStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { + synthesizers := []ResourceSynthesizer{ + lattice.NewAccessLogSubscriptionSynthesizer(d.log, d.k8sClient, d.accessLogSubscriptionManager, stack), + } + return deploy(ctx, stack, synthesizers) +} diff --git a/pkg/deploy/stack_deployer_test.go b/pkg/deploy/stack_deployer_test.go index 89d819d6..3c3393f1 100644 --- a/pkg/deploy/stack_deployer_test.go +++ b/pkg/deploy/stack_deployer_test.go @@ -69,7 +69,7 @@ func Test_latticeServiceStackDeployer_createAllResources(t *testing.T) { mockRuleManager.EXPECT().Create(gomock.Any(), gomock.Any()) mockDnsManager.EXPECT().Create(gomock.Any(), gomock.Any()) - deployer := &LatticeServiceStackDeployer{ + deployer := &latticeServiceStackDeployer{ log: gwlog.FallbackLogger, cloud: mockCloud, k8sClient: mockClient, @@ -124,7 +124,7 @@ func Test_latticeServiceStackDeployer_CreateJustService(t *testing.T) { mockRuleManager.EXPECT().Cloud().Return(mockCloud).AnyTimes() mockCloud.EXPECT().Lattice().Return(mockLattice).AnyTimes() - deployer := &LatticeServiceStackDeployer{ + deployer := &latticeServiceStackDeployer{ log: gwlog.FallbackLogger, cloud: mockCloud, k8sClient: mockClient, @@ -179,7 +179,7 @@ func Test_latticeServiceStackDeployer_DeleteService(t *testing.T) { mockServiceManager.EXPECT().Delete(gomock.Any(), gomock.Any()) - deployer := &LatticeServiceStackDeployer{ + deployer := &latticeServiceStackDeployer{ log: gwlog.FallbackLogger, cloud: mockCloud, k8sClient: mockClient, @@ -237,7 +237,7 @@ func Test_latticeServiceStackDeployer_DeleteAllResources(t *testing.T) { mockServiceManager.EXPECT().Delete(gomock.Any(), gomock.Any()) mockTargetGroupManager.EXPECT().Delete(gomock.Any(), gomock.Any()) - deployer := &LatticeServiceStackDeployer{ + deployer := &latticeServiceStackDeployer{ log: gwlog.FallbackLogger, cloud: mockCloud, k8sClient: mockClient, diff --git a/pkg/gateway/model_build_access_log_subscription.go b/pkg/gateway/model_build_access_log_subscription.go new file mode 100644 index 00000000..069c7888 --- /dev/null +++ b/pkg/gateway/model_build_access_log_subscription.go @@ -0,0 +1,86 @@ +package gateway + +import ( + "context" + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/client" + + anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +type AccessLogSubscriptionModelBuilder interface { + Build(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) (core.Stack, *model.AccessLogSubscription, error) +} + +type accessLogSubscriptionModelBuilder struct { + log gwlog.Logger + client client.Client +} + +func NewAccessLogSubscriptionModelBuilder(log gwlog.Logger, client client.Client) *accessLogSubscriptionModelBuilder { + return &accessLogSubscriptionModelBuilder{ + client: client, + log: log, + } +} + +func (b *accessLogSubscriptionModelBuilder) Build( + ctx context.Context, + accessLogPolicy *anv1alpha1.AccessLogPolicy, +) (core.Stack, *model.AccessLogSubscription, error) { + stack := core.NewDefaultStack(core.StackID(k8s.NamespacedName(accessLogPolicy))) + + task := accessLogSubscriptionModelBuildTask{ + stack: stack, + accessLogPolicy: accessLogPolicy, + } + + if err := task.run(ctx); err != nil { + return nil, nil, err + } + + return task.stack, task.accessLogSubscription, nil +} + +type accessLogSubscriptionModelBuildTask struct { + stack core.Stack + accessLogPolicy *anv1alpha1.AccessLogPolicy + accessLogSubscription *model.AccessLogSubscription +} + +func (t *accessLogSubscriptionModelBuildTask) run(ctx context.Context) error { + sourceType := model.ServiceSourceType + if t.accessLogPolicy.Spec.TargetRef.Kind == "Gateway" { + sourceType = model.ServiceNetworkSourceType + } + + /* + * For Service Network, the name is just the Gateway's name. + * For Service, the name is Route's name, followed by hyphen (-), then the Route's namespace. + */ + sourceName := string(t.accessLogPolicy.Spec.TargetRef.Name) + if sourceType == model.ServiceSourceType { + namespace := t.accessLogPolicy.Spec.TargetRef.Namespace + if namespace != nil { + sourceName = fmt.Sprintf("%s-%s", sourceName, string(*namespace)) + } else { + sourceName = fmt.Sprintf("%s-default", sourceName) + } + } + + destinationArn := t.accessLogPolicy.Spec.DestinationArn + if destinationArn == nil { + return fmt.Errorf("access log policy's destinationArn cannot be nil") + } + + isDeleted := t.accessLogPolicy.DeletionTimestamp != nil + + t.accessLogSubscription = model.NewAccessLogSubscription(t.stack, sourceType, sourceName, *destinationArn, isDeleted) + + return nil +} diff --git a/pkg/gateway/model_build_access_log_subscription_test.go b/pkg/gateway/model_build_access_log_subscription_test.go new file mode 100644 index 00000000..9239c73d --- /dev/null +++ b/pkg/gateway/model_build_access_log_subscription_test.go @@ -0,0 +1,233 @@ +package gateway + +import ( + "context" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api/apis/v1alpha2" + + anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +const ( + s3DestinationArn = "arn:aws:s3:::test" + gatewayKind = "Gateway" + httpRouteKind = "HTTPRoute" + grpcRouteKind = "GRPCRoute" + name = "TestName" + namespace = "TestNamespace" +) + +func Test_BuildAccessLogSubscription(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + scheme := runtime.NewScheme() + clientgoscheme.AddToScheme(scheme) + client := testclient.NewClientBuilder().WithScheme(scheme).Build() + modelBuilder := NewAccessLogSubscriptionModelBuilder(gwlog.FallbackLogger, client) + + tests := []struct { + description string + input *anv1alpha1.AccessLogPolicy + expectedOutput *lattice.AccessLogSubscription + onlyCompareSpecs bool + expectedError error + }{ + { + description: "Policy on Gateway without namespace maps to ALS on Service Network with Gateway name", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: gatewayKind, + Name: name, + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: name, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on Gateway with namespace maps to ALS on Service Network with Gateway name", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: gatewayKind, + Name: name, + Namespace: (*v1alpha2.Namespace)(aws.String(namespace)), + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: name, + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on HTTPRoute without namespace maps to ALS on Service with HTTPRoute name + default namespace", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: httpRouteKind, + Name: name, + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: fmt.Sprintf("%s-default", name), + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on HTTPRoute with namespace maps to ALS on Service Network with HTTPRoute name + namespace", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: httpRouteKind, + Name: name, + Namespace: (*v1alpha2.Namespace)(aws.String(namespace)), + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: fmt.Sprintf("%s-%s", name, namespace), + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on GRPCRoute without namespace maps to ALS on Service with GRPCRoute name + default namespace", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: grpcRouteKind, + Name: name, + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: fmt.Sprintf("%s-default", name), + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on GRPCRoute with namespace maps to ALS on Service Network with GRPCRoute name + namespace", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: grpcRouteKind, + Name: name, + Namespace: (*v1alpha2.Namespace)(aws.String(namespace)), + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceSourceType, + SourceName: fmt.Sprintf("%s-%s", name, namespace), + DestinationArn: s3DestinationArn, + IsDeleted: false, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy on Gateway with deletion timestamp is marked as deleted", + input: &anv1alpha1.AccessLogPolicy{ + ObjectMeta: v1.ObjectMeta{ + DeletionTimestamp: &v1.Time{}, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(s3DestinationArn), + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: gatewayKind, + Name: name, + }, + }, + }, + expectedOutput: &lattice.AccessLogSubscription{ + Spec: lattice.AccessLogSubscriptionSpec{ + SourceType: lattice.ServiceNetworkSourceType, + SourceName: name, + DestinationArn: s3DestinationArn, + IsDeleted: true, + }, + }, + onlyCompareSpecs: true, + expectedError: nil, + }, + { + description: "Policy missing destinationArn results in error", + input: &anv1alpha1.AccessLogPolicy{ + Spec: anv1alpha1.AccessLogPolicySpec{ + TargetRef: &v1alpha2.PolicyTargetReference{ + Kind: grpcRouteKind, + Name: name, + Namespace: (*v1alpha2.Namespace)(aws.String(namespace)), + }, + }, + }, + expectedOutput: nil, + onlyCompareSpecs: false, + expectedError: fmt.Errorf("access log policy's destinationArn cannot be nil"), + }, + } + + for _, tt := range tests { + _, als, err := modelBuilder.Build(ctx, tt.input) + if tt.onlyCompareSpecs { + assert.Equal(t, tt.expectedOutput.Spec, als.Spec, tt.description) + } else { + assert.Equal(t, tt.expectedOutput, als, tt.description) + } + assert.Equal(t, tt.expectedError, err, tt.description) + } +} diff --git a/pkg/gateway/model_build_servicenetwork.go b/pkg/gateway/model_build_servicenetwork.go index cc18ed3e..c60615ac 100644 --- a/pkg/gateway/model_build_servicenetwork.go +++ b/pkg/gateway/model_build_servicenetwork.go @@ -51,7 +51,6 @@ func (b *serviceNetworkModelBuilder) Build(ctx context.Context, gw *gateway_api. } func (t *serviceNetworkModelBuildTask) run(ctx context.Context) error { - err := t.buildModel(ctx) return err } @@ -107,8 +106,7 @@ type serviceNetworkModelBuildTask struct { gateway *gateway_api.Gateway vpcAssociationPolicy *anv1alpha1.VpcAssociationPolicy serviceNetwork *model.ServiceNetwork - - stack core.Stack + stack core.Stack } func securityGroupIdsToStringPointersSlice(sgIds []anv1alpha1.SecurityGroupId) []*string { diff --git a/pkg/k8s/events.go b/pkg/k8s/events.go index 36d9d02f..7e728897 100644 --- a/pkg/k8s/events.go +++ b/pkg/k8s/events.go @@ -31,4 +31,5 @@ const ( // AccessLogPolicy events AccessLogPolicyEventReasonFailedAddFinalizer = "FailedAddFinalizer" + AccessLogPolicyEventReasonFailedBuildModel = "FailedBuildModel" ) diff --git a/pkg/model/lattice/accesslogsubscription.go b/pkg/model/lattice/accesslogsubscription.go new file mode 100644 index 00000000..251b885f --- /dev/null +++ b/pkg/model/lattice/accesslogsubscription.go @@ -0,0 +1,56 @@ +package lattice + +import ( + "fmt" + + "github.com/aws/aws-application-networking-k8s/pkg/model/core" +) + +type SourceType string + +const ( + ServiceNetworkSourceType SourceType = "ServiceNetwork" + ServiceSourceType SourceType = "Service" +) + +type AccessLogSubscription struct { + core.ResourceMeta `json:"-"` + Spec AccessLogSubscriptionSpec `json:"spec"` + Status *AccessLogSubscriptionStatus `json:"status,omitempty"` +} + +type AccessLogSubscriptionSpec struct { + SourceType SourceType + SourceName string + DestinationArn string + IsDeleted bool +} + +type AccessLogSubscriptionStatus struct { + Arn string `json:"arn"` + Id string `json:"id"` +} + +func NewAccessLogSubscription( + stack core.Stack, + sourceType SourceType, + sourceName string, + destinationArn string, + isDeleted bool, +) *AccessLogSubscription { + id := fmt.Sprintf("%s-%s-%s", sourceType, sourceName, destinationArn) + als := &AccessLogSubscription{ + ResourceMeta: core.NewResourceMeta(stack, "AWS::VPCServiceNetwork::AccessLogSubscription", id), + Spec: AccessLogSubscriptionSpec{ + SourceType: sourceType, + SourceName: sourceName, + DestinationArn: destinationArn, + IsDeleted: isDeleted, + }, + Status: nil, + } + + stack.AddResource(als) + + return als +} diff --git a/test/pkg/test/framework.go b/test/pkg/test/framework.go index 6b3a5bd1..f72f6436 100644 --- a/test/pkg/test/framework.go +++ b/test/pkg/test/framework.go @@ -94,6 +94,9 @@ func addOptionalCRDs(scheme *runtime.Scheme) { scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.VpcAssociationPolicy{}, &v1alpha1.VpcAssociationPolicyList{}) metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion) + + scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.AccessLogPolicy{}, &v1alpha1.AccessLogPolicyList{}) + metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion) } type Framework struct { @@ -109,7 +112,6 @@ type Framework struct { } func NewFramework(ctx context.Context, log gwlog.Logger, testNamespace string) *Framework { - addOptionalCRDs(testScheme) config.ConfigInit() controllerRuntimeConfig := controllerruntime.GetConfigOrDie() diff --git a/test/suites/integration/access_log_policy_test.go b/test/suites/integration/access_log_policy_test.go new file mode 100644 index 00000000..645b2149 --- /dev/null +++ b/test/suites/integration/access_log_policy_test.go @@ -0,0 +1,587 @@ +package integration + +import ( + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/aws/aws-sdk-go/service/iam" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/vpclattice" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/config" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "github.com/aws/aws-application-networking-k8s/test/pkg/test" +) + +var _ = Describe("Creating Access Log Policy", Ordered, func() { + const ( + k8sResourceName = "test-access-log-policy" + bucketName = "k8s-test-lattice-bucket" + logGroupName = "k8s-test-lattice-log-group" + deliveryStreamName = "k8s-test-lattice-delivery-stream" + deliveryStreamRoleName = "k8s-test-lattice-delivery-stream-role" + deliveryStreamRolePolicy = `{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["s3:PutObject", "s3:GetBucketLocation"], + "Resource": ["arn:aws:s3:::k8s-test-lattice-bucket/*"] + } + ] + }` + deliveryStreamAssumeRolePolicy = `{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "firehose.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + }` + ) + + var ( + s3Client *s3.S3 + logsClient *cloudwatchlogs.CloudWatchLogs + firehoseClient *firehose.Firehose + iamClient *iam.IAM + httpDeployment *appsv1.Deployment + grpcDeployment *appsv1.Deployment + httpK8sService *corev1.Service + grpcK8sService *corev1.Service + httpRoute *gwv1beta1.HTTPRoute + grpcRoute *gwv1alpha2.GRPCRoute + bucketArn string + logGroupArn string + deliveryStreamArn string + roleArn string + ) + + BeforeAll(func() { + // Create S3 Bucket + s3Client = s3.New(session.Must(session.NewSession(&aws.Config{Region: aws.String(config.Region)}))) + _, err := s3Client.CreateBucketWithContext(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + Expect(err).To(BeNil()) + bucketArn = "arn:aws:s3:::" + bucketName + + // Create CloudWatch Log Group + logsClient = cloudwatchlogs.New(session.Must(session.NewSession(&aws.Config{Region: aws.String(config.Region)}))) + _, err = logsClient.CreateLogGroupWithContext(ctx, &cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: aws.String(logGroupName), + }) + Expect(err).To(BeNil()) + logGroupArn = fmt.Sprintf("arn:aws:logs:%s:%s:log-group:%s:*", config.Region, config.AccountID, logGroupName) + + // Create IAM Role for Firehose Delivery Stream + iamClient = iam.New(session.Must(session.NewSession(&aws.Config{Region: aws.String(config.Region)}))) + createRoleOutput, err := iamClient.CreateRoleWithContext(ctx, &iam.CreateRoleInput{ + RoleName: aws.String(deliveryStreamRoleName), + AssumeRolePolicyDocument: aws.String(deliveryStreamAssumeRolePolicy), + }) + Expect(err).To(BeNil()) + roleArn = *createRoleOutput.Role.Arn + + // Attach S3 permissions to IAM Role + _, err = iamClient.PutRolePolicyWithContext(ctx, &iam.PutRolePolicyInput{ + RoleName: aws.String(deliveryStreamRoleName), + PolicyName: aws.String("FirehoseS3Permissions"), + PolicyDocument: aws.String(deliveryStreamRolePolicy), + }) + Expect(err).To(BeNil()) + + // Wait for permissions to propagate + time.Sleep(30 * time.Second) + + // Create Firehose Delivery Stream + firehoseClient = firehose.New(session.Must(session.NewSession(&aws.Config{Region: aws.String(config.Region)}))) + _, err = firehoseClient.CreateDeliveryStreamWithContext(ctx, &firehose.CreateDeliveryStreamInput{ + DeliveryStreamName: aws.String(deliveryStreamName), + DeliveryStreamType: aws.String(firehose.DeliveryStreamTypeDirectPut), + ExtendedS3DestinationConfiguration: &firehose.ExtendedS3DestinationConfiguration{ + BucketARN: aws.String(bucketArn), + RoleARN: aws.String(roleArn), + }, + }) + Expect(err).To(BeNil()) + describeDeliveryStreamOutput, err := firehoseClient.DescribeDeliveryStreamWithContext(ctx, &firehose.DescribeDeliveryStreamInput{ + DeliveryStreamName: aws.String(deliveryStreamName), + }) + deliveryStreamArn = *describeDeliveryStreamOutput.DeliveryStreamDescription.DeliveryStreamARN + + // Create HTTP Route, Service, and Deployment + httpDeployment, httpK8sService = testFramework.NewNginxApp(test.ElasticSearchOptions{ + Name: k8sResourceName, + Namespace: k8snamespace, + }) + httpRoute = testFramework.NewHttpRoute(testGateway, httpK8sService, "Service") + testFramework.ExpectCreated(ctx, httpRoute, httpDeployment, httpK8sService) + + // Create GRPC Route, Service, and Deployment + grpcAppOptions := test.GrpcAppOptions{AppName: k8sResourceName, Namespace: k8snamespace} + grpcDeployment, grpcK8sService = testFramework.NewGrpcBin(grpcAppOptions) + grpcRouteRules := []gwv1alpha2.GRPCRouteRule{ + { + BackendRefs: []gwv1alpha2.GRPCBackendRef{ + { + BackendRef: gwv1alpha2.BackendRef{ + BackendObjectReference: gwv1beta1.BackendObjectReference{ + Name: gwv1alpha2.ObjectName(grpcK8sService.Name), + Namespace: lo.ToPtr(gwv1beta1.Namespace(grpcK8sService.Namespace)), + Kind: (*gwv1beta1.Kind)(lo.ToPtr("Service")), + Port: lo.ToPtr(gwv1beta1.PortNumber(19000)), + }, + }, + }, + }, + }, + } + grpcRoute = testFramework.NewGRPCRoute(k8snamespace, testGateway, grpcRouteRules) + testFramework.ExpectCreated(ctx, grpcRoute, grpcDeployment, grpcK8sService) + }) + + It("creates an access log subscription for Service Network when targetRef is Gateway", func() { + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + g.Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + + // Service Network should have Access Log Subscription with S3 Bucket destination + output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + }) + g.Expect(err).To(BeNil()) + g.Expect(len(output.Items)).To(BeEquivalentTo(1)) + g.Expect(output.Items[0].ResourceId).To(BeEquivalentTo(testServiceNetwork.Id)) + g.Expect(*output.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + }).Should(Succeed()) + }) + + It("creates an access log subscription for VPC Lattice Service when targetRef is HTTPRoute", func() { + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "HTTPRoute", + Name: gwv1alpha2.ObjectName(httpRoute.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + g.Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + + // VPC Lattice Service should have Access Log Subscription with S3 Bucket destination + latticeService := testFramework.GetVpcLatticeService(ctx, core.NewHTTPRoute(*httpRoute)) + output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: latticeService.Arn, + }) + g.Expect(err).To(BeNil()) + g.Expect(len(output.Items)).To(BeEquivalentTo(1)) + g.Expect(output.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) + g.Expect(*output.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + }).Should(Succeed()) + }) + + It("creates an access log subscription for VPC Lattice Service when targetRef is GRPCRoute", func() { + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "GRPCRoute", + Name: gwv1alpha2.ObjectName(grpcRoute.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + + Eventually(func(g Gomega) { + // Policy status should be Accepted + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + g.Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + + // VPC Lattice Service should have Access Log Subscription with S3 Bucket destination + latticeService := testFramework.GetVpcLatticeService(ctx, core.NewGRPCRoute(*grpcRoute)) + output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: latticeService.Arn, + }) + g.Expect(err).To(BeNil()) + g.Expect(len(output.Items)).To(BeEquivalentTo(1)) + g.Expect(output.Items[0].ResourceId).To(BeEquivalentTo(latticeService.Id)) + g.Expect(*output.Items[0].DestinationArn).To(BeEquivalentTo(bucketArn)) + }).Should(Succeed()) + }) + + It("creates access log subscriptions with Bucket, Log Group, and Delivery Stream destinations on the same targetRef", func() { + // Create Access Log Policy for S3 Bucket + s3AccessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName + "-bucket", + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, s3AccessLogPolicy) + + // Create Access Log Policy for CloudWatch Log Group + cwAccessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName + "-log-group", + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(logGroupArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, cwAccessLogPolicy) + + // Create Access Log Policy for Firehose Delivery Stream + fhAccessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName + "-delivery-stream", + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(deliveryStreamArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, fhAccessLogPolicy) + + // Service Network should have Access Log Subscription for each destination type + Eventually(func(g Gomega) { + output, err := testFramework.LatticeClient.ListAccessLogSubscriptions(&vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + }) + g.Expect(err).To(BeNil()) + g.Expect(len(output.Items)).To(BeEquivalentTo(3)) + + getDestinationArn := func(s *vpclattice.AccessLogSubscriptionSummary) string { + return *s.DestinationArn + } + + g.Expect(output.Items).To(ContainElement(WithTransform(getDestinationArn, Equal(bucketArn)))) + g.Expect(output.Items).To(ContainElement(WithTransform(getDestinationArn, Equal(logGroupArn)))) + g.Expect(output.Items).To(ContainElement(WithTransform(getDestinationArn, Equal(deliveryStreamArn)))) + }).Should(Succeed()) + + // Every Access Log Policy status should be Accepted + for _, accessLogPolicy := range []*anv1alpha1.AccessLogPolicy{s3AccessLogPolicy, cwAccessLogPolicy} { + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + Expect(err).To(BeNil()) + Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionTrue)) + Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonAccepted))) + Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + } + }) + + It("sets Access Log Policy status to Conflicted when creating a new policy for the same targetRef and destination type", func() { + accessLogPolicy1 := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName + "-1", + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy1) + + accessLogPolicy2 := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName + "-2", + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy2) + + // Policy status should be Conflicted + Eventually(func(g Gomega) { + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy2.Name, + Namespace: accessLogPolicy2.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonConflicted))) + g.Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + }).Should(Succeed()) + }) + + It("sets Access Log Policy status to Invalid when the destination does not exist", func() { + accessLogPolicy := &anv1alpha1.AccessLogPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: k8sResourceName, + Namespace: k8snamespace, + }, + Spec: anv1alpha1.AccessLogPolicySpec{ + DestinationArn: aws.String(bucketArn + "foo"), + TargetRef: &gwv1alpha2.PolicyTargetReference{ + Group: gwv1beta1.GroupName, + Kind: "Gateway", + Name: gwv1alpha2.ObjectName(testGateway.Name), + Namespace: (*gwv1alpha2.Namespace)(aws.String(k8snamespace)), + }, + }, + } + testFramework.ExpectCreated(ctx, accessLogPolicy) + + // Policy status should be Invalid + Eventually(func(g Gomega) { + alpNamespacedName := types.NamespacedName{ + Name: accessLogPolicy.Name, + Namespace: accessLogPolicy.Namespace, + } + alp := &anv1alpha1.AccessLogPolicy{} + err := testFramework.Client.Get(ctx, alpNamespacedName, alp) + g.Expect(err).To(BeNil()) + g.Expect(len(alp.Status.Conditions)).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Type).To(BeEquivalentTo(string(gwv1alpha2.PolicyConditionAccepted))) + g.Expect(alp.Status.Conditions[0].Status).To(BeEquivalentTo(metav1.ConditionFalse)) + g.Expect(alp.Status.Conditions[0].ObservedGeneration).To(BeEquivalentTo(1)) + g.Expect(alp.Status.Conditions[0].Reason).To(BeEquivalentTo(string(gwv1alpha2.PolicyReasonInvalid))) + g.Expect(alp.Status.Conditions[0].Message).To(BeEquivalentTo(config.LatticeGatewayControllerName)) + }).Should(Succeed()) + }) + + AfterEach(func() { + // TODO: Remove this block when DeleteAccessLogPolicy reconciliation is added + // Delete Access Log Subscriptions on test Service Network + listInput := &vpclattice.ListAccessLogSubscriptionsInput{ + ResourceIdentifier: testServiceNetwork.Arn, + } + output, err := testFramework.LatticeClient.ListAccessLogSubscriptionsWithContext(ctx, listInput) + Expect(err).To(BeNil()) + for _, als := range output.Items { + deleteInput := &vpclattice.DeleteAccessLogSubscriptionInput{ + AccessLogSubscriptionIdentifier: als.Arn, + } + _, err := testFramework.LatticeClient.DeleteAccessLogSubscriptionWithContext(ctx, deleteInput) + Expect(err).To(BeNil()) + } + + // Delete Access Log Policies in test namespace + alps := &anv1alpha1.AccessLogPolicyList{} + err = testFramework.Client.List(ctx, alps, client.InNamespace(k8snamespace)) + Expect(err).To(BeNil()) + for _, alp := range alps.Items { + testFramework.ExpectDeletedThenNotFound(ctx, &alp) + } + }) + + AfterAll(func() { + // Delete Kubernetes Routes, Services, and Deployments + testFramework.ExpectDeleted(ctx, httpRoute, grpcRoute) + testFramework.SleepForRouteDeletion() + testFramework.ExpectDeletedThenNotFound(ctx, + httpRoute, + grpcRoute, + httpK8sService, + grpcK8sService, + httpDeployment, + grpcDeployment, + ) + + // Delete S3 Bucket contents + output, err := s3Client.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + }) + Expect(err).To(BeNil()) + for _, object := range output.Contents { + _, err := s3Client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: object.Key, + }) + Expect(err).To(BeNil()) + } + + // Delete S3 Bucket + _, err = s3Client.DeleteBucketWithContext(ctx, &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + Expect(err).To(BeNil()) + + // Delete CloudWatch Log Group + _, err = logsClient.DeleteLogGroupWithContext(ctx, &cloudwatchlogs.DeleteLogGroupInput{ + LogGroupName: aws.String(logGroupName), + }) + Expect(err).To(BeNil()) + + // Delete Firehose Delivery Stream + Eventually(func(g Gomega) { + describeDeliveryStreamOutput, err := firehoseClient.DescribeDeliveryStreamWithContext(ctx, &firehose.DescribeDeliveryStreamInput{ + DeliveryStreamName: aws.String(deliveryStreamName), + }) + Expect(err).To(BeNil()) + Expect(*describeDeliveryStreamOutput.DeliveryStreamDescription.DeliveryStreamStatus).To(BeEquivalentTo(firehose.DeliveryStreamStatusActive)) + }) + _, err = firehoseClient.DeleteDeliveryStreamWithContext(ctx, &firehose.DeleteDeliveryStreamInput{ + DeliveryStreamName: aws.String(deliveryStreamName), + }) + Expect(err).To(BeNil()) + + // Detach managed policies from IAM Role + policies, err := iamClient.ListAttachedRolePoliciesWithContext(ctx, &iam.ListAttachedRolePoliciesInput{ + RoleName: aws.String(deliveryStreamRoleName), + }) + Expect(err).To(BeNil()) + for _, policy := range policies.AttachedPolicies { + _, err := iamClient.DetachRolePolicyWithContext(ctx, &iam.DetachRolePolicyInput{ + RoleName: aws.String(deliveryStreamRoleName), + PolicyArn: policy.PolicyArn, + }) + Expect(err).To(BeNil()) + } + + // Delete inline policies from IAM Role + inlinePolicies, err := iamClient.ListRolePoliciesWithContext(ctx, &iam.ListRolePoliciesInput{ + RoleName: aws.String(deliveryStreamRoleName), + }) + Expect(err).To(BeNil()) + for _, policyName := range inlinePolicies.PolicyNames { + _, err := iamClient.DeleteRolePolicyWithContext(ctx, &iam.DeleteRolePolicyInput{ + RoleName: aws.String(deliveryStreamRoleName), + PolicyName: policyName, + }) + Expect(err).To(BeNil()) + } + + // Delete IAM Role + _, err = iamClient.DeleteRoleWithContext(ctx, &iam.DeleteRoleInput{ + RoleName: aws.String(deliveryStreamRoleName), + }) + Expect(err).To(BeNil()) + }) +})