Skip to content

Commit

Permalink
refactor: allow variants to implement their own k8s providerID parsin…
Browse files Browse the repository at this point in the history
…g logic

== Motivation ==

Allow further variant specific customization

== Details ==

This change adds the ability for variants to implement their own logic
to parse a k8s providerID into an identifier specific to that variant.
This is done by adding a new method 'NodeId' on each variant that takes
the providerID (in pre-parsed url format) and returns a respective NodeId.

NodeId is a refactor of the previous InstanceId which was inadequately named
for variants other than EC2 instances (e.g. fargate). I have also taken this
opportunity to add a bit more strong typing to the variant methods to make
these methods a bit easier to grok and safer to implement. I have also renamed
the KubernetesInstanceId to KubernetesProviderId which felt a more adequate name
as well. Finally, in order to prevent circular depedencies and to maintain
(what I felt) was a more logical concept of a NodeId, we have created a new
submodule in the v1 package named 'awsnode' which contains the NodeId type.
This allows both the base v1 module and the variant submodule to use this
NodeId type withou circular dependencies.

== Testing ==

make
  • Loading branch information
sanjams2 committed Jun 4, 2024
1 parent cea2af6 commit 2bb8e6f
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 96 deletions.
10 changes: 5 additions & 5 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,17 @@ func (tc *Controller) process() bool {
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesProviderId(workItem.node.Spec.ProviderID).MapToAWSNodeId()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
utilruntime.HandleError(err)
return nil
}
klog.Infof("Instance ID of work item %s is %s", workItem, instanceID)

