Skip to content

Commit

Permalink
Add VolumeGroupReplication support
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Gershkovich <[email protected]>
  • Loading branch information
ELENAGER committed Apr 8, 2024
1 parent 9d58e61 commit 04b8580
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 134 deletions.
20 changes: 20 additions & 0 deletions config/dr-cluster/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ rules:
- get
- list
- watch
- apiGroups:
- cache.storage.ramendr.io
resources:
- volumegroupreplicationclasses
verbs:
- get
- list
- watch
- apiGroups:
- cache.storage.ramendr.io
resources:
- volumegroupreplications
verbs:
- create
- update
- delete
- get
- list
- watch
- patch
- apiGroups:
- storage.k8s.io
resources:
Expand Down
20 changes: 20 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ rules:
- get
- list
- watch
- apiGroups:
- cache.storage.ramendr.io
resources:
- volumegroupreplicationclasses
verbs:
- get
- list
- watch
- apiGroups:
- cache.storage.ramendr.io
resources:
- volumegroupreplications
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- cluster.open-cluster-management.io
resources:
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
ocmclv1 "github.com/open-cluster-management/api/cluster/v1"
ocmworkv1 "github.com/open-cluster-management/api/work/v1"
volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
viewv1beta1 "github.com/stolostron/multicloud-operators-foundation/pkg/apis/view/v1beta1"
plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1"
cpcv1 "open-cluster-management.io/config-policy-controller/api/v1"
Expand Down Expand Up @@ -199,6 +200,9 @@ var _ = BeforeSuite(func() {
err = volrep.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = volgroup.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = volsyncv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

Expand Down
58 changes: 55 additions & 3 deletions controllers/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
"github.com/google/uuid"
volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
"github.com/ramendr/ramen/controllers/kubeobjects"
"github.com/ramendr/ramen/controllers/kubeobjects/velero"
"golang.org/x/exp/maps" // TODO replace with "maps" in go1.21+
Expand Down Expand Up @@ -92,7 +93,8 @@ func (r *VolumeReplicationGroupReconciler) SetupWithManager(
builder.WithPredicates(rmnutil.CreateOrDeleteOrResourceVersionUpdatePredicate{}),
).
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.configMapFun)).
Owns(&volrep.VolumeReplication{})
Owns(&volrep.VolumeReplication{}).
Owns(&volgroup.VolumeGroupReplication{})

if !ramenConfig.VolSync.Disabled {
r.Log.Info("VolSync enabled; adding owns and watches")
Expand Down Expand Up @@ -358,6 +360,8 @@ func filterPVC(reader client.Reader, pvc *corev1.PersistentVolumeClaim, log logr
// +kubebuilder:rbac:groups=ramendr.openshift.io,resources=volumereplicationgroups/finalizers,verbs=update
// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplicationclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.storage.ramendr.io,resources=volumegroupreplicationclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch;create;update
// +kubebuilder:rbac:groups=storage.k8s.io,resources=volumeattachments,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
Expand Down Expand Up @@ -397,6 +401,7 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct
volRepPVCs: []corev1.PersistentVolumeClaim{},
volSyncPVCs: []corev1.PersistentVolumeClaim{},
replClassList: &volrep.VolumeReplicationClassList{},
grpReplClassList: &volgroup.VolumeGroupReplicationClassList{},
namespacedName: req.NamespacedName.String(),
objectStorers: make(map[string]cachedObjectStorer),
storageClassCache: make(map[string]*storagev1.StorageClass),
Expand Down Expand Up @@ -459,10 +464,12 @@ type VRGInstance struct {
volRepPVCs []corev1.PersistentVolumeClaim
volSyncPVCs []corev1.PersistentVolumeClaim
replClassList *volrep.VolumeReplicationClassList
grpReplClassList *volgroup.VolumeGroupReplicationClassList
storageClassCache map[string]*storagev1.StorageClass
vrgObjectProtected *metav1.Condition
kubeObjectsProtected *metav1.Condition
vrcUpdated bool
vgrcUpdated bool
namespacedName string
volSyncHandler *volsync.VSHandler
objectStorers map[string]cachedObjectStorer
Expand Down Expand Up @@ -638,6 +645,8 @@ func (v *VRGInstance) listPVCsByPVCSelector(labelSelector metav1.LabelSelector,
}

// updatePVCList fetches and updates the PVC list to process for the current instance of VRG
//
//nolint:cyclop
func (v *VRGInstance) updatePVCList() error {
pvcList, err := v.listPVCsByVrgPVCSelector()
if err != nil {
Expand All @@ -663,6 +672,16 @@ func (v *VRGInstance) updatePVCList() error {
v.vrcUpdated = true
}

if !v.vgrcUpdated {
if err := v.updateGroupReplicationClassList(); err != nil {
v.log.Error(err, "Failed to get VolumeGroupReplicationClass list")

return fmt.Errorf("failed to get VolumeGroupReplicationClass list")
}

v.vgrcUpdated = true
}

if rmnutil.ResourceIsDeleted(v.instance) {
v.separatePVCsUsingVRGStatus(pvcList)
v.log.Info(fmt.Sprintf("Separated PVCs (%d) into VolRepPVCs (%d) and VolSyncPVCs (%d)",
Expand All @@ -671,10 +690,11 @@ func (v *VRGInstance) updatePVCList() error {
return nil
}

if len(v.replClassList.Items) == 0 {
if len(v.replClassList.Items) == 0 && len(v.grpReplClassList.Items) == 0 {
v.volSyncPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items))
numCopied := copy(v.volSyncPVCs, pvcList.Items)
v.log.Info("No VolumeReplicationClass available. Using all PVCs with VolSync", "pvcCount", numCopied)
v.log.Info("No VolumeReplicationClass or VolumeGroupReplicationClass available. Using all PVCs with VolSync",
"pvcCount", numCopied)

return nil
}
Expand Down Expand Up @@ -703,6 +723,26 @@ func (v *VRGInstance) updateReplicationClassList() error {
return nil
}

func (v *VRGInstance) updateGroupReplicationClassList() error {
labelSelector := v.instance.Spec.Async.ReplicationClassSelector

v.log.Info("Fetching VolumeGroupReplicationClass", "labeled", labels.Set(labelSelector.MatchLabels))
listOptions := []client.ListOption{
client.MatchingLabels(labelSelector.MatchLabels),
}

if err := v.reconciler.List(v.ctx, v.grpReplClassList, listOptions...); err != nil {
v.log.Error(err, "Failed to list Group Replication Classes",
"labeled", labels.Set(labelSelector.MatchLabels))

return fmt.Errorf("failed to list Group Replication Classes, %w", err)
}

v.log.Info("Number of Group Replication Classes", "count", len(v.grpReplClassList.Items))

return nil
}

func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolumeClaimList) {
for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]
Expand All @@ -719,6 +759,7 @@ func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolum
}
}

//nolint:gocognit,cyclop
func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.PersistentVolumeClaimList) error {
for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]
Expand Down Expand Up @@ -746,6 +787,17 @@ func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.P
}
}

if !replicationClassMatchFound {
for _, replicationClass := range v.grpReplClassList.Items {
if storageClass.Provisioner == replicationClass.Spec.Provisioner {
v.volRepPVCs = append(v.volRepPVCs, *pvc)
replicationClassMatchFound = true

break
}
}
}

if !replicationClassMatchFound {
v.volSyncPVCs = append(v.volSyncPVCs, *pvc)
}
Expand Down
Loading

0 comments on commit 04b8580

Please sign in to comment.