Skip to content

Commit

Permalink
PB-7476: Adding node affinity to kopia backup and restore jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
shkumari-px committed Aug 6, 2024
1 parent 855337d commit ad0842a
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 1 deletion.
13 changes: 13 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,11 @@ func startTransferJob(
psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey)
psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.Rsync:
return drv.StartJob(
Expand Down Expand Up @@ -1929,6 +1934,7 @@ func startTransferJob(
drivers.WithExcludeFileList(excludeFileList),
drivers.WithPodDatapathType(podDataPath),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
Expand All @@ -1951,6 +1957,7 @@ func startTransferJob(
drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
drivers.WithPodUserId(psaJobUid),
Expand Down Expand Up @@ -2378,6 +2385,11 @@ func startNfsCSIRestoreVolumeJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSCSIRestore:
return drv.StartJob(
Expand All @@ -2392,6 +2404,7 @@ func startNfsCSIRestoreVolumeJob(
drivers.WithNfsSubPath(bl.Location.Path),
drivers.WithPodUserId(psaJobUid),
drivers.WithPodGroupId(psaJobGid),
drivers.WithNodeAffinity(nodeLabel),
)
}
return "", fmt.Errorf("unknown driver for nfs csi volume restore: %s", drv.Name())
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ func startNfsResourceJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}

nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand All @@ -420,6 +426,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand All @@ -438,6 +445,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (

var (
// ErrJobFailed is a know error for a data transfer job failure.
ErrJobFailed = fmt.Errorf("data transfer job failed")
ErrJobFailed = fmt.Errorf("data transfer job failed")
PxbJobNodeLabelKey = "PXB_JOB_NODE_AFFINITY_LABEL"
)

// Interface defines a data export driver behaviour.
Expand Down
12 changes: 12 additions & 0 deletions pkg/drivers/kopiabackup/kopiabackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func jobFor(
jobName string,
resources corev1.ResourceRequirements,
nodeName string,
live bool,
) (*batchv1.Job, error) {
backupName := jobName

Expand Down Expand Up @@ -410,6 +411,14 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
if !live {
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down Expand Up @@ -494,6 +503,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
}
var resourceNamespace string
var nodeName string
var live bool
// filter out the pods that are create by us
for _, pod := range pods {
labels := pod.ObjectMeta.Labels
Expand All @@ -504,6 +514,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
// get the nodeName, if the pods is in Running state, So that we can schedule
// kopia job on the same node.
nodeName = pod.Spec.NodeName
live = true
break
}
}
Expand All @@ -518,6 +529,7 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
jobName,
resources,
nodeName,
live,
)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/drivers/kopiarestore/kopiarestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if(err != nil){
return nil, err
}

if drivers.CertFilePath != "" {
volumeMount := corev1.VolumeMount{
Name: utils.TLSCertMountVol,
Expand Down
6 changes: 6 additions & 0 deletions pkg/drivers/nfsbackup/nfsbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ func jobForBackupResource(
return nil, err
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if(err != nil){
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfscsirestore/nfscsirestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func jobForRestoreCSISnapshot(
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if(err != nil){
return nil, err
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfsrestore/nfsrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ func jobForRestoreResource(
if err != nil {
return nil, err
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if(err != nil){
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
41 changes: 41 additions & 0 deletions pkg/drivers/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,20 @@ func GetNodeAffinityFromDeployment(name, namespace string) (*corev1.NodeAffinity
return deploy.Spec.Template.Spec.Affinity.NodeAffinity, nil
}

// GetNodeLabelFromDeployment gets node label from deployment
func GetNodeLabelFromDeployment(name, namespace, key string) (map[string]string, error) {
nodeLabel := make(map[string]string)
deploy, err := core.Instance().GetConfigMap(name, namespace)
if err != nil {
return nil, err
}
value, ok := deploy.Data[key]
if ok && value != "" {
nodeLabel[key] = value
}
return nodeLabel, nil
}

// IsJobPodMountFailed - checks for mount failure in a Job pod
func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool {
fn := "IsJobPodMountFailed"
Expand Down Expand Up @@ -1085,3 +1099,30 @@ func GetOcpNsUidGid(nsName string, psaJobUid string, psaJobGid string) (string,
}
return psaJobUid, psaJobGid, isOcp, nil
}

// AddNodeAffinityToJob adds node affinity to the job spec
func AddNodeAffinityToJob(job *batchv1.Job, jobOption drivers.JobOpts) (*batchv1.Job, error) {
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}
job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}
return job, nil
}

0 comments on commit ad0842a

Please sign in to comment.