From 43f45b1ffda15b289da2996822665eb5a44eb19a Mon Sep 17 00:00:00 2001 From: shkumari-px Date: Mon, 13 May 2024 15:30:09 +0000 Subject: [PATCH] PB-6862-vendor-stork: vendor release-24.2.1 stork to kdmp 1.2.12 --- go.mod | 2 +- go.sum | 4 +- .../stork/drivers/volume/kdmp/kdmp.go | 9 + .../stork/drivers/volume/portworx/portworx.go | 169 ++++++++++---- .../stork/drivers/volume/volume.go | 28 +++ .../controllers/applicationbackup.go | 41 +++- .../controllers/applicationrestore.go | 220 +++++++++++------- .../libopenstorage/stork/pkg/cache/cache.go | 33 ++- .../pkg/resourcecollector/persistentvolume.go | 2 +- .../resourcecollector/resourcecollector.go | 9 +- .../stork/pkg/snapshotter/snapshotter_csi.go | 11 +- vendor/modules.txt | 2 +- 12 files changed, 376 insertions(+), 154 deletions(-) diff --git a/go.mod b/go.mod index 7e99a1a07..76e088947 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/hashicorp/go-version v1.6.0 github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 github.com/kubernetes-incubator/external-storage v0.20.4-openstorage-rc7 - github.com/libopenstorage/stork v1.4.1-0.20240424163629-d47b17267ad0 + github.com/libopenstorage/stork v1.4.1-0.20240513102605-2340238c7664 github.com/portworx/pxc v0.33.0 github.com/portworx/sched-ops v1.20.4-rc1.0.20240424161056-966b1a440b16 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 1419cc18e..4c583708f 100644 --- a/go.sum +++ b/go.sum @@ -3362,8 +3362,8 @@ github.com/libopenstorage/stork v1.4.1-0.20230502135851-9cacb19e1df5/go.mod h1:R github.com/libopenstorage/stork v1.4.1-0.20230519043154-cbc10dffaf19/go.mod h1:Xm4DHoViynFXMQKBXGj3IkA77LY2RBFkNtv6vbo3wNw= github.com/libopenstorage/stork v1.4.1-0.20230601053837-5dd68f026569/go.mod h1:+mKPMCPNhS/XOF2RPcNFijkr67CCCWp0o8OXVG6xxAk= github.com/libopenstorage/stork v1.4.1-0.20230610103146-72cf75320066/go.mod h1:Yst+fnOYjWk6SA5pXZBKm19wtiinjxQ/vgYTXI3k80Q= -github.com/libopenstorage/stork v1.4.1-0.20240424163629-d47b17267ad0 h1:n0xSBBNggwwJRP/wVXsfEJPXTWFjDiZcCl7uA+ZqDuU= -github.com/libopenstorage/stork v1.4.1-0.20240424163629-d47b17267ad0/go.mod h1:EraUy+NLSVp4YK7FrFBg61suAvyHcACIuW1jb1PdWt8= +github.com/libopenstorage/stork v1.4.1-0.20240513102605-2340238c7664 h1:bABYni9x1xTkaIzIvfeYj1MpubMp+kjV5K0qP9k1Xb4= +github.com/libopenstorage/stork v1.4.1-0.20240513102605-2340238c7664/go.mod h1:kp5qtpq+BgjL5WqiOpDvbPH1WGReO5AaqXDbb+XpvzM= github.com/libopenstorage/systemutils v0.0.0-20160208220149-44ac83be3ce1/go.mod h1:xwNGC7xiz/BQ/wbMkvHujL8Gjgseg+x41xMek7sKRRQ= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= diff --git a/vendor/github.com/libopenstorage/stork/drivers/volume/kdmp/kdmp.go b/vendor/github.com/libopenstorage/stork/drivers/volume/kdmp/kdmp.go index 21065a5e1..abd6f12c0 100644 --- a/vendor/github.com/libopenstorage/stork/drivers/volume/kdmp/kdmp.go +++ b/vendor/github.com/libopenstorage/stork/drivers/volume/kdmp/kdmp.go @@ -32,6 +32,7 @@ import ( v1 "k8s.io/api/core/v1" k8serror "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" k8shelper "k8s.io/component-helpers/storage/volume" ) @@ -721,6 +722,14 @@ func (k *kdmp) StartRestore( pvc.Name = bkpvInfo.PersistentVolumeClaim pvc.Namespace = restoreNamespace } + // Check if the snapshot size is larger than the pvc size. + // Update the pvc size to that of the snapshot size if so. + pvcQuantity := pvc.Spec.Resources.Requests[v1.ResourceStorage] + if pvcQuantity.CmpInt64(int64(bkpvInfo.TotalSize)) == -1 { + newPvcQuantity := resource.NewQuantity(int64(bkpvInfo.TotalSize), resource.BinarySI) + logrus.Debugf("setting size of pvc %s/%s same as snapshot size %s", pvc.Namespace, pvc.Name, newPvcQuantity.String()) + pvc.Spec.Resources.Requests[v1.ResourceStorage] = *newPvcQuantity + } volumeInfo.PersistentVolumeClaim = bkpvInfo.PersistentVolumeClaim volumeInfo.PersistentVolumeClaimUID = bkpvInfo.PersistentVolumeClaimUID volumeInfo.SourceNamespace = bkpvInfo.Namespace diff --git a/vendor/github.com/libopenstorage/stork/drivers/volume/portworx/portworx.go b/vendor/github.com/libopenstorage/stork/drivers/volume/portworx/portworx.go index 14f98614e..b94aa1020 100644 --- a/vendor/github.com/libopenstorage/stork/drivers/volume/portworx/portworx.go +++ b/vendor/github.com/libopenstorage/stork/drivers/volume/portworx/portworx.go @@ -9,7 +9,6 @@ import ( "math" "os" "path/filepath" - "reflect" "regexp" "strconv" "strings" @@ -252,15 +251,16 @@ var restoreStateCallBackoff = wait.Backoff{ } type portworx struct { - store cache.Store - stopChannel chan struct{} - sdkConn *portworxGrpcConnection - id string - endpoint string - tlsConfig *tls.Config - jwtSharedSecret string - jwtIssuer string - initDone bool + store cache.Store + stopChannel chan struct{} + sdkConn *portworxGrpcConnection + id string + endpoint string + tlsConfig *tls.Config + jwtSharedSecret string + jwtIssuer string + initDone bool + getAdminVolDriver func() (volume.VolumeDriver, error) } type portworxGrpcConnection struct { @@ -506,35 +506,84 @@ func (p *portworx) inspectVolume(volDriver volume.VolumeDriver, volumeID string) Type: "Volume", } } + return p.constructStorkVolume(vols[0]), nil +} +func (p *portworx) constructStorkVolume(vol *api.Volume) *storkvolume.Info { info := &storkvolume.Info{} - info.VolumeID = vols[0].Id + info.VolumeID = vol.Id - info.VolumeName = vols[0].Locator.Name - for _, rset := range vols[0].ReplicaSets { + info.VolumeName = vol.Locator.Name + for _, rset := range vol.ReplicaSets { info.DataNodes = append(info.DataNodes, rset.Nodes...) } - if vols[0].Source != nil { - info.ParentID = vols[0].Source.Parent + if vol.Source != nil { + info.ParentID = vol.Source.Parent } info.Labels = make(map[string]string) - for k, v := range vols[0].Spec.GetVolumeLabels() { + for k, v := range vol.Spec.GetVolumeLabels() { info.Labels[k] = v } - for k, v := range vols[0].Locator.GetVolumeLabels() { + for k, v := range vol.Locator.GetVolumeLabels() { info.Labels[k] = v } - info.NeedsAntiHyperconvergence = vols[0].Spec.Sharedv4 && vols[0].Spec.Sharedv4ServiceSpec != nil + info.NeedsAntiHyperconvergence = vol.Spec.Sharedv4 && vol.Spec.Sharedv4ServiceSpec != nil info.WindowsVolume = p.volumePrefersWindowsNodes(info) - info.VolumeSourceRef = vols[0] + info.VolumeSourceRef = vol + + info.AttachedOn = vol.AttachedOn - info.AttachedOn = vols[0].AttachedOn + return info - return info, nil +} + +func (p *portworx) enumerateVolumes(volumeNameList []string) (map[string]*storkvolume.Info, error) { + if !p.initDone { + if err := p.initPortworxClients(); err != nil { + return nil, err + } + } + + volDriver, err := p.getAdminVolDriver() + if err != nil { + return nil, err + } + volMap := make(map[string]*storkvolume.Info) + vols, err := volDriver.Enumerate( + &api.VolumeLocator{ + VolumeIds: volumeNameList, + }, + nil, + ) + if err != nil { + return nil, err + } + // Construct the stork volMap with just the names so that + // if Portworx failed to return the volume info, we can at least + //return the volume name. + for _, volName := range volumeNameList { + storkVol := &storkvolume.Info{ + VolumeID: volName, + } + volMap[volName] = storkVol + } + for _, vol := range vols { + // Construct and overwrite the stork volMap with the + // volume info fetched from Portworx API + if _, ok := volMap[vol.Locator.Name]; ok { + storkVol := p.constructStorkVolume(vol) + volMap[vol.Locator.Name] = storkVol + } else if _, ok := volMap[vol.Id]; ok { + storkVol := p.constructStorkVolume(vol) + volMap[vol.Id] = storkVol + } + } + + return volMap, nil } // volumePrefersWindowsNodes checks if "winshare" label is applied to the volume @@ -745,7 +794,7 @@ func (p *portworx) IsSupportedPVC(coreOps core.Ops, pvc *v1.PersistentVolumeClai if storageClassName != "" { var storageClass *storagev1.StorageClass var err error - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { storageClass, err = storkcache.Instance().GetStorageClass(storageClassName) } else { storageClass, err = storage.Instance().GetStorageClass(storageClassName) @@ -813,6 +862,8 @@ func (p *portworx) GetPodVolumes(podSpec *v1.PodSpec, namespace string, includeP // includePendingWFFC - Includes pending volumes in the second return value if they are using WaitForFirstConsumer binding mode var volumes []*storkvolume.Info var pendingWFFCVolumes []*storkvolume.Info + pvcMap := make(map[string]*v1.PersistentVolumeClaim) + var volumeNameList []string for _, volume := range podSpec.Volumes { volumeName := "" isPendingWFFC := false @@ -835,6 +886,18 @@ func (p *portworx) GetPodVolumes(podSpec *v1.PodSpec, namespace string, includeP if pvc.Status.Phase == v1.ClaimPending { // Only include pending volume if requested and storage class has WFFC if includePendingWFFC && isWaitingForFirstConsumer(pvc) { + // For pending volumes we won't query Portworx. + // Populate at least something about these volumes + wffcVol := &storkvolume.Info{ + // this would be empty for pending volumes, but the callers are simply + // checking for the existence of such a volume and not its contents + VolumeName: pvc.Spec.VolumeName, + } + wffcVol.Labels = make(map[string]string) + for k, v := range pvc.ObjectMeta.Annotations { + wffcVol.Labels[k] = v + } + pendingWFFCVolumes = append(pendingWFFCVolumes, wffcVol) isPendingWFFC = true } else { return nil, nil, &storkvolume.ErrPVCPending{ @@ -843,37 +906,49 @@ func (p *portworx) GetPodVolumes(podSpec *v1.PodSpec, namespace string, includeP } } volumeName = pvc.Spec.VolumeName + pvcMap[volumeName] = pvc } else if volume.PortworxVolume != nil { volumeName = volume.PortworxVolume.VolumeID } - - if volumeName != "" { - volumeInfo, err := p.InspectVolume(volumeName) - if err != nil { - logrus.Warnf("Failed to inspect volume %v: %v", volumeName, err) - // If the inspect volume fails return with atleast some info - volumeInfo = &storkvolume.Info{ + // If a volume is pending and WFFC, it doesn't exist in Portworx. + // No need of querying it. + if !isPendingWFFC { + volumeNameList = append(volumeNameList, volumeName) + } + } + var ( + volInfos map[string]*storkvolume.Info + err error + ) + if len(volumeNameList) > 0 { + // Lets get all the volumes in one shot + volInfos, err = p.enumerateVolumes(volumeNameList) + if err != nil { + logrus.Warnf("Failed to enumerate volumes %v: %v", volumeNameList, err) + volInfos = make(map[string]*storkvolume.Info) + for _, volumeName := range volumeNameList { + // Populate at lease something about the volumes + volInfos[volumeName] = &storkvolume.Info{ VolumeName: volumeName, } } - // Add the annotations as volume labels - if len(volumeInfo.Labels) == 0 { - volumeInfo.Labels = make(map[string]string) - } - - if pvc != nil { - for k, v := range pvc.ObjectMeta.Annotations { - volumeInfo.Labels[k] = v - } - } + } + } - if isPendingWFFC { - pendingWFFCVolumes = append(pendingWFFCVolumes, volumeInfo) - } else { - volumes = append(volumes, volumeInfo) + for volumeName, volumeInfo := range volInfos { + // Add the annotations as volume labels + if len(volumeInfo.Labels) == 0 { + volumeInfo.Labels = make(map[string]string) + } + pvc, ok := pvcMap[volumeName] + if ok && pvc != nil { + for k, v := range pvc.ObjectMeta.Annotations { + volumeInfo.Labels[k] = v } } + volumes = append(volumes, volumeInfo) } + return volumes, pendingWFFCVolumes, nil } @@ -1063,7 +1138,7 @@ func (p *portworx) getUserVolDriver(annotations map[string]string, namespace str return volDriver, err } -func (p *portworx) getAdminVolDriver() (volume.VolumeDriver, error) { +func (p *portworx) adminVolDriver() (volume.VolumeDriver, error) { if len(p.jwtSharedSecret) != 0 { claims := &auth.Claims{ Issuer: p.jwtIssuer, @@ -2762,7 +2837,7 @@ func (p *portworx) UpdateMigratedPersistentVolumeSpec( if len(pv.Spec.StorageClassName) != 0 { var sc *storagev1.StorageClass var err error - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { sc, err = storkcache.Instance().GetStorageClass(pv.Spec.StorageClassName) } else { sc, err = storage.Instance().GetStorageClass(pv.Spec.StorageClassName) @@ -3718,7 +3793,7 @@ func (p *portworx) getCloudBackupRestoreSpec( var sc *storagev1.StorageClass var err error if len(destStorageClass) != 0 { - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { sc, err = storkcache.Instance().GetStorageClass(destStorageClass) } else { sc, err = storage.Instance().GetStorageClass(destStorageClass) @@ -4404,6 +4479,8 @@ func (p *portworx) statfsConfigMapNeedsUpdate(existing *v1.ConfigMap, expectedSu func init() { p := &portworx{} + // Defining this override helps in UTs + p.getAdminVolDriver = p.adminVolDriver err := p.Init(nil) if err != nil { logrus.Debugf("Error init'ing portworx driver: %v", err) diff --git a/vendor/github.com/libopenstorage/stork/drivers/volume/volume.go b/vendor/github.com/libopenstorage/stork/drivers/volume/volume.go index 5bd4b310d..acbfd2f6d 100644 --- a/vendor/github.com/libopenstorage/stork/drivers/volume/volume.go +++ b/vendor/github.com/libopenstorage/stork/drivers/volume/volume.go @@ -67,6 +67,16 @@ var ( CSIDriverName, KDMPDriverName, } + + orderedListOfDriversForRestore = []string{ + PortworxDriverName, + AWSDriverName, + AzureDriverName, + GCEDriverName, + LinstorDriverName, + CSIDriverName, + KDMPDriverName, + } ) // Driver defines an external volume driver interface. @@ -399,6 +409,24 @@ func GetPVDriver(pv *v1.PersistentVolume) (string, error) { } } +// GetPVDriverForRestore gets the driver associated with a PV. Returns ErrNotFound if the PV is +// not owned by any available driver +func GetPVDriverForRestore(pv *v1.PersistentVolume) (string, error) { + for _, driverName := range orderedListOfDriversForRestore { + driverInst, ok := volDrivers[driverName] + if !ok { + continue + } + if driverInst.OwnsPV(pv) { + return driverName, nil + } + } + return "", &errors.ErrNotSupported{ + Feature: "VolumeDriver", + Reason: fmt.Sprintf("PV %v provisioned using unsupported driver", pv.Name), + } +} + // ClusterPairNotSupported to be used by drivers that don't support pairing type ClusterPairNotSupported struct{} diff --git a/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationbackup.go b/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationbackup.go index d119c9d03..3da6b4452 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationbackup.go +++ b/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationbackup.go @@ -439,6 +439,32 @@ func (a *ApplicationBackupController) handle(ctx context.Context, backup *stork_ return nil } + // if stork got restarted we would have got IncludeResource Memory map cleaned up. + // Hence re-create memory map of it. + if IsBackupObjectTypeVirtualMachine(backup) && + backup.Status.Stage != stork_api.ApplicationBackupStageInitial && + backup.Status.Stage != stork_api.ApplicationBackupStageImportResource && + (len(a.vmIncludeResource[string(backup.UID)]) == 0 || len(a.vmIncludeResourceMap[string(backup.UID)]) == 0) { + logrus.Infof("Stork seems restarted, repopulating VM resource Map for backup %v", backup.Name) + // First VMs from various filters provided. + vmList, objectMap, err := resourcecollector.GetVMIncludeListFromBackup(backup) + if err != nil { + logrus.Debugf("failed to import VM resources, after stork reboot. returning for retry") + return err + } + nsMap := make(map[string]bool) + // Second fetch VM resources from the list of filtered VMs and freeze/thaw rule for each of them. + // also set SkipVmAutoExecRules to true as we dont need to recreate it at this stage. + skipVmAutoRuleCommands := true + vmIncludeResources, objectMap, _, _ := resourcecollector.GetVMIncludeResourceInfoList(vmList, + objectMap, nsMap, skipVmAutoRuleCommands) + + // update in memory data structure for later use. + a.vmIncludeResourceMap[string(backup.UID)] = objectMap + a.vmIncludeResource[string(backup.UID)] = vmIncludeResources + a.vmNsListMap[string(backup.UID)] = nsMap + } + switch backup.Status.Stage { case stork_api.ApplicationBackupStageInitial: // Validate parameters @@ -750,6 +776,12 @@ func (a *ApplicationBackupController) backupVolumes(backup *stork_api.Applicatio var objectMap map[stork_api.ObjectInfo]bool if IsBackupObjectTypeVirtualMachine(backup) { objectMap = a.vmIncludeResourceMap[string(backup.UID)] + if len(objectMap) == 0 { + // for debugging purpose only. + // Its possible that will have empty rsources to backup during schedule backups due + // to vm or namespace being deleted. + logrus.Warnf("found empty includeResources for VM backup during volumeBakup stage") + } } else { objectMap = stork_api.CreateObjectsMap(backup.Spec.IncludeResources) } @@ -1559,7 +1591,6 @@ func (a *ApplicationBackupController) uploadObject( if err != nil { return err } - _, err = writer.Write(data) if err != nil { closeErr := writer.Close() @@ -1776,10 +1807,15 @@ func (a *ApplicationBackupController) backupResources( var objectMap map[stork_api.ObjectInfo]bool if IsBackupObjectTypeVirtualMachine(backup) { objectMap = a.vmIncludeResourceMap[string(backup.UID)] + if len(objectMap) == 0 { + // for debugging purpose + // its possible we will not have any resources during schedule backups due + // vm or namespace deletions + logrus.Warnf("found empty resources for VM backup during resourceBackup stage...") + } } else { objectMap = stork_api.CreateObjectsMap(backup.Spec.IncludeResources) } - namespacelist := backup.Spec.Namespaces // GetResources takes more time, if we have more number of namespaces // So, submitting it in batches and in between each batch, @@ -1927,7 +1963,6 @@ func (a *ApplicationBackupController) backupResources( } return nil } - // Do any additional preparation for the resources if required if err = a.prepareResources(backup, allObjects); err != nil { message := fmt.Sprintf("Error preparing resources for backup: %v", err) diff --git a/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationrestore.go b/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationrestore.go index c7f302a89..817233ea8 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationrestore.go +++ b/vendor/github.com/libopenstorage/stork/pkg/applicationmanager/controllers/applicationrestore.go @@ -611,6 +611,7 @@ func (a *ApplicationRestoreController) restoreVolumes(restore *storkapi.Applicat pvcCount := 0 restoreDone := 0 backupVolumeInfoMappings := make(map[string][]*storkapi.ApplicationBackupVolumeInfo) + hasPXDdriver := false for _, namespace := range backup.Spec.Namespaces { if _, ok := restore.Spec.NamespaceMapping[namespace]; !ok { continue @@ -648,6 +649,9 @@ func (a *ApplicationRestoreController) restoreVolumes(restore *storkapi.Applicat backupVolumeInfoMappings[volumeBackup.DriverName] = make([]*storkapi.ApplicationBackupVolumeInfo, 0) } backupVolumeInfoMappings[volumeBackup.DriverName] = append(backupVolumeInfoMappings[volumeBackup.DriverName], volumeBackup) + if volumeBackup.DriverName == volume.PortworxDriverName { + hasPXDdriver = true + } } } if restore.Status.Volumes == nil { @@ -659,10 +663,25 @@ func (a *ApplicationRestoreController) restoreVolumes(restore *storkapi.Applicat logrus.Errorf("error in checking backuplocation type") return err } + + // Flag onlyPXDdriver to avoid creating a nfs-restore-pvc job for PXD driver(only) exclusively. + // This code checks if there's precisely one entry in the backupVolumeInfoMappings map. + // If such an entry exists, it sets onlyPXDdriver to true if "pxd" is the only driver in the map. + // If there are multiple drivers with PXD, we can still create a resourceExport CR as the count of restored volumes doesn't match the total count of PVCs to be restored. + // If there's only PXD with NFS, skipping resourceExport CR creation for volumes is necessary. + // This prevents duplicating restores, as both PVC counts will match after being initiated from driver.StartRestore in Stork context and marking stage as ApplicationRestoreStageApplications + // Otherwise, there would be redundant restoration of resources, once from nfs-restore-pvc and another from nfs-restore-resource, potentially marking the restore as partial even in new namespace restores. + var skipNfsForPxDOnlyDriver bool + if len(backupVolumeInfoMappings) == 1 { + _, skipNfsForPxDOnlyDriver = backupVolumeInfoMappings[volume.PortworxDriverName] + } if len(restore.Status.Volumes) != pvcCount { - // Here backupVolumeInfoMappings is framed based on driver name mapping, hence startRestore() - // gets called once per driver - if !nfs { + // Portworx driver restore is not supported via job as it can be a secured px volume + // to access the token, we need to run the driver.startRestore in the same context as the stork controller + // this check is equivalent to (if !nfs || (nfs && driverName == volume.PortworxDriverName)) + if !nfs || hasPXDdriver { + // Here backupVolumeInfoMappings is framed based on driver name mapping, hence startRestore() + // gets called once per driver for driverName, vInfos := range backupVolumeInfoMappings { backupVolInfos := vInfos driver, err := volume.Get(driverName) @@ -674,82 +693,95 @@ func (a *ApplicationRestoreController) restoreVolumes(restore *storkapi.Applicat if err != nil { return err } - // For each driver, check if it needs any additional resources to be - // restored before starting the volume restore - objects, err := a.downloadResources(backup, restore.Spec.BackupLocation, restore.Namespace) - if err != nil { - log.ApplicationRestoreLog(restore).Errorf("Error downloading resources: %v", err) - return err - } - // Skip pv/pvc if replacepolicy is set to retain to avoid creating - if restore.Spec.ReplacePolicy == storkapi.ApplicationRestoreReplacePolicyRetain { - backupVolInfos, existingRestoreVolInfos, err = a.skipVolumesFromRestoreList(restore, objects, driver, vInfos) - if err != nil { - log.ApplicationRestoreLog(restore).Errorf("Error while checking pvcs: %v", err) - return err + var preRestoreObjects []runtime.Unstructured + // this check is equivalent to (if nfs && driverName == volume.PortworxDriverName) + if nfs { + if restore.Spec.ReplacePolicy == storkapi.ApplicationRestoreReplacePolicyRetain { + // Skip pv/pvc if replacepolicy is set to retain to avoid creating with empty object + backupVolInfos, existingRestoreVolInfos, err = a.skipVolumesFromRestoreList(restore, nil, driver, vInfos) + if err != nil { + log.ApplicationRestoreLog(restore).Errorf("Error while checking pvcs: %v", err) + return err + } } - } - var storageClassesBytes []byte - if driverName == "csi" { - storageClassesBytes, err = a.downloadObject(backup, backup.Spec.BackupLocation, backup.Namespace, "storageclasses.json", false) + } else { + // For each driver, check if it needs any additional resources to be + // restored before starting the volume restore + objects, err := a.downloadResources(backup, restore.Spec.BackupLocation, restore.Namespace) if err != nil { - log.ApplicationRestoreLog(restore).Errorf("Error in a.downloadObject %v", err) + log.ApplicationRestoreLog(restore).Errorf("Error downloading resources: %v", err) return err } - } - preRestoreObjects, err := driver.GetPreRestoreResources(backup, restore, objects, storageClassesBytes) - if err != nil { - log.ApplicationRestoreLog(restore).Errorf("Error getting PreRestore Resources: %v", err) - return err - } - - // Pre-delete resources for CSI driver - if (driverName == "csi" || driverName == "kdmp") && restore.Spec.ReplacePolicy == storkapi.ApplicationRestoreReplacePolicyDelete { - objectMap := storkapi.CreateObjectsMap(restore.Spec.IncludeResources) - objectBasedOnIncludeResources := make([]runtime.Unstructured, 0) - var opts resourcecollector.Options - for _, o := range objects { - skip, err := a.resourceCollector.PrepareResourceForApply( - o, - objects, - objectMap, - restore.Spec.NamespaceMapping, - nil, // no need to set storage class mappings at this stage - nil, - restore.Spec.IncludeOptionalResourceTypes, - nil, - &opts, - restore.Spec.BackupLocation, - restore.Namespace, - ) + // Skip pv/pvc if replacepolicy is set to retain to avoid creating + if restore.Spec.ReplacePolicy == storkapi.ApplicationRestoreReplacePolicyRetain { + backupVolInfos, existingRestoreVolInfos, err = a.skipVolumesFromRestoreList(restore, objects, driver, vInfos) if err != nil { + log.ApplicationRestoreLog(restore).Errorf("Error while checking pvcs: %v", err) return err } - if !skip { - objectBasedOnIncludeResources = append( - objectBasedOnIncludeResources, - o, - ) + } + var storageClassesBytes []byte + if driverName == "csi" { + storageClassesBytes, err = a.downloadObject(backup, backup.Spec.BackupLocation, backup.Namespace, "storageclasses.json", false) + if err != nil { + log.ApplicationRestoreLog(restore).Errorf("Error in a.downloadObject %v", err) + return err } } - tempObjects, err := a.getNamespacedObjectsToDelete( - restore, - objectBasedOnIncludeResources, - ) + preRestoreObjects, err = driver.GetPreRestoreResources(backup, restore, objects, storageClassesBytes) if err != nil { + log.ApplicationRestoreLog(restore).Errorf("Error getting PreRestore Resources: %v", err) return err } - err = a.resourceCollector.DeleteResources( - a.dynamicInterface, - tempObjects, updateCr) - if err != nil { - return err + + // Pre-delete resources for CSI driver + if (driverName == "csi" || driverName == "kdmp") && restore.Spec.ReplacePolicy == storkapi.ApplicationRestoreReplacePolicyDelete { + objectMap := storkapi.CreateObjectsMap(restore.Spec.IncludeResources) + objectBasedOnIncludeResources := make([]runtime.Unstructured, 0) + var opts resourcecollector.Options + for _, o := range objects { + skip, err := a.resourceCollector.PrepareResourceForApply( + o, + objects, + objectMap, + restore.Spec.NamespaceMapping, + nil, // no need to set storage class mappings at this stage + nil, + restore.Spec.IncludeOptionalResourceTypes, + nil, + &opts, + restore.Spec.BackupLocation, + restore.Namespace, + ) + if err != nil { + return err + } + if !skip { + objectBasedOnIncludeResources = append( + objectBasedOnIncludeResources, + o, + ) + } + } + tempObjects, err := a.getNamespacedObjectsToDelete( + restore, + objectBasedOnIncludeResources, + ) + if err != nil { + return err + } + err = a.resourceCollector.DeleteResources( + a.dynamicInterface, + tempObjects, updateCr) + if err != nil { + return err + } } - } - // pvc creation is not part of kdmp - if driverName != volume.KDMPDriverName { - if err := a.applyResources(restore, preRestoreObjects, updateCr); err != nil { - return err + // pvc creation is not part of kdmp + if driverName != volume.KDMPDriverName { + if err := a.applyResources(restore, preRestoreObjects, updateCr); err != nil { + return err + } } } restore, err = a.updateRestoreCRInVolumeStage( @@ -841,11 +873,11 @@ func (a *ApplicationRestoreController) restoreVolumes(restore *storkapi.Applicat return err } } - } } - // Check whether ResourceExport is present or not - if nfs { + // If NFS we create resourceExportCR but we will ensure to ignore PX volumes in the restore + // If only PXD driver is present, we will not create PVC job as that is taken care in above loop + if nfs && !skipNfsForPxDOnlyDriver { err = a.client.Update(context.TODO(), restore) if err != nil { time.Sleep(retrySleep) @@ -1319,7 +1351,6 @@ func getNamespacedPVCLocation(pvc *v1.PersistentVolumeClaim) string { // getPVCToPVMapping constructs a mapping of PVC name/namespace to PV objects func getPVCToPVMapping(allObjects []runtime.Unstructured) (map[string]*v1.PersistentVolume, error) { - // Get mapping of PVC name to PV name pvNameToPVCName := make(map[string]string) for _, o := range allObjects { @@ -1365,7 +1396,7 @@ func getPVCToPVMapping(allObjects []runtime.Unstructured) (map[string]*v1.Persis } func isGenericCSIPersistentVolume(pv *v1.PersistentVolume) (bool, error) { - driverName, err := volume.GetPVDriver(pv) + driverName, err := volume.GetPVDriverForRestore(pv) if err != nil { return false, err } @@ -1407,21 +1438,26 @@ func (a *ApplicationRestoreController) skipVolumesFromRestoreList( logrus.Infof("skipping namespace %s for restore", bkupVolInfo.Namespace) continue } + ns := val + var pvcName string // Declare the pvcName variable + if objects != nil { + // get corresponding pvc object from objects list + pvcObject, err := volume.GetPVCFromObjects(objects, bkupVolInfo) + if err != nil { + return newVolInfos, existingInfos, err + } + pvcName = pvcObject.Name - // get corresponding pvc object from objects list - pvcObject, err := volume.GetPVCFromObjects(objects, bkupVolInfo) - if err != nil { - return newVolInfos, existingInfos, err + } else { + pvcName = bkupVolInfo.PersistentVolumeClaim } - - ns := val - pvc, err := core.Instance().GetPersistentVolumeClaim(pvcObject.Name, ns) + pvc, err := core.Instance().GetPersistentVolumeClaim(pvcName, ns) if err != nil { if k8s_errors.IsNotFound(err) { newVolInfos = append(newVolInfos, bkupVolInfo) continue } - return newVolInfos, existingInfos, fmt.Errorf("erorr getting pvc %s/%s: %v", ns, pvcObject.Name, err) + return newVolInfos, existingInfos, fmt.Errorf("error getting pvc %s/%s: %v", ns, pvcName, err) // Update the error message } pvName := pvc.Spec.VolumeName var zones []string @@ -1599,10 +1635,24 @@ func (a *ApplicationRestoreController) applyResources( namespacedName := types.NamespacedName{} namespacedName.Namespace = restore.Namespace namespacedName.Name = restore.Name - - pvNameMappings, err := a.getPVNameMappings(restore, objects) - if err != nil { - return err + // The applyResources is getting called in both the volume stage and resource stage. + // In the volume stage, it is getting called for applying the preRestore object. + // During the volume stage, we will not have restoreVolume updated in the volumeInfo structure. + // In between two driver's PVC restore processing, there is a chance that applicationrestore CR will status.VolumeInfo + // updated with the basic information of the volume, with out restoreVolume name. + // List of prerestore resource for each driver is as follow: + // aws, azure, gke driver does not have any preRestore object. + // kdmp - restore PVC spec but we do not apply it in the volume stage, as we do not call applyResource for kdmp case. + // PXD - for px volumes, we apply the secrets of encrypted volumes. + // That means , we do not need to call getPVNameMappings during volume stage. + // So, avoiding the call to getPVNameMappings, if it getting called from volume stage. + var pvNameMappings map[string]string + var err error + if restore.Status.Stage != storkapi.ApplicationRestoreStageVolumes { + pvNameMappings, err = a.getPVNameMappings(restore, objects) + if err != nil { + return err + } } objectMap := storkapi.CreateObjectsMap(restore.Spec.IncludeResources) tempObjects := make([]runtime.Unstructured, 0) @@ -1903,7 +1953,7 @@ func (a *ApplicationRestoreController) restoreResources( logrus.Debugf("resource export: %s, status: %s", resourceExport.Name, resourceExport.Status.Status) switch resourceExport.Status.Status { case kdmpapi.ResourceExportStatusFailed: - message = fmt.Sprintf("Error applying resources: %v", err) + message = fmt.Sprintf("Error applying resources: %v", resourceExport.Status.Reason) restore.Status.Status = storkapi.ApplicationRestoreStatusFailed restore.Status.Stage = storkapi.ApplicationRestoreStageFinal restore.Status.Reason = message @@ -2305,7 +2355,7 @@ func (a *ApplicationRestoreController) processVMResourcesForVMRestoreFromNFS(res logrus.Debugf("resource export: %s, status: %s", resourceExport.Name, resourceExport.Status.Status) switch resourceExport.Status.Status { case kdmpapi.ResourceExportStatusFailed: - message = fmt.Sprintf("Error applying resources: %v", err) + message = fmt.Sprintf("Error applying resources: %v", resourceExport.Status.Reason) restore.Status.Status = storkapi.ApplicationRestoreStatusFailed restore.Status.Stage = storkapi.ApplicationRestoreStageFinal restore.Status.Reason = message diff --git a/vendor/github.com/libopenstorage/stork/pkg/cache/cache.go b/vendor/github.com/libopenstorage/stork/pkg/cache/cache.go index 4061116f2..41a0c6795 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/cache/cache.go +++ b/vendor/github.com/libopenstorage/stork/pkg/cache/cache.go @@ -43,8 +43,8 @@ type cache struct { } var ( - cacheLock sync.Mutex - sharedInformerCache *cache + cacheLock sync.Mutex + instance SharedInformerCache cacheNotInitializedErr = "shared informer cache has not been initialized yet" ) @@ -52,7 +52,7 @@ var ( func CreateSharedInformerCache(mgr manager.Manager) error { cacheLock.Lock() defer cacheLock.Unlock() - if sharedInformerCache != nil { + if instance != nil { return fmt.Errorf("shared informer cache already initialized") } config, err := rest.InClusterConfig() @@ -71,7 +71,19 @@ func CreateSharedInformerCache(mgr manager.Manager) error { currPod.Name = podResource.Name currPod.Namespace = podResource.Namespace - currPod.Spec.Volumes = podResource.Spec.Volumes + // Only store volumes which we care about. + for _, podVolume := range podResource.Spec.Volumes { + if podVolume.PersistentVolumeClaim != nil { + currPod.Spec.Volumes = append(currPod.Spec.Volumes, podVolume) + } else if podVolume.PortworxVolume != nil { + currPod.Spec.Volumes = append(currPod.Spec.Volumes, podVolume) + } else if podVolume.Ephemeral != nil { + currPod.Spec.Volumes = append(currPod.Spec.Volumes, podVolume) + } else if podVolume.CSI != nil { + currPod.Spec.Volumes = append(currPod.Spec.Volumes, podVolume) + } + } + currPod.Spec.Containers = podResource.Spec.Containers currPod.Spec.NodeName = podResource.Spec.NodeName @@ -83,7 +95,9 @@ func CreateSharedInformerCache(mgr manager.Manager) error { }, } - sharedInformerCache = &cache{} + sharedInformerCache := &cache{} + // Set the global instance + instance = sharedInformerCache sharedInformerCache.controllerCache, err = controllercache.New(config, controllercache.Options{ Scheme: mgr.GetScheme(), TransformByObject: transformMap, @@ -103,7 +117,14 @@ func CreateSharedInformerCache(mgr manager.Manager) error { func Instance() SharedInformerCache { cacheLock.Lock() defer cacheLock.Unlock() - return sharedInformerCache + return instance +} + +// Only used for UTs +func SetTestInstance(s SharedInformerCache) { + cacheLock.Lock() + defer cacheLock.Unlock() + instance = s } // GetStorageClass returns the storage class if present in the cache. diff --git a/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/persistentvolume.go b/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/persistentvolume.go index 982288fa4..89b889e76 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/persistentvolume.go +++ b/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/persistentvolume.go @@ -207,7 +207,7 @@ func (r *ResourceCollector) preparePVResourceForApply( // checks proper driver by looking at pv name if driverName == "" { var err error - driverName, err = volume.GetPVDriver(&pv) + driverName, err = volume.GetPVDriverForRestore(&pv) if err != nil { return false, err } diff --git a/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/resourcecollector.go b/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/resourcecollector.go index 3114ed28b..ccf244353 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/resourcecollector.go +++ b/vendor/github.com/libopenstorage/stork/pkg/resourcecollector/resourcecollector.go @@ -3,7 +3,6 @@ package resourcecollector import ( "context" "fmt" - "reflect" "strconv" "strings" "time" @@ -241,7 +240,7 @@ func (r *ResourceCollector) GetResourceTypes( var crdResources []metav1.GroupVersionKind var crdList *stork_api.ApplicationRegistrationList storkcache.Instance() - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { crdList, err = storkcache.Instance().ListApplicationRegistrations() } else { crdList, err = r.storkOps.ListApplicationRegistrations() @@ -359,7 +358,7 @@ func (r *ResourceCollector) GetResourcesForType( return nil, nil, err } var crdList *stork_api.ApplicationRegistrationList - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { crdList, err = storkcache.Instance().ListApplicationRegistrations() } else { crdList, err = r.storkOps.ListApplicationRegistrations() @@ -407,7 +406,7 @@ func (r *ResourceCollector) GetResourcesExcludingTypes( resourceMap := make(map[types.UID]bool) var crdResources []metav1.GroupVersionKind var crdList *stork_api.ApplicationRegistrationList - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { crdList, err = storkcache.Instance().ListApplicationRegistrations() } else { crdList, err = r.storkOps.ListApplicationRegistrations() @@ -508,7 +507,7 @@ func (r *ResourceCollector) GetResources( resourceMap := make(map[types.UID]bool) var crdResources []metav1.GroupVersionKind var crdList *stork_api.ApplicationRegistrationList - if !reflect.ValueOf(storkcache.Instance()).IsNil() { + if storkcache.Instance() != nil { crdList, err = storkcache.Instance().ListApplicationRegistrations() } else { crdList, err = r.storkOps.ListApplicationRegistrations() diff --git a/vendor/github.com/libopenstorage/stork/pkg/snapshotter/snapshotter_csi.go b/vendor/github.com/libopenstorage/stork/pkg/snapshotter/snapshotter_csi.go index 145592c05..0c0f2c88d 100644 --- a/vendor/github.com/libopenstorage/stork/pkg/snapshotter/snapshotter_csi.go +++ b/vendor/github.com/libopenstorage/stork/pkg/snapshotter/snapshotter_csi.go @@ -49,11 +49,9 @@ const ( skipResourceAnnotation = "stork.libopenstorage.org/skip-resource" // defaultSnapshotTimeout represents the duration to wait before timing out on snapshot completion - defaultSnapshotTimeout = time.Minute * 5 + defaultSnapshotTimeout = time.Minute * 30 // SnapshotTimeoutKey represents the duration to wait before timing out on snapshot completion SnapshotTimeoutKey = "SNAPSHOT_TIMEOUT" - // restoreTimeout is the duration to wait before timing out the restore - restoreTimeout = time.Minute * 5 // snapDeleteAnnotation needs to be set if volume snapshot is scheduled for deletion snapDeleteAnnotation = "snapshotScheduledForDeletion" // snapRestoreAnnotation needs to be set if volume snapshot is scheduled for restore @@ -741,7 +739,12 @@ func (c *csiDriver) RestoreStatus(pvcName, namespace string) (RestoreInfo, error } } } - + // Lets use SNAPSHOT_TIMEOUT for restoreTimeout as well. + restoreTimeout, err := getSnapshotTimeout() + if err == nil { + logrus.Warnf("failed to obtain timeout value for snapshot %s: %v, falling back on default snapshot timeout value %s", vsName, err, defaultSnapshotTimeout.String()) + restoreTimeout = defaultSnapshotTimeout + } if time.Now().After(pvc.CreationTimestamp.Add(restoreTimeout)) { restoreInfo.Status = StatusFailed restoreInfo.Reason = formatReasonErrorMessage(fmt.Sprintf("PVC restore timeout out after %s", restoreTimeout.String()), vsError, vscError) diff --git a/vendor/modules.txt b/vendor/modules.txt index 3cc2de9fd..d308a9487 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -441,7 +441,7 @@ github.com/libopenstorage/openstorage-sdk-clients/sdk/golang github.com/libopenstorage/secrets github.com/libopenstorage/secrets/aws/credentials github.com/libopenstorage/secrets/k8s -# github.com/libopenstorage/stork v1.4.1-0.20240424163629-d47b17267ad0 +# github.com/libopenstorage/stork v1.4.1-0.20240513102605-2340238c7664 ## explicit; go 1.21 github.com/libopenstorage/stork/drivers github.com/libopenstorage/stork/drivers/volume