Skip to content

Commit

Permalink
PB-7476: Adding node affinity to kopia backup and restore jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
shkumari-px committed Jul 19, 2024
1 parent 4a8c1c5 commit 6573e00
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 1 deletion.
11 changes: 11 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,15 @@ func startTransferJob(
psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey)
psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey)
}
nodeLabel := make(map[string]string)
kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs)
if err != nil {
return "", err
}
pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey]
if ok && pxbJobNodeLabelValue != "" {
nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue
}
switch drv.Name() {
case drivers.Rsync:
return drv.StartJob(
Expand Down Expand Up @@ -1929,6 +1938,7 @@ func startTransferJob(
drivers.WithExcludeFileList(excludeFileList),
drivers.WithPodDatapathType(podDataPath),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
Expand All @@ -1951,6 +1961,7 @@ func startTransferJob(
drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
drivers.WithPodUserId(psaJobUid),
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@ func startNfsResourceJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}
nodeLabel := make(map[string]string)
kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs)
if err != nil {
return "", err
}
pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey]
if ok && pxbJobNodeLabelValue != "" {
nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue
}

switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand All @@ -420,6 +430,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand All @@ -438,6 +449,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (

var (
// ErrJobFailed is a know error for a data transfer job failure.
ErrJobFailed = fmt.Errorf("data transfer job failed")
ErrJobFailed = fmt.Errorf("data transfer job failed")
PxbJobNodeLabelKey = "PXB_JOB_NODE_AFFINITY_LABEL"
)

// Interface defines a data export driver behaviour.
Expand Down
25 changes: 25 additions & 0 deletions pkg/drivers/kopiabackup/kopiabackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,31 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affnity to the job spec
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down
23 changes: 23 additions & 0 deletions pkg/drivers/kopiarestore/kopiarestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,29 @@ func jobFor(
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

if drivers.CertFilePath != "" {
volumeMount := corev1.VolumeMount{
Expand Down
25 changes: 25 additions & 0 deletions pkg/drivers/nfsbackup/nfsbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,31 @@ func jobForBackupResource(
return nil, err
}

// Add node affnity to the job spec
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
26 changes: 26 additions & 0 deletions pkg/drivers/nfsrestore/nfsrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,32 @@ func jobForRestoreResource(
if err != nil {
return nil, err
}

// Add node affnity to the job spec
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down

0 comments on commit 6573e00

Please sign in to comment.