Skip to content

Commit

Permalink
Support Creating Access Log Policies (#430)
Browse files Browse the repository at this point in the history
Added support for creating Access Log Policies

---------

Co-authored-by: Shawn Kaplan <[email protected]>
  • Loading branch information
xWink and Shawn Kaplan authored Oct 13, 2023
1 parent 24cedb3 commit c3d0ab9
Show file tree
Hide file tree
Showing 17 changed files with 1,961 additions and 34 deletions.
104 changes: 87 additions & 17 deletions controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ import (
"context"
"fmt"

"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/deploy"
"github.com/aws/aws-application-networking-k8s/pkg/gateway"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
Expand All @@ -50,6 +56,7 @@ type accessLogPolicyReconciler struct {
scheme *runtime.Scheme
finalizerManager k8s.FinalizerManager
eventRecorder record.EventRecorder
modelBuilder gateway.AccessLogSubscriptionModelBuilder
stackDeployer deploy.StackDeployer
cloud aws.Cloud
stackMarshaller deploy.StackMarshaller
Expand All @@ -65,7 +72,8 @@ func RegisterAccessLogPolicyController(
scheme := mgr.GetScheme()
evtRec := mgr.GetEventRecorderFor("accesslogpolicy")

stackDeployer := deploy.NewServiceNetworkStackDeployer(log, cloud, mgrClient)
modelBuilder := gateway.NewAccessLogSubscriptionModelBuilder(log, mgrClient)
stackDeployer := deploy.NewAccessLogSubscriptionStackDeployer(log, cloud, mgrClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()

r := &accessLogPolicyReconciler{
Expand All @@ -74,13 +82,14 @@ func RegisterAccessLogPolicyController(
scheme: scheme,
finalizerManager: finalizerManager,
eventRecorder: evtRec,
modelBuilder: modelBuilder,
stackDeployer: stackDeployer,
cloud: cloud,
stackMarshaller: stackMarshaller,
}

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.AccessLogPolicy{})
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))

return builder.Complete(r)
}
Expand All @@ -105,6 +114,24 @@ func (r *accessLogPolicyReconciler) reconcile(ctx context.Context, req ctrl.Requ
return client.IgnoreNotFound(err)
}

if alp.Spec.TargetRef.Group != gwv1beta1.GroupName {
message := "The targetRef's Group must be " + gwv1beta1.GroupName
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid, message)
if err != nil {
return err
}
return nil
}

if !slices.Contains([]string{"Gateway", "HTTPRoute", "GRPCRoute"}, string(alp.Spec.TargetRef.Kind)) {
message := "The targetRef's Kind must be Gateway, HTTPRoute, or GRPCRoute"
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid, message)
if err != nil {
return err
}
return nil
}

if !alp.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, alp)
} else {
Expand All @@ -128,27 +155,40 @@ func (r *accessLogPolicyReconciler) reconcileUpsert(ctx context.Context, alp *an
return err
}

if !r.targetRefExists(ctx, alp) {
r.log.Infof("Could not find Acces Log Policy targetRef %s %s",
alp.Spec.TargetRef.Kind, alp.Spec.TargetRef.Name)
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound)
targetRefExists, err := r.targetRefExists(ctx, alp)
if err != nil {
return err
}
if !targetRefExists {
message := "The targetRef could not be found"
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound, message)
if err != nil {
return fmt.Errorf("failed to update Access Log Policy status, %w", err)
return err
}
return nil
}

// TODO: Create VPC Lattice Access Log Subscription
err = r.buildAndDeployModel(ctx, alp)
if err != nil {
if services.IsConflictError(err) {
message := "An Access Log Policy with a destinationArn for the same destination type already exists for this targetRef"
return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonConflicted, message)
} else if services.IsInvalidError(err) {
message := "The AWS resource with the provided destinationArn could not be found"
return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid, message)
}
return err
}

err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonAccepted)
err = r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonAccepted, config.LatticeGatewayControllerName)
if err != nil {
return fmt.Errorf("failed to update Access Log Policy status, %w", err)
return err
}

return nil
}

