Skip to content

Commit

Permalink
use annotation for exposed affinity for dataflow and store info in jo…
Browse files Browse the repository at this point in the history
…b's annotation (#4277)

Signed-off-by: xliuqq <[email protected]>
  • Loading branch information
xliuqq authored Aug 27, 2024
1 parent 6e33346 commit b2e4e61
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 135 deletions.
12 changes: 8 additions & 4 deletions docs/en/samples/dataflow_affinity.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ metadata:
name: phy
spec:
mounts:
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase
name: hbase
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink
name: flink
---
Expand All @@ -65,6 +63,9 @@ apiVersion: data.fluid.io/v1alpha1
kind: DataLoad
metadata:
name: loadA
annotations:
# Built in tags that do not require explicit settings and support custom label names.
fluid.io/affinity.labels: "kubernetes.io/hostname,topology.kubernetes.io/zone,topology.kubernetes.io/region"
spec:
dataset:
name: phy
Expand Down Expand Up @@ -97,15 +98,15 @@ When DataLoad B is running, you will find the the affinity of its pod including

The DataFlow consists of DataLoad A and DataLoad B. DataLoad A requires running on GPU nodes through customized label `node.kubernetes.io/instance-type`, while DataLoad B requires running on nodes with the same label value as DataLoad A (GPU nodes).

- If DataLoad C (runAfter B) requires the affinity of this label, DataLoad B also needs to set the annotation `fluid.io/affinity.labels: node.kubernetes.io/instance-type`

