Skip to content

Commit

Permalink
Set or update peerClasses for VRG
Browse files Browse the repository at this point in the history
Update works to ensure VRG is updated with peerClasses that it requires,
based on reported PVCs that the VRG is attempting to protect. If a VRG
is attempting to protect a PVC for which is is lacking a peerClass and
that is available as part of the DRPolicy its peerClasses are updated.

For existing peerClasses the VRG information is not updated, this is done
to avoid any protection mechanism conflicts. For example, if a VRG
carried a peerClass without the replicationID (ie it would choose to
protect the PVC using Volsync and VolumeSnapshots), then it is not
updated with a peerClass that NOW supports native VolumeReplication, as
that would void existing protection.

To change replication schemes a workload needs to be DR disabled and then
reenabled to catch up to the latest available peer information for an SC.

Signed-off-by: Shyamsundar Ranganathan <[email protected]>
  • Loading branch information
ShyamsundarR authored and BenamarMk committed Nov 6, 2024
1 parent a6b1547 commit 80ba156
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 27 deletions.
179 changes: 153 additions & 26 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,7 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
d.log.Info("Creating VRG ManifestWork", "ReplicationState", repState,
"Last State:", d.getLastDRState(), "cluster", homeCluster)

newVRG := d.newVRG(homeCluster, repState)
newVRG := d.newVRG(homeCluster, repState, nil)
annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = d.instance.Name
Expand Down Expand Up @@ -1596,11 +1596,133 @@ func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
homeCluster)
}

d.updateVRGOptionalFields(vrg, homeCluster)
d.updateVRGOptionalFields(vrg, d.vrgs[homeCluster], homeCluster)

return d.mwu.UpdateVRGManifestWork(vrg, mw)
}

// hasPeerClass finds a peer in the passed in list of peerClasses and returns true if a peer matches the passed in
// storage class name and represents the cluster in the clusterIDs list
// Also see peerClassMatchesPeer
func hasPeerClass(vrgPeerClasses []rmn.PeerClass, scName string, clusterIDs []string) bool {
for peerClassVRGIdx := range vrgPeerClasses {
if (vrgPeerClasses[peerClassVRGIdx].StorageClassName == scName) &&
(slices.Equal(vrgPeerClasses[peerClassVRGIdx].ClusterIDs, clusterIDs)) {
return true
}
}

return false
}

// updatePeers see updateVRGDRTypeSpec
func updatePeers(
vrgFromView *rmn.VolumeReplicationGroup,
vrgPeerClasses, policyPeerClasses []rmn.PeerClass,
) []rmn.PeerClass {
peerClasses := vrgPeerClasses

for pvcIdx := range vrgFromView.Status.ProtectedPVCs {
for policyPeerClassIdx := range policyPeerClasses {
if policyPeerClasses[policyPeerClassIdx].StorageClassName ==
*vrgFromView.Status.ProtectedPVCs[pvcIdx].StorageClassName {
if hasPeerClass(
vrgPeerClasses,
*vrgFromView.Status.ProtectedPVCs[pvcIdx].StorageClassName,
policyPeerClasses[policyPeerClassIdx].ClusterIDs,
) {
break
}

peerClasses = append(
peerClasses,
policyPeerClasses[policyPeerClassIdx],
)
}
}
}

return peerClasses
}

// updateVRGAsyncSpec see updateVRGDRTypeSpec
func (d *DRPCInstance) updateVRGAsyncSpec(vrgFromView, vrg *rmn.VolumeReplicationGroup) {
// vrg will be updated with final contents of asyncSpec
asyncSpec := d.newVRGSpecAsync()
if len(asyncSpec.PeerClasses) == 0 {
// Retain peerClasses from VRG
if vrg.Spec.Async != nil && len(vrg.Spec.Async.PeerClasses) != 0 {
asyncSpec.PeerClasses = vrg.Spec.Async.PeerClasses
}

vrg.Spec.Async = asyncSpec

return
}

// If there is no async spec in VRG, update it with generated spec
// NOTE: Cannot happen! VRG is type Async and is being updated so Async cannot be nil, this is just safety
if vrg.Spec.Async == nil {
vrg.Spec.Async = asyncSpec

return
}

asyncSpec.PeerClasses = updatePeers(vrgFromView, vrg.Spec.Async.PeerClasses, d.drPolicy.Status.Async.PeerClasses)

// TODO: prune peerClasses not in policy and not in use by VRG

vrg.Spec.Async = asyncSpec
}

