Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Support for Updating Access Log Policies #442

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,18 @@ func (r *accessLogPolicyReconciler) reconcileUpsert(ctx context.Context, alp *an
return err
}

if alp.Spec.TargetRef.Namespace != nil && string(*alp.Spec.TargetRef.Namespace) != alp.Namespace {
message := "The targetRef's namespace does not match the access log policy's namespace"
return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonInvalid, message)
}

targetRefExists, err := r.targetRefExists(ctx, alp)
if err != nil {
return err
}
if !targetRefExists {
message := "The targetRef could not be found"
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound, message)
if err != nil {
return err
}
return nil
return r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound, message)
}

stack, err := r.buildAndDeployModel(ctx, alp)
Expand Down Expand Up @@ -272,7 +273,7 @@ func (r *accessLogPolicyReconciler) updateAccessLogPolicyAnnotations(
}

for _, als := range accessLogSubscriptions {
if als.Spec.EventType == core.CreateEvent {
if als.Spec.EventType != core.DeleteEvent {
oldAlp := alp.DeepCopy()
if alp.ObjectMeta.Annotations == nil {
alp.ObjectMeta.Annotations = make(map[string]string)
Expand Down
212 changes: 150 additions & 62 deletions pkg/deploy/lattice/access_log_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

type AccessLogSubscriptionManager interface {
Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error)
Delete(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) error
Update(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error)
Delete(ctx context.Context, accessLogSubscriptionArn string) error
}

type defaultAccessLogSubscriptionManager struct {
Expand All @@ -42,97 +43,143 @@ func (m *defaultAccessLogSubscriptionManager) Create(
) (*lattice.AccessLogSubscriptionStatus, error) {
vpcLatticeSess := m.cloud.Lattice()

var resourceIdentifier string
switch accessLogSubscription.Spec.SourceType {
case lattice.ServiceNetworkSourceType:
serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, accessLogSubscription.Spec.SourceName, config.AccountID)
if err != nil {
return nil, err
}
resourceIdentifier = *serviceNetwork.SvcNetwork.Arn
case lattice.ServiceSourceType:
serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(accessLogSubscription.Spec.SourceName)
service, err := vpcLatticeSess.FindService(ctx, serviceNameProvider)
if err != nil {
return nil, err
}
resourceIdentifier = *service.Arn
default:
return nil, fmt.Errorf("unsupported source type: %s", accessLogSubscription.Spec.SourceType)
sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName)
if err != nil {
return nil, err
}

tags := m.cloud.DefaultTagsMergedWith(services.Tags{
lattice.AccessLogPolicyTagKey: aws.String(accessLogSubscription.Spec.ALPNamespacedName.String()),
})

createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{
ResourceIdentifier: &resourceIdentifier,
ResourceIdentifier: sourceArn,
DestinationArn: &accessLogSubscription.Spec.DestinationArn,
Tags: tags,
}

createALSOutput, err := vpcLatticeSess.CreateAccessLogSubscriptionWithContext(ctx, createALSInput)
if err == nil {
return &lattice.AccessLogSubscriptionStatus{
Arn: *createALSOutput.Arn,
}, nil
}

switch e := err.(type) {
case *vpclattice.AccessDeniedException:
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ResourceNotFoundException:
if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" {
return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName)
}
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ConflictException:
/*
* Conflict may arise if we retry creation due to a failure elsewhere in the controller,
* so we check if the conflicting ALS was created for the same ALP via its tags.
* If it is the same ALP, return success. Else, return ConflictError.
*/
listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{
ResourceIdentifier: sourceArn,
}
listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput)
if err != nil {
return nil, err
}
for _, als := range listALSOutput.Items {
if *als.DestinationArn == accessLogSubscription.Spec.DestinationArn {
listTagsInput := &vpclattice.ListTagsForResourceInput{
ResourceArn: als.Arn,
}
listTagsOutput, err := vpcLatticeSess.ListTagsForResourceWithContext(ctx, listTagsInput)
if err != nil {
return nil, err
}
value, exists := listTagsOutput.Tags[lattice.AccessLogPolicyTagKey]
if exists && *value == accessLogSubscription.Spec.ALPNamespacedName.String() {
return &lattice.AccessLogSubscriptionStatus{
Arn: *als.Arn,
}, nil
}
}
}
return nil, services.NewConflictError(
string(accessLogSubscription.Spec.SourceType),
accessLogSubscription.Spec.SourceName,
e.Message(),
)
default:
return nil, err
}
}

