Skip to content

Commit

Permalink
controllers: add controller logic for ProtectedNamespaces
Browse files Browse the repository at this point in the history
Signed-off-by: Raghavendra Talur <[email protected]>
  • Loading branch information
raghavendra-talur authored and ShyamsundarR committed Mar 30, 2024
1 parent 2f4de7c commit cec9ab5
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 74 deletions.
2 changes: 1 addition & 1 deletion controllers/drclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func drClusterUndeploy(

for i := range drpolicies.Items {
drpolicy1 := &drpolicies.Items[i]
clusterNames = clusterNames.Insert(util.DrpolicyClusterNames(drpolicy1)...)
clusterNames = clusterNames.Insert(util.DRPolicyClusterNames(drpolicy1)...)
}

if clusterNames.Has(drcluster.Name) {
Expand Down
13 changes: 7 additions & 6 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {

failedCount := 0

for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
_, err := d.updateVRGState(clusterName, rmn.Secondary)
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -1331,7 +1331,7 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
}

func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if skipCluster == clusterName {
continue
}
Expand Down Expand Up @@ -1527,6 +1527,7 @@ func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationSt
},
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ProtectedNamespaces: d.instance.Spec.ProtectedNamespaces,
ReplicationState: repState,
S3Profiles: AvailableS3Profiles(d.drClusters),
KubeObjectProtection: d.instance.Spec.KubeObjectProtection,
Expand Down Expand Up @@ -1571,7 +1572,7 @@ func dRPolicySupportsMetro(drpolicy *rmn.DRPolicy, drclusters []rmn.DRCluster) (
allRegionsMap := make(map[rmn.Region][]string)
metroMap = make(map[rmn.Region][]string)

for _, managedCluster := range rmnutil.DrpolicyClusterNames(drpolicy) {
for _, managedCluster := range rmnutil.DRPolicyClusterNames(drpolicy) {
for _, v := range drclusters {
if v.Name == managedCluster {
allRegionsMap[v.Spec.Region] = append(
Expand Down Expand Up @@ -1729,7 +1730,7 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {

peersReady := true

for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if clusterToSkip == clusterName {
continue
}
Expand Down Expand Up @@ -1816,7 +1817,7 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string)
// a cluster if provided. It returns true if all clusters report secondary for the VRG,
// otherwise, it returns false
func (d *DRPCInstance) ensureVRGIsSecondaryEverywhere(clusterToSkip string) bool {
for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if clusterToSkip == clusterName {
continue
}
Expand Down Expand Up @@ -1874,7 +1875,7 @@ func (d *DRPCInstance) ensureVRGIsSecondaryOnCluster(clusterName string) bool {
// has to be ensured. This can only be done at the other cluster which has been moved to
// secondary by now.
func (d *DRPCInstance) ensureDataProtected(targetCluster string) bool {
for _, clusterName := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if targetCluster == clusterName {
continue
}
Expand Down
173 changes: 168 additions & 5 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"reflect"
"time"

"golang.org/x/exp/slices"

"github.com/go-logr/logr"
"github.com/google/uuid"
ocmworkv1 "github.com/open-cluster-management/api/work/v1"
Expand Down Expand Up @@ -745,6 +747,20 @@ func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{}, nil
}

err = ensureDRPCValidNamespace(drpc, ramenConfig)
if err != nil {
r.recordFailure(ctx, drpc, placementObj, "Error", err.Error(), logger)

return ctrl.Result{}, err
}

err = r.ensureNoConflictingDRPCs(ctx, drpc, ramenConfig, logger)
if err != nil {
r.recordFailure(ctx, drpc, placementObj, "Error", err.Error(), logger)

return ctrl.Result{}, err
}

drPolicy, err := r.getAndEnsureValidDRPolicy(ctx, drpc, logger)
if err != nil {
r.recordFailure(ctx, drpc, placementObj, "Error", err.Error(), logger)
Expand Down Expand Up @@ -1090,7 +1106,7 @@ func (r DRPlacementControlReconciler) updateObjectMetadata(ctx context.Context,
func getDRClusters(ctx context.Context, client client.Client, drPolicy *rmn.DRPolicy) ([]rmn.DRCluster, error) {
drClusters := []rmn.DRCluster{}

for _, managedCluster := range rmnutil.DrpolicyClusterNames(drPolicy) {
for _, managedCluster := range rmnutil.DRPolicyClusterNames(drPolicy) {
drCluster := &rmn.DRCluster{}

err := client.Get(ctx, types.NamespacedName{Name: managedCluster}, drCluster)
Expand Down Expand Up @@ -1199,7 +1215,7 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
}

// delete manifestworks (VRGs)
for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) {
for _, drClusterName := range rmnutil.DRPolicyClusterNames(drPolicy) {
err := mwu.DeleteManifestWorksForCluster(drClusterName)
if err != nil {
return fmt.Errorf("%w", err)
Expand All @@ -1211,7 +1227,7 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
}

// delete MCVs used in the previous call
if err := r.deleteAllManagedClusterViews(drpc, rmnutil.DrpolicyClusterNames(drPolicy)); err != nil {
if err := r.deleteAllManagedClusterViews(drpc, rmnutil.DRPolicyClusterNames(drPolicy)); err != nil {
return fmt.Errorf("error in deleting MCV (%w)", err)
}

Expand Down Expand Up @@ -1640,11 +1656,11 @@ func (r *DRPlacementControlReconciler) deleteClonedPlacementRule(ctx context.Con
func (r *DRPlacementControlReconciler) addClusterPeersToPlacementRule(
drPolicy *rmn.DRPolicy, plRule *plrv1.PlacementRule, log logr.Logger,
) error {
if len(rmnutil.DrpolicyClusterNames(drPolicy)) == 0 {
if len(rmnutil.DRPolicyClusterNames(drPolicy)) == 0 {
return fmt.Errorf("DRPolicy %s is missing DR clusters", drPolicy.Name)
}

for _, v := range rmnutil.DrpolicyClusterNames(drPolicy) {
for _, v := range rmnutil.DRPolicyClusterNames(drPolicy) {
plRule.Spec.Clusters = append(plRule.Spec.Clusters, plrv1.GenericClusterReference{Name: v})
}

Expand Down Expand Up @@ -2650,3 +2666,150 @@ func constructVRGFromView(viewVRG *rmn.VolumeReplicationGroup) *rmn.VolumeReplic

return vrg
}

func ensureDRPCValidNamespace(drpc *rmn.DRPlacementControl, ramenConfig *rmn.RamenConfig) error {
if drpcInAdminNamespace(drpc, ramenConfig) {
if !ramenConfig.MultiNamespace.FeatureEnabled {
return fmt.Errorf("drpc cannot be in admin namespace when multinamespace feature is disabled")
}

if drpc.Spec.ProtectedNamespaces == nil || len(*drpc.Spec.ProtectedNamespaces) == 0 {
return fmt.Errorf("drpc in admin namespace must have protected namespaces")
}

adminNamespace := drpcAdminNamespaceName(*ramenConfig)
if slices.Contains(*drpc.Spec.ProtectedNamespaces, adminNamespace) {
return fmt.Errorf("admin namespace cannot be a protected namespace, admin namespace: %s", adminNamespace)
}

return nil
}

if drpc.Spec.ProtectedNamespaces != nil && len(*drpc.Spec.ProtectedNamespaces) > 0 {
adminNamespace := drpcAdminNamespaceName(*ramenConfig)

return fmt.Errorf("drpc in non-admin namespace(%v) cannot have protected namespaces, admin-namespaces: %v",
drpc.Namespace, adminNamespace)
}

return nil
}

func drpcsProtectCommonNamespace(drpcProtectedNs []string, otherDRPCProtectedNs []string) bool {
for _, ns := range drpcProtectedNs {
if slices.Contains(otherDRPCProtectedNs, ns) {
return true
}
}

return false
}

func (r *DRPlacementControlReconciler) getProtectedNamespaces(drpc *rmn.DRPlacementControl,
log logr.Logger,
) ([]string, error) {
if drpc.Spec.ProtectedNamespaces != nil && len(*drpc.Spec.ProtectedNamespaces) > 0 {
return *drpc.Spec.ProtectedNamespaces, nil
}

placementObj, err := getPlacementOrPlacementRule(context.TODO(), r.Client, drpc, log)
if err != nil {
return []string{}, err
}

protectedNamespace, err := selectVRGNamespace(r.Client, log, drpc, placementObj)
if err != nil {
return []string{}, err
}

return []string{protectedNamespace}, nil
}

func (r *DRPlacementControlReconciler) ensureNoConflictingDRPCs(ctx context.Context,
drpc *rmn.DRPlacementControl, ramenConfig *rmn.RamenConfig, log logr.Logger,
) error {
drpcList := &rmn.DRPlacementControlList{}
if err := r.Client.List(ctx, drpcList); err != nil {
return fmt.Errorf("failed to list DRPlacementControls (%w)", err)
}

for i := range drpcList.Items {
otherDRPC := &drpcList.Items[i]

// Skip the drpc itself
if otherDRPC.Name == drpc.Name && otherDRPC.Namespace == drpc.Namespace {
continue
}

if err := r.twoDRPCsConflict(ctx, drpc, otherDRPC, ramenConfig, log); err != nil {
return err
}
}

return nil
}

func (r *DRPlacementControlReconciler) twoDRPCsConflict(ctx context.Context,
drpc *rmn.DRPlacementControl, otherDRPC *rmn.DRPlacementControl, ramenConfig *rmn.RamenConfig, log logr.Logger,
) error {
drpcIsInAdminNamespace := drpcInAdminNamespace(drpc, ramenConfig)
otherDRPCIsInAdminNamespace := drpcInAdminNamespace(otherDRPC, ramenConfig)

// we don't check for conflicts between drpcs in non-admin namespace
if !drpcIsInAdminNamespace && !otherDRPCIsInAdminNamespace {
return nil
}

// If the drpcs don't have common clusters, they definitely don't conflict
common, err := r.drpcHaveCommonClusters(ctx, drpc, otherDRPC, log)
if err != nil {
return fmt.Errorf("failed to check if drpcs have common clusters (%w)", err)
}

if !common {
return nil
}

drpcProtectedNamespaces, err := r.getProtectedNamespaces(drpc, log)
if err != nil {
return fmt.Errorf("failed to get protected namespaces for drpc: %v, %w", drpc.Name, err)
}

otherDRPCProtectedNamespaces, err := r.getProtectedNamespaces(otherDRPC, log)
if err != nil {
return fmt.Errorf("failed to get protected namespaces for drpc: %v, %w", otherDRPC.Name, err)
}

conflict := drpcsProtectCommonNamespace(drpcProtectedNamespaces, otherDRPCProtectedNamespaces)
if conflict {
return fmt.Errorf("drpc: %s and drpc: %s protect the same namespace",
drpc.Name, otherDRPC.Name)
}

return nil
}

func drpcInAdminNamespace(drpc *rmn.DRPlacementControl, ramenConfig *rmn.RamenConfig) bool {
adminNamespace := drpcAdminNamespaceName(*ramenConfig)

return adminNamespace == drpc.Namespace
}

func (r *DRPlacementControlReconciler) drpcHaveCommonClusters(ctx context.Context,
drpc, otherDRPC *rmn.DRPlacementControl, log logr.Logger,
) (bool, error) {
drpolicy, err := r.getDRPolicy(ctx, drpc, log)
if err != nil {
return false, fmt.Errorf("failed to get DRPolicy %w", err)
}

otherDrpolicy, err := r.getDRPolicy(ctx, otherDRPC, log)
if err != nil {
return false, fmt.Errorf("failed to get DRPolicy %w", err)
}

drpolicyClusters := rmnutil.DRPolicyClusterNamesAsASet(drpolicy)
otherDrpolicyClusters := rmnutil.DRPolicyClusterNamesAsASet(otherDrpolicy)

return drpolicyClusters.Intersection(otherDrpolicyClusters).Len() > 0, nil
}
4 changes: 2 additions & 2 deletions controllers/drplacementcontrolvolsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (d *DRPCInstance) ensureVolSyncReplicationCommon(srcCluster string) error {
// Make sure we have Source and Destination VRGs - Source should already have been created at this point
d.setProgression(rmn.ProgressionEnsuringVolSyncSetup)

vrgMWCount := d.mwu.GetVRGManifestWorkCount(rmnutil.DrpolicyClusterNames(d.drPolicy))
vrgMWCount := d.mwu.GetVRGManifestWorkCount(rmnutil.DRPolicyClusterNames(d.drPolicy))

const maxNumberOfVRGs = 2
if len(d.vrgs) != maxNumberOfVRGs || vrgMWCount != maxNumberOfVRGs {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (d *DRPCInstance) createVolSyncDestManifestWork(clusterToSkip string) error
"Last State:", d.getLastDRState(), "homeCluster", clusterToSkip)

// Create or update ManifestWork for all the peers
for _, dstCluster := range rmnutil.DrpolicyClusterNames(d.drPolicy) {
for _, dstCluster := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if dstCluster == clusterToSkip {
// skip source cluster
continue
Expand Down
8 changes: 4 additions & 4 deletions controllers/drpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func propagateS3Secret(
drClustersMutex.Lock()
defer drClustersMutex.Unlock()

for _, clusterName := range util.DrpolicyClusterNames(drpolicy) {
for _, clusterName := range util.DRPolicyClusterNames(drpolicy) {
if err := drClusterSecretsDeploy(clusterName, drpolicy, drclusters, secretsUtil,
hubOperatorRamenConfig, log); err != nil {
return err
Expand Down Expand Up @@ -107,7 +107,7 @@ func drClustersUndeploySecrets(
mustHaveS3Secrets := map[string]sets.String{}

// Determine S3 secrets that must continue to exist per cluster in the policy being deleted
for _, clusterName := range util.DrpolicyClusterNames(drpolicy) {
for _, clusterName := range util.DRPolicyClusterNames(drpolicy) {
mustHaveS3Secrets[clusterName] = drClusterListMustHaveSecrets(drpolicies, drclusters, clusterName,
drpolicy, ramenConfig)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func drClusterListMustHaveS3Profiles(drpolicies rmn.DRPolicyList,
continue
}

for _, cluster := range util.DrpolicyClusterNames(&drpolicies.Items[idx]) {
for _, cluster := range util.DRPolicyClusterNames(&drpolicies.Items[idx]) {
// Skip if not the current cluster
if cluster != clusterName {
continue
Expand All @@ -199,7 +199,7 @@ func drPolicySecretNames(drpolicy *rmn.DRPolicy,

var err error

for _, managedCluster := range util.DrpolicyClusterNames(drpolicy) {
for _, managedCluster := range util.DRPolicyClusterNames(drpolicy) {
mcProfileFound := false

s3ProfileName := ""
Expand Down
4 changes: 2 additions & 2 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ func hasConflictingDRPolicy(match *ramen.DRPolicy, drclusters *ramen.DRClusterLi
}

func haveOverlappingMetroZones(d1 *ramen.DRPolicy, d2 *ramen.DRPolicy, drclusters *ramen.DRClusterList) bool {
d1ClusterNames := sets.NewString(util.DrpolicyClusterNames(d1)...)
d1ClusterNames := sets.NewString(util.DRPolicyClusterNames(d1)...)
d1SupportsMetro, d1MetroRegions := dRPolicySupportsMetro(d1, drclusters.Items)
d2ClusterNames := sets.NewString(util.DrpolicyClusterNames(d2)...)
d2ClusterNames := sets.NewString(util.DRPolicyClusterNames(d2)...)
d2SupportsMetro, d2MetroRegions := dRPolicySupportsMetro(d2, drclusters.Items)
commonClusters := d1ClusterNames.Intersection(d2ClusterNames)

Expand Down
3 changes: 2 additions & 1 deletion controllers/kubeobjects/velero/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ func getBackupSpecFromObjectsSpec(objectsSpec kubeobjects.Spec) velero.BackupSpe
IncludedNamespaces: objectsSpec.IncludedNamespaces,
IncludedResources: objectsSpec.IncludedResources,
// exclude VRs from Backup so VRG can create them: see https://github.com/RamenDR/ramen/issues/884
ExcludedResources: append(objectsSpec.ExcludedResources, "volumereplications.replication.storage.openshift.io"),
ExcludedResources: append(objectsSpec.ExcludedResources, "volumereplications.replication.storage.openshift.io",
"replicationsources.volsync.backube", "replicationdestinations.volsync.backube"),
LabelSelector: objectsSpec.LabelSelector,
OrLabelSelectors: objectsSpec.OrLabelSelectors,
TTL: metav1.Duration{}, // TODO: set default here
Expand Down
12 changes: 8 additions & 4 deletions controllers/util/drpolicy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (
rmn "github.com/ramendr/ramen/api/v1alpha1"
)

func DrpolicyClusterNames(drpolicy *rmn.DRPolicy) []string {
func DRPolicyClusterNames(drpolicy *rmn.DRPolicy) []string {
return drpolicy.Spec.DRClusters
}

func DRPolicyClusterNamesAsASet(drpolicy *rmn.DRPolicy) sets.String {
return sets.NewString(DRPolicyClusterNames(drpolicy)...)
}

func DrpolicyRegionNames(drpolicy *rmn.DRPolicy, drClusters []rmn.DRCluster) []string {
regionNames := make([]string, len(DrpolicyClusterNames(drpolicy)))
regionNames := make([]string, len(DRPolicyClusterNames(drpolicy)))

for i, v := range DrpolicyClusterNames(drpolicy) {
for i, v := range DRPolicyClusterNames(drpolicy) {
regionName := ""

for _, drCluster := range drClusters {
Expand Down Expand Up @@ -71,7 +75,7 @@ func GetAllDRPolicies(ctx context.Context, client client.Reader) (rmn.DRPolicyLi
func DRPolicyS3Profiles(drpolicy *rmn.DRPolicy, drclusters []rmn.DRCluster) sets.String {
mustHaveS3Profiles := sets.String{}

for _, managedCluster := range DrpolicyClusterNames(drpolicy) {
for _, managedCluster := range DRPolicyClusterNames(drpolicy) {
s3ProfileName := ""

for i := range drclusters {
Expand Down
Loading

0 comments on commit cec9ab5

Please sign in to comment.