// updateVRGSyncSpec see updateVRGDRTypeSpec
func (d *DRPCInstance) updateVRGSyncSpec(vrgFromView, vrg *rmn.VolumeReplicationGroup) {
// vrg will be updated with final contents of syncSpec
syncSpec := d.newVRGSpecSync()
if len(syncSpec.PeerClasses) == 0 {
// Retain peerClasses from VRG
if vrg.Spec.Sync != nil && len(vrg.Spec.Sync.PeerClasses) != 0 {
syncSpec.PeerClasses = vrg.Spec.Sync.PeerClasses
}

vrg.Spec.Sync = syncSpec

return
}

// If there is no sync spec in VRG, update it with generated spec
// NOTE: Cannot happen! VRG is type Sync and is being updated so Sync cannot be nil, this is just safety
if vrg.Spec.Sync == nil {
vrg.Spec.Sync = syncSpec

return
}

syncSpec.PeerClasses = updatePeers(vrgFromView, vrg.Spec.Sync.PeerClasses, d.drPolicy.Status.Sync.PeerClasses)

// TODO: prune peerClasses not in policy and not in use by VRG

vrg.Spec.Sync = syncSpec
}

// updateVRGDRTypeSpec updates VRG Sync/Async spec based on the DR type.
// Update works to ensure VRG is updated with peerClasses that it requires, based on reported PVCs that the VRG is
// attempting to protect. If a VRG is attempting to protect a PVC for which is is lacking a peerClass and that is
// available as part of the DRPolicy its peerClasses are updated. For existing peerClasses the VRG information is
// not updated, this is done to avoid any protection mechanism conflicts. For example, if a VRG carried a peerClass
// without the replicationID (ie it would choose to protect the PVC using Volsync and VolumeSnapshots), then it is not
// updated with a peerClass that NOW supports native VolumeReplication, as that would void existing protection.
// To change replication schemes a workload needs to be DR disabled and then reenabled to catch up to the latest
// available peer information for an SC.
func (d *DRPCInstance) updateVRGDRTypeSpec(vrgFromCluster, generatedVRG *rmn.VolumeReplicationGroup) {
switch d.drType {
case DRTypeSync:
d.updateVRGSyncSpec(vrgFromCluster, generatedVRG)
case DRTypeAsync:
d.updateVRGAsyncSpec(vrgFromCluster, generatedVRG)
}
}

// updateVRGOptionalFields ensures that the optional fields in the VRG object are up to date.
// This function does not modify the following fields:
// - ObjectMeta.Name
Expand All @@ -1613,7 +1735,7 @@ func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
//
// These fields are either set during the initial creation of the VRG (e.g., name and namespace)
// or updated as needed, such as the PrepareForFinalSync and RunFinalSync fields.
func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup, homeCluster string) {
func (d *DRPCInstance) updateVRGOptionalFields(vrg, vrgFromView *rmn.VolumeReplicationGroup, homeCluster string) {
vrg.ObjectMeta.Annotations = map[string]string{
DestinationClusterAnnotationKey: homeCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
Expand All @@ -1624,10 +1746,20 @@ func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup,
vrg.Spec.ProtectedNamespaces = d.instance.Spec.ProtectedNamespaces
vrg.Spec.S3Profiles = AvailableS3Profiles(d.drClusters)
vrg.Spec.KubeObjectProtection = d.instance.Spec.KubeObjectProtection
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
d.setVRGAction(vrg)

// If vrgFromView nil, then vrg is newly generated, Sync/Async spec is updated unconditionally
if vrgFromView == nil {
switch d.drType {
case DRTypeSync:
vrg.Spec.Sync = d.newVRGSpecSync()
case DRTypeAsync:
vrg.Spec.Async = d.newVRGSpecAsync()
}
} else {
d.updateVRGDRTypeSpec(vrgFromView, vrg)
}
}

func (d *DRPCInstance) ensurePlacement(homeCluster string) error {
Expand Down Expand Up @@ -1662,7 +1794,11 @@ func (d *DRPCInstance) setVRGAction(vrg *rmn.VolumeReplicationGroup) {
vrg.Spec.Action = action
}

func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
func (d *DRPCInstance) newVRG(
dstCluster string,
repState rmn.ReplicationState,
vrgFromView *rmn.VolumeReplicationGroup,
) rmn.VolumeReplicationGroup {
vrg := rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1675,34 +1811,25 @@ func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState)
},
}

d.updateVRGOptionalFields(&vrg, dstCluster)
d.updateVRGOptionalFields(&vrg, vrgFromView, dstCluster)

return vrg
}