if variant.IsVariantNode(string(instanceID)) {
if variant.IsVariantNode(instanceID) {
klog.Infof("Skip processing the node %s since it is a %s node",
instanceID, variant.NodeType(string(instanceID)))
instanceID, variant.NodeType(instanceID))
tc.workqueue.Forget(obj)
return nil
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {
return nil
}

instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, _ := awsv1.KubernetesProviderId(node.Spec.ProviderID).MapToAWSNodeId()

err := tc.cloud.TagResource(string(instanceID), tc.tags)

Expand Down Expand Up @@ -349,7 +349,7 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error {
// untagEc2Instances deletes the provided tags to each EC2 instances in
// the cluster.
func (tc *Controller) untagEc2Instance(node *v1.Node) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, _ := awsv1.KubernetesProviderId(node.Spec.ProviderID).MapToAWSNodeId()

err := tc.cloud.UntagResource(string(instanceID), tc.tags)

Expand Down
53 changes: 27 additions & 26 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"k8s.io/cloud-provider-aws/pkg/providers/v1/awsnode"
"net"
"regexp"
"sort"
Expand Down Expand Up @@ -418,7 +419,7 @@ func InstanceIDIndexFunc(obj interface{}) ([]string, error) {
// provider ID hasn't been populated yet
return []string{""}, nil
}
instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(node.Spec.ProviderID).MapToAWSNodeId()
if err != nil {
//logging the error as warning as Informer.AddIndexers would panic if there is an error
klog.Warningf("error mapping node %q's provider ID %q to instance ID: %v", node.Name, node.Spec.ProviderID, err)
Expand Down Expand Up @@ -832,16 +833,16 @@ func extractIPv6NodeAddresses(instance *ec2.Instance) ([]v1.NodeAddress, error)
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
instanceID, err := KubernetesInstanceID(providerID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(providerID).MapToAWSNodeId()
if err != nil {
return nil, err
}

if v := variant.GetVariant(string(instanceID)); v != nil {
return v.NodeAddresses(string(instanceID), c.vpcID)
if v := variant.GetVariant(instanceID); v != nil {
return v.NodeAddresses(instanceID, c.vpcID)
}

instance, err := describeInstance(c.ec2, instanceID)
instance, err := describeInstance(c.ec2, string(instanceID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -871,17 +872,17 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
instanceID, err := KubernetesInstanceID(providerID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(providerID).MapToAWSNodeId()
if err != nil {
return false, err
}

if v := variant.GetVariant(string(instanceID)); v != nil {
return v.InstanceExists(string(instanceID), c.vpcID)
if v := variant.GetVariant(instanceID); v != nil {
return v.InstanceExists(instanceID, c.vpcID)
}

request := &ec2.DescribeInstancesInput{
InstanceIds: []*string{instanceID.awsString()},
InstanceIds: []*string{instanceID.AwsString()},
}

instances, err := c.ec2.DescribeInstances(request)
Expand Down Expand Up @@ -910,17 +911,17 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin

// InstanceShutdownByProviderID returns true if the instance is terminated
func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
instanceID, err := KubernetesInstanceID(providerID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(providerID).MapToAWSNodeId()
if err != nil {
return false, err
}

if v := variant.GetVariant(string(instanceID)); v != nil {
return v.InstanceShutdown(string(instanceID), c.vpcID)
if v := variant.GetVariant(instanceID); v != nil {
return v.InstanceShutdown(instanceID, c.vpcID)
}

request := &ec2.DescribeInstancesInput{
InstanceIds: []*string{instanceID.awsString()},
InstanceIds: []*string{instanceID.AwsString()},
}

instances, err := c.ec2.DescribeInstances(request)
Expand Down Expand Up @@ -969,16 +970,16 @@ func (c *Cloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instanceID, err := KubernetesInstanceID(providerID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(providerID).MapToAWSNodeId()
if err != nil {
return "", err
}

if v := variant.GetVariant(string(instanceID)); v != nil {
return v.InstanceTypeByProviderID(string(instanceID))
if v := variant.GetVariant(instanceID); v != nil {
return v.InstanceTypeByProviderID(instanceID)
}

instance, err := describeInstance(c.ec2, instanceID)
instance, err := describeInstance(c.ec2, string(instanceID))
if err != nil {
return "", err
}
Expand Down Expand Up @@ -1010,13 +1011,13 @@ func (c *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
instanceID, err := KubernetesInstanceID(providerID).MapToAWSInstanceID()
instanceID, err := KubernetesProviderId(providerID).MapToAWSNodeId()
if err != nil {
return cloudprovider.Zone{}, err
}

if v := variant.GetVariant(string(instanceID)); v != nil {
return v.GetZone(string(instanceID), c.vpcID, c.region)
if v := variant.GetVariant(instanceID); v != nil {
return v.GetZone(instanceID, c.vpcID, c.region)
}

instance, err := c.getInstanceByID(string(instanceID))
Expand Down Expand Up @@ -2651,7 +2652,7 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error)

// Open security group ingress rules on the instances so that the load balancer can talk to them
// Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances
func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, instances map[InstanceID]*ec2.Instance, annotations map[string]string) error {
func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, instances map[awsnode.NodeId]*ec2.Instance, annotations map[string]string) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
Expand Down Expand Up @@ -3228,15 +3229,15 @@ func nodeNameToIPAddress(nodeName string) string {
return strings.ReplaceAll(nodeName, "-", ".")
}

func (c *Cloud) nodeNameToInstanceID(nodeName types.NodeName) (InstanceID, error) {
func (c *Cloud) nodeNameToInstanceID(nodeName types.NodeName) (awsnode.NodeId, error) {
if strings.HasPrefix(string(nodeName), rbnNamePrefix) {
// depending on if you use a RHEL (e.g. AL2) or Debian (e.g. standard Ubuntu) based distribution, the
// hostname on the machine may be either i-00000000000000001 or i-00000000000000001.region.compute.internal.
// This handles both scenarios by returning anything before the first '.' in the node name if it has an RBN prefix.
if idx := strings.IndexByte(string(nodeName), '.'); idx != -1 {
return InstanceID(nodeName[0:idx]), nil
return awsnode.NodeId(nodeName[0:idx]), nil
}
return InstanceID(nodeName), nil
return awsnode.NodeId(nodeName), nil
}
if len(nodeName) == 0 {
return "", fmt.Errorf("no nodeName provided")
Expand All @@ -3254,10 +3255,10 @@ func (c *Cloud) nodeNameToInstanceID(nodeName types.NodeName) (InstanceID, error
return "", fmt.Errorf("node has no providerID")
}

return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
return KubernetesProviderId(node.Spec.ProviderID).MapToAWSNodeId()
}

func (c *Cloud) instanceIDToNodeName(instanceID InstanceID) (types.NodeName, error) {
func (c *Cloud) instanceIDToNodeName(instanceID awsnode.NodeId) (types.NodeName, error) {
if len(instanceID) == 0 {
return "", fmt.Errorf("no instanceID provided")
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/providers/v1/aws_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/apimachinery/pkg/types"

"k8s.io/cloud-provider-aws/pkg/providers/v1/iface"
)

Expand Down Expand Up @@ -67,5 +66,5 @@ func newAWSInstance(ec2Service iface.EC2, instance *ec2.Instance) *awsInstance {

// Gets the full information about this instance from the EC2 API
func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
return describeInstance(i.ec2, InstanceID(i.awsID))
return describeInstance(i.ec2, i.awsID)
}
7 changes: 4 additions & 3 deletions pkg/providers/v1/aws_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/hex"
"fmt"
"k8s.io/cloud-provider-aws/pkg/providers/v1/awsnode"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -781,7 +782,7 @@ func (c *Cloud) chunkTargetDescriptions(targets []*elbv2.TargetDescription, chun

// updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings.
// TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared.
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[awsnode.NodeId]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
Expand Down Expand Up @@ -1430,7 +1431,7 @@ func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDesc
}

// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[InstanceID]*ec2.Instance) error {
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[awsnode.NodeId]*ec2.Instance) error {
expected := sets.NewString()
for id := range instanceIDs {
expected.Insert(string(id))
Expand Down Expand Up @@ -1607,7 +1608,7 @@ func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool {
// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB
// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider,
// and we ignore instances which are not found
func (c *Cloud) findInstancesForELB(nodes []*v1.Node, annotations map[string]string) (map[InstanceID]*ec2.Instance, error) {
func (c *Cloud) findInstancesForELB(nodes []*v1.Node, annotations map[string]string) (map[awsnode.NodeId]*ec2.Instance, error) {

targetNodes := filterTargetNodes(nodes, annotations)

Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/v1/aws_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package aws
import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/cloud-provider-aws/pkg/providers/v1/awsnode"
"k8s.io/klog/v2"

cloudprovider "k8s.io/cloud-provider"
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpro
if instanceID != "" {
_, found := instances[instanceID]
if found {
node, err := c.instanceIDToNodeName(InstanceID(instanceID))
node, err := c.instanceIDToNodeName(awsnode.NodeId(instanceID))
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/providers/v1/awsnode/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package awsnode

import "github.com/aws/aws-sdk-go/aws"

type NodeId string

func (i NodeId) AwsString() *string {
return aws.String(string(i))
}
Loading

0 comments on commit 2bb8e6f

Please sign in to comment.