func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) bool {
func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) (bool, error) {
targetRefNamespace := alp.Namespace
if alp.Spec.TargetRef.Namespace != nil {
targetRefNamespace = string(*alp.Spec.TargetRef.Namespace)
Expand All @@ -163,26 +203,56 @@ func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *an

switch alp.Spec.TargetRef.Kind {
case "Gateway":
gateway := &gwv1beta1.Gateway{}
err = r.client.Get(ctx, targetRefNamespacedName, gateway)
gw := &gwv1beta1.Gateway{}
err = r.client.Get(ctx, targetRefNamespacedName, gw)
case "HTTPRoute":
httpRoute := &gwv1beta1.HTTPRoute{}
err = r.client.Get(ctx, targetRefNamespacedName, httpRoute)
case "GRPCRoute":
grpcRoute := &gwv1alpha2.GRPCRoute{}
err = r.client.Get(ctx, targetRefNamespacedName, grpcRoute)
default:
r.log.Infof("Access Log Policy targetRef is for an unsupported Kind: %s", alp.Spec.TargetRef.Kind)
return false
return false, fmt.Errorf("access Log Policy targetRef is for an unsupported Kind: %s",
alp.Spec.TargetRef.Kind)
}

return err == nil
if err != nil && !errors.IsNotFound(err) {
return false, err
}

return err == nil, nil
}

func (r *accessLogPolicyReconciler) buildAndDeployModel(
ctx context.Context,
alp *anv1alpha1.AccessLogPolicy,
) error {
stack, _, err := r.modelBuilder.Build(ctx, alp)
if err != nil {
r.eventRecorder.Event(alp, corev1.EventTypeWarning, k8s.AccessLogPolicyEventReasonFailedBuildModel,
fmt.Sprintf("Failed to build model due to %s", err))
return err
}

jsonStack, err := r.stackMarshaller.Marshal(stack)
if err != nil {
return err
}
r.log.Debugw("Successfully built model", "stack", jsonStack)

if err := r.stackDeployer.Deploy(ctx, stack); err != nil {
return err
}
r.log.Debugf("successfully deployed model for stack %s:%s", stack.StackID().Name, stack.StackID().Namespace)

return nil
}

func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus(
ctx context.Context,
alp *anv1alpha1.AccessLogPolicy,
reason gwv1alpha2.PolicyConditionReason,
message string,
) error {
status := metav1.ConditionTrue
if reason != gwv1alpha2.PolicyReasonAccepted {
Expand All @@ -192,7 +262,7 @@ func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus(
alp.Status.Conditions = utils.GetNewConditions(alp.Status.Conditions, metav1.Condition{
Type: string(gwv1alpha2.PolicyConditionAccepted),
ObservedGeneration: alp.Generation,
Message: config.LatticeGatewayControllerName,
Message: message,
Status: status,
Reason: string(reason),
})
Expand Down
50 changes: 50 additions & 0 deletions pkg/aws/services/vpclattice.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ type LatticeServiceNameProvider interface {
LatticeServiceName() string
}

type defaultLatticeServiceNameProvider struct {
name string
}

func NewDefaultLatticeServiceNameProvider(name string) *defaultLatticeServiceNameProvider {
return &defaultLatticeServiceNameProvider{
name: name,
}
}

func (p *defaultLatticeServiceNameProvider) LatticeServiceName() string {
return p.name
}

type NotFoundError struct {
ResourceType string
Name string
Expand All @@ -44,6 +58,42 @@ func IsNotFoundError(err error) bool {
return errors.As(err, &nfErr)
}

type ConflictError struct {
ResourceType string
Name string
Message string
}

func (e *ConflictError) Error() string {
return fmt.Sprintf("%s %s had a conflict: %s", e.ResourceType, e.Name, e.Message)
}

func NewConflictError(resourceType string, name string, message string) error {
return &ConflictError{resourceType, name, message}
}

func IsConflictError(err error) bool {
conflictErr := &ConflictError{}
return errors.As(err, &conflictErr)
}

type InvalidError struct {
Message string
}

func (e *InvalidError) Error() string {
return fmt.Sprintf("Invalid input: %s", e.Message)
}

func NewInvalidError(message string) error {
return &InvalidError{message}
}

func IsInvalidError(err error) bool {
invalidErr := &InvalidError{}
return errors.As(err, &invalidErr)
}

type Lattice interface {
vpclatticeiface.VPCLatticeAPI
ListServiceNetworksAsList(ctx context.Context, input *vpclattice.ListServiceNetworksInput) ([]*vpclattice.ServiceNetworkSummary, error)
Expand Down
89 changes: 89 additions & 0 deletions pkg/deploy/lattice/access_log_subscription_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package lattice

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/service/vpclattice"

"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

//go:generate mockgen -destination access_log_subscription_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice AccessLogSubscriptionManager

type AccessLogSubscriptionManager interface {
Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error)
}

type defaultAccessLogSubscriptionManager struct {
log gwlog.Logger
cloud aws.Cloud
}

func NewAccessLogSubscriptionManager(
log gwlog.Logger,
cloud aws.Cloud,
) *defaultAccessLogSubscriptionManager {
return &defaultAccessLogSubscriptionManager{
log: log,
cloud: cloud,
}
}

func (m *defaultAccessLogSubscriptionManager) Create(
ctx context.Context,
accessLogSubscription *lattice.AccessLogSubscription,
) (*lattice.AccessLogSubscriptionStatus, error) {
vpcLatticeSess := m.cloud.Lattice()

var resourceIdentifier string
switch accessLogSubscription.Spec.SourceType {
case lattice.ServiceNetworkSourceType:
serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, accessLogSubscription.Spec.SourceName, config.AccountID)
if err != nil {
return nil, err
}
resourceIdentifier = *serviceNetwork.SvcNetwork.Arn
case lattice.ServiceSourceType:
serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(accessLogSubscription.Spec.SourceName)
service, err := vpcLatticeSess.FindService(ctx, serviceNameProvider)
if err != nil {
return nil, err
}
resourceIdentifier = *service.Arn
default:
return nil, fmt.Errorf("unsupported source type: %s", accessLogSubscription.Spec.SourceType)
}

createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{
ResourceIdentifier: &resourceIdentifier,
DestinationArn: &accessLogSubscription.Spec.DestinationArn,
Tags: m.cloud.DefaultTags(),
}

createALSOutput, err := vpcLatticeSess.CreateAccessLogSubscriptionWithContext(ctx, createALSInput)
if err != nil {
switch e := err.(type) {
case *vpclattice.ConflictException:
return nil, services.NewConflictError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName, e.Message())
case *vpclattice.AccessDeniedException:
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ResourceNotFoundException:
if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" {
return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName)
}
return nil, services.NewInvalidError(e.Message())
default:
return nil, err
}
}

return &lattice.AccessLogSubscriptionStatus{
Arn: *createALSOutput.Arn,
Id: *createALSOutput.Id,
}, nil
}
Loading

0 comments on commit c3d0ab9

Please sign in to comment.