diff --git a/pkg/controllers/dataexport/reconcile.go b/pkg/controllers/dataexport/reconcile.go index 25ab340bf..e3b145a9a 100644 --- a/pkg/controllers/dataexport/reconcile.go +++ b/pkg/controllers/dataexport/reconcile.go @@ -1885,6 +1885,15 @@ func startTransferJob( psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey) psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey) } + nodeLabel := make(map[string]string) + kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs) + if err != nil { + return "", err + } + pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey] + if ok && pxbJobNodeLabelValue != "" { + nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue + } switch drv.Name() { case drivers.Rsync: return drv.StartJob( @@ -1929,6 +1938,7 @@ func startTransferJob( drivers.WithExcludeFileList(excludeFileList), drivers.WithPodDatapathType(podDataPath), drivers.WithJobConfigMap(jobConfigMap), + drivers.WithNodeAffinity(nodeLabel), drivers.WithJobConfigMapNs(jobConfigMapNs), drivers.WithNfsServer(nfsServerAddr), drivers.WithNfsExportDir(nfsExportPath), @@ -1951,6 +1961,7 @@ func startTransferJob( drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace), drivers.WithJobConfigMap(jobConfigMap), drivers.WithJobConfigMapNs(jobConfigMapNs), + drivers.WithNodeAffinity(nodeLabel), drivers.WithNfsServer(nfsServerAddr), drivers.WithNfsExportDir(nfsExportPath), drivers.WithPodUserId(psaJobUid), diff --git a/pkg/controllers/resourceexport/reconcile.go b/pkg/controllers/resourceexport/reconcile.go index 56fb7b4a9..7f189be50 100644 --- a/pkg/controllers/resourceexport/reconcile.go +++ b/pkg/controllers/resourceexport/reconcile.go @@ -405,6 +405,16 @@ func startNfsResourceJob( logrus.Errorf("failed to create NFS cred secret: %v", err) return "", fmt.Errorf("failed to create NFS cred secret: %v", err) } + nodeLabel := make(map[string]string) + kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs) + if err != nil { + return "", err + } + pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey] + if ok && pxbJobNodeLabelValue != "" { + nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue + } + switch drv.Name() { case drivers.NFSBackup: return drv.StartJob( @@ -420,6 +430,7 @@ func startNfsResourceJob( drivers.WithAppCRNamespace(re.Spec.Source.Namespace), drivers.WithNamespace(re.Namespace), drivers.WithResoureBackupName(re.Name), + drivers.WithNodeAffinity(nodeLabel), drivers.WithResoureBackupNamespace(re.Namespace), drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions), drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath), @@ -438,6 +449,7 @@ func startNfsResourceJob( drivers.WithAppCRNamespace(re.Spec.Source.Namespace), drivers.WithNamespace(re.Namespace), drivers.WithResoureBackupName(re.Name), + drivers.WithNodeAffinity(nodeLabel), drivers.WithResoureBackupNamespace(re.Namespace), drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions), drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath), diff --git a/pkg/drivers/drivers.go b/pkg/drivers/drivers.go index 921718a49..2faaa2be5 100644 --- a/pkg/drivers/drivers.go +++ b/pkg/drivers/drivers.go @@ -123,7 +123,8 @@ const ( var ( // ErrJobFailed is a know error for a data transfer job failure. - ErrJobFailed = fmt.Errorf("data transfer job failed") + ErrJobFailed = fmt.Errorf("data transfer job failed") + PxbJobNodeLabelKey = "PXB_JOB_NODE_AFFINITY_LABEL" ) // Interface defines a data export driver behaviour. diff --git a/pkg/drivers/kopiabackup/kopiabackup.go b/pkg/drivers/kopiabackup/kopiabackup.go index 2fd6f5e32..7069a627b 100644 --- a/pkg/drivers/kopiabackup/kopiabackup.go +++ b/pkg/drivers/kopiabackup/kopiabackup.go @@ -410,6 +410,31 @@ func jobFor( job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName)) } + // Add node affnity to the job spec + if len(jobOption.NodeAffinity) > 0 { + matchExpressions := []corev1.NodeSelectorRequirement{} + for key, val := range jobOption.NodeAffinity { + expression := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + } + matchExpressions = append(matchExpressions, expression) + } + + job.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + }, + }, + }, + } + } + if len(jobOption.NfsServer) != 0 { volumeMount := corev1.VolumeMount{ Name: utils.NfsVolumeName, diff --git a/pkg/drivers/kopiarestore/kopiarestore.go b/pkg/drivers/kopiarestore/kopiarestore.go index 5e700428c..7b3204625 100644 --- a/pkg/drivers/kopiarestore/kopiarestore.go +++ b/pkg/drivers/kopiarestore/kopiarestore.go @@ -306,6 +306,29 @@ func jobFor( if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName)) } + if len(jobOption.NodeAffinity) > 0 { + matchExpressions := []corev1.NodeSelectorRequirement{} + for key, val := range jobOption.NodeAffinity { + expression := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + } + matchExpressions = append(matchExpressions, expression) + } + + job.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + }, + }, + }, + } + } if drivers.CertFilePath != "" { volumeMount := corev1.VolumeMount{ diff --git a/pkg/drivers/nfsbackup/nfsbackup.go b/pkg/drivers/nfsbackup/nfsbackup.go index 6e576d20b..99778df0d 100644 --- a/pkg/drivers/nfsbackup/nfsbackup.go +++ b/pkg/drivers/nfsbackup/nfsbackup.go @@ -286,6 +286,31 @@ func jobForBackupResource( return nil, err } + // Add node affnity to the job spec + if len(jobOption.NodeAffinity) > 0 { + matchExpressions := []corev1.NodeSelectorRequirement{} + for key, val := range jobOption.NodeAffinity { + expression := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + } + matchExpressions = append(matchExpressions, expression) + } + + job.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + }, + }, + }, + } + } + // Add the image secret in job spec only if it is present in the stork deployment. if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName)) diff --git a/pkg/drivers/nfsrestore/nfsrestore.go b/pkg/drivers/nfsrestore/nfsrestore.go index b9be4c7af..98f4981b4 100644 --- a/pkg/drivers/nfsrestore/nfsrestore.go +++ b/pkg/drivers/nfsrestore/nfsrestore.go @@ -325,6 +325,32 @@ func jobForRestoreResource( if err != nil { return nil, err } + + // Add node affnity to the job spec + if len(jobOption.NodeAffinity) > 0 { + matchExpressions := []corev1.NodeSelectorRequirement{} + for key, val := range jobOption.NodeAffinity { + expression := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + } + matchExpressions = append(matchExpressions, expression) + } + + job.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + }, + }, + }, + } + } + // Add the image secret in job spec only if it is present in the stork deployment. if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))