From f92801bd332c1672c835f8ff0fe0f755bef4b5a0 Mon Sep 17 00:00:00 2001 From: Elena Gershkovich Date: Tue, 5 Mar 2024 13:40:46 +0200 Subject: [PATCH] Add VolumeGroupReplication support Signed-off-by: Elena Gershkovich --- config/dr-cluster/rbac/role.yaml | 20 + config/rbac/role.yaml | 20 + controllers/suite_test.go | 4 + .../volumereplicationgroup_controller.go | 58 ++- controllers/vrg_volrep.go | 344 +++++++++++++----- go.mod | 1 + go.sum | 2 + main.go | 2 + 8 files changed, 347 insertions(+), 104 deletions(-) diff --git a/config/dr-cluster/rbac/role.yaml b/config/dr-cluster/rbac/role.yaml index 87d81c91ca..d6eb35c472 100644 --- a/config/dr-cluster/rbac/role.yaml +++ b/config/dr-cluster/rbac/role.yaml @@ -123,6 +123,26 @@ rules: - get - list - watch +- apiGroups: + - cache.storage.ramendr.io + resources: + - volumegroupreplicationclasses + verbs: + - get + - list + - watch +- apiGroups: + - cache.storage.ramendr.io + resources: + - volumegroupreplications + verbs: + - create + - update + - delete + - get + - list + - watch + - patch - apiGroups: - storage.k8s.io resources: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4573e3c8c5..f65ba664f5 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -67,6 +67,26 @@ rules: - get - list - watch +- apiGroups: + - cache.storage.ramendr.io + resources: + - volumegroupreplicationclasses + verbs: + - get + - list + - watch +- apiGroups: + - cache.storage.ramendr.io + resources: + - volumegroupreplications + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - cluster.open-cluster-management.io resources: diff --git a/controllers/suite_test.go b/controllers/suite_test.go index b14f349378..5a2f16650f 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -34,6 +34,7 @@ import ( snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" ocmclv1 "github.com/open-cluster-management/api/cluster/v1" ocmworkv1 "github.com/open-cluster-management/api/work/v1" + volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1" viewv1beta1 "github.com/stolostron/multicloud-operators-foundation/pkg/apis/view/v1beta1" plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1" cpcv1 "open-cluster-management.io/config-policy-controller/api/v1" @@ -199,6 +200,9 @@ var _ = BeforeSuite(func() { err = volrep.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = volgroup.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = volsyncv1alpha1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/volumereplicationgroup_controller.go b/controllers/volumereplicationgroup_controller.go index c27be224ba..69445438a5 100644 --- a/controllers/volumereplicationgroup_controller.go +++ b/controllers/volumereplicationgroup_controller.go @@ -15,6 +15,7 @@ import ( volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" "github.com/google/uuid" + volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1" "github.com/ramendr/ramen/controllers/kubeobjects" "github.com/ramendr/ramen/controllers/kubeobjects/velero" "golang.org/x/exp/maps" // TODO replace with "maps" in go1.21+ @@ -92,7 +93,8 @@ func (r *VolumeReplicationGroupReconciler) SetupWithManager( builder.WithPredicates(rmnutil.CreateOrDeleteOrResourceVersionUpdatePredicate{}), ). Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.configMapFun)). - Owns(&volrep.VolumeReplication{}) + Owns(&volrep.VolumeReplication{}). + Owns(&volgroup.VolumeGroupReplication{}) if !ramenConfig.VolSync.Disabled { r.Log.Info("VolSync enabled; adding owns and watches") @@ -358,6 +360,8 @@ func filterPVC(reader client.Reader, pvc *corev1.PersistentVolumeClaim, log logr // +kubebuilder:rbac:groups=ramendr.openshift.io,resources=volumereplicationgroups/finalizers,verbs=update // +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplicationclasses,verbs=get;list;watch +// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplications,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplicationclasses,verbs=get;list;watch // +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch;create;update // +kubebuilder:rbac:groups=storage.k8s.io,resources=volumeattachments,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch @@ -397,6 +401,7 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct volRepPVCs: []corev1.PersistentVolumeClaim{}, volSyncPVCs: []corev1.PersistentVolumeClaim{}, replClassList: &volrep.VolumeReplicationClassList{}, + grpReplClassList: &volgroup.VolumeGroupReplicationClassList{}, namespacedName: req.NamespacedName.String(), objectStorers: make(map[string]cachedObjectStorer), storageClassCache: make(map[string]*storagev1.StorageClass), @@ -459,10 +464,12 @@ type VRGInstance struct { volRepPVCs []corev1.PersistentVolumeClaim volSyncPVCs []corev1.PersistentVolumeClaim replClassList *volrep.VolumeReplicationClassList + grpReplClassList *volgroup.VolumeGroupReplicationClassList storageClassCache map[string]*storagev1.StorageClass vrgObjectProtected *metav1.Condition kubeObjectsProtected *metav1.Condition vrcUpdated bool + vgrcUpdated bool namespacedName string volSyncHandler *volsync.VSHandler objectStorers map[string]cachedObjectStorer @@ -638,6 +645,8 @@ func (v *VRGInstance) listPVCsByPVCSelector(labelSelector metav1.LabelSelector, } // updatePVCList fetches and updates the PVC list to process for the current instance of VRG +// +//nolint:cyclop func (v *VRGInstance) updatePVCList() error { pvcList, err := v.listPVCsByVrgPVCSelector() if err != nil { @@ -663,6 +672,16 @@ func (v *VRGInstance) updatePVCList() error { v.vrcUpdated = true } + if !v.vgrcUpdated { + if err := v.updateGroupReplicationClassList(); err != nil { + v.log.Error(err, "Failed to get VolumeGroupReplicationClass list") + + return fmt.Errorf("failed to get VolumeGroupReplicationClass list") + } + + v.vgrcUpdated = true + } + if rmnutil.ResourceIsDeleted(v.instance) { v.separatePVCsUsingVRGStatus(pvcList) v.log.Info(fmt.Sprintf("Separated PVCs (%d) into VolRepPVCs (%d) and VolSyncPVCs (%d)", @@ -671,10 +690,11 @@ func (v *VRGInstance) updatePVCList() error { return nil } - if len(v.replClassList.Items) == 0 { + if len(v.replClassList.Items) == 0 && len(v.grpReplClassList.Items) == 0 { v.volSyncPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items)) numCopied := copy(v.volSyncPVCs, pvcList.Items) - v.log.Info("No VolumeReplicationClass available. Using all PVCs with VolSync", "pvcCount", numCopied) + v.log.Info("No VolumeReplicationClass or VolumeGroupReplicationClass available. Using all PVCs with VolSync", + "pvcCount", numCopied) return nil } @@ -703,6 +723,26 @@ func (v *VRGInstance) updateReplicationClassList() error { return nil } +func (v *VRGInstance) updateGroupReplicationClassList() error { + labelSelector := v.instance.Spec.Async.ReplicationClassSelector + + v.log.Info("Fetching VolumeGroupReplicationClass", "labeled", labels.Set(labelSelector.MatchLabels)) + listOptions := []client.ListOption{ + client.MatchingLabels(labelSelector.MatchLabels), + } + + if err := v.reconciler.List(v.ctx, v.grpReplClassList, listOptions...); err != nil { + v.log.Error(err, "Failed to list Group Replication Classes", + "labeled", labels.Set(labelSelector.MatchLabels)) + + return fmt.Errorf("failed to list Group Replication Classes, %w", err) + } + + v.log.Info("Number of Group Replication Classes", "count", len(v.grpReplClassList.Items)) + + return nil +} + func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolumeClaimList) { for idx := range pvcList.Items { pvc := &pvcList.Items[idx] @@ -719,6 +759,7 @@ func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolum } } +//nolint:gocognit,cyclop func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.PersistentVolumeClaimList) error { for idx := range pvcList.Items { pvc := &pvcList.Items[idx] @@ -746,6 +787,17 @@ func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.P } } + if !replicationClassMatchFound { + for _, replicationClass := range v.grpReplClassList.Items { + if storageClass.Provisioner == replicationClass.Spec.Provisioner { + v.volRepPVCs = append(v.volRepPVCs, *pvc) + replicationClassMatchFound = true + + break + } + } + } + if !replicationClassMatchFound { v.volSyncPVCs = append(v.volSyncPVCs, *pvc) } diff --git a/controllers/vrg_volrep.go b/controllers/vrg_volrep.go index 55239e9d65..9cccf631bc 100644 --- a/controllers/vrg_volrep.go +++ b/controllers/vrg_volrep.go @@ -15,6 +15,7 @@ import ( volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" volrepController "github.com/csi-addons/kubernetes-csi-addons/controllers/replication.storage" + volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -253,7 +254,7 @@ func (v *VRGInstance) updateProtectedPVCs(pvc *corev1.PersistentVolumeClaim) err func setPVCStorageIdentifiers( protectedPVC *ramendrv1alpha1.ProtectedPVC, storageClass *storagev1.StorageClass, - volumeReplicationClass *volrep.VolumeReplicationClass, + volumeReplicationClass client.Object, ) { protectedPVC.StorageIdentifiers.StorageProvisioner = storageClass.Provisioner @@ -264,9 +265,9 @@ func setPVCStorageIdentifiers( } } - if value, ok := volumeReplicationClass.Labels[VolumeReplicationIDLabel]; ok { + if value, ok := volumeReplicationClass.GetLabels()[VolumeReplicationIDLabel]; ok { protectedPVC.StorageIdentifiers.ReplicationID.ID = value - if modes, ok := volumeReplicationClass.Labels[MModesLabel]; ok { + if modes, ok := volumeReplicationClass.GetLabels()[MModesLabel]; ok { protectedPVC.StorageIdentifiers.ReplicationID.Modes = MModesFromCSV(modes) } } @@ -898,9 +899,13 @@ func (v *VRGInstance) reconcileMissingVR(pvc *corev1.PersistentVolumeClaim, log return !vrMissing, !requeue } - volRep := &volrep.VolumeReplication{} vrNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace} + volRep, vrTypeErr := v.getVolumeReplication(&vrNamespacedName) + if vrTypeErr != nil { + return !vrMissing, requeue + } + err := v.reconciler.Get(v.ctx, vrNamespacedName, volRep) if err == nil { if rmnutil.ResourceIsDeleted(volRep) { @@ -930,6 +935,102 @@ func (v *VRGInstance) reconcileMissingVR(pvc *corev1.PersistentVolumeClaim, log return vrMissing, !requeue } +func (v *VRGInstance) getVolumeGroupReplicationState(state volrep.ReplicationState) (volgroup.ReplicationState, error) { + var volGroupRepState volgroup.ReplicationState + + switch state { + case volrep.Primary: + volGroupRepState = volgroup.Primary + case volrep.Secondary: + case volrep.Resync: + volGroupRepState = volgroup.Secondary + default: + v.log.Info(fmt.Sprintf("invalid Replication State %s", string(state))) + + return volGroupRepState, fmt.Errorf("invalid Replication State %s", string(state)) + } + + return volGroupRepState, nil +} + +func (v *VRGInstance) getVolumeReplication( + vrNamespacedName *types.NamespacedName, +) (client.Object, error) { + volumeReplicationClass, vrTypeErr := v.selectVolumeReplicationClass(*vrNamespacedName) + if vrTypeErr != nil { + return nil, vrTypeErr + } + + var volRep client.Object + + switch volumeReplicationClass.(type) { + case *volgroup.VolumeGroupReplicationClass: + vrNamespacedName.Name = vrNamespacedName.Namespace + "-vgr" + + return &volgroup.VolumeGroupReplication{}, nil + case *volrep.VolumeReplicationClass: + return &volrep.VolumeReplication{}, nil + default: + return volRep, fmt.Errorf("failed to find Volume or VolumeGroup replication") + } +} + +func (v *VRGInstance) createVolumeReplication(vrNamespacedName *types.NamespacedName, + state volrep.ReplicationState, +) (client.Object, error) { + volumeReplicationClass, err := v.selectVolumeReplicationClass(*vrNamespacedName) + if err != nil { + return nil, err + } + + var volRep client.Object + + switch volumeReplicationClass.(type) { + case *volgroup.VolumeGroupReplicationClass: + volGroupRepState, err := v.getVolumeGroupReplicationState(state) + if err != nil { + return nil, err + } + + vrNamespacedName.Name = vrNamespacedName.Namespace + "-vgr" + + return &volgroup.VolumeGroupReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: vrNamespacedName.Name, + Namespace: vrNamespacedName.Namespace, + }, + Spec: volgroup.VolumeGroupReplicationSpec{ + ReplicationState: volGroupRepState, + VolumeGroupReplicationClass: volumeReplicationClass.GetName(), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": vrNamespacedName.Namespace, + }, + }, + }, + }, nil + case *volrep.VolumeReplicationClass: + return &volrep.VolumeReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: vrNamespacedName.Name, + Namespace: vrNamespacedName.Namespace, + }, + Spec: volrep.VolumeReplicationSpec{ + DataSource: corev1.TypedLocalObjectReference{ + Kind: "PersistentVolumeClaim", + Name: vrNamespacedName.Name, + APIGroup: new(string), + }, + ReplicationState: state, + VolumeReplicationClass: volumeReplicationClass.GetName(), + AutoResync: v.autoResync(state), + }, + }, nil + default: + return volRep, fmt.Errorf("failed to create Volume or VolumeGroup replication") + } +} + func (v *VRGInstance) deleteClusterDataInS3Stores(log logr.Logger) error { log.Info("Delete cluster data in", "s3Profiles", v.instance.Spec.S3Profiles) @@ -1078,7 +1179,10 @@ func (v *VRGInstance) createOrUpdateVR(vrNamespacedName types.NamespacedName, ) (bool, bool, error) { const requeue = true - volRep := &volrep.VolumeReplication{} + volRep, vrTypeErr := v.getVolumeReplication(&vrNamespacedName) + if vrTypeErr != nil { + return requeue, false, vrTypeErr + } err := v.reconciler.Get(v.ctx, vrNamespacedName, volRep) if err != nil { @@ -1137,70 +1241,71 @@ func (v *VRGInstance) autoResync(state volrep.ReplicationState) bool { // - a boolean indicating if a reconcile requeue is required // - a boolean indicating if VR is already at the desired state // - any errors during the process of updating the resource -func (v *VRGInstance) updateVR(volRep *volrep.VolumeReplication, +func (v *VRGInstance) updateVR(volRep client.Object, state volrep.ReplicationState, log logr.Logger, ) (bool, bool, error) { const requeue = true - // If state is already as desired, check the status - if volRep.Spec.ReplicationState == state && volRep.Spec.AutoResync == v.autoResync(state) { - log.Info("VolumeReplication and VolumeReplicationGroup state and autoresync match. Proceeding to status check") + switch obj := volRep.(type) { + case *volrep.VolumeReplication: + // If state is already as desired, check the status + if obj.Spec.ReplicationState == state && obj.Spec.AutoResync == v.autoResync(state) { + log.Info("VolumeReplication and VolumeReplicationGroup state and autoresync match. Proceeding to status check") - return !requeue, v.checkVRStatus(volRep), nil - } + return !requeue, v.checkVRStatus(obj, obj.Status.Conditions, obj.Status.ObservedGeneration), nil + } + + obj.Spec.ReplicationState = state + obj.Spec.AutoResync = v.autoResync(state) + case *volgroup.VolumeGroupReplication: + groupState, err := v.getVolumeGroupReplicationState(state) + if err != nil { + return requeue, false, err + } - volRep.Spec.ReplicationState = state - volRep.Spec.AutoResync = v.autoResync(state) + if obj.Spec.ReplicationState == groupState { + log.Info("VolumeGroupReplication and VolumeReplicationGroup state match. Proceeding to status check") + + return !requeue, v.checkVRStatus(obj, obj.Status.Conditions, obj.Status.ObservedGeneration), nil + } + + obj.Spec.ReplicationState = groupState + default: + v.log.Info(fmt.Sprintf("invalid VolumeReplication (%s:%s)", volRep.GetName(), volRep.GetNamespace())) + + return requeue, false, fmt.Errorf("invalid VolumeReplication (%s:%s)", volRep.GetName(), volRep.GetNamespace()) + } if err := v.reconciler.Update(v.ctx, volRep); err != nil { log.Error(err, "Failed to update VolumeReplication resource", - "name", volRep.Name, "namespace", volRep.Namespace, + "name", volRep.GetName(), "namespace", volRep.GetNamespace(), "state", state) rmnutil.ReportIfNotPresent(v.reconciler.eventRecorder, v.instance, corev1.EventTypeWarning, rmnutil.EventReasonVRUpdateFailed, err.Error()) msg := "Failed to update VolumeReplication resource" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg) return requeue, false, fmt.Errorf("failed to update VolumeReplication resource"+ " (%s/%s) as %s, belonging to VolumeReplicationGroup (%s/%s), %w", - volRep.Namespace, volRep.Name, state, + volRep.GetNamespace(), volRep.GetName(), state, v.instance.Namespace, v.instance.Name, err) } log.Info(fmt.Sprintf("Updated VolumeReplication resource (%s/%s) with state %s", - volRep.Name, volRep.Namespace, state)) + volRep.GetName(), volRep.GetNamespace(), state)) // Just updated the state of the VolRep. Mark it as progressing. msg := "Updated VolumeReplication resource for PVC" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonProgressing, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonProgressing, msg) return !requeue, false, nil } // createVR creates a VolumeReplication CR with a PVC as its data source. func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volrep.ReplicationState) error { - volumeReplicationClass, err := v.selectVolumeReplicationClass(vrNamespacedName) + volRep, err := v.createVolumeReplication(&vrNamespacedName, state) if err != nil { - return fmt.Errorf("failed to find the appropriate VolumeReplicationClass (%s) %w", - v.instance.Name, err) - } - - volRep := &volrep.VolumeReplication{ - ObjectMeta: metav1.ObjectMeta{ - Name: vrNamespacedName.Name, - Namespace: vrNamespacedName.Namespace, - Labels: rmnutil.OwnerLabels(v.instance), - }, - Spec: volrep.VolumeReplicationSpec{ - DataSource: corev1.TypedLocalObjectReference{ - Kind: "PersistentVolumeClaim", - Name: vrNamespacedName.Name, - APIGroup: new(string), - }, - ReplicationState: state, - VolumeReplicationClass: volumeReplicationClass.GetName(), - AutoResync: v.autoResync(state), - }, + return err } if !vrgInAdminNamespace(v.instance, v.ramenConfig) { @@ -1209,7 +1314,7 @@ func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volr // when VRG is not in the admin namespace. if err := ctrl.SetControllerReference(v.instance, volRep, v.reconciler.Scheme); err != nil { return fmt.Errorf("failed to set owner reference to VolumeReplication resource (%s/%s), %w", - volRep.Name, volRep.Namespace, err) + volRep.GetName(), volRep.GetNamespace(), err) } } @@ -1227,9 +1332,11 @@ func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volr // VolumeReplicationGroup has the same name as pvc. But in future if it changes // functions to be changed would be processVRAsPrimary(), processVRAsSecondary() // to either receive pvc NamespacedName or pvc itself as an additional argument. +// +//nolint:funlen,gocognit,cyclop func (v *VRGInstance) selectVolumeReplicationClass( namespacedName types.NamespacedName, -) (*volrep.VolumeReplicationClass, error) { +) (client.Object, error) { if !v.vrcUpdated { if err := v.updateReplicationClassList(); err != nil { v.log.Error(err, "Failed to get VolumeReplicationClass list") @@ -1240,12 +1347,6 @@ func (v *VRGInstance) selectVolumeReplicationClass( v.vrcUpdated = true } - if len(v.replClassList.Items) == 0 { - v.log.Info("No VolumeReplicationClass available") - - return nil, fmt.Errorf("no VolumeReplicationClass available") - } - storageClass, err := v.getStorageClass(namespacedName) if err != nil { v.log.Info(fmt.Sprintf("Failed to get the storageclass of pvc %s", @@ -1276,6 +1377,39 @@ func (v *VRGInstance) selectVolumeReplicationClass( } } + if !v.vgrcUpdated { + if err := v.updateGroupReplicationClassList(); err != nil { + v.log.Error(err, "Failed to get VolumeGroupReplicationClass list") + + return nil, fmt.Errorf("failed to get VolumeGroupReplicationClass list") + } + + v.vgrcUpdated = true + } + + for index := range v.grpReplClassList.Items { + grpReplicationClass := &v.grpReplClassList.Items[index] + if storageClass.Provisioner != grpReplicationClass.Spec.Provisioner { + continue + } + + schedulingInterval, found := grpReplicationClass.Spec.Parameters["schedulingInterval"] + if !found { + v.log.Info("No schedulingInterval in VolumeGroupReplicationClass available") + + // schedule not present in parameters of this replicationClass. + continue + } + + // ReplicationClass that matches both VRG schedule and pvc provisioner + if schedulingInterval == v.instance.Spec.Async.SchedulingInterval { + v.log.Info(fmt.Sprintf("Found VolumeGroupReplicationClass that matches provisioner and schedule %s/%s", + storageClass.Provisioner, v.instance.Spec.Async.SchedulingInterval)) + + return grpReplicationClass, nil + } + } + v.log.Info(fmt.Sprintf("No VolumeReplicationClass found to match provisioner and schedule %s/%s", storageClass.Provisioner, v.instance.Spec.Async.SchedulingInterval)) @@ -1332,30 +1466,32 @@ func (v *VRGInstance) getStorageClass(namespacedName types.NamespacedName) (*sto // checkVRStatus checks if the VolumeReplication resource has the desired status for the // current generation and returns true if so, false otherwise -func (v *VRGInstance) checkVRStatus(volRep *volrep.VolumeReplication) bool { +func (v *VRGInstance) checkVRStatus(volRep client.Object, + conditions []metav1.Condition, observedGeneration int64, +) bool { // When the generation in the status is updated, VRG would get a reconcile // as it owns VolumeReplication resource. - if volRep.Generation != volRep.Status.ObservedGeneration { + if volRep.GetGeneration() != observedGeneration { v.log.Info(fmt.Sprintf("Generation mismatch in status for VolumeReplication resource (%s/%s)", - volRep.Name, volRep.Namespace)) + volRep.GetName(), volRep.GetNamespace())) msg := "VolumeReplication generation not updated in status" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonProgressing, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonProgressing, msg) return false } switch { case v.instance.Spec.ReplicationState == ramendrv1alpha1.Primary: - return v.validateVRStatus(volRep, ramendrv1alpha1.Primary) + return v.validateVRStatus(volRep, ramendrv1alpha1.Primary, conditions) case v.instance.Spec.ReplicationState == ramendrv1alpha1.Secondary: - return v.validateVRStatus(volRep, ramendrv1alpha1.Secondary) + return v.validateVRStatus(volRep, ramendrv1alpha1.Secondary, conditions) default: v.log.Info(fmt.Sprintf("invalid Replication State %s for VolumeReplicationGroup (%s:%s)", string(v.instance.Spec.ReplicationState), v.instance.Name, v.instance.Namespace)) msg := "VolumeReplicationGroup state invalid" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg) return false } @@ -1366,7 +1502,9 @@ func (v *VRGInstance) checkVRStatus(volRep *volrep.VolumeReplication) bool { // - When replication state is Primary, only Completed condition is checked. // - When replication state is Secondary, all 3 conditions for Completed/Degraded/Resyncing is // checked and ensured healthy. -func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state ramendrv1alpha1.ReplicationState) bool { +func (v *VRGInstance) validateVRStatus(volRep client.Object, state ramendrv1alpha1.ReplicationState, + conditions []metav1.Condition, +) bool { var ( stateString string action string @@ -1382,33 +1520,33 @@ func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state r } // it should be completed - conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionCompleted, metav1.ConditionTrue) + conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionCompleted, metav1.ConditionTrue) if !conditionMet { defaultMsg := fmt.Sprintf("VolumeReplication resource for pvc not %s to %s", action, stateString) - v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataReadyConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataProtectedConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace)) + v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace())) return false } // if primary, all checks are completed if state == ramendrv1alpha1.Secondary { - return v.validateAdditionalVRStatusForSecondary(volRep) + return v.validateAdditionalVRStatusForSecondary(volRep, conditions) } msg = "PVC in the VolumeReplicationGroup is ready for use" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReady, msg) - v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReady, msg) - v.updatePVCLastSyncTime(volRep.Namespace, volRep.Name, volRep.Status.LastSyncTime) - v.updatePVCLastSyncDuration(volRep.Namespace, volRep.Name, volRep.Status.LastSyncDuration) - v.updatePVCLastSyncBytes(volRep.Namespace, volRep.Name, volRep.Status.LastSyncBytes) - v.log.Info(fmt.Sprintf("VolumeReplication resource %s/%s is ready for use", volRep.Name, - volRep.Namespace)) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonReady, msg) + v.updatePVCDataProtectedCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonReady, msg) + // v.updatePVCLastSyncTime(volRep.GetNamespace(), volRep.GetName(), volRep.Status.LastSyncTime) + // v.updatePVCLastSyncDuration(volRep.GetNamespace(), volRep.GetName(), volRep.Status.LastSyncDuration) + // v.updatePVCLastSyncBytes(volRep.GetNamespace(), volRep.GetName(), volRep.Status.LastSyncBytes) + v.log.Info(fmt.Sprintf("VolumeReplication resource %s/%s is ready for use", volRep.GetName(), + volRep.GetNamespace())) return true } @@ -1430,92 +1568,92 @@ func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state r // With 2nd condition being met, // ProtectedPVC.Conditions[DataReady] = True // ProtectedPVC.Conditions[DataProtected] = True -func (v *VRGInstance) validateAdditionalVRStatusForSecondary(volRep *volrep.VolumeReplication) bool { - v.updatePVCLastSyncTime(volRep.Namespace, volRep.Name, nil) - v.updatePVCLastSyncDuration(volRep.Namespace, volRep.Name, nil) - v.updatePVCLastSyncBytes(volRep.Namespace, volRep.Name, nil) +func (v *VRGInstance) validateAdditionalVRStatusForSecondary(volRep client.Object, conditions []metav1.Condition) bool { + v.updatePVCLastSyncTime(volRep.GetNamespace(), volRep.GetName(), nil) + v.updatePVCLastSyncDuration(volRep.GetNamespace(), volRep.GetName(), nil) + v.updatePVCLastSyncBytes(volRep.GetNamespace(), volRep.GetName(), nil) - conditionMet, _ := isVRConditionMet(volRep, volrepController.ConditionResyncing, metav1.ConditionTrue) + conditionMet, _ := isVRConditionMet(volRep, conditions, volrepController.ConditionResyncing, metav1.ConditionTrue) if !conditionMet { - return v.checkResyncCompletionAsSecondary(volRep) + return v.checkResyncCompletionAsSecondary(volRep, conditions) } - conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionDegraded, metav1.ConditionTrue) + conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionDegraded, metav1.ConditionTrue) if !conditionMet { - v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataProtectedConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, "VolumeReplication resource for pvc is not in Degraded condition while resyncing") - v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataReadyConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, "VolumeReplication resource for pvc is not in Degraded condition while resyncing") v.log.Info(fmt.Sprintf("VolumeReplication resource is not in degraded condition while"+ - " resyncing is true (%s/%s)", volRep.Name, volRep.Namespace)) + " resyncing is true (%s/%s)", volRep.GetName(), volRep.GetNamespace())) return false } msg = "VolumeReplication resource for the pvc is syncing as Secondary" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicating, msg) - v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicating, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonReplicating, msg) + v.updatePVCDataProtectedCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonReplicating, msg) v.log.Info(fmt.Sprintf("VolumeReplication resource for the pvc is syncing as Secondary (%s/%s)", - volRep.Name, volRep.Namespace)) + volRep.GetName(), volRep.GetNamespace())) return true } // checkResyncCompletionAsSecondary returns true if resync status is complete as secondary, false otherwise -func (v *VRGInstance) checkResyncCompletionAsSecondary(volRep *volrep.VolumeReplication) bool { - conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionResyncing, metav1.ConditionFalse) +func (v *VRGInstance) checkResyncCompletionAsSecondary(volRep client.Object, conditions []metav1.Condition) bool { + conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionResyncing, metav1.ConditionFalse) if !conditionMet { defaultMsg := "VolumeReplication resource for pvc not syncing as Secondary" - v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataReadyConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataProtectedConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace)) + v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace())) return false } - conditionMet, msg = isVRConditionMet(volRep, volrepController.ConditionDegraded, metav1.ConditionFalse) + conditionMet, msg = isVRConditionMet(volRep, conditions, volrepController.ConditionDegraded, metav1.ConditionFalse) if !conditionMet { defaultMsg := "VolumeReplication resource for pvc is not syncing and is degraded as Secondary" - v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataReadyConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg, + v.updatePVCDataProtectedConditionHelper(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonError, msg, defaultMsg) - v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace)) + v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace())) return false } msg = "VolumeReplication resource for the pvc as Secondary is in sync with Primary" - v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicated, msg) - v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonDataProtected, msg) + v.updatePVCDataReadyCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonReplicated, msg) + v.updatePVCDataProtectedCondition(volRep.GetNamespace(), volRep.GetName(), VRGConditionReasonDataProtected, msg) v.log.Info(fmt.Sprintf("data sync completed as both degraded and resyncing are false for"+ - " secondary VolRep (%s/%s)", volRep.Name, volRep.Namespace)) + " secondary VolRep (%s/%s)", volRep.GetName(), volRep.GetNamespace())) return true } -func isVRConditionMet(volRep *volrep.VolumeReplication, +func isVRConditionMet(volRep client.Object, conditions []metav1.Condition, conditionType string, desiredStatus metav1.ConditionStatus, ) (bool, string) { - volRepCondition := findCondition(volRep.Status.Conditions, conditionType) + volRepCondition := findCondition(conditions, conditionType) if volRepCondition == nil { msg := fmt.Sprintf("Failed to get the %s condition from status of VolumeReplication resource.", conditionType) return false, msg } - if volRep.Generation != volRepCondition.ObservedGeneration { + if volRep.GetGeneration() != volRepCondition.ObservedGeneration { msg := fmt.Sprintf("Stale generation for condition %s from status of VolumeReplication resource.", conditionType) return false, msg @@ -1711,7 +1849,10 @@ func setPVCClusterDataProtectedCondition(protectedPVC *ramendrv1alpha1.Protected // ensureVRDeletedFromAPIServer adds an additional step to ensure that we wait for volumereplication deletion // from API server before moving ahead with vrg finalizer removal. func (v *VRGInstance) ensureVRDeletedFromAPIServer(vrNamespacedName types.NamespacedName, log logr.Logger) error { - volRep := &volrep.VolumeReplication{} + volRep, vrTypeErr := v.getVolumeReplication(&vrNamespacedName) + if vrTypeErr != nil { + return vrTypeErr + } err := v.reconciler.APIReader.Get(v.ctx, vrNamespacedName, volRep) if err == nil { @@ -1733,13 +1874,14 @@ func (v *VRGInstance) ensureVRDeletedFromAPIServer(vrNamespacedName types.Namesp // deleteVR deletes a VolumeReplication instance if found func (v *VRGInstance) deleteVR(vrNamespacedName types.NamespacedName, log logr.Logger) error { - cr := &volrep.VolumeReplication{ - ObjectMeta: metav1.ObjectMeta{ - Name: vrNamespacedName.Name, - Namespace: vrNamespacedName.Namespace, - }, + cr, vrTypeErr := v.getVolumeReplication(&vrNamespacedName) + if vrTypeErr != nil { + return vrTypeErr } + cr.SetName(vrNamespacedName.Name) + cr.SetNamespace(vrNamespacedName.Namespace) + err := v.reconciler.Delete(v.ctx, cr) if err != nil { if !k8serrors.IsNotFound(err) { diff --git a/go.mod b/go.mod index fbd730628a..9a92c59c59 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/operator-framework/api v0.17.6 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 + github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444 github.com/ramendr/ramen/api v0.0.0-20240117171503-e11c56eac24d github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932 github.com/stolostron/multicloud-operators-foundation v0.0.0-20220824091202-e9cd9710d009 diff --git a/go.sum b/go.sum index 49e17eb023..8bccb09868 100644 --- a/go.sum +++ b/go.sum @@ -277,6 +277,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444 h1:IetfzsnrKvfbr64hCzV5tfRU9caPRtksxXl1BG7Crfg= +github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444/go.mod h1:Dt/6XAyRYH+jiCmDD1p/lDEGWYMok5bHcxv8jYhJgFU= github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932 h1:n89W9K2gDa0XwdIVuWyg53hPgaR97DfGVi9o2V0WcWA= github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932/go.mod h1:QHVQXKgNId8EfvNd+Y6JcTrsXwTImtSFkV4IsiOkwCw= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/main.go b/main.go index 6897e99b45..c4fed052c0 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" clrapiv1beta1 "github.com/open-cluster-management-io/api/cluster/v1beta1" ocmworkv1 "github.com/open-cluster-management/api/work/v1" + volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1" viewv1beta1 "github.com/stolostron/multicloud-operators-foundation/pkg/apis/view/v1beta1" plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1" velero "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -108,6 +109,7 @@ func configureController(ramenConfig *ramendrv1alpha1.RamenConfig) error { } else { utilruntime.Must(velero.AddToScheme(scheme)) utilruntime.Must(volrep.AddToScheme(scheme)) + utilruntime.Must(volgroup.AddToScheme(scheme)) utilruntime.Must(volsyncv1alpha1.AddToScheme(scheme)) utilruntime.Must(snapv1.AddToScheme(scheme)) utilruntime.Must(recipe.AddToScheme(scheme))