Skip to content

Commit

Permalink
Merge pull request #398 from portworx/PB-7476-sync
Browse files Browse the repository at this point in the history
PB-7476-sync: Merging commits from 1.2.15 to master and updating node label as per the best practices
  • Loading branch information
shkumari-px authored Oct 5, 2024
2 parents f4c7e34 + 8298317 commit 7cde58b
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 25 deletions.
13 changes: 13 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,11 @@ func startTransferJob(
psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey)
psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.Rsync:
return drv.StartJob(
Expand Down Expand Up @@ -1947,6 +1952,7 @@ func startTransferJob(
drivers.WithExcludeFileList(excludeFileList),
drivers.WithPodDatapathType(podDataPath),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
Expand All @@ -1969,6 +1975,7 @@ func startTransferJob(
drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
drivers.WithPodUserId(psaJobUid),
Expand Down Expand Up @@ -2397,6 +2404,11 @@ func startNfsCSIRestoreVolumeJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSCSIRestore:
return drv.StartJob(
Expand All @@ -2411,6 +2423,7 @@ func startNfsCSIRestoreVolumeJob(
drivers.WithNfsSubPath(bl.Location.Path),
drivers.WithPodUserId(psaJobUid),
drivers.WithPodGroupId(psaJobGid),
drivers.WithNodeAffinity(nodeLabel),
)
}
return "", fmt.Errorf("unknown driver for nfs csi volume restore: %s", drv.Name())
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ func startNfsResourceJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}

nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand All @@ -427,6 +433,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand All @@ -445,6 +452,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (

var (
// ErrJobFailed is a know error for a data transfer job failure.
ErrJobFailed = fmt.Errorf("data transfer job failed")
ErrJobFailed = fmt.Errorf("data transfer job failed")
PxbJobNodeLabelKey = "pxb_job_node_affinity_label"
)

// Interface defines a data export driver behaviour.
Expand Down
60 changes: 36 additions & 24 deletions pkg/drivers/kopiabackup/kopiabackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func jobFor(
jobName string,
resources corev1.ResourceRequirements,
nodeName string,
live bool,
) (*batchv1.Job, error) {
backupName := jobName

Expand Down Expand Up @@ -401,15 +402,41 @@ func jobFor(
}
}

if len(nodeName) != 0 {
job.Spec.Template.Spec.NodeName = nodeName
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
if !live {
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}
} else {
accessModes, err := utils.GetAccessModeFromPvc(jobOption.SourcePVCName, jobOption.SourcePVCNamespace)
if err != nil {
return nil, err
}

var singleNodeMount bool
for _, val := range accessModes {
if val == "ReadWriteOnce" || val == "ReadWriteOncePod" {
singleNodeMount = true
break
}
}

if singleNodeMount && len(nodeName) != 0 {
job.Spec.Template.Spec.NodeName = nodeName
} else if !singleNodeMount {
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}
}
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down Expand Up @@ -493,8 +520,8 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
return nil, fmt.Errorf(errMsg)
}
var resourceNamespace string
var live bool
var nodeName string
var live bool
// filter out the pods that are create by us
for _, pod := range pods {
labels := pod.ObjectMeta.Labels
Expand All @@ -505,11 +532,12 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
// get the nodeName, if the pods is in Running state, So that we can schedule
// kopia job on the same node.
nodeName = pod.Spec.NodeName
live = true
break
}
}
resourceNamespace = jobOptions.Namespace
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor(live)); err != nil {
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor()); err != nil {
errMsg := fmt.Sprintf("error creating service account %s/%s: %v", resourceNamespace, jobName, err)
logrus.Errorf("%s: %v", fn, errMsg)
return nil, fmt.Errorf(errMsg)
Expand All @@ -519,10 +547,11 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
jobName,
resources,
nodeName,
live,
)
}

func roleFor(live bool) *rbacv1.Role {
func roleFor() *rbacv1.Role {
role := &rbacv1.Role{
Rules: []rbacv1.PolicyRule{
{
Expand All @@ -532,22 +561,5 @@ func roleFor(live bool) *rbacv1.Role {
},
},
}
// Only live backup, we will add the hostaccess and privilege option.
if live {
hostAccessRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"hostaccess"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, hostAccessRule)
PrivilegedRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"privileged"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, PrivilegedRule)
}
return role
}
6 changes: 6 additions & 0 deletions pkg/drivers/kopiarestore/kopiarestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

if drivers.CertFilePath != "" {
volumeMount := corev1.VolumeMount{
Name: utils.TLSCertMountVol,
Expand Down
6 changes: 6 additions & 0 deletions pkg/drivers/nfsbackup/nfsbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ func jobForBackupResource(
}
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfscsirestore/nfscsirestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func jobForRestoreCSISnapshot(
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfsrestore/nfsrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ func jobForRestoreResource(
if err != nil {
return nil, err
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
51 changes: 51 additions & 0 deletions pkg/drivers/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,20 @@ func GetNodeAffinityFromDeployment(name, namespace string) (*corev1.NodeAffinity
return deploy.Spec.Template.Spec.Affinity.NodeAffinity, nil
}

// GetNodeLabelFromDeployment gets node label from deployment
func GetNodeLabelFromDeployment(name, namespace, key string) (map[string]string, error) {
nodeLabel := make(map[string]string)
deploy, err := core.Instance().GetConfigMap(name, namespace)
if err != nil {
return nil, err
}
value, ok := deploy.Data[key]
if ok && value != "" {
nodeLabel[key] = value
}
return nodeLabel, nil
}

// IsJobPodMountFailed - checks for mount failure in a Job pod
func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool {
fn := "IsJobPodMountFailed"
Expand Down Expand Up @@ -1127,3 +1141,40 @@ func PauseCleanupResource() (time.Duration, error) {
}
return pauseCleanupVal, nil
}

// AddNodeAffinityToJob adds node affinity to the job spec
func AddNodeAffinityToJob(job *batchv1.Job, jobOption drivers.JobOpts) (*batchv1.Job, error) {
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}
job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}
return job, nil
}

// GetAccessModeFromPvc gets the access modes of the pvc
func GetAccessModeFromPvc(srcPvcName, srcPvcNameSpace string) ([]corev1.PersistentVolumeAccessMode, error) {
srcPvc, err := core.Instance().GetPersistentVolumeClaim(srcPvcName, srcPvcNameSpace)
if err != nil {
return nil, err
}
accessModes := srcPvc.Status.AccessModes
return accessModes, nil
}

0 comments on commit 7cde58b

Please sign in to comment.