diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index f4cce21044..2f9b383f55 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -249,6 +249,8 @@ const ( LoadBalancerSkuBasic = "basic" // LoadBalancerSkuStandard is the load balancer standard sku LoadBalancerSkuStandard = "standard" + // LoadBalancerSkuService is the load balancer service sku + LoadBalancerSkuService = "service" // ServiceAnnotationLoadBalancerInternal is the annotation used on the service ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal" diff --git a/pkg/provider/azure.go b/pkg/provider/azure.go index 1b1a8b2c93..0b5e5805c4 100644 --- a/pkg/provider/azure.go +++ b/pkg/provider/azure.go @@ -168,7 +168,7 @@ type Config struct { // the `Tags` is changed. However, the old tags would be deleted if they are neither included in `Tags` nor // in `SystemTags` after the update of `Tags`. SystemTags string `json:"systemTags,omitempty" yaml:"systemTags,omitempty"` - // Sku of Load Balancer and Public IP. Candidate values are: basic and standard. + // Sku of Load Balancer and Public IP. Candidate values are: basic, standard and service // If not set, it will be default to basic. LoadBalancerSku string `json:"loadBalancerSku,omitempty" yaml:"loadBalancerSku,omitempty"` // LoadBalancerName determines the specific name of the load balancer user want to use, working with @@ -658,10 +658,24 @@ func (az *Cloud) InitializeCloudFromConfig(ctx context.Context, config *Config, } } + if az.useServiceLoadBalancer() && !az.isLBBackendPoolTypePodIP() { + err := fmt.Errorf("BackendPoolType is not POD IP for Service LB SKU") + klog.Fatal(err) + return err + } + + if !az.useServiceLoadBalancer() && az.isLBBackendPoolTypePodIP() { + err := fmt.Errorf("LB SKU type is not Service LB for BackendPoolType of POD IP") + klog.Fatal(err) + return err + } + if az.isLBBackendPoolTypeNodeIPConfig() { az.LoadBalancerBackendPool = newBackendPoolTypeNodeIPConfig(az) } else if az.isLBBackendPoolTypeNodeIP() { az.LoadBalancerBackendPool = newBackendPoolTypeNodeIP(az) + } else if az.isLBBackendPoolTypePodIP() { + az.LoadBalancerBackendPool = newBackendPoolTypePodIP(az) } if az.useMultipleStandardLoadBalancers() { @@ -824,6 +838,10 @@ func (az *Cloud) isLBBackendPoolTypeNodeIP() bool { return strings.EqualFold(az.LoadBalancerBackendPoolConfigurationType, consts.LoadBalancerBackendPoolConfigurationTypeNodeIP) } +func (az *Cloud) isLBBackendPoolTypePodIP() bool { + return strings.EqualFold(az.LoadBalancerBackendPoolConfigurationType, consts.LoadBalancerBackendPoolConfigurationTypePODIP) +} + func (az *Cloud) getPutVMSSVMBatchSize() int { return az.PutVMSSVMBatchSize } diff --git a/pkg/provider/azure_loadbalancer.go b/pkg/provider/azure_loadbalancer.go index 90400b4ca7..068db21040 100644 --- a/pkg/provider/azure_loadbalancer.go +++ b/pkg/provider/azure_loadbalancer.go @@ -41,6 +41,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/ptr" "k8s.io/utils/strings/slices" + "k8s.io/apimachinery/pkg/util/intstr" azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" @@ -2685,7 +2686,8 @@ func (az *Cloud) getExpectedLBRules( // https://github.com/kubernetes/kubernetes/blob/7c013c3f64db33cf19f38bb2fc8d9182e42b0b7b/pkg/proxy/healthcheck/service_health.go#L236 var nodeEndpointHealthprobe *network.Probe var nodeEndpointHealthprobeAdded bool - if servicehelpers.NeedsHealthCheck(service) && !(consts.IsPLSEnabled(service.Annotations) && consts.IsPLSProxyProtocolEnabled(service.Annotations)) { + if servicehelpers.NeedsHealthCheck(service) && !(consts.IsPLSEnabled(service.Annotations) && consts.IsPLSProxyProtocolEnabled(service.Annotations)) && + !az.useServiceLoadBalancer() { podPresencePath, podPresencePort := servicehelpers.GetServiceHealthCheckPathPort(service) lbRuleName := az.getLoadBalancerRuleName(service, v1.ProtocolTCP, podPresencePort, isIPv6) probeInterval, numberOfProbes, err := az.getHealthProbeConfigProbeIntervalAndNumOfProbe(service, podPresencePort) @@ -2706,7 +2708,8 @@ func (az *Cloud) getExpectedLBRules( var useSharedProbe bool if az.useSharedLoadBalancerHealthProbeMode() && - !strings.EqualFold(string(service.Spec.ExternalTrafficPolicy), string(v1.ServiceExternalTrafficPolicyLocal)) { + !strings.EqualFold(string(service.Spec.ExternalTrafficPolicy), string(v1.ServiceExternalTrafficPolicyLocal)) && + !az.useServiceLoadBalancer() { nodeEndpointHealthprobe = az.buildClusterServiceSharedProbe() useSharedProbe = true } @@ -2789,6 +2792,11 @@ func (az *Cloud) getExpectedLBRules( klog.V(2).ErrorS(err, "error occurred when buildHealthProbeRulesForPort", "service", service.Name, "namespace", service.Namespace, "rule-name", lbRuleName, "port", port.Port) } + + if az.useServiceLoadBalancer() { + isNoHealthProbeRule = true + } + if !isNoHealthProbeRule { portprobe, err := az.buildHealthProbeRulesForPort(service, port, lbRuleName, nodeEndpointHealthprobe, useSharedProbe) if err != nil { @@ -2815,6 +2823,18 @@ func (az *Cloud) getExpectedLBRules( props.BackendPort = ptr.To(port.NodePort) props.EnableFloatingIP = ptr.To(false) } + + if az.useServiceLoadBalancer() { + //If Interger value of TargetPort is present in the service, use it as the backend port. + //We current don't support string(named Port). + if port.TargetPort.Type == intstr.Int && port.TargetPort.IntVal != 0 { + props.BackendPort = ptr.To(port.TargetPort.IntVal) + } else { + props.BackendPort = ptr.To(port.Port) + } + props.EnableFloatingIP = ptr.To(false) + } + expectedRules = append(expectedRules, network.LoadBalancingRule{ Name: &lbRuleName, LoadBalancingRulePropertiesFormat: props, diff --git a/pkg/provider/azure_loadbalancer_backendpool.go b/pkg/provider/azure_loadbalancer_backendpool.go index 2cf1047614..e150439bee 100644 --- a/pkg/provider/azure_loadbalancer_backendpool.go +++ b/pkg/provider/azure_loadbalancer_backendpool.go @@ -26,6 +26,7 @@ import ( "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network" v1 "k8s.io/api/core/v1" + discovery_v1 "k8s.io/api/discovery/v1" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -911,3 +912,203 @@ func removeNodeIPAddressesFromBackendPool( return changed } + +type backendPoolTypePodIP struct { + *Cloud +} + +func newBackendPoolTypePodIP(c *Cloud) BackendPool { + return &backendPoolTypePodIP{c} +} + +func (bpi *backendPoolTypePodIP) CleanupVMSetFromBackendPoolByCondition(_ *network.LoadBalancer, _ *v1.Service, _ []*v1.Node, _ string, _ func(string) bool) (*network.LoadBalancer, error) { + return nil, errors.New("CleanupVMSetFromBackendPoolByCondition is not implemented for pod IP backend pool") +} + +func (bpi *backendPoolTypePodIP) EnsureHostsInPool(service *v1.Service, _ []*v1.Node, _, _, clusterName, lbName string, backendPool network.BackendAddressPool) error { + isIPv6 := isBackendPoolIPv6(ptr.Deref(backendPool.Name, "")) + + var ( + changed bool + err error + podIPsToBeAdded []string + endpointSliceName string + endpointSliceNames [] string + numOfAdd int + ) + + endpointSliceList, err := bpi.getEndpointSliceListForService(service) + + if err != nil { + klog.Errorf("bpi.EnsureHostsInPool: failed to get endpoint slice list for service %q, error: %s", service.Name, err.Error()) + return err + } + + lbBackendPoolName := bpi.getBackendPoolNameForService(service, clusterName, isIPv6) + + /* Remove all addresses from the backend pool and add the addresses from all the + endpoint-slices pertaining to a service.*/ + if strings.EqualFold(ptr.Deref(backendPool.Name, ""), lbBackendPoolName) && + backendPool.BackendAddressPoolPropertiesFormat != nil { + if backendPool.LoadBalancerBackendAddresses == nil { + lbBackendPoolAddresses := make([]network.LoadBalancerBackendAddress, 0) + backendPool.LoadBalancerBackendAddresses = &lbBackendPoolAddresses + } else { + removeNodeIPAddressesFromBackendPool(backendPool, []string{}, true, false) + } + + for _, ES := range endpointSliceList { + + if ES.AddressType == discovery_v1.AddressTypeIPv6 && !isIPv6 { + continue + } + + if ES.AddressType == discovery_v1.AddressTypeIPv4 && isIPv6 { + continue + } + + for _, endpoint := range ES.Endpoints { + + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + // Skip endpoints that are not ready + continue + } + + for _, address := range endpoint.Addresses { + klog.V(6).Infof("bpi.EnsureHostsInPool: adding ip address %s", address) + podIPsToBeAdded = append(podIPsToBeAdded, address) + endpointsliceName = strings.ToLower(fmt.Sprintf("%s/%s", ES.Namespace, ES.Name)) + endpointSliceNames = append(endpointSliceNames,endpointsliceName) + numOfAdd++ + } + } + } + + changed = bpi.addPodIPAddressesToBackendPool(&backendPool, podIPsToBeAdded,endpointSliceNames) + } + + if changed { + klog.V(2).Infof("bpi.EnsureHostsInPool: updating backend pool %s of load balancer %s to add %d pods", lbBackendPoolName, lbName, numOfAdd) + if err := bpi.CreateOrUpdateLBBackendPool(lbName, backendPool); err != nil { + return fmt.Errorf("bpi.EnsureHostsInPool: failed to update backend pool %s: %w", lbBackendPoolName, err) + } + } + + return nil +} + +func (bpi *backendPoolTypePodIP) GetBackendPrivateIPs(clusterName string, service *v1.Service, lb *network.LoadBalancer) ([]string, []string) { + serviceName := getServiceName(service) + + lbBackendPoolNames := bpi.getBackendPoolNamesForService(service, clusterName) + + if lb.LoadBalancerPropertiesFormat == nil || lb.LoadBalancerPropertiesFormat.BackendAddressPools == nil { + return nil, nil + } + + backendPrivateIPv4s, backendPrivateIPv6s := utilsets.NewString(), utilsets.NewString() + for _, bp := range *lb.BackendAddressPools { + found, _ := isLBBackendPoolsExisting(lbBackendPoolNames, bp.Name) + if found { + klog.V(10).Infof("bpi.GetBackendPrivateIPs for service (%s): found wanted backendpool %s", serviceName, ptr.Deref(bp.Name, "")) + if bp.BackendAddressPoolPropertiesFormat != nil && bp.LoadBalancerBackendAddresses != nil { + for _, backendAddress := range *bp.LoadBalancerBackendAddresses { + ipAddress := backendAddress.IPAddress + if ipAddress != nil { + klog.V(2).Infof("bpi.GetBackendPrivateIPs for service (%s): lb backendpool - found private IP %q", serviceName, *ipAddress) + if utilnet.IsIPv4String(*ipAddress) { + backendPrivateIPv4s.Insert(*ipAddress) + } else if utilnet.IsIPv6String(*ipAddress) { + backendPrivateIPv6s.Insert(*ipAddress) + } + } else { + klog.V(4).Infof("bpi.GetBackendPrivateIPs for service (%s): lb backendpool - found null private IP", serviceName) + } + } + } + } else { + klog.V(10).Infof("bpi.GetBackendPrivateIPs for service (%s): found unmanaged backendpool %s", serviceName, ptr.Deref(bp.Name, "")) + } + } + return backendPrivateIPv4s.UnsortedList(), backendPrivateIPv6s.UnsortedList() +} + +func (bpi *backendPoolTypePodIP) ReconcileBackendPools(clusterName string, service *v1.Service, lb *network.LoadBalancer) (bool, bool, *network.LoadBalancer, error) { + var newBackendPools []network.BackendAddressPool + if lb.BackendAddressPools != nil { + newBackendPools = *lb.BackendAddressPools + } + + var backendPoolsUpdated bool + foundBackendPools := map[bool]bool{} + serviceName := getServiceName(service) + + lbBackendPoolNames := bpi.getBackendPoolNamesForService(service, clusterName) + // bp is never preconfigured in case of pods + isBackendPoolPreConfigured := false + + for i := len(newBackendPools) - 1; i >= 0; i-- { + bp := newBackendPools[i] + found, isIPv6 := isLBBackendPoolsExisting(lbBackendPoolNames, bp.Name) + if found { + klog.V(10).Infof("bpi.ReconcileBackendPools for service (%s): found wanted backendpool. Not adding anything", serviceName) + foundBackendPools[isIPv6] = true + } else { + klog.V(10).Infof("bpi.ReconcileBackendPools for service (%s): found unmanaged backendpool %s", serviceName, *bp.Name) + } + } + + for _, ipFamily := range service.Spec.IPFamilies { + if foundBackendPools[ipFamily == v1.IPv6Protocol] { + continue + } + isBackendPoolPreConfigured = newBackendPool(lb, isBackendPoolPreConfigured, + bpi.PreConfiguredBackendPoolLoadBalancerTypes, serviceName, + lbBackendPoolNames[ipFamily == v1.IPv6Protocol]) + backendPoolsUpdated = true + } + + return isBackendPoolPreConfigured, backendPoolsUpdated, lb, nil +} + +func (az *Cloud) addPodIPAddressesToBackendPool(backendPool *network.BackendAddressPool, podIPAddresses []string, endpointSliceNames []string) bool { + //TBD:(Kartick) Do we need to populate vnet Id as POD IPs are from overlay. Check... + vnetID := az.getVnetResourceID() + if backendPool.BackendAddressPoolPropertiesFormat != nil { + if backendPool.VirtualNetwork == nil || + backendPool.VirtualNetwork.ID == nil { + backendPool.VirtualNetwork = &network.SubResource{ + ID: &vnetID, + } + } + } else { + backendPool.BackendAddressPoolPropertiesFormat = &network.BackendAddressPoolPropertiesFormat{ + VirtualNetwork: &network.SubResource{ + ID: &vnetID, + }, + } + } + + if backendPool.LoadBalancerBackendAddresses == nil { + lbBackendPoolAddresses := make([]network.LoadBalancerBackendAddress, 0) + backendPool.LoadBalancerBackendAddresses = &lbBackendPoolAddresses + } + + var changed bool + addresses := *backendPool.LoadBalancerBackendAddresses + for _, ipAddress := range podIPAddresses { + if !hasIPAddressInBackendPool(backendPool, ipAddress) { + klog.V(4).Infof("bi.addPodIPAddressesToBackendPool: adding %s to the backend pool %s", ipAddress, ptr.Deref(backendPool.Name, "")) + //TBD:(Kartick) Populate the slice_name later... + addresses = append(addresses, network.LoadBalancerBackendAddress{ + Name: ptr.To(ipAddress), + LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{ + IPAddress: ptr.To(ipAddress), + }, + }) + changed = true + } + } + backendPool.LoadBalancerBackendAddresses = &addresses + return changed +} \ No newline at end of file diff --git a/pkg/provider/azure_local_services.go b/pkg/provider/azure_local_services.go index d8ffd3bf5c..8bde0ef8c6 100644 --- a/pkg/provider/azure_local_services.go +++ b/pkg/provider/azure_local_services.go @@ -409,7 +409,7 @@ func getLocalServiceBackendPoolName(serviceName string, ipv6 bool) string { // getBackendPoolNameForService determine the expected backend pool name // by checking the external traffic policy of the service. func (az *Cloud) getBackendPoolNameForService(service *v1.Service, clusterName string, ipv6 bool) string { - if !isLocalService(service) || !az.useMultipleStandardLoadBalancers() { + if !isLocalService(service) || (!az.useMultipleStandardLoadBalancers() && !az.useServiceLoadBalancer()) { return getBackendPoolName(clusterName, ipv6) } return getLocalServiceBackendPoolName(getServiceName(service), ipv6) @@ -623,3 +623,23 @@ func (az *Cloud) reconcileIPsInLocalServiceBackendPoolsAsync( } } } + +func (az *Cloud) getEndpointSliceListForService(service *v1.Service) ([]*discovery_v1.EndpointSlice, error) { + + var ( + esList []*discovery_v1.EndpointSlice + ) + + //Retrieving only from the cache to avoid expensive listing from k8 server as Informer + //code path would listen to updates to k8 api-server and store in the cache. + az.endpointSlicesCache.Range(func(key, value interface{}) bool { + endpointSlice := value.(*discovery_v1.EndpointSlice) + if strings.EqualFold(getServiceNameOfEndpointSlice(endpointSlice), service.Name) && + strings.EqualFold(endpointSlice.Namespace, service.Namespace) { + esList = append(esList, endpointSlice) + } + return true + }) + + return esList, nil +} \ No newline at end of file diff --git a/pkg/provider/azure_wrap.go b/pkg/provider/azure_wrap.go index cb7dd54f94..4ce6f812d2 100644 --- a/pkg/provider/azure_wrap.go +++ b/pkg/provider/azure_wrap.go @@ -57,6 +57,10 @@ func (az *Cloud) useStandardLoadBalancer() bool { return strings.EqualFold(az.LoadBalancerSku, consts.LoadBalancerSkuStandard) } +func (az *Cloud) useServiceLoadBalancer() bool { + return strings.EqualFold(az.LoadBalancerSku, consts.LoadBalancerSkuService) +} + func (az *Cloud) excludeMasterNodesFromStandardLB() bool { return az.ExcludeMasterFromStandardLB != nil && *az.ExcludeMasterFromStandardLB }