From 1c878d45560a975516e9d0d8d00496b0f18865c5 Mon Sep 17 00:00:00 2001 From: Leela Srinivas <83946232+lsrinivas-pure@users.noreply.github.com> Date: Tue, 20 Aug 2024 20:07:38 +0530 Subject: [PATCH] Adding alert and path validation for svmotion test (#2738) --- drivers/node/node.go | 81 ++++++- drivers/node/vsphere/vsphere.go | 314 +++++++++------------------- drivers/scheduler/dcos/dcos.go | 8 + drivers/scheduler/k8s/k8s.go | 16 +- drivers/scheduler/scheduler.go | 6 +- tests/basic/misc_test.go | 88 ++++++++ tests/basic/storage_pool_test.go | 4 +- tests/basic/upgrade_cluster_test.go | 10 +- tests/common.go | 127 +++++++++++ tests/testTriggers.go | 97 +++++++-- 10 files changed, 499 insertions(+), 252 deletions(-) diff --git a/drivers/node/node.go b/drivers/node/node.go index cb67ab982..165bb642e 100644 --- a/drivers/node/node.go +++ b/drivers/node/node.go @@ -6,7 +6,6 @@ import ( "time" "github.com/libopenstorage/openstorage/api" - corev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" "github.com/portworx/torpedo/pkg/errors" "github.com/vmware/govmomi/object" ) @@ -129,6 +128,64 @@ type InitOptions struct { SpecDir string } +type DriveSet struct { + // Configs describes the configuration of the drives present in this set + // The key is the volumeID + Configs map[string]DriveConfig + // NodeID is the id of the node where the drive set is being used/last + // used + NodeID string + // ReservedInstanceID if set is the instance ID of the node that's attempting to transfer the driveset to itself + ReservedInstanceID string + // SchedulerNodeName is the name of the node in scheduler context + SchedulerNodeName string + // NodeIndex is the index of the node where the drive set is being + // used/last used + NodeIndex int + // CreateTimestamp is the timestamp when the drive set was created + CreateTimestamp time.Time + // InstanceID is the cloud provider id of the instance using this drive set + InstanceID string + // Zone defines the zone in which the node exists + Zone string + // State state of the drive set from the well defined states + State string + // Labels associated with this drive set + Labels *map[string]string `json:"labels"` +} + +// DriveConfig defines the configuration for a cloud drive +type DriveConfig struct { + // Type defines the type of cloud drive + Type string + // Size defines the size of the cloud drive in Gi + Size int64 + // ID is the cloud drive id + ID string + // Path is the path where the drive is attached + Path string + // Iops is the iops that the drive supports + Iops int64 + // Vpus provide a measure of disk resources available for + // performance (IOPS/GBs) of Oracle drives. + // Oracle uses VPU in lieu of disk types. + Vpus int64 + // PXType indicates how this drive is being used by PX + PXType string + // State state of the drive config from the well defined states + State string + // Labels associated with this drive config + Labels map[string]string `json:"labels"` + // AttachOptions for cloud drives to be attached + AttachOptions map[string]string + // Provisioner is a name of provisioner which was used to create a drive + Provisioner string + // Encryption Key string to be passed in device specs + EncryptionKeyInfo string + // UUID of VMDK + DiskUUID string +} + // Driver provides the node driver interface type Driver interface { // Init initializes the node driver under the given scheduler @@ -217,7 +274,7 @@ type Driver interface { AddMachine(machineName string) error // DetachDisk vdisk from node. - DetachDrivesFromVM(stc *corev1.StorageCluster, nodeName string) error + DetachDrivesFromVM(nodeName string, configData map[string]DriveSet) error //GetCompatibleDatastores GetCompatibleDatastores(portworxNamespace string, datastoreNames []string) ([]*object.Datastore, error) @@ -257,7 +314,10 @@ type Driver interface { GetSupportedDriveTypes() ([]string, error) // StorageVmotion selectively relocates specific disks of a virtual machine to a new datastore - StorageVmotion(ctx context.Context, node Node, portworxNamespace string, moveAllDisks bool) error + StorageVmotion(ctx context.Context, node Node, portworxNamespace string, moveAllDisks bool) (*object.Datastore, error) + + // GetUUIDFromVMDKPath returns the UUID of the VMDK file + GetUUIDFromVMDKPath(ctx context.Context, node Node, vmdkPath string) (string, error) // findVMByName finds a virtual machine by its name FindVMByName(vmName string) (*object.VirtualMachine, error) @@ -311,7 +371,7 @@ func (d *notSupportedDriver) RebootNode(node Node, options RebootNodeOpts) error } } -func (d *notSupportedDriver) DetachDrivesFromVM(stc *corev1.StorageCluster, nodeName string) error { +func (d *notSupportedDriver) DetachDrivesFromVM(nodeName string, configData map[string]DriveSet) error { return &errors.ErrNotSupported{ Type: "Function", Operation: "DetachDrivesFromVM()", @@ -581,13 +641,20 @@ func (d *notSupportedDriver) RemoveNonRootDisks(node Node) error { } } -func (d *notSupportedDriver) StorageVmotion(ctx context.Context, node Node, portworxNamespace string, moveAllDisks bool) error { - return &errors.ErrNotSupported{ +func (d *notSupportedDriver) StorageVmotion(ctx context.Context, node Node, portworxNamespace string, moveAllDisks bool) (*object.Datastore, error) { + return nil, &errors.ErrNotSupported{ Type: "Function", Operation: "StorageVmotion()", } } +func (d *notSupportedDriver) GetUUIDFromVMDKPath(ctx context.Context, node Node, vmdkPath string) (string, error) { + return "", &errors.ErrNotSupported{ + Type: "Function", + Operation: "GetUUIDFromVMDKPath()", + } +} + func (d *notSupportedDriver) FindVMByName(vmName string) (*object.VirtualMachine, error) { return nil, &errors.ErrNotSupported{ Type: "Function", @@ -600,4 +667,4 @@ func (d *notSupportedDriver) FindDatastoreByName(dsName string) (*object.Datasto Type: "Function", Operation: "FindDatastoreByName()", } -} +} \ No newline at end of file diff --git a/drivers/node/vsphere/vsphere.go b/drivers/node/vsphere/vsphere.go index df8bb6371..45720ce44 100644 --- a/drivers/node/vsphere/vsphere.go +++ b/drivers/node/vsphere/vsphere.go @@ -2,7 +2,6 @@ package vsphere import ( "context" - "encoding/json" "fmt" "net/url" "os" @@ -12,11 +11,6 @@ import ( "strings" "time" - pxutil "github.com/libopenstorage/operator/drivers/storage/portworx/util" - corev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" - operatorcorev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" - coreops "github.com/portworx/sched-ops/k8s/core" - "github.com/portworx/sched-ops/k8s/operator" "github.com/portworx/sched-ops/task" "github.com/portworx/torpedo/drivers/node" "github.com/portworx/torpedo/drivers/node/ssh" @@ -26,7 +20,6 @@ import ( "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" - v1 "k8s.io/api/core/v1" ) const ( @@ -52,64 +45,6 @@ const ( VMReadyRetryInterval = 5 * time.Second ) -type DriveSet struct { - // Configs describes the configuration of the drives present in this set - // The key is the volumeID - Configs map[string]DriveConfig - // NodeID is the id of the node where the drive set is being used/last - // used - NodeID string - // ReservedInstanceID if set is the instance ID of the node that's attempting to transfer the driveset to itself - ReservedInstanceID string - // SchedulerNodeName is the name of the node in scheduler context - SchedulerNodeName string - // NodeIndex is the index of the node where the drive set is being - // used/last used - NodeIndex int - // CreateTimestamp is the timestamp when the drive set was created - CreateTimestamp time.Time - // InstanceID is the cloud provider id of the instance using this drive set - InstanceID string - // Zone defines the zone in which the node exists - Zone string - // State state of the drive set from the well defined states - State string - // Labels associated with this drive set - Labels *map[string]string `json:"labels"` -} - -// DriveConfig defines the configuration for a cloud drive -type DriveConfig struct { - // Type defines the type of cloud drive - Type string - // Size defines the size of the cloud drive in Gi - Size int64 - // ID is the cloud drive id - ID string - // Path is the path where the drive is attached - Path string - // Iops is the iops that the drive supports - Iops int64 - // Vpus provide a measure of disk resources available for - // performance (IOPS/GBs) of Oracle drives. - // Oracle uses VPU in lieu of disk types. - Vpus int64 - // PXType indicates how this drive is being used by PX - PXType string - // State state of the drive config from the well defined states - State string - // Labels associated with this drive config - Labels map[string]string `json:"labels"` - // AttachOptions for cloud drives to be attached - AttachOptions map[string]string - // Provisioner is a name of provisioner which was used to create a drive - Provisioner string - // Encryption Key string to be passed in device specs - EncryptionKeyInfo string - // UUID of VMDK - DiskUUID string -} - // DrivePaths stores the device paths of the disks which will be used by PX. type DrivePaths struct { // Storage drives @@ -269,35 +204,17 @@ func (v *vsphere) getVMFinder() (*find.Finder, error) { } // GetCompatibleDatastores get matching prefix datastores -func (v *vsphere) GetCompatibleDatastores(portworxNamespace string, datastoreNames []string) ([]*object.Datastore, error) { +func (v *vsphere) GetCompatibleDatastores(prefixName string, datastoreNames []string) ([]*object.Datastore, error) { var err error datastores, err := v.GetDatastoresFromDatacenter() if err != nil { return nil, err } - var stc *operatorcorev1.StorageCluster - pxOperator := operator.Instance() - stcList, err := pxOperator.ListStorageClusters(portworxNamespace) - if err != nil { - return nil, fmt.Errorf("Failed to find storage clusters %v ", err) - } - var selectedDatastore []*object.Datastore - stc, err = pxOperator.GetStorageCluster(stcList.Items[0].Name, stcList.Items[0].Namespace) - if err != nil { - return nil, fmt.Errorf("Failed to find storage cluster %v in namespace %s ", err, portworxNamespace) - } - var envVariables []v1.EnvVar - envVariables = stc.Spec.CommonConfig.Env - var prefixName string - for _, envVar := range envVariables { - if envVar.Name == "VSPHERE_DATASTORE_PREFIX" { - prefixName = envVar.Value - log.Infof("prefixName %s ", prefixName) - } - } + if prefixName == "" { return nil, fmt.Errorf("Failed to find VSPHERE_DATASTORE_PREFIX prefix ") } + var selectedDatastore []*object.Datastore for _, ds := range datastores { if strings.HasPrefix(ds.Name(), prefixName) { log.Infof("Prefix match found for datastore Name %v ", ds.Name()) @@ -380,13 +297,9 @@ func (v *vsphere) connect() error { return nil } -// DetachDisk vdisks from node. -func (v *vsphere) DetachDrivesFromVM(stc *corev1.StorageCluster, nodeName string) error { - configData, err := GetCloudDriveConfigmapData(stc) - if err != nil { - err = fmt.Errorf("Failed to find configData: err %w", err) - return err - } +// DetachDrivesFromVM detach vdisks from node. +func (v *vsphere) DetachDrivesFromVM(nodeName string, configData map[string]node.DriveSet) error { + //Find out the instance VMUUID and then dettach. for _, nodeConfigData := range configData { if nodeName == nodeConfigData.SchedulerNodeName { @@ -394,7 +307,7 @@ func (v *vsphere) DetachDrivesFromVM(stc *corev1.StorageCluster, nodeName string instanceId := nodeConfigData.InstanceID for i := 0; i < len(allDiskPaths); i++ { log.Infof("Diskpath for %v is %v and instance id is %v", nodeConfigData.NodeID, allDiskPaths[i], instanceId) - err = v.DetachDisk(instanceId, allDiskPaths[i]) + err := v.DetachDisk(instanceId, allDiskPaths[i]) if err != nil { //log.InfoD("Detach drives from the node failed %v", err) err = fmt.Errorf("Detaching disk: %s on node %s failed: %w", allDiskPaths[i], nodeName, err) @@ -402,7 +315,7 @@ func (v *vsphere) DetachDrivesFromVM(stc *corev1.StorageCluster, nodeName string } } } else { - log.Infof(" Node Name from config %s, expected %s ", nodeConfigData.SchedulerNodeName, nodeName) + log.Infof("Node Name from config %s, expected %s ", nodeConfigData.SchedulerNodeName, nodeName) } } return nil @@ -467,10 +380,10 @@ func matchVirtualDiskAndVolPath(diskPath, volPath string) bool { return diskPath == volPath } -// Get virtual disk path. +// GetDiskPaths return virtual disks path. // TODO need to filter only of type: DrivePaths -func GetDiskPaths(driveset DriveSet) []string { - diskPaths := []string{} +func GetDiskPaths(driveset node.DriveSet) []string { + var diskPaths []string for vmdkPath, configs := range driveset.Configs { //TODO need to change later log.InfoD("PX type %s ", configs.PXType) @@ -491,7 +404,7 @@ func GetDiskPaths(driveset DriveSet) []string { } // GetDatastore -func GetDatastore(configs DriveConfig) string { +func GetDatastore(configs node.DriveConfig) string { for key, val := range configs.Labels { if key == "datastore" { return val @@ -500,19 +413,6 @@ func GetDatastore(configs DriveConfig) string { return "" } -// GetCloudDriveConfigmapData Get clouddrive configMap data. -func GetCloudDriveConfigmapData(cluster *corev1.StorageCluster) (map[string]DriveSet, error) { - cloudDriveConfigmapName := pxutil.GetCloudDriveConfigMapName(cluster) - var PortworxNamespace = "kube-system" - cloudDriveConfifmap, _ := coreops.Instance().GetConfigMap(cloudDriveConfigmapName, PortworxNamespace) - var configData map[string]DriveSet - err := json.Unmarshal([]byte(cloudDriveConfifmap.Data["cloud-drive"]), &configData) - if err != nil { - return nil, err - } - return configData, nil -} - // AddVM adds a new VM object to vmMap func (v *vsphere) AddMachine(vmName string) error { var f *find.Finder @@ -866,39 +766,23 @@ func (v *vsphere) RemoveNonRootDisks(n node.Node) error { // StorageVmotion relocates the largest disks of a VM from one datastore to another within the same prefix group // With moveAllDisks true we will be moving all disks attached to a VM onto same Datastore // If moveAllDisks is set to False then we will choose the largest sized disk and move that only to a new Datastore -func (v *vsphere) StorageVmotion(ctx context.Context, node node.Node, portworxNamespace string, moveAllDisks bool) error { +func (v *vsphere) StorageVmotion(ctx context.Context, node node.Node, datastorePrefix string, moveAllDisks bool) (*object.Datastore, error) { log.Infof("Trying to find the VM on vSphere: %v", node.Name) vm, err := v.FindVMByIP(node) if err != nil { - return fmt.Errorf("error retrieving VM: %v", err) + return nil, fmt.Errorf("error retrieving VM: %v", err) } var vmProps mo.VirtualMachine err = vm.Properties(ctx, vm.Reference(), []string{"config.hardware"}, &vmProps) if err != nil { - return fmt.Errorf("error retrieving VM properties: %v", err) + return nil, fmt.Errorf("error retrieving VM properties: %v", err) } log.Infof("Trying to fetch all compatible Datastores with the prefix that is set in Px Storage Class spec") - compatibleDatastores, err := v.GetCompatibleDatastores(portworxNamespace, []string{}) + compatibleDatastores, err := v.GetCompatibleDatastores(datastorePrefix, []string{}) if err != nil { - return fmt.Errorf("error retrieving compatible datastores: %v", err) - } - - var stc *operatorcorev1.StorageCluster - pxOperator := operator.Instance() - stcList, err := pxOperator.ListStorageClusters(portworxNamespace) - if err != nil { - return fmt.Errorf("Failed to find storage clusters %v ", err) - } - stc, err = pxOperator.GetStorageCluster(stcList.Items[0].Name, stcList.Items[0].Namespace) - if err != nil { - return fmt.Errorf("Failed to find storage cluster %v in namespace %s ", err, portworxNamespace) - } - - preData, err := GetCloudDriveConfigmapData(stc) - if err != nil { - return fmt.Errorf("error fetching pre-vMotion cloud drive config: %v", err) + return nil, fmt.Errorf("error retrieving compatible datastores: %v", err) } originalDatastoreIDs := map[string]string{} @@ -916,24 +800,47 @@ func (v *vsphere) StorageVmotion(ctx context.Context, node node.Node, portworxNa largestDisks := findLargestDisksOnDatastores(vmProps.Config.Hardware.Device, compatibleDatastores) if len(largestDisks) == 0 { - return fmt.Errorf("no large disks found on specified prefix datastores") + return nil, fmt.Errorf("no large disks found on specified prefix datastores") } sourceDatastore := object.NewDatastore(vm.Client(), largestDisks[0].Datastore) targetDatastores, err = filterTargetDatastores(ctx, sourceDatastore, compatibleDatastores, &vmProps) if err != nil { - return fmt.Errorf("error filtering target datastores: %v", err) + return nil, fmt.Errorf("error filtering target datastores: %v", err) + } + + var targetDatastore *object.Datastore + if len(targetDatastores) == 0 { + return nil, fmt.Errorf("no compatible datastores available for storage vMotion") + } + if len(targetDatastores) > 1 { + maxAvailableSpace := int64(-1) + for _, ds := range targetDatastores { + var dsProps mo.Datastore + if err := ds.Properties(ctx, ds.Reference(), []string{"summary"}, &dsProps); err == nil { + if available := dsProps.Summary.FreeSpace; available > maxAvailableSpace { + maxAvailableSpace = available + targetDatastore = ds + } + } + } + } else { + targetDatastore = targetDatastores[0] + } + + if targetDatastore == nil { + return nil, fmt.Errorf("failed to select a target datastore") } if !moveAllDisks { - log.Infof("Trying to Move largest disk on VM %v from Datastore %v to Datastore : %v", node.Name, sourceDatastore.Name(), targetDatastores[0].Name()) - err = initiateStorageVmotion(ctx, vm, largestDisks[:1], targetDatastores) + log.Infof("Trying to Move largest disk on VM %v from Datastore %v to Datastore : %v", node.Name, sourceDatastore.Name(), targetDatastore.Name()) + err = initiateStorageVmotion(ctx, vm, largestDisks[:1], targetDatastore) if err != nil { - return fmt.Errorf("error during storage vMotion: %v", err) + return nil, fmt.Errorf("error during storage vMotion: %v", err) } } else { - log.Infof("Trying to Move all disks of %v from Datastore %v to Datastore : %v", node.Name, sourceDatastore.Name(), targetDatastores[0].Name()) + log.Infof("Trying to Move all disks of %v from Datastore %v to Datastore : %v", node.Name, sourceDatastore.Name(), targetDatastore.Name()) diskLocators := make([]types.VirtualMachineRelocateSpecDiskLocator, 0) for _, device := range vmProps.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { @@ -944,24 +851,63 @@ func (v *vsphere) StorageVmotion(ctx context.Context, node node.Node, portworxNa } } if len(diskLocators) == 0 { - return fmt.Errorf("no disks found on the VM") + return nil, fmt.Errorf("no disks found on the VM") } log.Infof("Going to trigger Storage Vmotion for %v", node.Name) - err = initiateStorageVmotion(ctx, vm, diskLocators, targetDatastores) + err = initiateStorageVmotion(ctx, vm, diskLocators, targetDatastore) if err != nil { - return fmt.Errorf("error during storage vMotion: %v", err) + return nil, fmt.Errorf("error during storage vMotion: %v", err) } log.Infof("Sleeping for a minute to let config map be updated with latest changes") time.Sleep(1 * time.Minute) - postData, err := GetCloudDriveConfigmapData(stc) - if err != nil { - return fmt.Errorf("error fetching post-vMotion cloud drive config: %v", err) + } + return targetDatastore, nil +} + +func (v *vsphere) GetUUIDFromVMDKPath(ctx context.Context, node node.Node, vmdkPath string) (string, error) { + + log.Infof("Trying to find the VM on vSphere: %v", node.Name) + vm, err := v.FindVMByIP(node) + if err != nil { + return "", fmt.Errorf("error retrieving VM: %v", err) + } + + var vmProps mo.VirtualMachine + err = vm.Properties(ctx, vm.Reference(), []string{"config.hardware"}, &vmProps) + if err != nil { + return "", fmt.Errorf("error retrieving VM properties: %v", err) + } + + diskUUID := "" + log.Infof("Getting UUID for disk %v on node %s", vmdkPath, node.VolDriverNodeID) + + for _, device := range vmProps.Config.Hardware.Device { + if disk, ok := device.(*types.VirtualDisk); ok { + if backing, ok := disk.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok { + if backing.FileName == vmdkPath { + diskUUID = backing.Uuid + break + } + } } - if !v.ValidateDatastoreUpdate(preData, postData, node.VolDriverNodeID, targetDatastores[0].Reference().Value) { - return fmt.Errorf("validation failed: datastore updates are not as expected") + } + + if len(diskUUID) > 0 { + // Remove whitespace character + uuidWithoutSpaces := strings.ReplaceAll(diskUUID, " ", "") + //Remove Hyphen + uuidWithoutSpacesAndHyphens := strings.ReplaceAll(uuidWithoutSpaces, "-", "") + // Check if the UUID length is valid + if len(uuidWithoutSpacesAndHyphens) != 32 { + return "", fmt.Errorf("invalid UUID length.Expected UUID to be of length 32bytes, got %v: %v", len(uuidWithoutSpacesAndHyphens), uuidWithoutSpacesAndHyphens) } + + // Insert hyphens at specific positions to match standard UUID format + formattedUUID := fmt.Sprintf("%s-%s-%s-%s-%s", uuidWithoutSpacesAndHyphens[0:8], uuidWithoutSpacesAndHyphens[8:12], uuidWithoutSpacesAndHyphens[12:16], uuidWithoutSpacesAndHyphens[16:20], uuidWithoutSpacesAndHyphens[20:]) + return strings.ToLower(formattedUUID), nil } - return nil + return "", fmt.Errorf("failed to find UUID for disk %v on node %s", vmdkPath, node.VolDriverNodeID) + } // Function to get the datastore's cluster @@ -1089,29 +1035,7 @@ func findLargestDisksOnDatastores(devices []types.BaseVirtualDevice, datastores } // initiateStorageVmotion starts the Storage vMotion process for the disks, targeting a specific datastore. -func initiateStorageVmotion(ctx context.Context, vm *object.VirtualMachine, diskLocators []types.VirtualMachineRelocateSpecDiskLocator, datastores []*object.Datastore) error { - var targetDatastore *object.Datastore - if len(datastores) == 0 { - return fmt.Errorf("no compatible datastores available for storage vMotion") - } - if len(datastores) > 1 { - maxAvailableSpace := int64(-1) - for _, ds := range datastores { - var dsProps mo.Datastore - if err := ds.Properties(ctx, ds.Reference(), []string{"summary"}, &dsProps); err == nil { - if available := dsProps.Summary.FreeSpace; available > maxAvailableSpace { - maxAvailableSpace = available - targetDatastore = ds - } - } - } - } else { - targetDatastore = datastores[0] - } - - if targetDatastore == nil { - return fmt.Errorf("failed to select a target datastore") - } +func initiateStorageVmotion(ctx context.Context, vm *object.VirtualMachine, diskLocators []types.VirtualMachineRelocateSpecDiskLocator, targetDatastore *object.Datastore) error { for i := range diskLocators { diskLocators[i].Datastore = targetDatastore.Reference() @@ -1121,11 +1045,11 @@ func initiateStorageVmotion(ctx context.Context, vm *object.VirtualMachine, disk Disk: diskLocators, } - task, err := vm.Relocate(ctx, relocateSpec, types.VirtualMachineMovePriorityDefaultPriority) + vcTask, err := vm.Relocate(ctx, relocateSpec, types.VirtualMachineMovePriorityDefaultPriority) if err != nil { return fmt.Errorf("error initiating VM relocate: %v", err) } - return task.Wait(ctx) + return vcTask.Wait(ctx) } // FindVMByName finds a virtual machine by its name. @@ -1188,46 +1112,4 @@ func (v *vsphere) FindVMByIP(node node.Node) (*object.VirtualMachine, error) { } } return nil, fmt.Errorf("no VM found with the given IP addresses: %v", node.Addresses) -} - -// ValidateDatastoreUpdate validates the cloud drive configmap after Storage vmotion is done -func (v *vsphere) ValidateDatastoreUpdate(preData, postData map[string]DriveSet, nodeUUID string, targetDatastoreID string) bool { - preNodeData, preExists := preData[nodeUUID] - postNodeData, postExists := postData[nodeUUID] - - if !preExists || !postExists { - return false - } - - postDiskDatastores := make(map[string]string) - for _, postDrive := range postNodeData.Configs { - postDiskDatastores[postDrive.DiskUUID] = postDrive.Labels["datastore"] - } - - allMoved := true - for _, preDrive := range preNodeData.Configs { - postDSName, exists := postDiskDatastores[preDrive.DiskUUID] - if !exists { - log.Infof("No post-migration data found for disk with UUID %v", preDrive.DiskUUID) - return false - } - - postDS, err := v.FindDatastoreByName(postDSName) - if err != nil { - log.Errorf("Failed to find datastore with name %v: %v", postDSName, err) - return false - } - postDSID := postDS.Reference().Value - - if !(postDSID == targetDatastoreID || (preDrive.Labels["datastore"] == postDSName && postDSID == targetDatastoreID)) { - log.Infof("Disk with UUID %v did not move to the target datastore %v as expected, or was not already there. This is for Node %v", preDrive.DiskUUID, targetDatastoreID, nodeUUID) - allMoved = false - } - } - - if allMoved { - log.Infof("Storage vMotion happened successfully for all disks") - } - - return allMoved -} +} \ No newline at end of file diff --git a/drivers/scheduler/dcos/dcos.go b/drivers/scheduler/dcos/dcos.go index 4ce13b197..00a3f43e4 100644 --- a/drivers/scheduler/dcos/dcos.go +++ b/drivers/scheduler/dcos/dcos.go @@ -3,6 +3,7 @@ package dcos import ( "encoding/json" "fmt" + v1 "github.com/libopenstorage/operator/pkg/apis/core/v1" "io/ioutil" "os" "path/filepath" @@ -1190,6 +1191,13 @@ func (d *dcos) StartKubelet(n node.Node, options node.SystemctlOpts) error { } } +func (d *dcos) GetPXCloudDriveConfigMap(cluster *v1.StorageCluster) (map[string]node.DriveSet, error) { + return nil, &errors.ErrNotSupported{ + Type: "Function", + Operation: "GetPXCloudDriveConfigMap()", + } +} + func init() { d := &dcos{} scheduler.Register(SchedName, d) diff --git a/drivers/scheduler/k8s/k8s.go b/drivers/scheduler/k8s/k8s.go index c8ead3dc2..1fb8bbdec 100644 --- a/drivers/scheduler/k8s/k8s.go +++ b/drivers/scheduler/k8s/k8s.go @@ -8,6 +8,7 @@ import ( "encoding/json" baseErrors "errors" "fmt" + pxutil "github.com/libopenstorage/operator/drivers/storage/portworx/util" "io" "io/ioutil" random "math/rand" @@ -36,6 +37,7 @@ import ( apapi "github.com/libopenstorage/autopilot-api/pkg/apis/autopilot/v1alpha1" osapi "github.com/libopenstorage/openstorage/api" "github.com/libopenstorage/openstorage/pkg/units" + operatorcorev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" storkapi "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1" admissionregistration "github.com/portworx/sched-ops/k8s/admissionregistration" "github.com/portworx/sched-ops/k8s/apiextensions" @@ -8757,6 +8759,18 @@ func rotateTopologyArray(options *scheduler.ScheduleOptions) { } } +// GetPXCloudDriveConfigMap retruns px cloud derive config map data +func (k *K8s) GetPXCloudDriveConfigMap(cluster *operatorcorev1.StorageCluster) (map[string]node.DriveSet, error) { + cloudDriveConfigmapName := pxutil.GetCloudDriveConfigMapName(cluster) + cloudDriveConfifmap, _ := k8sCore.GetConfigMap(cloudDriveConfigmapName, cluster.Namespace) + var configData map[string]node.DriveSet + err := json.Unmarshal([]byte(cloudDriveConfifmap.Data["cloud-drive"]), &configData) + if err != nil { + return nil, err + } + return configData, nil +} + func init() { k := &K8s{} scheduler.Register(SchedName, k) @@ -8810,4 +8824,4 @@ func createClonedStorageClassIfRequired(originalStorageClass *storageapi.Storage } } return clonedSCName, nil -} +} \ No newline at end of file diff --git a/drivers/scheduler/scheduler.go b/drivers/scheduler/scheduler.go index f6c110b0f..af7ae1595 100644 --- a/drivers/scheduler/scheduler.go +++ b/drivers/scheduler/scheduler.go @@ -11,6 +11,7 @@ import ( volsnapv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" snapv1 "github.com/kubernetes-incubator/external-storage/snapshot/pkg/apis/crd/v1" apapi "github.com/libopenstorage/autopilot-api/pkg/apis/autopilot/v1alpha1" + operatorcorev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" "github.com/portworx/torpedo/drivers/api" "github.com/portworx/torpedo/drivers/node" "github.com/portworx/torpedo/drivers/scheduler/spec" @@ -474,6 +475,9 @@ type Driver interface { StopKubelet(appNode node.Node, opts node.SystemctlOpts) error // StartKubelet starts kubelet on the given node StartKubelet(appNode node.Node, opts node.SystemctlOpts) error + + // GetPXCloudDriveConfigMap gets the PX-Cloud drive config map + GetPXCloudDriveConfigMap(cluster *operatorcorev1.StorageCluster) (map[string]node.DriveSet, error) } var ( @@ -557,4 +561,4 @@ type CSICloneRequest struct { Timestamp string OriginalPVCName string RestoredPVCName string -} +} \ No newline at end of file diff --git a/tests/basic/misc_test.go b/tests/basic/misc_test.go index daa2b1175..9269b32c2 100644 --- a/tests/basic/misc_test.go +++ b/tests/basic/misc_test.go @@ -1,8 +1,10 @@ package tests import ( + ctxt "context" "fmt" "github.com/google/uuid" + v1 "k8s.io/api/core/v1" "math/rand" "path" "strings" @@ -1846,3 +1848,89 @@ var _ = Describe("{VerifyNoPxRestartDueToPxPodStop}", func() { }) }) + +// Kubelet stopped on the nodes - and the client container should not be impacted. +var _ = Describe("{PerformStorageVMotions}", func() { + + JustBeforeEach(func() { + StartTorpedoTest("PerformStorageVMotions", "Perform Storage Vmotion and Validate PX", nil, 0) + + }) + var contexts []*scheduler.Context + + stepLog := "has to schedule apps and perform storage vmotion" + It(stepLog, func() { + log.InfoD(stepLog) + contexts = make([]*scheduler.Context, 0) + + for i := 0; i < Inst().GlobalScaleFactor; i++ { + contexts = append(contexts, ScheduleApplications(fmt.Sprintf("svmotion-%d", i))...) + } + + ValidateApplications(contexts) + + stepLog = "Choosing a single Storage Node randomly and performing SV Motion on it" + var randomIndex int + var moveAllDisks bool + Step(stepLog, func() { + log.InfoD(stepLog) + workerNodes := node.GetStorageNodes() + if len(workerNodes) > 0 { + randomIndex = rand.Intn(len(workerNodes)) + log.Infof("Selected worker node %v for storage vmotion", workerNodes[randomIndex].Name) + } else { + log.FailOnError(fmt.Errorf("no worker nodes available for svmotion"), "No worker nodes available") + + } + moveAllDisks = rand.Intn(2) == 0 + if moveAllDisks { + log.Infof("Moving all disks on worker node %v", workerNodes[randomIndex].Name) + } else { + log.Infof("Moving only largest sized disk(s) on worker node %v", workerNodes[randomIndex].Name) + } + + stc, err := Inst().V.GetDriver() + log.FailOnError(err, "Failed to get storage driver") + + preData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + log.FailOnError(err, "Failed to get pre-vMotion cloud drive config") + + var envVariables []v1.EnvVar + envVariables = stc.Spec.CommonConfig.Env + var prefixName string + for _, envVar := range envVariables { + if envVar.Name == "VSPHERE_DATASTORE_PREFIX" { + prefixName = envVar.Value + log.Infof("prefixName %s ", prefixName) + } + } + + ctx := ctxt.Background() + targetDatastore, err := Inst().N.StorageVmotion(ctx, workerNodes[randomIndex], prefixName, moveAllDisks) + dash.VerifyFatal(err, nil, fmt.Sprintf("validate storage vmotion on node [%s]", workerNodes[randomIndex].Name)) + + postData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + log.FailOnError(err, "Failed to get post-vMotion cloud drive config") + err = ValidateDatastoreUpdate(preData, postData, workerNodes[randomIndex].VolDriverNodeID, targetDatastore) + dash.VerifyFatal(err, nil, fmt.Sprintf("validate datastore update in cloud drive config after storage vmotion on node [%s]", workerNodes[randomIndex].Name)) + }) + + stepLog = "Validate PX on all nodes" + Step(stepLog, func() { + log.InfoD(stepLog) + for _, node := range node.GetStorageDriverNodes() { + status, err := IsPxRunningOnNode(&node) + log.FailOnError(err, fmt.Sprintf("Failed to check if PX is running on node [%s]", node.Name)) + dash.VerifySafely(status, true, fmt.Sprintf("PX is not running on node [%s]", node.Name)) + } + }) + + opts := make(map[string]bool) + opts[scheduler.OptionsWaitForResourceLeakCleanup] = true + ValidateAndDestroy(contexts, opts) + }) + JustAfterEach(func() { + defer EndTorpedoTest() + AfterEachTest(contexts) + }) +}) \ No newline at end of file diff --git a/tests/basic/storage_pool_test.go b/tests/basic/storage_pool_test.go index 182d3a990..a08bf7bfd 100644 --- a/tests/basic/storage_pool_test.go +++ b/tests/basic/storage_pool_test.go @@ -9601,7 +9601,7 @@ func pickPoolToResize(contexts []*scheduler.Context, expandType api.SdkStoragePo } for _, poolID := range poolsWithIO { - log.Infof("checking pool expansion eliginblity of pool [%s] with IOs", poolID) + log.Infof("checking pool expansion eligibility of pool [%s] with IOs", poolID) n, err := GetNodeWithGivenPoolID(poolID) if err != nil { continue @@ -11813,4 +11813,4 @@ var _ = Describe("{PoolDeleteMultiplePools}", func() { AfterEachTest(contexts) }) -}) +}) \ No newline at end of file diff --git a/tests/basic/upgrade_cluster_test.go b/tests/basic/upgrade_cluster_test.go index b71462112..3f77f9127 100644 --- a/tests/basic/upgrade_cluster_test.go +++ b/tests/basic/upgrade_cluster_test.go @@ -116,10 +116,10 @@ var _ = Describe("{UpgradeCluster}", func() { err = Inst().S.UpgradeScheduler(version) if err != nil { - err := Inst().S.RefreshNodeRegistry() - log.FailOnError(err, "Refresh Node Registry failed") - err = Inst().V.RefreshDriverEndpoints() - log.FailOnError(err, "Refresh Driver Endpoints failed") + neErr := Inst().S.RefreshNodeRegistry() + log.FailOnError(neErr, "Refresh Node Registry failed") + neErr = Inst().V.RefreshDriverEndpoints() + log.FailOnError(neErr, "Refresh Driver Endpoints failed") PrintPxctlStatus() PrintK8sClusterInfo() } @@ -311,4 +311,4 @@ func validateClusterNodes(stopSignal <-chan struct{}, mError *error) { itr++ time.Sleep(30 * time.Second) } -} +} \ No newline at end of file diff --git a/tests/common.go b/tests/common.go index 066856053..aae585bb3 100644 --- a/tests/common.go +++ b/tests/common.go @@ -10,6 +10,7 @@ import ( "errors" "flag" "fmt" + "github.com/vmware/govmomi/object" "io/ioutil" "maps" "math" @@ -14466,6 +14467,7 @@ func DeleteTorpedoApps() error { } return nil } + func ValidateNodePDB(minAvailable int, totalNodes int, errChan ...*chan error) { defer func() { if len(errChan) > 0 { @@ -14503,3 +14505,128 @@ func ValidateNodePDB(minAvailable int, totalNodes int, errChan ...*chan error) { } }) } + +// ValidateDatastoreUpdate validates the cloud drive configmap after Storage vmotion is done +func ValidateDatastoreUpdate(preData, postData map[string]node.DriveSet, nodeUUID string, targetDatastore *object.Datastore) error { + preNodeData, preExists := preData[nodeUUID] + postNodeData, postExists := postData[nodeUUID] + + if !preExists { + return fmt.Errorf("pre-migration data not found for node %v", nodeUUID) + } + + if !postExists { + return fmt.Errorf("post-migration data not found for node %v", nodeUUID) + + } + + postDiskDatastores := make(map[string]string) + for _, postDrive := range postNodeData.Configs { + postDiskDatastores[postDrive.DiskUUID] = postDrive.Labels["datastore"] + } + + allMoved := true + for preDiskID, preDrive := range preNodeData.Configs { + postDSName, exists := postDiskDatastores[preDrive.DiskUUID] + if !exists { + return fmt.Errorf("no post-migration data found for disk with UUID %v", preDrive.DiskUUID) + } + + postDS, err := Inst().N.FindDatastoreByName(postDSName) + if err != nil { + return fmt.Errorf("failed to find datastore with name %v: %v", postDSName, err) + } + postDSID := postDS.Reference().Value + + if !(postDSID == targetDatastore.Reference().Value) { + log.Errorf("Disk with UUID %v did not move to the target datastore %v as expected, or was not already there. This is for Node %v", preDrive.DiskUUID, targetDatastore.Name(), nodeUUID) + allMoved = false + } + if config, ok := postNodeData.Configs[preDiskID]; ok { + log.Infof("Pre-migration disk config: %+#v", preDrive) + log.Infof("Post-migration disk config: %+#v", config) + return fmt.Errorf("disk config with ID [%v] still exists in the node [%s] after storage vmotion", preDiskID, nodeUUID) + } + + } + + if !allMoved { + return fmt.Errorf("not all disks in the node [%s] moved to the target datastore %v as expected", targetDatastore.Name(), nodeUUID) + } + + alertResponse, err := Inst().V.GetAlertsUsingResourceTypeBySeverity(opsapi.ResourceType_RESOURCE_TYPE_DRIVE, + opsapi.SeverityType_SEVERITY_TYPE_NOTIFY) + if err != nil { + return fmt.Errorf("failed to get alerts for drives: %v", err) + } + alerts := alertResponse.GetAlerts() + + isAlertFound := false + for _, alert := range alerts { + log.Infof("Alert: %+#v", alert) + if alert.ResourceId == nodeUUID { + currTime := time.Now() + alertTime := time.Unix(alert.Timestamp.Seconds, 0) + log.Infof("alert time: %v, current time: %v", alertTime, currTime) + log.Infof("diff time: %v", currTime.Sub(alertTime)) + if !(currTime.Sub(alertTime) < 2*time.Minute) { + alertMsg := alert.Message + if strings.Contains(alertMsg, "New paths after Storage vMotion persisted") && strings.Contains(alertMsg, targetDatastore.Name()) { + log.InfoD("SvMotionMonitoringSuccess found for node %v: %v", nodeUUID, alertMsg) + isAlertFound = true + } + } + } + } + if !isAlertFound { + log.Infof("Alerts generated after storage vmotion are") + for _, alert := range alerts { + log.Infof("Time: %v , Alert: %s", alert.Timestamp.AsTime(), alert.Message) + } + return fmt.Errorf("SvMotionMonitoringSuccess alert not found for node %v", nodeUUID) + } + + ctx := context1.Background() + + n := node.Node{} + + n = node.GetNodesByVoDriverNodeID()[nodeUUID] + + if n.Name == "" { + return fmt.Errorf("node not found with UUID %v", nodeUUID) + } + + for _, config := range postNodeData.Configs { + + dsName := config.Labels["datastore"] + driveID := strings.Split(config.ID, " ") + if len(driveID) < 2 { + return fmt.Errorf("invalid drive id %v", config.ID) + } + dspath := driveID[1] + + vmdkPath := fmt.Sprintf("[%s] %s", dsName, dspath) + UUIDfromVMDKPath, err := Inst().N.GetUUIDFromVMDKPath(ctx, n, vmdkPath) + if err != nil { + return err + } + dash.VerifySafely(UUIDfromVMDKPath, config.DiskUUID, "Verify Drive ID after storage vmotion") + } + driveSet, err := Inst().V.GetDriveSet(&n) + if err != nil { + return err + } + driveSetConfigs := driveSet.Configs + for id, config := range postNodeData.Configs { + + if _, ok := driveSetConfigs[id]; !ok { + return fmt.Errorf("drive with ID %v not found in the drive set", id) + } + if config.Labels["datastore"] != driveSetConfigs[id].Labels["datastore"] { + return fmt.Errorf("datastore mismatch for drive with ID %v", id) + } + + } + + return nil +} \ No newline at end of file diff --git a/tests/testTriggers.go b/tests/testTriggers.go index 00312b211..302e8f034 100644 --- a/tests/testTriggers.go +++ b/tests/testTriggers.go @@ -49,7 +49,6 @@ import ( v1 "k8s.io/api/core/v1" storageapi "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/portworx/torpedo/drivers/backup" @@ -919,7 +918,12 @@ func TriggerDetachDrives(contexts *[]*scheduler.Context, recordChan *chan *Event var nodeId string storageNodes := node.GetStorageNodes() nodeId = storageNodes[0].VolDriverNodeID - err = Inst().N.DetachDrivesFromVM(stc, storageNodes[0].Name) + pxCloudDriveConfigMap, err := Inst().S.GetPXCloudDriveConfigMap(stc) + if err != nil { + UpdateOutcome(event, err) + return + } + err = Inst().N.DetachDrivesFromVM(storageNodes[0].Name, pxCloudDriveConfigMap) UpdateOutcome(event, err) time.Sleep(1 * time.Minute) statusErr := Inst().V.WaitDriverUpOnNode(storageNodes[0], 10*time.Minute) @@ -12056,18 +12060,11 @@ func TriggerSvMotionSingleNode(contexts *[]*scheduler.Context, recordChan *chan setMetrics(*event) var randomIndex int - var namespace string - var err error var moveAllDisks bool stepLog := "Choosing a single Storage Node randomly and performing SV Motion on it" Step(stepLog, func() { log.InfoD(stepLog) - if namespace, err = Inst().S.GetPortworxNamespace(); err != nil { - log.Errorf("Failed to get portworx namespace. Error : %v", err) - UpdateOutcome(event, err) - return - } workerNodes := node.GetStorageNodes() if len(workerNodes) > 0 { randomIndex = rand.Intn(len(workerNodes)) @@ -12083,8 +12080,43 @@ func TriggerSvMotionSingleNode(contexts *[]*scheduler.Context, recordChan *chan } else { log.Infof("Moving only largest sized disk(s) on worker node %v", workerNodes[randomIndex].Name) } + + stc, err := Inst().V.GetDriver() + if err != nil { + UpdateOutcome(event, err) + return + + } + + preData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + if err != nil { + UpdateOutcome(event, err) + return + } + + var envVariables []v1.EnvVar + envVariables = stc.Spec.CommonConfig.Env + var prefixName string + for _, envVar := range envVariables { + if envVar.Name == "VSPHERE_DATASTORE_PREFIX" { + prefixName = envVar.Value + log.Infof("prefixName %s ", prefixName) + } + } + ctx := ctxt.Background() - err = Inst().N.StorageVmotion(ctx, workerNodes[randomIndex], namespace, moveAllDisks) + targetDatastore, err := Inst().N.StorageVmotion(ctx, workerNodes[randomIndex], prefixName, moveAllDisks) + if err != nil { + UpdateOutcome(event, err) + return + } + + postData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + if err != nil { + err = fmt.Errorf("error fetching post-vMotion cloud drive config: %v", err) + UpdateOutcome(event, err) + } + err = ValidateDatastoreUpdate(preData, postData, workerNodes[randomIndex].VolDriverNodeID, targetDatastore) UpdateOutcome(event, err) }) updateMetrics(*event) @@ -12114,16 +12146,9 @@ func TriggerSvMotionMultipleNodes(contexts *[]*scheduler.Context, recordChan *ch Step(stepLog, func() { log.InfoD(stepLog) - var namespace string var err error maxNodes := 20 - if namespace, err = Inst().S.GetPortworxNamespace(); err != nil { - log.Errorf("Failed to get portworx namespace. Error: %v", err) - UpdateOutcome(event, err) - return - } - workerNodes := node.GetStorageNodes() if len(workerNodes) == 0 { log.Infof("No worker nodes available") @@ -12139,10 +12164,32 @@ func TriggerSvMotionMultipleNodes(contexts *[]*scheduler.Context, recordChan *ch if numSelectedNodes > maxNodes { numSelectedNodes = maxNodes } + stc, err := Inst().V.GetDriver() + if err != nil { + UpdateOutcome(event, err) + return + + } + + preData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + if err != nil { + UpdateOutcome(event, err) + return + } var wg sync.WaitGroup wg.Add(numSelectedNodes) + var envVariables []v1.EnvVar + envVariables = stc.Spec.CommonConfig.Env + var prefixName string + for _, envVar := range envVariables { + if envVar.Name == "VSPHERE_DATASTORE_PREFIX" { + prefixName = envVar.Value + log.Infof("prefixName %s ", prefixName) + } + } + for i := 0; i < numSelectedNodes; i++ { go func(node node.Node) { defer wg.Done() @@ -12155,15 +12202,25 @@ func TriggerSvMotionMultipleNodes(contexts *[]*scheduler.Context, recordChan *ch } ctx := ctxt.Background() - if err := Inst().N.StorageVmotion(ctx, node, namespace, moveAllDisks); err != nil { + targetDatastore, err := Inst().N.StorageVmotion(ctx, node, prefixName, moveAllDisks) + if err != nil { log.Errorf("Storage vMotion failed for node %v. Error: %v", node.Name, err) UpdateOutcome(event, err) } + + postData, err := Inst().S.GetPXCloudDriveConfigMap(stc) + if err != nil { + err = fmt.Errorf("error fetching post-vMotion cloud drive config: %v", err) + UpdateOutcome(event, err) + } + err = ValidateDatastoreUpdate(preData, postData, node.VolDriverNodeID, targetDatastore) + UpdateOutcome(event, err) + }(workerNodes[i]) } wg.Wait() - UpdateOutcome(event, nil) + }) updateMetrics(*event) @@ -12706,4 +12763,4 @@ $('#pxtable tr td').each(function(){
-` +` \ No newline at end of file