func (m *defaultAccessLogSubscriptionManager) Update(
ctx context.Context,
accessLogSubscription *lattice.AccessLogSubscription,
) (*lattice.AccessLogSubscriptionStatus, error) {
vpcLatticeSess := m.cloud.Lattice()

// If the source is modified, we need to replace the ALS
getALSInput := &vpclattice.GetAccessLogSubscriptionInput{
AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn),
}
getALSOutput, err := vpcLatticeSess.GetAccessLogSubscriptionWithContext(ctx, getALSInput)
if err != nil {
switch e := err.(type) {
case *vpclattice.AccessDeniedException:
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ResourceNotFoundException:
if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" {
return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName)
}
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ConflictException:
/*
* Conflict may arise if we retry creation due to a failure elsewhere in the controller,
* so we check if the conflicting ALS was created for the same ALP via its tags.
* If it is the same ALP, return success. Else, return ConflictError.
*/
listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{
ResourceIdentifier: &resourceIdentifier,
}
listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput)
if err != nil {
return nil, err
}
for _, als := range listALSOutput.Items {
if *als.DestinationArn == accessLogSubscription.Spec.DestinationArn {
listTagsInput := &vpclattice.ListTagsForResourceInput{
ResourceArn: als.Arn,
}
listTagsOutput, err := vpcLatticeSess.ListTagsForResourceWithContext(ctx, listTagsInput)
if err != nil {
return nil, err
}
value, exists := listTagsOutput.Tags[lattice.AccessLogPolicyTagKey]
if exists && *value == accessLogSubscription.Spec.ALPNamespacedName.String() {
return &lattice.AccessLogSubscriptionStatus{
Arn: *als.Arn,
}, nil
}
}
}
return nil, services.NewConflictError(
string(accessLogSubscription.Spec.SourceType),
accessLogSubscription.Spec.SourceName,
e.Message(),
)
default:
return nil, err
}
}
sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName)
if err != nil {
return nil, err
}
if *getALSOutput.ResourceArn != *sourceArn {
return m.replaceAccessLogSubscription(ctx, accessLogSubscription)
}

// Source is not modified, try to update destinationArn in the existing ALS
updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{
AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn),
DestinationArn: aws.String(accessLogSubscription.Spec.DestinationArn),
}
updateALSOutput, err := vpcLatticeSess.UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput)
if err == nil {
return &lattice.AccessLogSubscriptionStatus{
Arn: *updateALSOutput.Arn,
}, nil
}

return &lattice.AccessLogSubscriptionStatus{
Arn: *createALSOutput.Arn,
}, nil
switch e := err.(type) {
case *vpclattice.AccessDeniedException:
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ResourceNotFoundException:
if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" {
return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName)
}
return nil, services.NewInvalidError(e.Message())
case *vpclattice.ConflictException:
/*
* A conflict can happen when the destination type of the new ALS is different from the original.
* To gracefully handle this, we create a new ALS with the new destination, then delete the old one.
*/
return m.replaceAccessLogSubscription(ctx, accessLogSubscription)
default:
return nil, err
}
}

func (m *defaultAccessLogSubscriptionManager) Delete(
ctx context.Context,
accessLogSubscription *lattice.AccessLogSubscription,
accessLogSubscriptionArn string,
) error {
vpcLatticeSess := m.cloud.Lattice()
deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{
AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn),
AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn),
}
_, err := vpcLatticeSess.DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput)
if err != nil {
Expand All @@ -142,3 +189,44 @@ func (m *defaultAccessLogSubscriptionManager) Delete(
}
return nil
}

func (m *defaultAccessLogSubscriptionManager) getSourceArn(
ctx context.Context,
sourceType lattice.SourceType,
sourceName string,
) (*string, error) {
vpcLatticeSess := m.cloud.Lattice()

switch sourceType {
case lattice.ServiceNetworkSourceType:
serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, sourceName, config.AccountID)
if err != nil {
return nil, err
}
return serviceNetwork.SvcNetwork.Arn, nil
case lattice.ServiceSourceType:
serviceNameProvider := services.NewDefaultLatticeServiceNameProvider(sourceName)
service, err := vpcLatticeSess.FindService(ctx, serviceNameProvider)
if err != nil {
return nil, err
}
return service.Arn, nil
default:
return nil, fmt.Errorf("unsupported source type: %s", sourceType)
}
}

func (m *defaultAccessLogSubscriptionManager) replaceAccessLogSubscription(
ctx context.Context,
accessLogSubscription *lattice.AccessLogSubscription,
) (*lattice.AccessLogSubscriptionStatus, error) {
newAlsStatus, err := m.Create(ctx, accessLogSubscription)
if err != nil {
return nil, err
}
err = m.Delete(ctx, accessLogSubscription.Status.Arn)
if err != nil {
return nil, err
}
return newAlsStatus, nil
}
17 changes: 16 additions & 1 deletion pkg/deploy/lattice/access_log_subscription_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading