From 04b85803c6897c5c31cd5148303972b23a1af130 Mon Sep 17 00:00:00 2001
From: Elena Gershkovich <elenage@il.ibm.com>
Date: Tue, 5 Mar 2024 13:40:46 +0200
Subject: [PATCH] Add VolumeGroupReplication support

Signed-off-by: Elena Gershkovich <elenage@il.ibm.com>
---
 config/dr-cluster/rbac/role.yaml              |  20 +
 config/rbac/role.yaml                         |  20 +
 controllers/suite_test.go                     |   4 +
 .../volumereplicationgroup_controller.go      |  58 +-
 controllers/vrg_volrep.go                     | 514 +++++++++++++-----
 go.mod                                        |   1 +
 go.sum                                        |   2 +
 main.go                                       |   2 +
 8 files changed, 487 insertions(+), 134 deletions(-)

diff --git a/config/dr-cluster/rbac/role.yaml b/config/dr-cluster/rbac/role.yaml
index 87d81c91ca..d6eb35c472 100644
--- a/config/dr-cluster/rbac/role.yaml
+++ b/config/dr-cluster/rbac/role.yaml
@@ -123,6 +123,26 @@ rules:
   - get
   - list
   - watch
+- apiGroups:
+  - cache.storage.ramendr.io
+  resources:
+  - volumegroupreplicationclasses
+  verbs:
+  - get
+  - list
+  - watch
+- apiGroups:
+  - cache.storage.ramendr.io
+  resources:
+  - volumegroupreplications
+  verbs:
+  - create
+  - update
+  - delete
+  - get
+  - list
+  - watch
+  - patch
 - apiGroups:
   - storage.k8s.io
   resources:
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 4573e3c8c5..f65ba664f5 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -67,6 +67,26 @@ rules:
   - get
   - list
   - watch
+- apiGroups:
+  - cache.storage.ramendr.io
+  resources:
+  - volumegroupreplicationclasses
+  verbs:
+  - get
+  - list
+  - watch
+- apiGroups:
+  - cache.storage.ramendr.io
+  resources:
+  - volumegroupreplications
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
 - apiGroups:
   - cluster.open-cluster-management.io
   resources:
diff --git a/controllers/suite_test.go b/controllers/suite_test.go
index b14f349378..5a2f16650f 100644
--- a/controllers/suite_test.go
+++ b/controllers/suite_test.go
@@ -34,6 +34,7 @@ import (
 	snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
 	ocmclv1 "github.com/open-cluster-management/api/cluster/v1"
 	ocmworkv1 "github.com/open-cluster-management/api/work/v1"
+	volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
 	viewv1beta1 "github.com/stolostron/multicloud-operators-foundation/pkg/apis/view/v1beta1"
 	plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1"
 	cpcv1 "open-cluster-management.io/config-policy-controller/api/v1"
@@ -199,6 +200,9 @@ var _ = BeforeSuite(func() {
 	err = volrep.AddToScheme(scheme.Scheme)
 	Expect(err).NotTo(HaveOccurred())
 
+	err = volgroup.AddToScheme(scheme.Scheme)
+	Expect(err).NotTo(HaveOccurred())
+
 	err = volsyncv1alpha1.AddToScheme(scheme.Scheme)
 	Expect(err).NotTo(HaveOccurred())
 
diff --git a/controllers/volumereplicationgroup_controller.go b/controllers/volumereplicationgroup_controller.go
index af5bdcea07..0284fd2bf9 100644
--- a/controllers/volumereplicationgroup_controller.go
+++ b/controllers/volumereplicationgroup_controller.go
@@ -15,6 +15,7 @@ import (
 
 	volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
 	"github.com/google/uuid"
+	volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
 	"github.com/ramendr/ramen/controllers/kubeobjects"
 	"github.com/ramendr/ramen/controllers/kubeobjects/velero"
 	"golang.org/x/exp/maps" // TODO replace with "maps" in go1.21+
@@ -92,7 +93,8 @@ func (r *VolumeReplicationGroupReconciler) SetupWithManager(
 			builder.WithPredicates(rmnutil.CreateOrDeleteOrResourceVersionUpdatePredicate{}),
 		).
 		Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.configMapFun)).
-		Owns(&volrep.VolumeReplication{})
+		Owns(&volrep.VolumeReplication{}).
+		Owns(&volgroup.VolumeGroupReplication{})
 
 	if !ramenConfig.VolSync.Disabled {
 		r.Log.Info("VolSync enabled; adding owns and watches")
@@ -358,6 +360,8 @@ func filterPVC(reader client.Reader, pvc *corev1.PersistentVolumeClaim, log logr
 // +kubebuilder:rbac:groups=ramendr.openshift.io,resources=volumereplicationgroups/finalizers,verbs=update
 // +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;create;update;patch;delete
 // +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplicationclasses,verbs=get;list;watch
+// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplications,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplicationclasses,verbs=get;list;watch
 // +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch;create;update
 // +kubebuilder:rbac:groups=storage.k8s.io,resources=volumeattachments,verbs=get;list;watch
 // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
@@ -397,6 +401,7 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct
 		volRepPVCs:        []corev1.PersistentVolumeClaim{},
 		volSyncPVCs:       []corev1.PersistentVolumeClaim{},
 		replClassList:     &volrep.VolumeReplicationClassList{},
+		grpReplClassList:  &volgroup.VolumeGroupReplicationClassList{},
 		namespacedName:    req.NamespacedName.String(),
 		objectStorers:     make(map[string]cachedObjectStorer),
 		storageClassCache: make(map[string]*storagev1.StorageClass),
@@ -459,10 +464,12 @@ type VRGInstance struct {
 	volRepPVCs           []corev1.PersistentVolumeClaim
 	volSyncPVCs          []corev1.PersistentVolumeClaim
 	replClassList        *volrep.VolumeReplicationClassList
+	grpReplClassList     *volgroup.VolumeGroupReplicationClassList
 	storageClassCache    map[string]*storagev1.StorageClass
 	vrgObjectProtected   *metav1.Condition
 	kubeObjectsProtected *metav1.Condition
 	vrcUpdated           bool
+	vgrcUpdated          bool
 	namespacedName       string
 	volSyncHandler       *volsync.VSHandler
 	objectStorers        map[string]cachedObjectStorer
@@ -638,6 +645,8 @@ func (v *VRGInstance) listPVCsByPVCSelector(labelSelector metav1.LabelSelector,
 }
 
 // updatePVCList fetches and updates the PVC list to process for the current instance of VRG
+//
+//nolint:cyclop
 func (v *VRGInstance) updatePVCList() error {
 	pvcList, err := v.listPVCsByVrgPVCSelector()
 	if err != nil {
@@ -663,6 +672,16 @@ func (v *VRGInstance) updatePVCList() error {
 		v.vrcUpdated = true
 	}
 
+	if !v.vgrcUpdated {
+		if err := v.updateGroupReplicationClassList(); err != nil {
+			v.log.Error(err, "Failed to get VolumeGroupReplicationClass list")
+
+			return fmt.Errorf("failed to get VolumeGroupReplicationClass list")
+		}
+
+		v.vgrcUpdated = true
+	}
+
 	if rmnutil.ResourceIsDeleted(v.instance) {
 		v.separatePVCsUsingVRGStatus(pvcList)
 		v.log.Info(fmt.Sprintf("Separated PVCs (%d) into VolRepPVCs (%d) and VolSyncPVCs (%d)",
@@ -671,10 +690,11 @@ func (v *VRGInstance) updatePVCList() error {
 		return nil
 	}
 
-	if len(v.replClassList.Items) == 0 {
+	if len(v.replClassList.Items) == 0 && len(v.grpReplClassList.Items) == 0 {
 		v.volSyncPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items))
 		numCopied := copy(v.volSyncPVCs, pvcList.Items)
-		v.log.Info("No VolumeReplicationClass available. Using all PVCs with VolSync", "pvcCount", numCopied)
+		v.log.Info("No VolumeReplicationClass or VolumeGroupReplicationClass available. Using all PVCs with VolSync",
+			"pvcCount", numCopied)
 
 		return nil
 	}
@@ -703,6 +723,26 @@ func (v *VRGInstance) updateReplicationClassList() error {
 	return nil
 }
 
+func (v *VRGInstance) updateGroupReplicationClassList() error {
+	labelSelector := v.instance.Spec.Async.ReplicationClassSelector
+
+	v.log.Info("Fetching VolumeGroupReplicationClass", "labeled", labels.Set(labelSelector.MatchLabels))
+	listOptions := []client.ListOption{
+		client.MatchingLabels(labelSelector.MatchLabels),
+	}
+
+	if err := v.reconciler.List(v.ctx, v.grpReplClassList, listOptions...); err != nil {
+		v.log.Error(err, "Failed to list Group Replication Classes",
+			"labeled", labels.Set(labelSelector.MatchLabels))
+
+		return fmt.Errorf("failed to list Group Replication Classes, %w", err)
+	}
+
+	v.log.Info("Number of Group Replication Classes", "count", len(v.grpReplClassList.Items))
+
+	return nil
+}
+
 func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolumeClaimList) {
 	for idx := range pvcList.Items {
 		pvc := &pvcList.Items[idx]
@@ -719,6 +759,7 @@ func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolum
 	}
 }
 
+//nolint:gocognit,cyclop
 func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.PersistentVolumeClaimList) error {
 	for idx := range pvcList.Items {
 		pvc := &pvcList.Items[idx]
@@ -746,6 +787,17 @@ func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.P
 			}
 		}
 
+		if !replicationClassMatchFound {
+			for _, replicationClass := range v.grpReplClassList.Items {
+				if storageClass.Provisioner == replicationClass.Spec.Provisioner {
+					v.volRepPVCs = append(v.volRepPVCs, *pvc)
+					replicationClassMatchFound = true
+
+					break
+				}
+			}
+		}
+
 		if !replicationClassMatchFound {
 			v.volSyncPVCs = append(v.volSyncPVCs, *pvc)
 		}
diff --git a/controllers/vrg_volrep.go b/controllers/vrg_volrep.go
index 8ccd6ff9b2..25a4abf179 100644
--- a/controllers/vrg_volrep.go
+++ b/controllers/vrg_volrep.go
@@ -15,10 +15,12 @@ import (
 
 	volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
 	volrepController "github.com/csi-addons/kubernetes-csi-addons/controllers/replication.storage"
+	volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
 	corev1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/types"
 	ctrl "sigs.k8s.io/controller-runtime"
 	"sigs.k8s.io/controller-runtime/pkg/client"
@@ -37,8 +39,7 @@ func logWithPvcName(log logr.Logger, pvc *corev1.PersistentVolumeClaim) logr.Log
 func (v *VRGInstance) reconcileVolRepsAsPrimary() {
 	for idx := range v.volRepPVCs {
 		pvc := &v.volRepPVCs[idx]
-		pvcNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
-		log := v.log.WithValues("pvc", pvcNamespacedName.String())
+		log := logWithPvcName(v.log, pvc)
 
 		if v.pvcUnprotectVolRepIfDeleted(*pvc, log) {
 			continue
@@ -62,7 +63,7 @@ func (v *VRGInstance) reconcileVolRepsAsPrimary() {
 		}
 
 		// If VR did not reach primary state, it is fine to still upload the PV and continue processing
-		requeueResult, _, err := v.processVRAsPrimary(pvcNamespacedName, log)
+		requeueResult, _, err := v.processVRAsPrimary(pvc, log)
 		if requeueResult {
 			v.requeue()
 		}
@@ -152,9 +153,7 @@ func (v *VRGInstance) reconcileVRAsSecondary(pvc *corev1.PersistentVolumeClaim,
 		return requeue, false, skip
 	}
 
-	pvcNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
-
-	requeueResult, ready, err := v.processVRAsSecondary(pvcNamespacedName, log)
+	requeueResult, ready, err := v.processVRAsSecondary(pvc, log)
 	if err != nil {
 		log.Info("Failure in getting or creating VolumeReplication resource for PersistentVolumeClaim",
 			"errorValue", err)
@@ -227,7 +226,7 @@ func (v *VRGInstance) updateProtectedPVCs(pvc *corev1.PersistentVolumeClaim) err
 			pvcNamespacedName, err)
 	}
 
-	volumeReplicationClass, err := v.selectVolumeReplicationClass(pvcNamespacedName)
+	volumeReplicationClass, err := v.selectVolumeReplicationClass(pvc)
 	if err != nil {
 		return fmt.Errorf("failed to find the appropriate VolumeReplicationClass (%s) %w",
 			v.instance.Name, err)
@@ -253,7 +252,7 @@ func (v *VRGInstance) updateProtectedPVCs(pvc *corev1.PersistentVolumeClaim) err
 func setPVCStorageIdentifiers(
 	protectedPVC *ramendrv1alpha1.ProtectedPVC,
 	storageClass *storagev1.StorageClass,
-	volumeReplicationClass *volrep.VolumeReplicationClass,
+	volumeReplicationClass client.Object,
 ) {
 	protectedPVC.StorageIdentifiers.StorageProvisioner = storageClass.Provisioner
 
@@ -264,9 +263,9 @@ func setPVCStorageIdentifiers(
 		}
 	}
 
-	if value, ok := volumeReplicationClass.Labels[VolumeReplicationIDLabel]; ok {
+	if value, ok := volumeReplicationClass.GetLabels()[VolumeReplicationIDLabel]; ok {
 		protectedPVC.StorageIdentifiers.ReplicationID.ID = value
-		if modes, ok := volumeReplicationClass.Labels[MModesLabel]; ok {
+		if modes, ok := volumeReplicationClass.GetLabels()[MModesLabel]; ok {
 			protectedPVC.StorageIdentifiers.ReplicationID.Modes = MModesFromCSV(modes)
 		}
 	}
@@ -822,8 +821,6 @@ func (v *VRGInstance) pvcsUnprotectVolRep(pvcs []corev1.PersistentVolumeClaim) {
 func (v *VRGInstance) reconcileVRForDeletion(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
 	const requeue = true
 
-	pvcNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
-
 	if v.instance.Spec.ReplicationState == ramendrv1alpha1.Secondary {
 		requeueResult, ready, skip := v.reconcileVRAsSecondary(pvc, log)
 		if requeueResult {
@@ -839,7 +836,7 @@ func (v *VRGInstance) reconcileVRForDeletion(pvc *corev1.PersistentVolumeClaim,
 			return !requeue
 		}
 	} else {
-		requeueResult, ready, err := v.processVRAsPrimary(pvcNamespacedName, log)
+		requeueResult, ready, err := v.processVRAsPrimary(pvc, log)
 		switch {
 		case err != nil:
 			log.Info("Requeuing due to failure in getting or creating VolumeReplication resource for PersistentVolumeClaim",
@@ -859,9 +856,7 @@ func (v *VRGInstance) reconcileVRForDeletion(pvc *corev1.PersistentVolumeClaim,
 func (v *VRGInstance) undoPVCFinalizersAndPVRetention(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
 	const requeue = true
 
-	pvcNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
-
-	if err := v.deleteVR(pvcNamespacedName, log); err != nil {
+	if err := v.deleteVR(pvc, log); err != nil {
 		log.Info("Requeuing due to failure in finalizing VolumeReplication resource for PersistentVolumeClaim",
 			"errorValue", err)
 
@@ -898,9 +893,13 @@ func (v *VRGInstance) reconcileMissingVR(pvc *corev1.PersistentVolumeClaim, log
 		return !vrMissing, !requeue
 	}
 
-	volRep := &volrep.VolumeReplication{}
 	vrNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
 
+	volRep, vrTypeErr := v.getVolumeReplication(pvc, &vrNamespacedName)
+	if vrTypeErr != nil {
+		return !vrMissing, requeue
+	}
+
 	err := v.reconciler.Get(v.ctx, vrNamespacedName, volRep)
 	if err == nil {
 		if rmnutil.ResourceIsDeleted(volRep) {
@@ -930,6 +929,83 @@ func (v *VRGInstance) reconcileMissingVR(pvc *corev1.PersistentVolumeClaim, log
 	return vrMissing, !requeue
 }
 
+func (v *VRGInstance) getVolumeGroupReplicationState(state volrep.ReplicationState) (volgroup.ReplicationState, error) {
+	var volGroupRepState volgroup.ReplicationState
+
+	switch state {
+	case volrep.Primary:
+		volGroupRepState = volgroup.Primary
+	case volrep.Secondary:
+	case volrep.Resync:
+		volGroupRepState = volgroup.Secondary
+	default:
+		v.log.Info(fmt.Sprintf("invalid Replication State %s", string(state)))
+
+		return volGroupRepState, fmt.Errorf("invalid Replication State %s", string(state))
+	}
+
+	return volGroupRepState, nil
+}
+
+func (v *VRGInstance) getVolumeReplication(pvc *corev1.PersistentVolumeClaim,
+	vrNamespacedName *types.NamespacedName,
+) (client.Object, error) {
+	volumeReplicationClass, vrTypeErr := v.selectVolumeReplicationClass(pvc)
+	if vrTypeErr != nil {
+		return nil, vrTypeErr
+	}
+
+	var volRep client.Object
+
+	switch volumeReplicationClass.(type) {
+	case *volgroup.VolumeGroupReplicationClass:
+		vrNamespacedName.Name = volumeReplicationClass.GetName() + "-vgr"
+
+		return &volgroup.VolumeGroupReplication{}, nil
+	case *volrep.VolumeReplicationClass:
+		return &volrep.VolumeReplication{}, nil
+	default:
+		return volRep, fmt.Errorf("failed to find Volume or VolumeGroup replication")
+	}
+}
+
+func (v *VRGInstance) createVolumeReplication(pvc *corev1.PersistentVolumeClaim,
+	state volrep.ReplicationState, volumeReplicationClass client.Object,
+) client.Object {
+	return &volrep.VolumeReplication{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      pvc.Name,
+			Namespace: pvc.Namespace,
+		},
+		Spec: volrep.VolumeReplicationSpec{
+			DataSource: corev1.TypedLocalObjectReference{
+				Kind:     "PersistentVolumeClaim",
+				Name:     pvc.Name,
+				APIGroup: new(string),
+			},
+			ReplicationState:       state,
+			VolumeReplicationClass: volumeReplicationClass.GetName(),
+			AutoResync:             v.autoResync(state),
+		},
+	}
+}
+
+func (v *VRGInstance) createVolumeGroupReplication(pvc *corev1.PersistentVolumeClaim,
+	state volgroup.ReplicationState, volumeReplicationClass client.Object,
+) client.Object {
+	return &volgroup.VolumeGroupReplication{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      volumeReplicationClass.GetName() + "-vgr",
+			Namespace: pvc.Namespace,
+		},
+		Spec: volgroup.VolumeGroupReplicationSpec{
+			ReplicationState:            state,
+			VolumeGroupReplicationClass: volumeReplicationClass.GetName(),
+			Selector:                    &v.instance.Spec.PVCSelector,
+		},
+	}
+}
+
 func (v *VRGInstance) deleteClusterDataInS3Stores(log logr.Logger) error {
 	log.Info("Delete cluster data in", "s3Profiles", v.instance.Spec.S3Profiles)
 
@@ -1003,9 +1079,9 @@ func (v *VRGInstance) s3StoreDo(do func(ObjectStorer) error, msg, s3ProfileName
 //   - a boolean indicating if a reconcile requeue is required
 //   - a boolean indicating if VR is already at the desired state
 //   - any errors during processing
-func (v *VRGInstance) processVRAsPrimary(vrNamespacedName types.NamespacedName, log logr.Logger) (bool, bool, error) {
+func (v *VRGInstance) processVRAsPrimary(pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, bool, error) {
 	if v.instance.Spec.Async != nil {
-		return v.createOrUpdateVR(vrNamespacedName, volrep.Primary, log)
+		return v.createOrUpdateVR(pvc, volrep.Primary, log)
 	}
 
 	// TODO: createOrUpdateVR does two things. It modifies the VR and also
@@ -1017,11 +1093,11 @@ func (v *VRGInstance) processVRAsPrimary(vrNamespacedName types.NamespacedName,
 	// condition where both async and sync are enabled at the same time.
 	if v.instance.Spec.Sync != nil {
 		msg := "PVC in the VolumeReplicationGroup is ready for use"
-		v.updatePVCDataReadyCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonReady, msg)
-		v.updatePVCDataProtectedCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonReady, msg)
-		v.updatePVCLastSyncTime(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
-		v.updatePVCLastSyncDuration(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
-		v.updatePVCLastSyncBytes(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
+		v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonReady, msg)
+		v.updatePVCDataProtectedCondition(pvc.Namespace, pvc.Name, VRGConditionReasonReady, msg)
+		v.updatePVCLastSyncTime(pvc.Namespace, pvc.Name, nil)
+		v.updatePVCLastSyncDuration(pvc.Namespace, pvc.Name, nil)
+		v.updatePVCLastSyncBytes(pvc.Namespace, pvc.Name, nil)
 
 		return false, true, nil
 	}
@@ -1035,9 +1111,9 @@ func (v *VRGInstance) processVRAsPrimary(vrNamespacedName types.NamespacedName,
 //   - a boolean indicating if a reconcile requeue is required
 //   - a boolean indicating if VR is already at the desired state
 //   - any errors during processing
-func (v *VRGInstance) processVRAsSecondary(vrNamespacedName types.NamespacedName, log logr.Logger) (bool, bool, error) {
+func (v *VRGInstance) processVRAsSecondary(pvc *corev1.PersistentVolumeClaim, log logr.Logger) (bool, bool, error) {
 	if v.instance.Spec.Async != nil {
-		return v.createOrUpdateVR(vrNamespacedName, volrep.Secondary, log)
+		return v.createOrUpdateVR(pvc, volrep.Secondary, log)
 	}
 
 	// TODO: createOrUpdateVR does two things. It modifies the VR and also
@@ -1049,12 +1125,12 @@ func (v *VRGInstance) processVRAsSecondary(vrNamespacedName types.NamespacedName
 	// condition where both async and sync are enabled at the same time.
 	if v.instance.Spec.Sync != nil {
 		msg := "VolumeReplication resource for the pvc as Secondary is in sync with Primary"
-		v.updatePVCDataReadyCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonReplicated, msg)
-		v.updatePVCDataProtectedCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonDataProtected,
+		v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonReplicated, msg)
+		v.updatePVCDataProtectedCondition(pvc.Namespace, pvc.Name, VRGConditionReasonDataProtected,
 			msg)
-		v.updatePVCLastSyncTime(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
-		v.updatePVCLastSyncDuration(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
-		v.updatePVCLastSyncBytes(vrNamespacedName.Namespace, vrNamespacedName.Name, nil)
+		v.updatePVCLastSyncTime(pvc.Namespace, pvc.Name, nil)
+		v.updatePVCLastSyncDuration(pvc.Namespace, pvc.Name, nil)
+		v.updatePVCLastSyncBytes(pvc.Namespace, pvc.Name, nil)
 
 		return false, true, nil
 	}
@@ -1073,12 +1149,17 @@ func (v *VRGInstance) processVRAsSecondary(vrNamespacedName types.NamespacedName
 //   - a boolean indicating if a reconcile requeue is required
 //   - a boolean indicating if VR is already at the desired state
 //   - any errors during processing
-func (v *VRGInstance) createOrUpdateVR(vrNamespacedName types.NamespacedName,
+func (v *VRGInstance) createOrUpdateVR(pvc *corev1.PersistentVolumeClaim,
 	state volrep.ReplicationState, log logr.Logger,
 ) (bool, bool, error) {
 	const requeue = true
 
-	volRep := &volrep.VolumeReplication{}
+	vrNamespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
+
+	volRep, vrTypeErr := v.getVolumeReplication(pvc, &vrNamespacedName)
+	if vrTypeErr != nil {
+		return requeue, false, vrTypeErr
+	}
 
 	err := v.reconciler.Get(v.ctx, vrNamespacedName, volRep)
 	if err != nil {
@@ -1090,7 +1171,7 @@ func (v *VRGInstance) createOrUpdateVR(vrNamespacedName types.NamespacedName,
 			// is it replicating or not. So, mark the protected pvc as error
 			// with condition.status as Unknown.
 			msg := "Failed to get VolumeReplication resource"
-			v.updatePVCDataReadyCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonErrorUnknown, msg)
+			v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonErrorUnknown, msg)
 
 			return requeue, false, fmt.Errorf("failed to get VolumeReplication resource"+
 				" (%s/%s) belonging to VolumeReplicationGroup (%s/%s), %w",
@@ -1098,13 +1179,13 @@ func (v *VRGInstance) createOrUpdateVR(vrNamespacedName types.NamespacedName,
 		}
 
 		// Create VR for PVC
-		if err = v.createVR(vrNamespacedName, state); err != nil {
+		if err = v.createVR(pvc, state); err != nil {
 			log.Error(err, "Failed to create VolumeReplication resource", "resource", vrNamespacedName)
 			rmnutil.ReportIfNotPresent(v.reconciler.eventRecorder, v.instance, corev1.EventTypeWarning,
 				rmnutil.EventReasonVRCreateFailed, err.Error())
 
 			msg := "Failed to create VolumeReplication resource"
-			v.updatePVCDataReadyCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonError, msg)
+			v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonError, msg)
 
 			return requeue, false, fmt.Errorf("failed to create VolumeReplication resource"+
 				" (%s/%s) belonging to VolumeReplicationGroup (%s/%s), %w",
@@ -1113,12 +1194,12 @@ func (v *VRGInstance) createOrUpdateVR(vrNamespacedName types.NamespacedName,
 
 		// Just created VolRep. Mark status.conditions as Progressing.
 		msg := "Created VolumeReplication resource for PVC"
-		v.updatePVCDataReadyCondition(vrNamespacedName.Namespace, vrNamespacedName.Name, VRGConditionReasonProgressing, msg)
+		v.updatePVCDataReadyCondition(pvc.Namespace, pvc.Name, VRGConditionReasonProgressing, msg)
 
 		return !requeue, false, nil
 	}
 
-	return v.updateVR(volRep, state, log)
+	return v.updateVR(pvc, volRep, state, log)
 }
 
 func (v *VRGInstance) autoResync(state volrep.ReplicationState) bool {
@@ -1137,7 +1218,25 @@ func (v *VRGInstance) autoResync(state volrep.ReplicationState) bool {
 //   - a boolean indicating if a reconcile requeue is required
 //   - a boolean indicating if VR is already at the desired state
 //   - any errors during the process of updating the resource
-func (v *VRGInstance) updateVR(volRep *volrep.VolumeReplication,
+func (v *VRGInstance) updateVR(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	state volrep.ReplicationState, log logr.Logger,
+) (bool, bool, error) {
+	const requeue = true
+
+	switch obj := volRep.(type) {
+	case *volrep.VolumeReplication:
+		return v.updateVolumeReplication(pvc, obj, state, log)
+	case *volgroup.VolumeGroupReplication:
+		return v.updateVolumeGroupReplication(pvc, obj, state, log)
+	default:
+		errMsg := fmt.Sprintf("invalid VolumeReplication (%s:%s)", volRep.GetName(), volRep.GetNamespace())
+		v.log.Info(errMsg)
+
+		return requeue, false, fmt.Errorf(errMsg)
+	}
+}
+
+func (v *VRGInstance) updateVolumeReplication(pvc *corev1.PersistentVolumeClaim, volRep *volrep.VolumeReplication,
 	state volrep.ReplicationState, log logr.Logger,
 ) (bool, bool, error) {
 	const requeue = true
@@ -1146,61 +1245,93 @@ func (v *VRGInstance) updateVR(volRep *volrep.VolumeReplication,
 	if volRep.Spec.ReplicationState == state && volRep.Spec.AutoResync == v.autoResync(state) {
 		log.Info("VolumeReplication and VolumeReplicationGroup state and autoresync match. Proceeding to status check")
 
-		return !requeue, v.checkVRStatus(volRep), nil
+		vrStatus := v.checkVRStatus(pvc, volRep, volRep.Status.Conditions, volRep.Status.ObservedGeneration)
+		if vrStatus {
+			v.updatePVCLastSyncTime(pvc.GetNamespace(), pvc.GetName(), volRep.Status.LastSyncTime)
+			v.updatePVCLastSyncDuration(pvc.GetNamespace(), pvc.GetName(), volRep.Status.LastSyncDuration)
+			v.updatePVCLastSyncBytes(pvc.GetNamespace(), pvc.GetName(), volRep.Status.LastSyncBytes)
+		}
+		return !requeue, vrStatus, nil
 	}
 
 	volRep.Spec.ReplicationState = state
 	volRep.Spec.AutoResync = v.autoResync(state)
 
+	return v.updateReplicationObject(pvc, volRep, state, log)
+}
+
+func (v *VRGInstance) updateVolumeGroupReplication(pvc *corev1.PersistentVolumeClaim,
+	volRep *volgroup.VolumeGroupReplication, state volrep.ReplicationState, log logr.Logger,
+) (bool, bool, error) {
+	const requeue = true
+
+	groupState, err := v.getVolumeGroupReplicationState(state)
+	if err != nil {
+		return requeue, false, err
+	}
+
+	if volRep.Spec.ReplicationState == groupState {
+		log.Info("VolumeGroupReplication and VolumeReplicationGroup state match. Proceeding to status check")
+
+		return !requeue, v.checkVRStatus(pvc, volRep, volRep.Status.Conditions, volRep.Status.ObservedGeneration), nil
+	}
+
+	volRep.Spec.ReplicationState = groupState
+
+	return v.updateReplicationObject(pvc, volRep, state, log)
+}
+
+func (v *VRGInstance) updateReplicationObject(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	state volrep.ReplicationState, log logr.Logger,
+) (bool, bool, error) {
+	const requeue = true
+
 	if err := v.reconciler.Update(v.ctx, volRep); err != nil {
 		log.Error(err, "Failed to update VolumeReplication resource",
-			"name", volRep.Name, "namespace", volRep.Namespace,
+			"name", volRep.GetName(), "namespace", volRep.GetNamespace(),
 			"state", state)
 		rmnutil.ReportIfNotPresent(v.reconciler.eventRecorder, v.instance, corev1.EventTypeWarning,
 			rmnutil.EventReasonVRUpdateFailed, err.Error())
 
 		msg := "Failed to update VolumeReplication resource"
-		v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg)
+		v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg)
 
 		return requeue, false, fmt.Errorf("failed to update VolumeReplication resource"+
 			" (%s/%s) as %s, belonging to VolumeReplicationGroup (%s/%s), %w",
-			volRep.Namespace, volRep.Name, state,
+			volRep.GetNamespace(), volRep.GetName(), state,
 			v.instance.Namespace, v.instance.Name, err)
 	}
 
 	log.Info(fmt.Sprintf("Updated VolumeReplication resource (%s/%s) with state %s",
-		volRep.Name, volRep.Namespace, state))
+		volRep.GetName(), volRep.GetNamespace(), state))
 	// Just updated the state of the VolRep. Mark it as progressing.
 	msg := "Updated VolumeReplication resource for PVC"
-	v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonProgressing, msg)
+	v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonProgressing, msg)
 
 	return !requeue, false, nil
 }
 
 // createVR creates a VolumeReplication CR with a PVC as its data source.
-func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volrep.ReplicationState) error {
-	volumeReplicationClass, err := v.selectVolumeReplicationClass(vrNamespacedName)
+func (v *VRGInstance) createVR(pvc *corev1.PersistentVolumeClaim, state volrep.ReplicationState) error {
+	volumeReplicationClass, err := v.selectVolumeReplicationClass(pvc)
 	if err != nil {
-		return fmt.Errorf("failed to find the appropriate VolumeReplicationClass (%s) %w",
-			v.instance.Name, err)
+		return err
 	}
 
-	volRep := &volrep.VolumeReplication{
-		ObjectMeta: metav1.ObjectMeta{
-			Name:      vrNamespacedName.Name,
-			Namespace: vrNamespacedName.Namespace,
-			Labels:    rmnutil.OwnerLabels(v.instance),
-		},
-		Spec: volrep.VolumeReplicationSpec{
-			DataSource: corev1.TypedLocalObjectReference{
-				Kind:     "PersistentVolumeClaim",
-				Name:     vrNamespacedName.Name,
-				APIGroup: new(string),
-			},
-			ReplicationState:       state,
-			VolumeReplicationClass: volumeReplicationClass.GetName(),
-			AutoResync:             v.autoResync(state),
-		},
+	var volRep client.Object
+
+	switch volumeReplicationClass.(type) {
+	case *volgroup.VolumeGroupReplicationClass:
+		volGroupRepState, err := v.getVolumeGroupReplicationState(state)
+		if err != nil {
+			return err
+		}
+
+		volRep = v.createVolumeGroupReplication(pvc, volGroupRepState, volumeReplicationClass)
+	case *volrep.VolumeReplicationClass:
+		volRep = v.createVolumeReplication(pvc, state, volumeReplicationClass)
+	default:
+		return fmt.Errorf("failed to create Volume or VolumeGroup replication")
 	}
 
 	if !vrgInAdminNamespace(v.instance, v.ramenConfig) {
@@ -1209,14 +1340,15 @@ func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volr
 		// when VRG is not in the admin namespace.
 		if err := ctrl.SetControllerReference(v.instance, volRep, v.reconciler.Scheme); err != nil {
 			return fmt.Errorf("failed to set owner reference to VolumeReplication resource (%s/%s), %w",
-				volRep.Name, volRep.Namespace, err)
+				volRep.GetName(), volRep.GetNamespace(), err)
 		}
 	}
 
 	v.log.Info("Creating VolumeReplication resource", "resource", volRep)
 
 	if err := v.reconciler.Create(v.ctx, volRep); err != nil {
-		return fmt.Errorf("failed to create VolumeReplication resource (%s), %w", vrNamespacedName, err)
+		return fmt.Errorf("failed to create VolumeReplication resource (%s/%s), %w",
+			volRep.GetName(), volRep.GetNamespace(), err)
 	}
 
 	return nil
@@ -1227,9 +1359,13 @@ func (v *VRGInstance) createVR(vrNamespacedName types.NamespacedName, state volr
 // VolumeReplicationGroup has the same name as pvc. But in future if it changes
 // functions to be changed would be processVRAsPrimary(), processVRAsSecondary()
 // to either receive pvc NamespacedName or pvc itself as an additional argument.
+//
+//nolint:funlen,gocognit,cyclop
 func (v *VRGInstance) selectVolumeReplicationClass(
-	namespacedName types.NamespacedName,
-) (*volrep.VolumeReplicationClass, error) {
+	pvc *corev1.PersistentVolumeClaim,
+) (client.Object, error) {
+	namespacedName := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
+
 	if !v.vrcUpdated {
 		if err := v.updateReplicationClassList(); err != nil {
 			v.log.Error(err, "Failed to get VolumeReplicationClass list")
@@ -1240,12 +1376,6 @@ func (v *VRGInstance) selectVolumeReplicationClass(
 		v.vrcUpdated = true
 	}
 
-	if len(v.replClassList.Items) == 0 {
-		v.log.Info("No VolumeReplicationClass available")
-
-		return nil, fmt.Errorf("no VolumeReplicationClass available")
-	}
-
 	storageClass, err := v.getStorageClass(namespacedName)
 	if err != nil {
 		v.log.Info(fmt.Sprintf("Failed to get the storageclass of pvc %s",
@@ -1276,6 +1406,39 @@ func (v *VRGInstance) selectVolumeReplicationClass(
 		}
 	}
 
+	if !v.vgrcUpdated {
+		if err := v.updateGroupReplicationClassList(); err != nil {
+			v.log.Error(err, "Failed to get VolumeGroupReplicationClass list")
+
+			return nil, fmt.Errorf("failed to get VolumeGroupReplicationClass list")
+		}
+
+		v.vgrcUpdated = true
+	}
+
+	for index := range v.grpReplClassList.Items {
+		grpReplicationClass := &v.grpReplClassList.Items[index]
+		if storageClass.Provisioner != grpReplicationClass.Spec.Provisioner {
+			continue
+		}
+
+		schedulingInterval, found := grpReplicationClass.Spec.Parameters["schedulingInterval"]
+		if !found {
+			v.log.Info("No schedulingInterval in VolumeGroupReplicationClass available")
+
+			// schedule not present in parameters of this replicationClass.
+			continue
+		}
+
+		// ReplicationClass that matches both VRG schedule and pvc provisioner
+		if schedulingInterval == v.instance.Spec.Async.SchedulingInterval {
+			v.log.Info(fmt.Sprintf("Found VolumeGroupReplicationClass that matches provisioner and schedule %s/%s",
+				storageClass.Provisioner, v.instance.Spec.Async.SchedulingInterval))
+
+			return grpReplicationClass, nil
+		}
+	}
+
 	v.log.Info(fmt.Sprintf("No VolumeReplicationClass found to match provisioner and schedule %s/%s",
 		storageClass.Provisioner, v.instance.Spec.Async.SchedulingInterval))
 
@@ -1332,30 +1495,32 @@ func (v *VRGInstance) getStorageClass(namespacedName types.NamespacedName) (*sto
 
 // checkVRStatus checks if the VolumeReplication resource has the desired status for the
 // current generation and returns true if so, false otherwise
-func (v *VRGInstance) checkVRStatus(volRep *volrep.VolumeReplication) bool {
+func (v *VRGInstance) checkVRStatus(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	conditions []metav1.Condition, observedGeneration int64,
+) bool {
 	// When the generation in the status is updated, VRG would get a reconcile
 	// as it owns VolumeReplication resource.
-	if volRep.Generation != volRep.Status.ObservedGeneration {
+	if volRep.GetGeneration() != observedGeneration {
 		v.log.Info(fmt.Sprintf("Generation mismatch in status for VolumeReplication resource (%s/%s)",
-			volRep.Name, volRep.Namespace))
+			volRep.GetName(), volRep.GetNamespace()))
 
 		msg := "VolumeReplication generation not updated in status"
-		v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonProgressing, msg)
+		v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonProgressing, msg)
 
 		return false
 	}
 
 	switch {
 	case v.instance.Spec.ReplicationState == ramendrv1alpha1.Primary:
-		return v.validateVRStatus(volRep, ramendrv1alpha1.Primary)
+		return v.validateVRStatus(pvc, volRep, ramendrv1alpha1.Primary, conditions)
 	case v.instance.Spec.ReplicationState == ramendrv1alpha1.Secondary:
-		return v.validateVRStatus(volRep, ramendrv1alpha1.Secondary)
+		return v.validateVRStatus(pvc, volRep, ramendrv1alpha1.Secondary, conditions)
 	default:
 		v.log.Info(fmt.Sprintf("invalid Replication State %s for VolumeReplicationGroup (%s:%s)",
 			string(v.instance.Spec.ReplicationState), v.instance.Name, v.instance.Namespace))
 
 		msg := "VolumeReplicationGroup state invalid"
-		v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg)
+		v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg)
 
 		return false
 	}
@@ -1366,7 +1531,9 @@ func (v *VRGInstance) checkVRStatus(volRep *volrep.VolumeReplication) bool {
 //   - When replication state is Primary, only Completed condition is checked.
 //   - When replication state is Secondary, all 3 conditions for Completed/Degraded/Resyncing is
 //     checked and ensured healthy.
-func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state ramendrv1alpha1.ReplicationState) bool {
+func (v *VRGInstance) validateVRStatus(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	state ramendrv1alpha1.ReplicationState, conditions []metav1.Condition,
+) bool {
 	var (
 		stateString string
 		action      string
@@ -1382,33 +1549,30 @@ func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state r
 	}
 
 	// it should be completed
-	conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionCompleted, metav1.ConditionTrue)
+	conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionCompleted, metav1.ConditionTrue)
 	if !conditionMet {
 		defaultMsg := fmt.Sprintf("VolumeReplication resource for pvc not %s to %s", action, stateString)
-		v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataReadyConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataProtectedConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace))
+		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace()))
 
 		return false
 	}
 
 	// if primary, all checks are completed
 	if state == ramendrv1alpha1.Secondary {
-		return v.validateAdditionalVRStatusForSecondary(volRep)
+		return v.validateAdditionalVRStatusForSecondary(pvc, volRep, conditions)
 	}
 
 	msg = "PVC in the VolumeReplicationGroup is ready for use"
-	v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReady, msg)
-	v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReady, msg)
-	v.updatePVCLastSyncTime(volRep.Namespace, volRep.Name, volRep.Status.LastSyncTime)
-	v.updatePVCLastSyncDuration(volRep.Namespace, volRep.Name, volRep.Status.LastSyncDuration)
-	v.updatePVCLastSyncBytes(volRep.Namespace, volRep.Name, volRep.Status.LastSyncBytes)
-	v.log.Info(fmt.Sprintf("VolumeReplication resource %s/%s is ready for use", volRep.Name,
-		volRep.Namespace))
+	v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonReady, msg)
+	v.updatePVCDataProtectedCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonReady, msg)
+	v.log.Info(fmt.Sprintf("VolumeReplication resource %s/%s is ready for use", volRep.GetName(),
+		volRep.GetNamespace()))
 
 	return true
 }
@@ -1430,92 +1594,96 @@ func (v *VRGInstance) validateVRStatus(volRep *volrep.VolumeReplication, state r
 // With 2nd condition being met,
 // ProtectedPVC.Conditions[DataReady] = True
 // ProtectedPVC.Conditions[DataProtected] = True
-func (v *VRGInstance) validateAdditionalVRStatusForSecondary(volRep *volrep.VolumeReplication) bool {
-	v.updatePVCLastSyncTime(volRep.Namespace, volRep.Name, nil)
-	v.updatePVCLastSyncDuration(volRep.Namespace, volRep.Name, nil)
-	v.updatePVCLastSyncBytes(volRep.Namespace, volRep.Name, nil)
-
-	conditionMet, _ := isVRConditionMet(volRep, volrepController.ConditionResyncing, metav1.ConditionTrue)
+func (v *VRGInstance) validateAdditionalVRStatusForSecondary(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	conditions []metav1.Condition,
+) bool {
+	v.updatePVCLastSyncTime(pvc.GetNamespace(), pvc.GetName(), nil)
+	v.updatePVCLastSyncDuration(pvc.GetNamespace(), pvc.GetName(), nil)
+	v.updatePVCLastSyncBytes(pvc.GetNamespace(), pvc.GetName(), nil)
+
+	conditionMet, _ := isVRConditionMet(volRep, conditions, volrepController.ConditionResyncing, metav1.ConditionTrue)
 	if !conditionMet {
-		return v.checkResyncCompletionAsSecondary(volRep)
+		return v.checkResyncCompletionAsSecondary(pvc, volRep, conditions)
 	}
 
-	conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionDegraded, metav1.ConditionTrue)
+	conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionDegraded, metav1.ConditionTrue)
 	if !conditionMet {
-		v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataProtectedConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			"VolumeReplication resource for pvc is not in Degraded condition while resyncing")
 
-		v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataReadyConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			"VolumeReplication resource for pvc is not in Degraded condition while resyncing")
 
 		v.log.Info(fmt.Sprintf("VolumeReplication resource is not in degraded condition while"+
-			" resyncing is true (%s/%s)", volRep.Name, volRep.Namespace))
+			" resyncing is true (%s/%s)", volRep.GetName(), volRep.GetNamespace()))
 
 		return false
 	}
 
 	msg = "VolumeReplication resource for the pvc is syncing as Secondary"
-	v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicating, msg)
-	v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicating, msg)
+	v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonReplicating, msg)
+	v.updatePVCDataProtectedCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonReplicating, msg)
 
 	v.log.Info(fmt.Sprintf("VolumeReplication resource for the pvc is syncing as Secondary (%s/%s)",
-		volRep.Name, volRep.Namespace))
+		volRep.GetName(), volRep.GetNamespace()))
 
 	return true
 }
 
 // checkResyncCompletionAsSecondary returns true if resync status is complete as secondary, false otherwise
-func (v *VRGInstance) checkResyncCompletionAsSecondary(volRep *volrep.VolumeReplication) bool {
-	conditionMet, msg := isVRConditionMet(volRep, volrepController.ConditionResyncing, metav1.ConditionFalse)
+func (v *VRGInstance) checkResyncCompletionAsSecondary(pvc *corev1.PersistentVolumeClaim, volRep client.Object,
+	conditions []metav1.Condition,
+) bool {
+	conditionMet, msg := isVRConditionMet(volRep, conditions, volrepController.ConditionResyncing, metav1.ConditionFalse)
 	if !conditionMet {
 		defaultMsg := "VolumeReplication resource for pvc not syncing as Secondary"
-		v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataReadyConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataProtectedConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace))
+		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace()))
 
 		return false
 	}
 
-	conditionMet, msg = isVRConditionMet(volRep, volrepController.ConditionDegraded, metav1.ConditionFalse)
+	conditionMet, msg = isVRConditionMet(volRep, conditions, volrepController.ConditionDegraded, metav1.ConditionFalse)
 	if !conditionMet {
 		defaultMsg := "VolumeReplication resource for pvc is not syncing and is degraded as Secondary"
-		v.updatePVCDataReadyConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataReadyConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.updatePVCDataProtectedConditionHelper(volRep.Namespace, volRep.Name, VRGConditionReasonError, msg,
+		v.updatePVCDataProtectedConditionHelper(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonError, msg,
 			defaultMsg)
 
-		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.Name, volRep.Namespace))
+		v.log.Info(fmt.Sprintf("%s (VolRep: %s/%s)", defaultMsg, volRep.GetName(), volRep.GetNamespace()))
 
 		return false
 	}
 
 	msg = "VolumeReplication resource for the pvc as Secondary is in sync with Primary"
-	v.updatePVCDataReadyCondition(volRep.Namespace, volRep.Name, VRGConditionReasonReplicated, msg)
-	v.updatePVCDataProtectedCondition(volRep.Namespace, volRep.Name, VRGConditionReasonDataProtected, msg)
+	v.updatePVCDataReadyCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonReplicated, msg)
+	v.updatePVCDataProtectedCondition(pvc.GetNamespace(), pvc.GetName(), VRGConditionReasonDataProtected, msg)
 
 	v.log.Info(fmt.Sprintf("data sync completed as both degraded and resyncing are false for"+
-		" secondary VolRep (%s/%s)", volRep.Name, volRep.Namespace))
+		" secondary VolRep (%s/%s)", volRep.GetName(), volRep.GetNamespace()))
 
 	return true
 }
 
-func isVRConditionMet(volRep *volrep.VolumeReplication,
+func isVRConditionMet(volRep client.Object, conditions []metav1.Condition,
 	conditionType string,
 	desiredStatus metav1.ConditionStatus,
 ) (bool, string) {
-	volRepCondition := findCondition(volRep.Status.Conditions, conditionType)
+	volRepCondition := findCondition(conditions, conditionType)
 	if volRepCondition == nil {
 		msg := fmt.Sprintf("Failed to get the %s condition from status of VolumeReplication resource.", conditionType)
 
 		return false, msg
 	}
 
-	if volRep.Generation != volRepCondition.ObservedGeneration {
+	if volRep.GetGeneration() != volRepCondition.ObservedGeneration {
 		msg := fmt.Sprintf("Stale generation for condition %s from status of VolumeReplication resource.", conditionType)
 
 		return false, msg
@@ -1710,8 +1878,8 @@ func setPVCClusterDataProtectedCondition(protectedPVC *ramendrv1alpha1.Protected
 
 // ensureVRDeletedFromAPIServer adds an additional step to ensure that we wait for volumereplication deletion
 // from API server before moving ahead with vrg finalizer removal.
-func (v *VRGInstance) ensureVRDeletedFromAPIServer(vrNamespacedName types.NamespacedName, log logr.Logger) error {
-	volRep := &volrep.VolumeReplication{}
+func (v *VRGInstance) ensureVRDeletedFromAPIServer(volRep client.Object, log logr.Logger) error {
+	vrNamespacedName := types.NamespacedName{Name: volRep.GetName(), Namespace: volRep.GetNamespace()}
 
 	err := v.reconciler.APIReader.Get(v.ctx, vrNamespacedName, volRep)
 	if err == nil {
@@ -1732,27 +1900,111 @@ func (v *VRGInstance) ensureVRDeletedFromAPIServer(vrNamespacedName types.Namesp
 }
 
 // deleteVR deletes a VolumeReplication instance if found
-func (v *VRGInstance) deleteVR(vrNamespacedName types.NamespacedName, log logr.Logger) error {
+func (v *VRGInstance) deleteVR(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
+	volumeReplicationClass, vrTypeErr := v.selectVolumeReplicationClass(pvc)
+	if vrTypeErr != nil {
+		return vrTypeErr
+	}
+
+	switch volumeReplicationClass.(type) {
+	case *volgroup.VolumeGroupReplicationClass:
+		return v.deleteVolumeGroupReplication(pvc, volumeReplicationClass, log)
+	case *volrep.VolumeReplicationClass:
+		return v.deleteVolumeReplication(pvc, log)
+	default:
+		errMsg := fmt.Sprintf("invalid VolumeReplication (%s:%s)", pvc.GetName(), pvc.GetNamespace())
+		v.log.Info(errMsg)
+
+		return fmt.Errorf(errMsg)
+	}
+}
+
+func (v *VRGInstance) deleteVolumeGroupReplication(pvc *corev1.PersistentVolumeClaim,
+	volumeReplicationClass client.Object, log logr.Logger,
+) error {
+	volRep := &volgroup.VolumeGroupReplication{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      volumeReplicationClass.GetName() + "-vgr",
+			Namespace: pvc.Namespace,
+		},
+	}
+
+	vrNamespacedName := types.NamespacedName{Name: volRep.Name, Namespace: volRep.Namespace}
+
+	err := v.reconciler.Get(v.ctx, vrNamespacedName, volRep)
+	if err != nil {
+		if !k8serrors.IsNotFound(err) {
+			log.Error(err, "Failed to delete VolumeGroupReplication resource")
+
+			return fmt.Errorf("failed to delete VolumeGroupReplication resource (%s/%s), %w",
+				vrNamespacedName.Namespace, vrNamespacedName.Name, err)
+		}
+
+		return nil
+	}
+
+	pvcLabelSelector := volRep.Spec.Selector
+
+	v.log.Info("Fetching PersistentVolumeClaims", "pvcSelector", pvcLabelSelector)
+
+	// Found VGR, if there is only 1 PVC protected by it, we can delete
+	pvcList, err := rmnutil.ListPVCsByPVCSelector(v.ctx, v.reconciler.Client, v.log,
+		*pvcLabelSelector,
+		[]string{vrNamespacedName.Namespace},
+		v.instance.Spec.VolSync.Disabled,
+	)
+	if err != nil {
+		return err
+	}
+
+	v.log.Info(fmt.Sprintf("Found %d PVCs using label selector %v", len(pvcList.Items), pvcLabelSelector))
+
+	if len(pvcList.Items) > 1 {
+		log.Error(err, "VolumeGroupReplication resource is in use and cannot be deleted yet")
+
+		return nil
+	}
+
+	selector, err := metav1.LabelSelectorAsSelector(pvcLabelSelector)
+	if err != nil {
+		return err
+	}
+
+	labelMatch := selector.Matches(labels.Set(pvc.GetLabels()))
+	if !labelMatch {
+		v.log.Info(fmt.Sprintf("PVC %s does not match VolumeGroupReplication label selector  %v", pvc.Name, selector))
+
+		return fmt.Errorf("PVC %s does not match VolumeGroupReplication label selector  %v", pvc.Name, selector)
+	}
+
+	return v.deleteReplicationObject(volRep, log)
+}
+
+func (v *VRGInstance) deleteVolumeReplication(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
 	cr := &volrep.VolumeReplication{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:      vrNamespacedName.Name,
-			Namespace: vrNamespacedName.Namespace,
+			Name:      pvc.Name,
+			Namespace: pvc.Namespace,
 		},
 	}
 
+	return v.deleteReplicationObject(cr, log)
+}
+
+func (v *VRGInstance) deleteReplicationObject(cr client.Object, log logr.Logger) error {
 	err := v.reconciler.Delete(v.ctx, cr)
 	if err != nil {
 		if !k8serrors.IsNotFound(err) {
 			log.Error(err, "Failed to delete VolumeReplication resource")
 
 			return fmt.Errorf("failed to delete VolumeReplication resource (%s/%s), %w",
-				vrNamespacedName.Namespace, vrNamespacedName.Name, err)
+				cr.GetNamespace(), cr.GetName(), err)
 		}
 
 		return nil
 	}
 
-	return v.ensureVRDeletedFromAPIServer(vrNamespacedName, log)
+	return v.ensureVRDeletedFromAPIServer(cr, log)
 }
 
 func (v *VRGInstance) addProtectedAnnotationForPVC(pvc *corev1.PersistentVolumeClaim, log logr.Logger) error {
diff --git a/go.mod b/go.mod
index 2dbd43db25..7c998ae973 100644
--- a/go.mod
+++ b/go.mod
@@ -19,6 +19,7 @@ require (
 	github.com/operator-framework/api v0.17.6
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.16.0
+	github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444
 	github.com/ramendr/ramen/api v0.0.0-20240117171503-e11c56eac24d
 	github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932
 	github.com/stolostron/multicloud-operators-foundation v0.0.0-20220824091202-e9cd9710d009
diff --git a/go.sum b/go.sum
index 49e17eb023..8bccb09868 100644
--- a/go.sum
+++ b/go.sum
@@ -277,6 +277,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
 github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
 github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
 github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
+github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444 h1:IetfzsnrKvfbr64hCzV5tfRU9caPRtksxXl1BG7Crfg=
+github.com/rakeshgm/volgroup-shim-operator v0.0.0-20240304154104-19fa9aea1444/go.mod h1:Dt/6XAyRYH+jiCmDD1p/lDEGWYMok5bHcxv8jYhJgFU=
 github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932 h1:n89W9K2gDa0XwdIVuWyg53hPgaR97DfGVi9o2V0WcWA=
 github.com/ramendr/recipe v0.0.0-20230817160432-729dc7fd8932/go.mod h1:QHVQXKgNId8EfvNd+Y6JcTrsXwTImtSFkV4IsiOkwCw=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
diff --git a/main.go b/main.go
index 6897e99b45..c4fed052c0 100644
--- a/main.go
+++ b/main.go
@@ -18,6 +18,7 @@ import (
 	snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
 	clrapiv1beta1 "github.com/open-cluster-management-io/api/cluster/v1beta1"
 	ocmworkv1 "github.com/open-cluster-management/api/work/v1"
+	volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
 	viewv1beta1 "github.com/stolostron/multicloud-operators-foundation/pkg/apis/view/v1beta1"
 	plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1"
 	velero "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -108,6 +109,7 @@ func configureController(ramenConfig *ramendrv1alpha1.RamenConfig) error {
 	} else {
 		utilruntime.Must(velero.AddToScheme(scheme))
 		utilruntime.Must(volrep.AddToScheme(scheme))
+		utilruntime.Must(volgroup.AddToScheme(scheme))
 		utilruntime.Must(volsyncv1alpha1.AddToScheme(scheme))
 		utilruntime.Must(snapv1.AddToScheme(scheme))
 		utilruntime.Must(recipe.AddToScheme(scheme))