Skip to content

Commit

Permalink
Add VolumeGroupReplication support
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Gershkovich <[email protected]>
  • Loading branch information
ELENAGER committed Jun 13, 2024
1 parent d3b1b8c commit b960f8d
Show file tree
Hide file tree
Showing 5 changed files with 444 additions and 136 deletions.
53 changes: 51 additions & 2 deletions controllers/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

volrep "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
"github.com/google/uuid"
volgroup "github.com/rakeshgm/volgroup-shim-operator/api/v1alpha1"
"github.com/ramendr/ramen/controllers/kubeobjects"
"github.com/ramendr/ramen/controllers/kubeobjects/velero"
"golang.org/x/exp/maps" // TODO replace with "maps" in go1.21+
Expand Down Expand Up @@ -97,7 +98,8 @@ func (r *VolumeReplicationGroupReconciler) SetupWithManager(
builder.WithPredicates(rmnutil.CreateOrDeleteOrResourceVersionUpdatePredicate{}),
).
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.configMapFun)).
Owns(&volrep.VolumeReplication{})
Owns(&volrep.VolumeReplication{}).
Owns(&volgroup.VolumeGroupReplication{})

if !ramenConfig.VolSync.Disabled {
r.Log.Info("VolSync enabled; adding owns and watches")
Expand Down Expand Up @@ -402,6 +404,7 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct
volRepPVCs: []corev1.PersistentVolumeClaim{},
volSyncPVCs: []corev1.PersistentVolumeClaim{},
replClassList: &volrep.VolumeReplicationClassList{},
grpReplClassList: &volgroup.VolumeGroupReplicationClassList{},
namespacedName: req.NamespacedName.String(),
objectStorers: make(map[string]cachedObjectStorer),
storageClassCache: make(map[string]*storagev1.StorageClass),
Expand Down Expand Up @@ -466,10 +469,12 @@ type VRGInstance struct {
volRepPVCs []corev1.PersistentVolumeClaim
volSyncPVCs []corev1.PersistentVolumeClaim
replClassList *volrep.VolumeReplicationClassList
grpReplClassList *volgroup.VolumeGroupReplicationClassList
storageClassCache map[string]*storagev1.StorageClass
vrgObjectProtected *metav1.Condition
kubeObjectsProtected *metav1.Condition
vrcUpdated bool
vgrcUpdated bool
namespacedName string
volSyncHandler *volsync.VSHandler
objectStorers map[string]cachedObjectStorer
Expand Down Expand Up @@ -645,6 +650,8 @@ func (v *VRGInstance) listPVCsByPVCSelector(labelSelector metav1.LabelSelector,
}

// updatePVCList fetches and updates the PVC list to process for the current instance of VRG
//
//nolint:cyclop
func (v *VRGInstance) updatePVCList() error {
pvcList, err := v.listPVCsByVrgPVCSelector()
if err != nil {
Expand All @@ -670,6 +677,16 @@ func (v *VRGInstance) updatePVCList() error {
v.vrcUpdated = true
}

if !v.vgrcUpdated {
if err := v.updateGroupReplicationClassList(); err != nil {
v.log.Error(err, "Failed to get VolumeGroupReplicationClass list")

return fmt.Errorf("failed to get VolumeGroupReplicationClass list")
}

v.vgrcUpdated = true
}

if rmnutil.ResourceIsDeleted(v.instance) {
v.separatePVCsUsingVRGStatus(pvcList)
v.log.Info(fmt.Sprintf("Separated PVCs (%d) into VolRepPVCs (%d) and VolSyncPVCs (%d)",
Expand All @@ -678,7 +695,7 @@ func (v *VRGInstance) updatePVCList() error {
return nil
}

if len(v.replClassList.Items) == 0 {
if len(v.replClassList.Items) == 0 && len(v.grpReplClassList.Items) == 0 {
v.volSyncPVCs = make([]corev1.PersistentVolumeClaim, len(pvcList.Items))
numCopied := copy(v.volSyncPVCs, pvcList.Items)
v.log.Info("No VolumeReplicationClass available. Using all PVCs with VolSync", "pvcCount", numCopied)
Expand Down Expand Up @@ -710,6 +727,26 @@ func (v *VRGInstance) updateReplicationClassList() error {
return nil
}

func (v *VRGInstance) updateGroupReplicationClassList() error {
labelSelector := v.instance.Spec.Async.ReplicationClassSelector

v.log.Info("Fetching VolumeGroupReplicationClass", "labeled", labels.Set(labelSelector.MatchLabels))
listOptions := []client.ListOption{
client.MatchingLabels(labelSelector.MatchLabels),
}

if err := v.reconciler.List(v.ctx, v.grpReplClassList, listOptions...); err != nil {
v.log.Error(err, "Failed to list Group Replication Classes",
"labeled", labels.Set(labelSelector.MatchLabels))

return fmt.Errorf("failed to list Group Replication Classes, %w", err)
}

v.log.Info("Number of Group Replication Classes", "count", len(v.grpReplClassList.Items))

return nil
}

func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolumeClaimList) {
for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]
Expand All @@ -726,6 +763,7 @@ func (v *VRGInstance) separatePVCsUsingVRGStatus(pvcList *corev1.PersistentVolum
}
}

//nolint:gocognit,cyclop
func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.PersistentVolumeClaimList) error {
for idx := range pvcList.Items {
pvc := &pvcList.Items[idx]
Expand Down Expand Up @@ -753,6 +791,17 @@ func (v *VRGInstance) separatePVCsUsingStorageClassProvisioner(pvcList *corev1.P
}
}

if !replicationClassMatchFound {
for _, replicationClass := range v.grpReplClassList.Items {
if storageClass.Provisioner == replicationClass.Spec.Provisioner {
v.volRepPVCs = append(v.volRepPVCs, *pvc)
replicationClassMatchFound = true

break
}
}
}

if !replicationClassMatchFound {
v.volSyncPVCs = append(v.volSyncPVCs, *pvc)
}
Expand Down
Loading

0 comments on commit b960f8d

Please sign in to comment.