Skip to content

Commit

Permalink
chore: improve cluster conversion (#8712)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Dec 26, 2024
1 parent a52a75c commit 0bb5b03
Showing 1 changed file with 183 additions and 0 deletions.
183 changes: 183 additions & 0 deletions apis/apps/v1alpha1/cluster_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package v1alpha1

import (
"sort"
"strings"

"github.com/jinzhu/copier"
"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/conversion"
Expand Down Expand Up @@ -111,11 +115,72 @@ func (r *Cluster) changesToCluster(cluster *appsv1.Cluster) {
// status
// components
// - message: ComponentMessageMap -> map[string]string
if len(r.Spec.ClusterDefRef) > 0 {
cluster.Spec.ClusterDef = r.Spec.ClusterDefRef
}

if r.Spec.TerminationPolicy == Halt {
cluster.Spec.TerminationPolicy = appsv1.DoNotTerminate
} else {
cluster.Spec.TerminationPolicy = appsv1.TerminationPolicyType(r.Spec.TerminationPolicy)
}

r.toClusterServices(cluster)

for i := range r.Spec.ComponentSpecs {
compSpec := r.Spec.ComponentSpecs[i]
r.toComponentSpec(compSpec, &cluster.Spec.ComponentSpecs[i], compSpec.Name)
}
var shardingRequiredPodAntiAffinity []string
for i := range r.Spec.ShardingSpecs {
shardingSpec := r.Spec.ShardingSpecs[i]
podAntiAffinityRequired := r.toComponentSpec(shardingSpec.Template, &cluster.Spec.Shardings[i].Template, shardingSpec.Name)
if podAntiAffinityRequired {
shardingRequiredPodAntiAffinity = append(shardingRequiredPodAntiAffinity, shardingSpec.Name)
}
}
if len(shardingRequiredPodAntiAffinity) > 0 {
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations["apps.kubeblocks.io/shard-pod-anti-affinity"] = strings.Join(shardingRequiredPodAntiAffinity, ",")
}
}

func (r *Cluster) toClusterServices(cluster *appsv1.Cluster) {
for i := range r.Spec.Services {
clusterSVC := r.Spec.Services[i]
if len(clusterSVC.ShardingSelector) > 0 && len(clusterSVC.ComponentSelector) == 0 {
cluster.Spec.Services[i].ComponentSelector = clusterSVC.ShardingSelector
}
}
}

func (r *Cluster) toSchedulingPolicy(affinity *Affinity, tolerations []corev1.Toleration, compName string) *appsv1.SchedulingPolicy {
if affinity == nil && len(tolerations) == 0 {
return nil
}
schedulingPolicy := &appsv1.SchedulingPolicy{}
schedulingPolicy.Tolerations = tolerations

schedulingPolicy.Affinity = convertToAffinity(r.Name, compName, affinity)
schedulingPolicy.TopologySpreadConstraints = convertTopologySpreadConstraints4Legacy(r.Name, compName, affinity)
return schedulingPolicy
}

func (r *Cluster) toComponentSpec(fromCompSpec ClusterComponentSpec, toCompSpec *appsv1.ClusterComponentSpec, componentName string) bool {
var requiredPodAntiAffinity bool
if r.Spec.SchedulingPolicy == nil && toCompSpec.SchedulingPolicy == nil {
affinity := fromCompSpec.Affinity
if affinity == nil {
affinity = buildAffinity(r)
}
if affinity != nil && affinity.PodAntiAffinity == Required {
requiredPodAntiAffinity = true
}
toCompSpec.SchedulingPolicy = r.toSchedulingPolicy(affinity, fromCompSpec.Tolerations, componentName)
}
return requiredPodAntiAffinity
}

func (r *Cluster) changesFromCluster(cluster *appsv1.Cluster) {
Expand All @@ -137,6 +202,9 @@ func (r *Cluster) changesFromCluster(cluster *appsv1.Cluster) {
// components
// - message: ComponentMessageMap -> map[string]string
// appsv1.TerminationPolicyType is a subset of appsv1alpha1.TerminationPolicyType, it can be converted directly.
if len(cluster.Spec.ClusterDef) > 0 {
r.Spec.ClusterDefRef = cluster.Spec.ClusterDef
}
}

type clusterConverter struct {
Expand Down Expand Up @@ -283,3 +351,118 @@ func (c *clusterConverter) toCluster(cluster *Cluster) {
}
}
}

func convertToAffinity(clusterName, compName string, compAffinity *Affinity) *corev1.Affinity {
if compAffinity == nil {
return nil
}
affinity := new(corev1.Affinity)
// Build NodeAffinity
var matchExpressions []corev1.NodeSelectorRequirement
nodeLabelKeys := maps.Keys(compAffinity.NodeLabels)
// NodeLabels must be ordered
sort.Strings(nodeLabelKeys)
for _, key := range nodeLabelKeys {
values := strings.Split(compAffinity.NodeLabels[key], ",")
matchExpressions = append(matchExpressions, corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: values,
})
}
if len(matchExpressions) > 0 {
nodeSelectorTerm := corev1.NodeSelectorTerm{
MatchExpressions: matchExpressions,
}
affinity.NodeAffinity = &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{nodeSelectorTerm},
},
}
}
// Build PodAntiAffinity
var podAntiAffinity *corev1.PodAntiAffinity
var podAffinityTerms []corev1.PodAffinityTerm
for _, topologyKey := range compAffinity.TopologyKeys {
podAffinityTerms = append(podAffinityTerms, corev1.PodAffinityTerm{
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": clusterName,
"apps.kubeblocks.io/component-name": compName,
},
},
})
}
if compAffinity.PodAntiAffinity == Required {
podAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
}
} else {
var weightedPodAffinityTerms []corev1.WeightedPodAffinityTerm
for _, podAffinityTerm := range podAffinityTerms {
weightedPodAffinityTerms = append(weightedPodAffinityTerms, corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: podAffinityTerm,
})
}
podAntiAffinity = &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: weightedPodAffinityTerms,
}
}
affinity.PodAntiAffinity = podAntiAffinity
return affinity
}