func (d *DRPCInstance) generateVRGSpecAsync() *rmn.VRGAsyncSpec {
if dRPolicySupportsRegional(d.drPolicy, d.drClusters) {
return &rmn.VRGAsyncSpec{
ReplicationClassSelector: d.drPolicy.Spec.ReplicationClassSelector,
VolumeSnapshotClassSelector: d.drPolicy.Spec.VolumeSnapshotClassSelector,
VolumeGroupSnapshotClassSelector: d.drPolicy.Spec.VolumeGroupSnapshotClassSelector,
SchedulingInterval: d.drPolicy.Spec.SchedulingInterval,
}
func (d *DRPCInstance) newVRGSpecAsync() *rmn.VRGAsyncSpec {
return &rmn.VRGAsyncSpec{
ReplicationClassSelector: d.drPolicy.Spec.ReplicationClassSelector,
VolumeSnapshotClassSelector: d.drPolicy.Spec.VolumeSnapshotClassSelector,
VolumeGroupSnapshotClassSelector: d.drPolicy.Spec.VolumeGroupSnapshotClassSelector,
SchedulingInterval: d.drPolicy.Spec.SchedulingInterval,
PeerClasses: d.drPolicy.Status.Async.PeerClasses,
}

return nil
}

func (d *DRPCInstance) generateVRGSpecSync() *rmn.VRGSyncSpec {
if d.drType == DRTypeSync {
return &rmn.VRGSyncSpec{}
func (d *DRPCInstance) newVRGSpecSync() *rmn.VRGSyncSpec {
return &rmn.VRGSyncSpec{
PeerClasses: d.drPolicy.Status.Sync.PeerClasses,
}

return nil
}

func dRPolicySupportsRegional(drpolicy *rmn.DRPolicy, drClusters []rmn.DRCluster) bool {
return rmnutil.DrpolicyRegionNamesAsASet(drpolicy, drClusters).Len() > 1
}

func dRPolicySupportsMetro(drpolicy *rmn.DRPolicy, drclusters []rmn.DRCluster) (
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/drplacementcontrolvolsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (d *DRPCInstance) createOrUpdateVolSyncDestManifestWork(srcCluster string)
return ctrlutil.OperationResultNone, err
}

// TODO: Update peerClass from source VRG MW (NOT from view)

opResult, err := d.mwu.CreateOrUpdateVRGManifestWork(
d.instance.Name, d.vrgNamespace,
dstCluster, *vrg, annotations)
Expand Down Expand Up @@ -183,7 +185,7 @@ func (d *DRPCInstance) refreshRDSpec(srcCluster, dstCluster string) (*rmn.Volume
return nil, WaitForSourceCluster
}

dstVRG := d.newVRG(dstCluster, rmn.Secondary)
dstVRG := d.newVRG(dstCluster, rmn.Secondary, nil)
d.resetRDSpec(srcVRG, &dstVRG)

return &dstVRG, nil
Expand Down

0 comments on commit 80ba156

Please sign in to comment.