From 08d814708a2f62d59b20ae4ae0f94741f0b77c6b Mon Sep 17 00:00:00 2001 From: shkumari-px Date: Mon, 5 Aug 2024 06:26:53 +0000 Subject: [PATCH] PB-7476: Adding node affinity to kopia backup and restore jobs --- pkg/controllers/dataexport/reconcile.go | 13 +++++++ pkg/controllers/resourceexport/reconcile.go | 8 ++++ pkg/drivers/drivers.go | 3 +- pkg/drivers/kopiabackup/kopiabackup.go | 12 ++++++ pkg/drivers/kopiarestore/kopiarestore.go | 6 +++ pkg/drivers/nfsbackup/nfsbackup.go | 6 +++ pkg/drivers/nfscsirestore/nfscsirestore.go | 7 ++++ pkg/drivers/nfsrestore/nfsrestore.go | 7 ++++ pkg/drivers/utils/utils.go | 41 +++++++++++++++++++++ 9 files changed, 102 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/dataexport/reconcile.go b/pkg/controllers/dataexport/reconcile.go index 7e8630537..3b6bc6128 100644 --- a/pkg/controllers/dataexport/reconcile.go +++ b/pkg/controllers/dataexport/reconcile.go @@ -1903,6 +1903,11 @@ func startTransferJob( psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey) psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey) } + nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey) + if err != nil { + return "", err + } + switch drv.Name() { case drivers.Rsync: return drv.StartJob( @@ -1947,6 +1952,7 @@ func startTransferJob( drivers.WithExcludeFileList(excludeFileList), drivers.WithPodDatapathType(podDataPath), drivers.WithJobConfigMap(jobConfigMap), + drivers.WithNodeAffinity(nodeLabel), drivers.WithJobConfigMapNs(jobConfigMapNs), drivers.WithNfsServer(nfsServerAddr), drivers.WithNfsExportDir(nfsExportPath), @@ -1969,6 +1975,7 @@ func startTransferJob( drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace), drivers.WithJobConfigMap(jobConfigMap), drivers.WithJobConfigMapNs(jobConfigMapNs), + drivers.WithNodeAffinity(nodeLabel), drivers.WithNfsServer(nfsServerAddr), drivers.WithNfsExportDir(nfsExportPath), drivers.WithPodUserId(psaJobUid), @@ -2396,6 +2403,11 @@ func startNfsCSIRestoreVolumeJob( logrus.Errorf("failed to create NFS cred secret: %v", err) return "", fmt.Errorf("failed to create NFS cred secret: %v", err) } + nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey) + if err != nil { + return "", err + } + switch drv.Name() { case drivers.NFSCSIRestore: return drv.StartJob( @@ -2410,6 +2422,7 @@ func startNfsCSIRestoreVolumeJob( drivers.WithNfsSubPath(bl.Location.Path), drivers.WithPodUserId(psaJobUid), drivers.WithPodGroupId(psaJobGid), + drivers.WithNodeAffinity(nodeLabel), ) } return "", fmt.Errorf("unknown driver for nfs csi volume restore: %s", drv.Name()) diff --git a/pkg/controllers/resourceexport/reconcile.go b/pkg/controllers/resourceexport/reconcile.go index 9edff22a9..f81bacfc2 100644 --- a/pkg/controllers/resourceexport/reconcile.go +++ b/pkg/controllers/resourceexport/reconcile.go @@ -412,6 +412,12 @@ func startNfsResourceJob( logrus.Errorf("failed to create NFS cred secret: %v", err) return "", fmt.Errorf("failed to create NFS cred secret: %v", err) } + + nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey) + if err != nil { + return "", err + } + switch drv.Name() { case drivers.NFSBackup: return drv.StartJob( @@ -427,6 +433,7 @@ func startNfsResourceJob( drivers.WithAppCRNamespace(re.Spec.Source.Namespace), drivers.WithNamespace(re.Namespace), drivers.WithResoureBackupName(re.Name), + drivers.WithNodeAffinity(nodeLabel), drivers.WithResoureBackupNamespace(re.Namespace), drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions), drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath), @@ -445,6 +452,7 @@ func startNfsResourceJob( drivers.WithAppCRNamespace(re.Spec.Source.Namespace), drivers.WithNamespace(re.Namespace), drivers.WithResoureBackupName(re.Name), + drivers.WithNodeAffinity(nodeLabel), drivers.WithResoureBackupNamespace(re.Namespace), drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions), drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath), diff --git a/pkg/drivers/drivers.go b/pkg/drivers/drivers.go index 921718a49..2faaa2be5 100644 --- a/pkg/drivers/drivers.go +++ b/pkg/drivers/drivers.go @@ -123,7 +123,8 @@ const ( var ( // ErrJobFailed is a know error for a data transfer job failure. - ErrJobFailed = fmt.Errorf("data transfer job failed") + ErrJobFailed = fmt.Errorf("data transfer job failed") + PxbJobNodeLabelKey = "PXB_JOB_NODE_AFFINITY_LABEL" ) // Interface defines a data export driver behaviour. diff --git a/pkg/drivers/kopiabackup/kopiabackup.go b/pkg/drivers/kopiabackup/kopiabackup.go index 7d70e026a..d50220e0f 100644 --- a/pkg/drivers/kopiabackup/kopiabackup.go +++ b/pkg/drivers/kopiabackup/kopiabackup.go @@ -273,6 +273,7 @@ func jobFor( jobName string, resources corev1.ResourceRequirements, nodeName string, + live bool, ) (*batchv1.Job, error) { backupName := jobName @@ -410,6 +411,14 @@ func jobFor( job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName)) } + // Add node affinity to the job spec + if !live { + job, err = utils.AddNodeAffinityToJob(job, jobOption) + if err != nil { + return nil, err + } + } + if len(jobOption.NfsServer) != 0 { volumeMount := corev1.VolumeMount{ Name: utils.NfsVolumeName, @@ -494,6 +503,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error) } var resourceNamespace string var nodeName string + var live bool // filter out the pods that are create by us for _, pod := range pods { labels := pod.ObjectMeta.Labels @@ -504,6 +514,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error) // get the nodeName, if the pods is in Running state, So that we can schedule // kopia job on the same node. nodeName = pod.Spec.NodeName + live = true break } } @@ -518,6 +529,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error) jobName, resources, nodeName, + live, ) } diff --git a/pkg/drivers/kopiarestore/kopiarestore.go b/pkg/drivers/kopiarestore/kopiarestore.go index 5e700428c..b0a1f5170 100644 --- a/pkg/drivers/kopiarestore/kopiarestore.go +++ b/pkg/drivers/kopiarestore/kopiarestore.go @@ -307,6 +307,12 @@ func jobFor( job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName)) } + // Add node affinity to the job spec + job, err = utils.AddNodeAffinityToJob(job, jobOption) + if(err != nil){ + return nil, err + } + if drivers.CertFilePath != "" { volumeMount := corev1.VolumeMount{ Name: utils.TLSCertMountVol, diff --git a/pkg/drivers/nfsbackup/nfsbackup.go b/pkg/drivers/nfsbackup/nfsbackup.go index b64454a26..967e19211 100644 --- a/pkg/drivers/nfsbackup/nfsbackup.go +++ b/pkg/drivers/nfsbackup/nfsbackup.go @@ -306,6 +306,12 @@ func jobForBackupResource( } } + // Add node affinity to the job spec + job, err = utils.AddNodeAffinityToJob(job, jobOption) + if(err != nil){ + return nil, err + } + // Add the image secret in job spec only if it is present in the stork deployment. if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName)) diff --git a/pkg/drivers/nfscsirestore/nfscsirestore.go b/pkg/drivers/nfscsirestore/nfscsirestore.go index 2cc6024db..8354cd0ec 100644 --- a/pkg/drivers/nfscsirestore/nfscsirestore.go +++ b/pkg/drivers/nfscsirestore/nfscsirestore.go @@ -286,6 +286,13 @@ func jobForRestoreCSISnapshot( if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName)) } + + // Add node affinity to the job spec + job, err = utils.AddNodeAffinityToJob(job, jobOption) + if(err != nil){ + return nil, err + } + if len(jobOption.NfsServer) != 0 { volumeMount := corev1.VolumeMount{ Name: utils.NfsVolumeName, diff --git a/pkg/drivers/nfsrestore/nfsrestore.go b/pkg/drivers/nfsrestore/nfsrestore.go index 881ef2721..8bbf3f1d0 100644 --- a/pkg/drivers/nfsrestore/nfsrestore.go +++ b/pkg/drivers/nfsrestore/nfsrestore.go @@ -327,6 +327,13 @@ func jobForRestoreResource( if err != nil { return nil, err } + + // Add node affinity to the job spec + job, err = utils.AddNodeAffinityToJob(job, jobOption) + if(err != nil){ + return nil, err + } + // Add the image secret in job spec only if it is present in the stork deployment. if len(imageRegistrySecret) != 0 { job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName)) diff --git a/pkg/drivers/utils/utils.go b/pkg/drivers/utils/utils.go index ab566c84b..2341d6e70 100644 --- a/pkg/drivers/utils/utils.go +++ b/pkg/drivers/utils/utils.go @@ -859,6 +859,20 @@ func GetNodeAffinityFromDeployment(name, namespace string) (*corev1.NodeAffinity return deploy.Spec.Template.Spec.Affinity.NodeAffinity, nil } +// GetNodeLabelFromDeployment gets node label from deployment +func GetNodeLabelFromDeployment(name, namespace, key string) (map[string]string, error) { + nodeLabel := make(map[string]string) + deploy, err := core.Instance().GetConfigMap(name, namespace) + if err != nil { + return nil, err + } + value, ok := deploy.Data[key] + if ok && value != "" { + nodeLabel[key] = value + } + return nodeLabel, nil +} + // IsJobPodMountFailed - checks for mount failure in a Job pod func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool { fn := "IsJobPodMountFailed" @@ -1127,3 +1141,30 @@ func PauseCleanupResource() (time.Duration, error) { } return pauseCleanupVal, nil } + +// AddNodeAffinityToJob adds node affinity to the job spec +func AddNodeAffinityToJob(job *batchv1.Job, jobOption drivers.JobOpts) (*batchv1.Job, error) { + if len(jobOption.NodeAffinity) > 0 { + matchExpressions := []corev1.NodeSelectorRequirement{} + for key, val := range jobOption.NodeAffinity { + expression := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + } + matchExpressions = append(matchExpressions, expression) + } + job.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + }, + }, + }, + } + } + return job, nil +}