func convertTopologySpreadConstraints4Legacy(clusterName, compName string, compAffinity *Affinity) []corev1.TopologySpreadConstraint {
if compAffinity == nil {
return nil
}

var topologySpreadConstraints []corev1.TopologySpreadConstraint

var whenUnsatisfiable corev1.UnsatisfiableConstraintAction
if compAffinity.PodAntiAffinity == Required {
whenUnsatisfiable = corev1.DoNotSchedule
} else {
whenUnsatisfiable = corev1.ScheduleAnyway
}
for _, topologyKey := range compAffinity.TopologyKeys {
topologySpreadConstraints = append(topologySpreadConstraints, corev1.TopologySpreadConstraint{
MaxSkew: 1,
WhenUnsatisfiable: whenUnsatisfiable,
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": clusterName,
"apps.kubeblocks.io/component-name": compName,
},
},
})
}
return topologySpreadConstraints
}

func buildAffinity(cluster *Cluster) *Affinity {
if cluster.Spec.Affinity != nil {
return cluster.Spec.Affinity
}
affinityTopoKey := func(policyType AvailabilityPolicyType) string {
switch policyType {
case AvailabilityPolicyZone:
return "topology.kubernetes.io/zone"
case AvailabilityPolicyNode:
return "kubernetes.io/hostname"
}
return ""
}
var affinity *Affinity
if len(cluster.Spec.Tenancy) > 0 || len(cluster.Spec.AvailabilityPolicy) > 0 {
affinity = &Affinity{
PodAntiAffinity: Preferred,
TopologyKeys: []string{affinityTopoKey(cluster.Spec.AvailabilityPolicy)},
Tenancy: cluster.Spec.Tenancy,
}
}
return affinity
}

0 comments on commit 0bb5b03

Please sign in to comment.