Skip to content

Commit

Permalink
PB-8410 Incorporate the logic not delete the restore job pods when mo…
Browse files Browse the repository at this point in the history
…unt failure occurs within 5 mins

Signed-off-by: vsundarraj-px <[email protected]>
  • Loading branch information
vsundarraj-px committed Nov 16, 2024
1 parent 6f78ad5 commit 21aeaf5
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,8 @@ func startTransferJob(
if err != nil {
return "", err
}
// update latest JobFailureRetryTimeout
utils.UpdateJobFailureTimeOut(jobConfigMap, jobConfigMapNs)

switch drv.Name() {
case drivers.Rsync:
Expand Down Expand Up @@ -2409,6 +2411,8 @@ func startNfsCSIRestoreVolumeJob(
return "", err
}

// update latest JobFailureRetryTimeout
utils.UpdateJobFailureTimeOut(jobConfigMap, jobConfigMapNs)
switch drv.Name() {
case drivers.NFSCSIRestore:
return drv.StartJob(
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ func startNfsResourceJob(
if err != nil {
return "", err
}

// update latest JobFailureRetryTimeout
utils.UpdateJobFailureTimeOut(jobConfigMap, jobConfigMapNs)
switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand Down
64 changes: 61 additions & 3 deletions pkg/drivers/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ const (
kopiaBackupString = "kopiaexecutor backup"
// if providerType in node spec has this string then it is GCP hosted cluster
GCPBasedClusterString = "gce://"
//PxbJobFailRetryLimit defines timeout after job failure due to mount failure
PxbJobFailureRetryTimeoutKey = "MOUNT_FAILURE_RETRY_TIMEOUT"
// PxbJobFailRetryLimit default timeout after job failure due to mount failure
PxbDefaultJobFailureRetryTimeout = "30"
)

var (
Expand All @@ -93,6 +97,8 @@ var volumeAPICallBackoff = wait.Backoff{
Steps: volumeSteps,
}

var JobFailureRetryTimeout time.Duration

// NamespacedName returns a name in form "<namespace>/<name>".
func NamespacedName(namespace, name string) string {
v := types.NamespacedName{
Expand Down Expand Up @@ -876,7 +882,7 @@ func GetNodeLabelFromDeployment(name, namespace, key string) (map[string]string,
// IsJobPodMountFailed - checks for mount failure in a Job pod
func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool {
fn := "IsJobPodMountFailed"

mountFailed := false
pod, err := core.Instance().GetPodsByOwner(job.UID, namespace)
if err != nil {
errMsg := fmt.Sprintf("Getting pod of job [%s/%s] failed: %v", namespace, job.Name, err)
Expand All @@ -899,12 +905,23 @@ func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool {
}
for _, event := range events.Items {
if event.Reason == "FailedMount" && event.Count > 0 {
return true
mountFailed = true
break
}
}
}
}
return false

if mountFailed {
timeSinceStart := time.Since(job.CreationTimestamp.Time)
if timeSinceStart >= JobFailureRetryTimeout {
logrus.Debugf("%v: job error. Timeout elapsed for volume mount failure of pod [%s/%s]", fn, namespace, pod[0].Name)
} else {
logrus.Debugf("%v: error in volume mount for pod [%s/%s]. Retry until timeout", fn, namespace, pod[0].Name)
mountFailed = false
}
}
return mountFailed
}

// Check if a job has failed because of podSecurity violation
Expand Down Expand Up @@ -1178,3 +1195,44 @@ func GetAccessModeFromPvc(srcPvcName, srcPvcNameSpace string) ([]corev1.Persiste
accessModes := srcPvc.Status.AccessModes
return accessModes, nil
}

// GetDataFroMConfigMap reads specific key value from configMap
// returns error if configMap cannot be read from k8s
// returns error if key is not found in configMap
// returns value of the key if key is found in configMap
func GetDataFromConfigMap(name, namespace, key string) (string, error) {

configMap, err := core.Instance().GetConfigMap(name, namespace)
if err != nil {
return "", err
}

if value, ok := configMap.Data[key]; ok {

return value, nil
}

return "", fmt.Errorf("key [%s] not found in configMap [%s/%s]", key, namespace, name)

}

// UpdateJobFailureTimeOut this is called in reconciler before starting a new Job to update JobFailureRetryTimeout value
// if we fail to read the latest values from configMap, we will reset to default value
// return: This function returns nothing.
func UpdateJobFailureTimeOut(jobConfigMap, jobConfigMapNs string) {
fn := "UpdateJobFailureTimeOut"
timeOut, err := GetDataFromConfigMap(jobConfigMap, jobConfigMapNs, PxbJobFailureRetryTimeoutKey)
if err != nil {
logrus.Debugf("%v:failed retry limit not found. Setting to default: %v", fn, err)
timeOut = PxbDefaultJobFailureRetryTimeout
} else {
// we could fail here if the vaue set is invalid or has some junk charectors.
duration, err := time.ParseDuration(timeOut + "s")
if err != nil || duration <= 0 {
logrus.Errorf("invalid %v value set. Should be numberic value > 0. Setting to default limit", PxbJobFailureRetryTimeoutKey)
timeOut = PxbDefaultJobFailureRetryTimeout
}
}
// skipping error cos we know we will never fail here.
JobFailureRetryTimeout, _ = time.ParseDuration(timeOut + "s")
}

0 comments on commit 21aeaf5

Please sign in to comment.