Skip to content

Commit

Permalink
Enhance subnet selection
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Chen committed Jan 30, 2024
1 parent aed881c commit 7917117
Show file tree
Hide file tree
Showing 11 changed files with 654 additions and 61 deletions.
138 changes: 114 additions & 24 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"os"
"regexp"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -85,7 +86,7 @@ var log = logger.Get()
// APIs defines interfaces calls for adding/getting/deleting ENIs/secondary IPs. The APIs are not thread-safe.
type APIs interface {
// AllocENI creates an ENI and attaches it to the instance
AllocENI(useCustomCfg bool, sg []*string, subnet string, numIPs int) (eni string, err error)
AllocENI(useCustomCfg bool, sg []*string, customSubnet string, numIPs int) (eni string, err error)

// FreeENI detaches ENI interface and deletes it
FreeENI(eniName string) error
Expand Down Expand Up @@ -197,6 +198,7 @@ type EC2InstanceMetadataCache struct {
primaryENImac string
availabilityZone string
region string
vpcID string

unmanagedENIs StringSet
useCustomNetworking bool
Expand Down Expand Up @@ -439,14 +441,22 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context)
}
log.Debugf("%s is the primary ENI of this instance", cache.primaryENI)

// retrieve sub-id
// retrieve subnet-id
cache.subnetID, err = cache.imds.GetSubnetID(ctx, mac)
if err != nil {
awsAPIErrInc("GetSubnetID", err)
return err
}
log.Debugf("Found subnet-id: %s ", cache.subnetID)

// retrieve vpc-id
cache.vpcID, err = cache.imds.GetVpcID(ctx, mac)
if err != nil {
awsAPIErrInc("GetVpcID", err)
return err
}
log.Debugf("Found vpc-id: %s ", cache.vpcID)