```yaml
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: phy
spec:
mounts:
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase
name: hbase
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink
name: flink
---
Expand All @@ -127,6 +128,9 @@ apiVersion: data.fluid.io/v1alpha1
kind: DataLoad
metadata:
name: loadA
annotations:
# The customized label name must be defined here as a prerequisite for subsequent Data Operations to use it to set affinity.
fluid.io/affinity.labels: "node.kubernetes.io/instance-type"
spec:
dataset:
name: phy
Expand Down
12 changes: 7 additions & 5 deletions docs/zh/samples/dataflow_affinity.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ metadata:
name: phy
spec:
mounts:
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase
name: hbase
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink
name: flink
---
Expand All @@ -65,6 +63,9 @@ apiVersion: data.fluid.io/v1alpha1
kind: DataLoad
metadata:
name: loadA
annotations:
# 内置的标签,可以不显式设置,支持自定义的标签名
fluid.io/affinity.labels: "kubernetes.io/hostname,topology.kubernetes.io/zone,topology.kubernetes.io/region"
spec:
dataset:
name: phy
Expand Down Expand Up @@ -96,16 +97,14 @@ spec:
### 示例1:自定义标签的亲和性

DataFlow 由 DataLoad A, DataLoad B 构成,DataLoad A 通过自定义标签`node.kubernetes.io/instance-type`要求运行在 GPU 节点上,DataLoad B要求运行在跟DataLoad A 同样的标签值的节点(即GPU节点);

- 如果后续还有 DataLoad C (runAfter B) 需要该标签的亲和性,则 DataLoad B也需要设置 `fluid.io/affinity.labels: "node.kubernetes.io/instance-type"` 的注解;
```yaml
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: phy
spec:
mounts:
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/hbase
name: hbase
- mountPoint: https://mirrors.tuna.tsinghua.edu.cn/apache/flink
name: flink
---
Expand All @@ -127,6 +126,9 @@ apiVersion: data.fluid.io/v1alpha1
kind: DataLoad
metadata:
name: loadA
annotations:
# 自定义的标签名,前置操作必须将标签显示定义在这里,后续的 Data Operation 才可以使用该标签设置亲和性
fluid.io/affinity.labels: "node.kubernetes.io/instance-type"
spec:
dataset:
name: phy
Expand Down
9 changes: 6 additions & 3 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ const (
// LabelNodePublishMothod is a pv label that indicates the method nodePuhlishVolume use
LabelNodePublishMothod = LabelAnnotationPrefix + "node-publish-method"

// AnnotationDataFlowAffinityInject is an annotation representing enabled the dataflow affinity injection
// AnnotationDataFlowAffinityInject is an annotation representing enabled the dataflow affinity injection, for internal use.
AnnotationDataFlowAffinityInject = LabelAnnotationPrefix + "dataflow-affinity.inject"
// LabelDataFlowAffinityPrefix is a prefix for customized dataflow affinity label name.
LabelDataFlowAffinityPrefix = "fluid.io."
// AnnotationDataFlowAffinityPrefix is a prefix for dataflow affinity label name.
AnnotationDataFlowAffinityPrefix = "dataflow-affinity.fluid.io."

// AnnotationDataFlowAffinityLabelsName is an annotation key name for exposed affinity labels for an operation in a dataflow.
AnnotationDataFlowAffinityLabelsName = LabelAnnotationPrefix + "affinity.labels"

// LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod.
LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ import (
"context"
"fmt"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl/watch"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/fluid-cloudnative/fluid/pkg/ctrl/watch"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"strings"
)

const DataOpJobControllerName string = "DataOpJobController"
Expand Down Expand Up @@ -140,62 +139,38 @@ func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error {
return fmt.Errorf("error to get node %s: %v", nodeName, err)
}

injectLabels := map[string]string{}
// node
injectLabels[common.K8sNodeNameLabelKey] = nodeName
// region
region, exist := node.Labels[common.K8sRegionLabelKey]
if exist {
injectLabels[common.K8sRegionLabelKey] = region
}
// zone
zone, exist := node.Labels[common.K8sZoneLabelKey]
if exist {
injectLabels[common.K8sZoneLabelKey] = zone
annotationsToInject := map[string]string{}
// default inject node, region and zone label
affinityLabels := []string{common.K8sNodeNameLabelKey, common.K8sRegionLabelKey, common.K8sZoneLabelKey}
// customized labels to be injected.
customizedLabels := pod.Annotations[common.AnnotationDataFlowAffinityLabelsName]
if len(customizedLabels) != 0 {
affinityLabels = append(affinityLabels, strings.Split(customizedLabels, ",")...)
}

// customized labels
if pod.Spec.Affinity != nil && pod.Spec.Affinity.NodeAffinity != nil {
fillCustomizedNodeAffinity(pod.Spec.Affinity.NodeAffinity, injectLabels, node)
fillCustomizedNodeAffinity(annotationsToInject, node.Labels, affinityLabels)
f.Log.Info("injected", "labels", affinityLabels)
// update job Annotations.
if job.Annotations == nil {
job.Annotations = annotationsToInject
} else {
for k, v := range annotationsToInject {
job.Annotations[k] = v
}
}

// update job labels, reconciled job is selected by labels so the field will not be nil.
for k, v := range injectLabels {
job.Labels[k] = v
}
if err = f.Client.Update(context.TODO(), job); err != nil {
return err
}

return nil
}

func fillCustomizedNodeAffinity(podNodeAffinity *corev1.NodeAffinity, injectLabels map[string]string, node *corev1.Node) {
// prefer
for _, term := range podNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
for _, expression := range term.Preference.MatchExpressions {
// use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In.
value, exist := node.Labels[expression.Key]
if exist {
// add customized prefix to distinguish
injectLabels[common.LabelDataFlowAffinityPrefix+expression.Key] = value
}
}
}

if podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return
}

// require
for _, term := range podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
for _, expression := range term.MatchExpressions {
// use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In.
value, exist := node.Labels[expression.Key]
if exist {
// add customized prefix to distinguish
injectLabels[common.LabelDataFlowAffinityPrefix+expression.Key] = value
}
func fillCustomizedNodeAffinity(annotationsToInject map[string]string, nodeLabels map[string]string, exposedLabelNames []string) {
for _, name := range exposedLabelNames {
name = strings.TrimSpace(name)
if value, exist := nodeLabels[name]; exist {
annotationsToInject[common.AnnotationDataFlowAffinityPrefix+name] = value
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
node *v1.Node
}
tests := []struct {
name string
args args
wantLabels map[string]string
wantErr bool
name string
args args
wantAnnotations map[string]string
wantErr bool
}{
{
name: "job with succeed pods",
Expand All @@ -64,6 +64,9 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
Labels: map[string]string{
"controller-uid": "455afc34-93b1-4e75-a6fa-8e13d2c6ca06",
},
Annotations: map[string]string{
common.AnnotationDataFlowAffinityLabelsName: "k8s.gpu,,",
},
},
Spec: v1.PodSpec{
NodeName: "node01",
Expand Down Expand Up @@ -101,12 +104,11 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
},
},
},
wantLabels: map[string]string{
common.LabelAnnotationManagedBy: common.Fluid,
common.K8sNodeNameLabelKey: "node01",
common.K8sRegionLabelKey: "region01",
common.K8sZoneLabelKey: "zone01",
common.LabelDataFlowAffinityPrefix + "k8s.gpu": "true",
wantAnnotations: map[string]string{
common.AnnotationDataFlowAffinityPrefix + common.K8sNodeNameLabelKey: "node01",
common.AnnotationDataFlowAffinityPrefix + common.K8sRegionLabelKey: "region01",
common.AnnotationDataFlowAffinityPrefix + common.K8sZoneLabelKey: "zone01",
common.AnnotationDataFlowAffinityPrefix + "k8s.gpu": "true",
},
wantErr: false,
},
Expand Down Expand Up @@ -164,8 +166,8 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
if (err != nil) != tt.wantErr {
t.Errorf("injectPodNodeLabelsToJob() error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil && !reflect.DeepEqual(tt.args.job.Labels, tt.wantLabels) {
t.Errorf("injectPodNodeLabelsToJob() got = %v, want %v", tt.args.job.Labels, tt.wantLabels)
if err == nil && !reflect.DeepEqual(tt.args.job.Annotations, tt.wantAnnotations) {
t.Errorf("injectPodNodeLabelsToJob() got = %v, want %v", tt.args.job.Labels, tt.wantAnnotations)
}
})
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/dataflow/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// InjectAffinityAnnotation inject the affinity annotation for the pod.
func InjectAffinityAnnotation(opAnnotation map[string]string, podAnnotation map[string]string) map[string]string {
value, exist := opAnnotation[common.AnnotationDataFlowAffinityLabelsName]
if !exist {
return podAnnotation
}
// return a copy not the origin map
copiedMap := make(map[string]string)
for k, v := range podAnnotation {
copiedMap[k] = v
}

copiedMap[common.AnnotationDataFlowAffinityLabelsName] = value
return copiedMap
}

// InjectAffinityByRunAfterOp inject the affinity based on preceding operation
func InjectAffinityByRunAfterOp(c client.Client, runAfter *datav1alpha1.OperationRef, opNamespace string, currentAffinity *v1.Affinity) (*v1.Affinity, error) {
// no previous operation or use default affinity strategy, no need to generate node affinity
Expand Down
93 changes: 93 additions & 0 deletions pkg/dataflow/affinity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,99 @@ func TestInjectAffinityByRunAfterOp(t *testing.T) {
want: nil,
wantErr: false,
},
{
name: "affinity no exist, prefer",
args: args{
runAfter: &datav1alpha1.OperationRef{
Kind: "DataLoad",
Name: "test-op",
AffinityStrategy: datav1alpha1.AffinityStrategy{
Policy: datav1alpha1.PreferAffinityStrategy,
Prefers: []datav1alpha1.Prefer{
{
Weight: 10,
Name: "not.exist.affinity",
},
},
},
},
objects: []runtime.Object{
&datav1alpha1.DataLoad{
ObjectMeta: metav1.ObjectMeta{
Name: "test-op",
Namespace: "default",
},
Status: datav1alpha1.OperationStatus{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: common.K8sRegionLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{"zone1"},
},
},
},
},
},
},
},
},
},
opNamespace: "default",
currentAffinity: nil,
},
want: nil,
wantErr: false,
},
{
name: "affinity no exist, required",
args: args{
runAfter: &datav1alpha1.OperationRef{
Kind: "DataLoad",
Name: "test-op",
AffinityStrategy: datav1alpha1.AffinityStrategy{
Policy: datav1alpha1.RequireAffinityStrategy,
Requires: []datav1alpha1.Require{
{
Name: "not.exist.affinity",
},
},
},
},
objects: []runtime.Object{
&datav1alpha1.DataLoad{
ObjectMeta: metav1.ObjectMeta{
Name: "test-op",
Namespace: "default",
},
Status: datav1alpha1.OperationStatus{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: common.K8sRegionLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{"zone1"},
},
},
},
},
},
},
},
},
},
opNamespace: "default",
currentAffinity: nil,
},
want: nil,
wantErr: false,
},
{
name: "no preceding op, error",
args: args{
Expand Down
Loading

0 comments on commit b2e4e61

Please sign in to comment.