From b2e4e615ac5b7052d4761db6acb778b447caadaf Mon Sep 17 00:00:00 2001 From: xliuqq Date: Tue, 27 Aug 2024 16:15:49 +0800 Subject: [PATCH] use annotation for exposed affinity for dataflow and store info in job's annotation (#4277) Signed-off-by: xliuqq --- docs/en/samples/dataflow_affinity.md | 12 ++- docs/zh/samples/dataflow_affinity.md | 12 ++- pkg/common/label.go | 9 +- .../dataflowaffinity_controller.go | 73 +++++---------- .../dataflowaffinity_controller_test.go | 26 +++--- pkg/dataflow/affinity.go | 16 ++++ pkg/dataflow/affinity_test.go | 93 +++++++++++++++++++ pkg/dataflow/helper.go | 54 +++-------- pkg/dataflow/helper_test.go | 26 +++--- pkg/dataprocess/generate_values.go | 2 +- pkg/ddc/alluxio/load_data.go | 2 +- pkg/ddc/goosefs/load_data.go | 2 + pkg/ddc/jindocache/load_data.go | 2 +- pkg/ddc/jindofsx/load_data.go | 2 +- pkg/ddc/juicefs/data_load.go | 2 +- pkg/ddc/juicefs/data_migrate.go | 2 +- pkg/utils/affinity.go | 8 ++ 17 files changed, 208 insertions(+), 135 deletions(-) diff --git a/docs/en/samples/dataflow_affinity.md b/docs/en/samples/dataflow_affinity.md index 9210cbc4562..6f9675d5291 100644 --- a/docs/en/samples/dataflow_affinity.md +++ b/docs/en/samples/dataflow_affinity.md @@ -42,8 +42,6 @@ metadata: name: phy spec: mounts: - - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase - name: hbase - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink name: flink --- @@ -65,6 +63,9 @@ apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: loadA + annotations: + # Built in tags that do not require explicit settings and support custom label names. + fluid.io/affinity.labels: "kubernetes.io/hostname,topology.kubernetes.io/zone,topology.kubernetes.io/region" spec: dataset: name: phy @@ -97,6 +98,8 @@ When DataLoad B is running, you will find the the affinity of its pod including The DataFlow consists of DataLoad A and DataLoad B. DataLoad A requires running on GPU nodes through customized label `node.kubernetes.io/instance-type`, while DataLoad B requires running on nodes with the same label value as DataLoad A (GPU nodes). +- If DataLoad C (runAfter B) requires the affinity of this label, DataLoad B also needs to set the annotation `fluid.io/affinity.labels: node.kubernetes.io/instance-type` + ```yaml apiVersion: data.fluid.io/v1alpha1 kind: Dataset @@ -104,8 +107,6 @@ metadata: name: phy spec: mounts: - - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase - name: hbase - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink name: flink --- @@ -127,6 +128,9 @@ apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: loadA + annotations: + # The customized label name must be defined here as a prerequisite for subsequent Data Operations to use it to set affinity. + fluid.io/affinity.labels: "node.kubernetes.io/instance-type" spec: dataset: name: phy diff --git a/docs/zh/samples/dataflow_affinity.md b/docs/zh/samples/dataflow_affinity.md index 87b6ac4be08..65cb720568a 100644 --- a/docs/zh/samples/dataflow_affinity.md +++ b/docs/zh/samples/dataflow_affinity.md @@ -42,8 +42,6 @@ metadata: name: phy spec: mounts: - - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase - name: hbase - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink name: flink --- @@ -65,6 +63,9 @@ apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: loadA + annotations: + # 内置的标签,可以不显式设置,支持自定义的标签名 + fluid.io/affinity.labels: "kubernetes.io/hostname,topology.kubernetes.io/zone,topology.kubernetes.io/region" spec: dataset: name: phy @@ -96,7 +97,7 @@ spec: ### 示例1:自定义标签的亲和性 DataFlow 由 DataLoad A, DataLoad B 构成,DataLoad A 通过自定义标签`node.kubernetes.io/instance-type`要求运行在 GPU 节点上,DataLoad B要求运行在跟DataLoad A 同样的标签值的节点(即GPU节点); - +- 如果后续还有 DataLoad C (runAfter B) 需要该标签的亲和性,则 DataLoad B也需要设置 `fluid.io/affinity.labels: "node.kubernetes.io/instance-type"` 的注解; ```yaml apiVersion: data.fluid.io/v1alpha1 kind: Dataset @@ -104,8 +105,6 @@ metadata: name: phy spec: mounts: - - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase - name: hbase - mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink name: flink --- @@ -127,6 +126,9 @@ apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: loadA + annotations: + # 自定义的标签名,前置操作必须将标签显示定义在这里,后续的 Data Operation 才可以使用该标签设置亲和性 + fluid.io/affinity.labels: "node.kubernetes.io/instance-type" spec: dataset: name: phy diff --git a/pkg/common/label.go b/pkg/common/label.go index b58b7659f5d..d22190d1730 100644 --- a/pkg/common/label.go +++ b/pkg/common/label.go @@ -49,11 +49,14 @@ const ( // LabelNodePublishMothod is a pv label that indicates the method nodePuhlishVolume use LabelNodePublishMothod = LabelAnnotationPrefix + "node-publish-method" - // AnnotationDataFlowAffinityInject is an annotation representing enabled the dataflow affinity injection + // AnnotationDataFlowAffinityInject is an annotation representing enabled the dataflow affinity injection, for internal use. AnnotationDataFlowAffinityInject = LabelAnnotationPrefix + "dataflow-affinity.inject" - // LabelDataFlowAffinityPrefix is a prefix for customized dataflow affinity label name. - LabelDataFlowAffinityPrefix = "fluid.io." + // AnnotationDataFlowAffinityPrefix is a prefix for dataflow affinity label name. + AnnotationDataFlowAffinityPrefix = "dataflow-affinity.fluid.io." + // AnnotationDataFlowAffinityLabelsName is an annotation key name for exposed affinity labels for an operation in a dataflow. + AnnotationDataFlowAffinityLabelsName = LabelAnnotationPrefix + "affinity.labels" + // LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod. LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use" ) diff --git a/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller.go b/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller.go index ba31ee81ef7..bcf54d40559 100644 --- a/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller.go +++ b/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller.go @@ -20,19 +20,18 @@ import ( "context" "fmt" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "strings" ) const DataOpJobControllerName string = "DataOpJobController" @@ -140,29 +139,26 @@ func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error { return fmt.Errorf("error to get node %s: %v", nodeName, err) } - injectLabels := map[string]string{} - // node - injectLabels[common.K8sNodeNameLabelKey] = nodeName - // region - region, exist := node.Labels[common.K8sRegionLabelKey] - if exist { - injectLabels[common.K8sRegionLabelKey] = region - } - // zone - zone, exist := node.Labels[common.K8sZoneLabelKey] - if exist { - injectLabels[common.K8sZoneLabelKey] = zone + annotationsToInject := map[string]string{} + // default inject node, region and zone label + affinityLabels := []string{common.K8sNodeNameLabelKey, common.K8sRegionLabelKey, common.K8sZoneLabelKey} + // customized labels to be injected. + customizedLabels := pod.Annotations[common.AnnotationDataFlowAffinityLabelsName] + if len(customizedLabels) != 0 { + affinityLabels = append(affinityLabels, strings.Split(customizedLabels, ",")...) } - // customized labels - if pod.Spec.Affinity != nil && pod.Spec.Affinity.NodeAffinity != nil { - fillCustomizedNodeAffinity(pod.Spec.Affinity.NodeAffinity, injectLabels, node) + fillCustomizedNodeAffinity(annotationsToInject, node.Labels, affinityLabels) + f.Log.Info("injected", "labels", affinityLabels) + // update job Annotations. + if job.Annotations == nil { + job.Annotations = annotationsToInject + } else { + for k, v := range annotationsToInject { + job.Annotations[k] = v + } } - // update job labels, reconciled job is selected by labels so the field will not be nil. - for k, v := range injectLabels { - job.Labels[k] = v - } if err = f.Client.Update(context.TODO(), job); err != nil { return err } @@ -170,32 +166,11 @@ func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error { return nil } -func fillCustomizedNodeAffinity(podNodeAffinity *corev1.NodeAffinity, injectLabels map[string]string, node *corev1.Node) { - // prefer - for _, term := range podNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - for _, expression := range term.Preference.MatchExpressions { - // use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In. - value, exist := node.Labels[expression.Key] - if exist { - // add customized prefix to distinguish - injectLabels[common.LabelDataFlowAffinityPrefix+expression.Key] = value - } - } - } - - if podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { - return - } - - // require - for _, term := range podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { - for _, expression := range term.MatchExpressions { - // use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In. - value, exist := node.Labels[expression.Key] - if exist { - // add customized prefix to distinguish - injectLabels[common.LabelDataFlowAffinityPrefix+expression.Key] = value - } +func fillCustomizedNodeAffinity(annotationsToInject map[string]string, nodeLabels map[string]string, exposedLabelNames []string) { + for _, name := range exposedLabelNames { + name = strings.TrimSpace(name) + if value, exist := nodeLabels[name]; exist { + annotationsToInject[common.AnnotationDataFlowAffinityPrefix+name] = value } } } diff --git a/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go b/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go index 5bac21acb4e..359d2cce924 100644 --- a/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go +++ b/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go @@ -35,10 +35,10 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) { node *v1.Node } tests := []struct { - name string - args args - wantLabels map[string]string - wantErr bool + name string + args args + wantAnnotations map[string]string + wantErr bool }{ { name: "job with succeed pods", @@ -64,6 +64,9 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) { Labels: map[string]string{ "controller-uid": "455afc34-93b1-4e75-a6fa-8e13d2c6ca06", }, + Annotations: map[string]string{ + common.AnnotationDataFlowAffinityLabelsName: "k8s.gpu,,", + }, }, Spec: v1.PodSpec{ NodeName: "node01", @@ -101,12 +104,11 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) { }, }, }, - wantLabels: map[string]string{ - common.LabelAnnotationManagedBy: common.Fluid, - common.K8sNodeNameLabelKey: "node01", - common.K8sRegionLabelKey: "region01", - common.K8sZoneLabelKey: "zone01", - common.LabelDataFlowAffinityPrefix + "k8s.gpu": "true", + wantAnnotations: map[string]string{ + common.AnnotationDataFlowAffinityPrefix + common.K8sNodeNameLabelKey: "node01", + common.AnnotationDataFlowAffinityPrefix + common.K8sRegionLabelKey: "region01", + common.AnnotationDataFlowAffinityPrefix + common.K8sZoneLabelKey: "zone01", + common.AnnotationDataFlowAffinityPrefix + "k8s.gpu": "true", }, wantErr: false, }, @@ -164,8 +166,8 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) { if (err != nil) != tt.wantErr { t.Errorf("injectPodNodeLabelsToJob() error = %v, wantErr %v", err, tt.wantErr) } - if err == nil && !reflect.DeepEqual(tt.args.job.Labels, tt.wantLabels) { - t.Errorf("injectPodNodeLabelsToJob() got = %v, want %v", tt.args.job.Labels, tt.wantLabels) + if err == nil && !reflect.DeepEqual(tt.args.job.Annotations, tt.wantAnnotations) { + t.Errorf("injectPodNodeLabelsToJob() got = %v, want %v", tt.args.job.Labels, tt.wantAnnotations) } }) } diff --git a/pkg/dataflow/affinity.go b/pkg/dataflow/affinity.go index 9a8fcf3904b..7c4928ab6fc 100644 --- a/pkg/dataflow/affinity.go +++ b/pkg/dataflow/affinity.go @@ -26,6 +26,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +// InjectAffinityAnnotation inject the affinity annotation for the pod. +func InjectAffinityAnnotation(opAnnotation map[string]string, podAnnotation map[string]string) map[string]string { + value, exist := opAnnotation[common.AnnotationDataFlowAffinityLabelsName] + if !exist { + return podAnnotation + } + // return a copy not the origin map + copiedMap := make(map[string]string) + for k, v := range podAnnotation { + copiedMap[k] = v + } + + copiedMap[common.AnnotationDataFlowAffinityLabelsName] = value + return copiedMap +} + // InjectAffinityByRunAfterOp inject the affinity based on preceding operation func InjectAffinityByRunAfterOp(c client.Client, runAfter *datav1alpha1.OperationRef, opNamespace string, currentAffinity *v1.Affinity) (*v1.Affinity, error) { // no previous operation or use default affinity strategy, no need to generate node affinity diff --git a/pkg/dataflow/affinity_test.go b/pkg/dataflow/affinity_test.go index 6be2b190679..06edeae38b6 100644 --- a/pkg/dataflow/affinity_test.go +++ b/pkg/dataflow/affinity_test.go @@ -64,6 +64,99 @@ func TestInjectAffinityByRunAfterOp(t *testing.T) { want: nil, wantErr: false, }, + { + name: "affinity no exist, prefer", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.PreferAffinityStrategy, + Prefers: []datav1alpha1.Prefer{ + { + Weight: 10, + Name: "not.exist.affinity", + }, + }, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sRegionLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: nil, + wantErr: false, + }, + { + name: "affinity no exist, required", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.RequireAffinityStrategy, + Requires: []datav1alpha1.Require{ + { + Name: "not.exist.affinity", + }, + }, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sRegionLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: nil, + wantErr: false, + }, { name: "no preceding op, error", args: args{ diff --git a/pkg/dataflow/helper.go b/pkg/dataflow/helper.go index 1ea89fbf824..0cc91e33afb 100644 --- a/pkg/dataflow/helper.go +++ b/pkg/dataflow/helper.go @@ -28,12 +28,12 @@ func GenerateNodeAffinity(job *batchv1.Job) (*corev1.NodeAffinity, error) { if job == nil { return nil, nil } - // mot inject, i.e. feature gate not enabled or job is a parallel job. + // not inject, i.e. feature gate not enabled or job is a parallel job. if v := job.Annotations[common.AnnotationDataFlowAffinityInject]; v != "true" { return nil, nil } - labels := job.Labels + annotations := job.Annotations nodeAffinity := &corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ @@ -44,54 +44,24 @@ func GenerateNodeAffinity(job *batchv1.Job) (*corev1.NodeAffinity, error) { }, }, } - // node name - nodeName, exist := labels[common.K8sNodeNameLabelKey] - if !exist { - return nil, errors.New("the affinity label is not set, wait for next reconcile") - } - - nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = - append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, - corev1.NodeSelectorRequirement{ - Key: common.K8sNodeNameLabelKey, - Operator: corev1.NodeSelectorOpIn, - Values: []string{nodeName}, - }) - // region - region, exist := labels[common.K8sRegionLabelKey] - if exist { - nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = - append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, - corev1.NodeSelectorRequirement{ - Key: common.K8sRegionLabelKey, - Operator: corev1.NodeSelectorOpIn, - Values: []string{region}, - }) - } - // zone - zone, exist := labels[common.K8sZoneLabelKey] - if exist { - nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = - append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, - corev1.NodeSelectorRequirement{ - Key: common.K8sZoneLabelKey, - Operator: corev1.NodeSelectorOpIn, - Values: []string{zone}, - }) - } - - // customized labels, start with specific prefix. - for key, value := range labels { - if strings.HasPrefix(key, common.LabelDataFlowAffinityPrefix) { + // affinity labels with specific prefix. + hasInjectedLabels := false + for key, value := range annotations { + if strings.HasPrefix(key, common.AnnotationDataFlowAffinityPrefix) { nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, corev1.NodeSelectorRequirement{ - Key: strings.TrimPrefix(key, common.LabelDataFlowAffinityPrefix), + Key: strings.TrimPrefix(key, common.AnnotationDataFlowAffinityPrefix), Operator: corev1.NodeSelectorOpIn, Values: []string{value}, }) + hasInjectedLabels = true } } + if !hasInjectedLabels { + return nil, errors.New("the affinity label is not set, wait for next reconcile") + } + return nodeAffinity, nil } diff --git a/pkg/dataflow/helper_test.go b/pkg/dataflow/helper_test.go index 4ac991cfe4d..9d89fdd0488 100644 --- a/pkg/dataflow/helper_test.go +++ b/pkg/dataflow/helper_test.go @@ -26,12 +26,10 @@ func TestGenerateNodeLabels(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "jobtest", Annotations: map[string]string{ - common.AnnotationDataFlowAffinityInject: "true", - }, - Labels: map[string]string{ - common.K8sNodeNameLabelKey: "node01", - common.K8sRegionLabelKey: "region01", - common.K8sZoneLabelKey: "zone01", + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowAffinityPrefix + common.K8sNodeNameLabelKey: "node01", + common.AnnotationDataFlowAffinityPrefix + common.K8sRegionLabelKey: "region01", + common.AnnotationDataFlowAffinityPrefix + common.K8sZoneLabelKey: "zone01", }, }, }, @@ -78,13 +76,10 @@ func TestGenerateNodeLabels(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "jobtest", Annotations: map[string]string{ - common.AnnotationDataFlowAffinityInject: "true", - }, - Labels: map[string]string{ - common.AnnotationDataFlowAffinityInject: "true", - common.K8sNodeNameLabelKey: "node01", - common.K8sZoneLabelKey: "zone01", - "fluid.io.k8s.rack": "rack01", + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowAffinityPrefix + common.K8sNodeNameLabelKey: "node01", + common.AnnotationDataFlowAffinityPrefix + common.K8sZoneLabelKey: "zone01", + common.AnnotationDataFlowAffinityPrefix + "k8s.rack": "rack01", }, }, }, @@ -125,7 +120,10 @@ func TestGenerateNodeLabels(t *testing.T) { return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("GenerateNodeAffinity() got = %v, want %v", got, tt.want) + // map traversal causes array order to be different, so only compare the length. + if len(got.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) != len(tt.want.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) { + t.Errorf("GenerateNodeAffinity() got = %v, want %v", got, tt.want) + } } }) } diff --git a/pkg/dataprocess/generate_values.go b/pkg/dataprocess/generate_values.go index 572caddfe67..53f1551eba9 100644 --- a/pkg/dataprocess/generate_values.go +++ b/pkg/dataprocess/generate_values.go @@ -115,7 +115,7 @@ func GenDataProcessValue(dataset *datav1alpha1.Dataset, dataProcess *datav1alpha func transformCommonPart(value *DataProcessValue, dataProcess *datav1alpha1.DataProcess) { value.Name = dataProcess.Name value.DataProcessInfo.Labels = dataProcess.Spec.Processor.PodMetadata.Labels - value.DataProcessInfo.Annotations = dataProcess.Spec.Processor.PodMetadata.Annotations + value.DataProcessInfo.Annotations = dataflow.InjectAffinityAnnotation(dataProcess.Annotations, dataProcess.Spec.Processor.PodMetadata.Annotations) value.Owner = transformer.GenerateOwnerReferenceFromObject(dataProcess) if len(dataProcess.Spec.Processor.ServiceAccountName) != 0 { value.DataProcessInfo.ServiceAccountName = dataProcess.Spec.Processor.ServiceAccountName diff --git a/pkg/ddc/alluxio/load_data.go b/pkg/ddc/alluxio/load_data.go index 698bfddccd7..dd2a7e81fba 100644 --- a/pkg/ddc/alluxio/load_data.go +++ b/pkg/ddc/alluxio/load_data.go @@ -111,7 +111,7 @@ func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alph LoadMetadata: dataload.Spec.LoadMetadata, Image: image, Labels: dataload.Spec.PodMetadata.Labels, - Annotations: dataload.Spec.PodMetadata.Annotations, + Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations), ImagePullSecrets: imagePullSecrets, Policy: string(dataload.Spec.Policy), Schedule: dataload.Spec.Schedule, diff --git a/pkg/ddc/goosefs/load_data.go b/pkg/ddc/goosefs/load_data.go index 70234cde996..54ef280ae82 100644 --- a/pkg/ddc/goosefs/load_data.go +++ b/pkg/ddc/goosefs/load_data.go @@ -107,6 +107,8 @@ func (e *GooseFSEngine) genDataLoadValue(image string, targetDataset *datav1alph Image: image, Options: dataload.Spec.Options, ImagePullSecrets: imagePullSecrets, + Labels: dataload.Spec.PodMetadata.Labels, + Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations), Policy: string(dataload.Spec.Policy), Schedule: dataload.Spec.Schedule, } diff --git a/pkg/ddc/jindocache/load_data.go b/pkg/ddc/jindocache/load_data.go index e4951ac7098..b422bd7f2b3 100644 --- a/pkg/ddc/jindocache/load_data.go +++ b/pkg/ddc/jindocache/load_data.go @@ -118,7 +118,7 @@ func (e *JindoCacheEngine) genDataLoadValue(image string, runtime *datav1alpha1. LoadMetadata: dataload.Spec.LoadMetadata, Image: image, Labels: dataload.Spec.PodMetadata.Labels, - Annotations: dataload.Spec.PodMetadata.Annotations, + Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations), ImagePullSecrets: imagePullSecrets, Policy: string(dataload.Spec.Policy), Schedule: dataload.Spec.Schedule, diff --git a/pkg/ddc/jindofsx/load_data.go b/pkg/ddc/jindofsx/load_data.go index fa5f652097b..b6b3ac04102 100644 --- a/pkg/ddc/jindofsx/load_data.go +++ b/pkg/ddc/jindofsx/load_data.go @@ -118,7 +118,7 @@ func (e *JindoFSxEngine) genDataLoadValue(image string, runtime *datav1alpha1.Ji LoadMetadata: dataload.Spec.LoadMetadata, Image: image, Labels: dataload.Spec.PodMetadata.Labels, - Annotations: dataload.Spec.PodMetadata.Annotations, + Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations), ImagePullSecrets: imagePullSecrets, Policy: string(dataload.Spec.Policy), Schedule: dataload.Spec.Schedule, diff --git a/pkg/ddc/juicefs/data_load.go b/pkg/ddc/juicefs/data_load.go index 493f965cce0..b2a8fd37704 100644 --- a/pkg/ddc/juicefs/data_load.go +++ b/pkg/ddc/juicefs/data_load.go @@ -126,7 +126,7 @@ func (j *JuiceFSEngine) genDataLoadValue(image string, cacheinfo map[string]stri LoadMetadata: dataload.Spec.LoadMetadata, Image: image, Labels: dataload.Spec.PodMetadata.Labels, - Annotations: dataload.Spec.PodMetadata.Annotations, + Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations), ImagePullSecrets: imagePullSecrets, Policy: string(dataload.Spec.Policy), Schedule: dataload.Spec.Schedule, diff --git a/pkg/ddc/juicefs/data_migrate.go b/pkg/ddc/juicefs/data_migrate.go index 70aa8b7e184..effebc1590a 100644 --- a/pkg/ddc/juicefs/data_migrate.go +++ b/pkg/ddc/juicefs/data_migrate.go @@ -99,7 +99,7 @@ func (j *JuiceFSEngine) generateDataMigrateValueFile(r cruntime.ReconcileRequest Image: image, Options: map[string]string{}, Labels: dataMigrate.Spec.PodMetadata.Labels, - Annotations: dataMigrate.Spec.PodMetadata.Annotations, + Annotations: dataflow.InjectAffinityAnnotation(dataMigrate.Annotations, dataMigrate.Spec.PodMetadata.Annotations), ImagePullSecrets: imagePullSecrets, Policy: string(dataMigrate.Spec.Policy), Schedule: dataMigrate.Spec.Schedule, diff --git a/pkg/utils/affinity.go b/pkg/utils/affinity.go index 8890b013ac8..436b759c856 100644 --- a/pkg/utils/affinity.go +++ b/pkg/utils/affinity.go @@ -21,6 +21,11 @@ import v1 "k8s.io/api/core/v1" // InjectNodeSelectorRequirements injects(not append) a node selector term to affinity‘s nodeAffinity. func InjectNodeSelectorRequirements(matchExpressions []v1.NodeSelectorRequirement, affinity *v1.Affinity) *v1.Affinity { result := affinity + + if len(matchExpressions) == 0 { + return result + } + if affinity == nil { result = &v1.Affinity{} } @@ -50,6 +55,9 @@ func InjectNodeSelectorRequirements(matchExpressions []v1.NodeSelectorRequiremen func InjectPreferredSchedulingTermsToAffinity(terms []v1.PreferredSchedulingTerm, affinity *v1.Affinity) *v1.Affinity { result := affinity + if len(terms) == 0 { + return result + } if affinity == nil { result = &v1.Affinity{} }