// We use the ctx here for testing, since we spawn go-routines above which will run forever.
select {
case <-ctx.Done():
Expand Down Expand Up @@ -724,8 +734,8 @@ func (cache *EC2InstanceMetadataCache) awsGetFreeDeviceNumber() (int, error) {

// AllocENI creates an ENI and attaches it to the instance
// returns: newly created ENI ID
func (cache *EC2InstanceMetadataCache) AllocENI(useCustomCfg bool, sg []*string, subnet string, numIPs int) (string, error) {
eniID, err := cache.createENI(useCustomCfg, sg, subnet, numIPs)
func (cache *EC2InstanceMetadataCache) AllocENI(useCustomCfg bool, sg []*string, customSubnet string, numIPs int) (string, error) {
eniID, err := cache.createENI(useCustomCfg, sg, customSubnet, numIPs)
if err != nil {
return "", errors.Wrap(err, "AllocENI: failed to create ENI")
}
Expand Down Expand Up @@ -796,7 +806,7 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) {
}

// return ENI id, error
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string, numIPs int) (string, error) {
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, customSubnet string, numIPs int) (string, error) {
eniDescription := eniDescriptionPrefix + cache.instanceID
tags := map[string]string{
eniCreatedAtTagKey: time.Now().Format(time.RFC3339),
Expand All @@ -819,6 +829,10 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string

log.Infof("Trying to allocate %d IP addresses on new ENI", needIPs)
log.Debugf("PD enabled - %t", cache.enablePrefixDelegation)

result := &ec2.CreateNetworkInterfaceOutput{}

log.Info("Using same config as the primary interface for the new ENI")
input := &ec2.CreateNetworkInterfaceInput{}

if cache.enablePrefixDelegation {
Expand All @@ -838,33 +852,109 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
SecondaryPrivateIpAddressCount: aws.Int64(int64(needIPs)),
}
}
if useCustomCfg {
log.Info("Using a custom network config for the new ENI")
if len(sg) != 0 {
input.Groups = sg
} else {
log.Warnf("No custom networking security group found, will use the node's primary ENI's SG: %v", aws.StringValueSlice(input.Groups))

var err error
if cache.useCustomNetworking {
input = createEniUsingCustomCfg(sg, customSubnet, input)
log.Infof("Creating ENI with security groups: %v in subnet: %s", aws.StringValueSlice(input.Groups), aws.StringValue(input.SubnetId))

start := time.Now()
result, err = cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input)
prometheusmetrics.Ec2ApiReq.WithLabelValues("CreateNetworkInterface").Inc()
prometheusmetrics.AwsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
if err == nil {
log.Infof("Created a new ENI: %s", aws.StringValue(result.NetworkInterface.NetworkInterfaceId))
return *result.NetworkInterface.NetworkInterfaceId, nil
}
input.SubnetId = aws.String(subnet)
checkAPIErrorAndBroadcastEvent(err, "ec2:CreateNetworkInterface")
awsAPIErrInc("CreateNetworkInterface", err)
prometheusmetrics.Ec2ApiErr.WithLabelValues("CreateNetworkInterface").Inc()
log.Errorf("Failed to CreateNetworkInterface %v for subnet %s", err, customSubnet)
} else {
log.Info("Using same config as the primary interface for the new ENI")
subnetResult, vpcErr := cache.getVpcSubnets()
if vpcErr != nil {
return "", errors.Wrap(vpcErr, "failed to get subnets in vpc")
}

for _, subnet := range subnetResult {
if *subnet.SubnetId != cache.subnetID {
if !validTag(subnet) {
continue
}
}
log.Infof("Creating ENI with security groups: %v in subnet: %s", aws.StringValueSlice(input.Groups), aws.StringValue(input.SubnetId))

input.SubnetId = subnet.SubnetId
start := time.Now()
result, err = cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input)
prometheusmetrics.Ec2ApiReq.WithLabelValues("CreateNetworkInterface").Inc()
prometheusmetrics.AwsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
if err == nil {
log.Infof("Created a new ENI: %s", aws.StringValue(result.NetworkInterface.NetworkInterfaceId))
return *result.NetworkInterface.NetworkInterfaceId, nil
}
checkAPIErrorAndBroadcastEvent(err, "ec2:CreateNetworkInterface")
awsAPIErrInc("CreateNetworkInterface", err)
prometheusmetrics.Ec2ApiErr.WithLabelValues("CreateNetworkInterface").Inc()
log.Errorf("Failed to CreateNetworkInterface %v for subnet %s", err, *subnet.SubnetId)
}
}
return "", errors.Wrap(err, "failed to create network interface")
}

log.Infof("Creating ENI with security groups: %v in subnet: %s", aws.StringValueSlice(input.Groups), aws.StringValue(input.SubnetId))
func (cache *EC2InstanceMetadataCache) getVpcSubnets() ([]*ec2.Subnet, error) {
describeSubnetInput := &ec2.DescribeSubnetsInput{
Filters: []*ec2.Filter{
{
Name: aws.String("vpc-id"),
Values: []*string{aws.String(cache.vpcID)},
},
{
Name: aws.String("availability-zone"),
Values: []*string{aws.String(cache.availabilityZone)},
},
},
}

start := time.Now()
result, err := cache.ec2SVC.CreateNetworkInterfaceWithContext(context.Background(), input)
prometheusmetrics.Ec2ApiReq.WithLabelValues("CreateNetworkInterface").Inc()
prometheusmetrics.AwsAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
subnetResult, err := cache.ec2SVC.DescribeSubnetsWithContext(context.Background(), describeSubnetInput)
prometheusmetrics.Ec2ApiReq.WithLabelValues("DescribeSubnets").Inc()
prometheusmetrics.AwsAPILatency.WithLabelValues("DescribeSubnets", fmt.Sprint(err != nil), awsReqStatus(err)).Observe(msSince(start))
if err != nil {
checkAPIErrorAndBroadcastEvent(err, "ec2:CreateNetworkInterface")
awsAPIErrInc("CreateNetworkInterface", err)
prometheusmetrics.Ec2ApiErr.WithLabelValues("CreateNetworkInterface").Inc()
log.Errorf("Failed to CreateNetworkInterface %v", err)
return "", errors.Wrap(err, "failed to create network interface")
checkAPIErrorAndBroadcastEvent(err, "ec2:DescribeSubnets")
awsAPIErrInc("DescribeSubnets", err)
prometheusmetrics.Ec2ApiErr.WithLabelValues("DescribeSubnets").Inc()
return nil, errors.Wrap(err, "AllocENI: unable to describe subnets")
}

// Sort the subnet by available IP address counter (desc order) before determining subnet to use
sort.SliceStable(subnetResult.Subnets, func(i, j int) bool {
return *subnetResult.Subnets[j].AvailableIpAddressCount < *subnetResult.Subnets[i].AvailableIpAddressCount
})

return subnetResult.Subnets, nil
}

func validTag(subnet *ec2.Subnet) bool {
for _, tag := range subnet.Tags {
if strings.HasPrefix(*tag.Key, "kubernetes.io") || strings.HasSuffix(*tag.Key, "cluster-name") {
return true
}
}
log.Infof("Created a new ENI: %s", aws.StringValue(result.NetworkInterface.NetworkInterfaceId))
return aws.StringValue(result.NetworkInterface.NetworkInterfaceId), nil
return false
}

func createEniUsingCustomCfg(sg []*string, customSubnet string, input *ec2.CreateNetworkInterfaceInput) *ec2.CreateNetworkInterfaceInput {
log.Info("Using a custom network config for the new ENI")

if len(sg) != 0 {
input.Groups = sg
} else {
log.Warnf("No custom networking security group found, will use the node's primary ENI's SG: %v", aws.StringValueSlice(input.Groups))
}
input.SubnetId = aws.String(customSubnet)

return input
}

// buildENITags computes the desired AWS Tags for eni
Expand Down
Loading

0 comments on commit 7917117

Please sign in to comment.