diff --git a/pkg/operator/ceph/cluster/watcher.go b/pkg/operator/ceph/cluster/watcher.go index 25edc67d074d..1fbeb1bfe2d8 100644 --- a/pkg/operator/ceph/cluster/watcher.go +++ b/pkg/operator/ceph/cluster/watcher.go @@ -53,6 +53,12 @@ type clientCluster struct { var nodesCheckedForReconcile = sets.New[string]() +// drivers that supports fencing, used in naming networkFence object +const ( + rbdDriver = "rbd" + cephfsDriver = "cephfs" +) + func newClientCluster(client client.Client, namespace string, context *clusterd.Context) *clientCluster { return &clientCluster{ client: client, @@ -185,9 +191,14 @@ func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.C return nil } - err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster) + err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, rbdDriver) if err != nil { - return pkgerror.Wrapf(err, "failed to delete network fence for node %q.", node.Name) + return pkgerror.Wrapf(err, "failed to delete rbd network fence for node %q.", node.Name) + } + + err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, cephfsDriver) + if err != nil { + return pkgerror.Wrapf(err, "failed to delete cephFS network fence for node %q.", node.Name) } return nil @@ -343,7 +354,7 @@ func listRWOCephFSPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephC continue } - if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" || pv.Spec.CSI.VolumeAttributes["pool"] == "" { + if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" { logger.Debugf("skipping, static pv %q", pv.Name) continue } @@ -388,7 +399,7 @@ func (c *clientCluster) fenceRbdImage( return pkgerror.Wrapf(err, "failed to unmarshal rbd status output") } if len(ips) != 0 { - err = c.createNetworkFence(ctx, rbdPV, node, cluster, ips) + err = c.createNetworkFence(ctx, rbdPV, node, cluster, ips, rbdDriver) if err != nil { return pkgerror.Wrapf(err, "failed to create network fence for node %q", node.Name) } @@ -428,7 +439,7 @@ func (c *clientCluster) fenceCephFSVolume( return fmt.Errorf("failed to unmarshal cephfs mds output. %v", err) } - err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips) + err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips, cephfsDriver) if err != nil { return fmt.Errorf("failed to create network fence for node %q. %v", node.Name, err) } @@ -500,7 +511,11 @@ func concatenateWatcherIp(address string) string { return watcherIP } -func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.PersistentVolume, node *corev1.Node, cluster *cephv1.CephCluster, cidr []string) error { +func fenceResourceName(nodeName, driver string) string { + return fmt.Sprintf("%s-%s", nodeName, driver) +} + +func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.PersistentVolume, node *corev1.Node, cluster *cephv1.CephCluster, cidr []string, driver string) error { logger.Warningf("Blocking node IP %s", cidr) secretName := pv.Annotations["volume.kubernetes.io/provisioner-deletion-secret-name"] @@ -516,7 +531,7 @@ func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.Persis networkFence := &addonsv1alpha1.NetworkFence{ ObjectMeta: metav1.ObjectMeta{ - Name: node.Name, + Name: fenceResourceName(node.Name, driver), Namespace: cluster.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(cluster, cephv1.SchemeGroupVersion.WithKind("CephCluster")), @@ -546,9 +561,9 @@ func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.Persis return nil } -func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node corev1.Node, cluster *cephv1.CephCluster) error { +func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node corev1.Node, cluster *cephv1.CephCluster, driver string) error { networkFence := &addonsv1alpha1.NetworkFence{} - err := c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cluster.Namespace}, networkFence) + err := c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, driver), Namespace: cluster.Namespace}, networkFence) if err != nil && !errors.IsNotFound(err) { return err } else if errors.IsNotFound(err) { @@ -565,7 +580,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c } err = wait.PollUntilContextTimeout(ctx, 2*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { - err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cluster.Namespace}, networkFence) + err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, driver), Namespace: cluster.Namespace}, networkFence) if err != nil && !errors.IsNotFound(err) { return false, err } @@ -575,7 +590,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c return false, err } - logger.Infof("successfully unfenced network fence cr %q, proceeding with deletion", networkFence.Name) + logger.Infof("successfully unfenced %q network fence cr %q, proceeding with deletion", driver, networkFence.Name) err = c.client.Delete(ctx, networkFence) if err == nil || errors.IsNotFound(err) { @@ -585,7 +600,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c return false, nil }) if err != nil { - return pkgerror.Wrapf(err, "timeout out deleting the network fence CR %s", networkFence.Name) + return pkgerror.Wrapf(err, "timeout out deleting the %s network fence CR %s", driver, networkFence.Name) } return nil diff --git a/pkg/operator/ceph/cluster/watcher_test.go b/pkg/operator/ceph/cluster/watcher_test.go index d37cb84c15d9..5f467d5f84e3 100644 --- a/pkg/operator/ceph/cluster/watcher_test.go +++ b/pkg/operator/ceph/cluster/watcher_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "strings" "testing" "github.com/coreos/pkg/capnslog" @@ -46,7 +47,7 @@ import ( func getFakeClient(obj ...runtime.Object) client.Client { // Register operator types with the runtime scheme. scheme := scheme.Scheme - scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &addonsv1alpha1.NetworkFence{}) + scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &addonsv1alpha1.NetworkFence{}, &addonsv1alpha1.NetworkFenceList{}) client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(obj...).Build() return client } @@ -175,8 +176,10 @@ func TestHandleNodeFailure(t *testing.T) { switch { case command == "rbd" && args[0] == "status": return `{"watchers":[{"address":"192.168.39.137:0/3762982934","client":4307,"cookie":18446462598732840961}]}`, nil + case command == "ceph" && args[0] == "status": + return `{"entity":[{"addr": [{"addr": "10.244.0.12:0", "nonce":3247243972}]}], "client_metadata":{"root":"/"}}`, nil case command == "ceph" && args[0] == "tell": - return `{"watchers":[{"id":5201,"entity":[{"addr": [{"addr": "10.244.0.12:0", "nonce":3247243972}]}]]}`, nil + return `[{"entity":{"addr":{"addr":"10.244.0.12:0","nonce":3247243972}}, "client_metadata":{"root":"/"}}]`, nil } return "", errors.Errorf("unexpected rbd/ceph command %q", args) @@ -208,7 +211,7 @@ func TestHandleNodeFailure(t *testing.T) { }, } - pv := &corev1.PersistentVolume{ + rbdPV := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b841ca", Annotations: map[string]string{ @@ -231,6 +234,29 @@ func TestHandleNodeFailure(t *testing.T) { }, } + cephfsPV := &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b842ca", + Annotations: map[string]string{ + "pv.kubernetes.io/provisioned-by": fmt.Sprintf("%s.cephfs.csi.ceph.com", ns), + "volume.kubernetes.io/provisioner-deletion-secret-name": "rook-csi-cephfs-provisioner", + "volume.kubernetes.io/provisioner-deletion-secret-namespace": ns, + }, + }, + Spec: corev1.PersistentVolumeSpec{ + PersistentVolumeSource: corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns), + VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001", + VolumeAttributes: map[string]string{ + "fsName": "myfs", + "subvolumeName": "csi-vol-58469d41-f6c0-4720-b23a-0a0826b842ca", + }, + }, + }, + }, + } + staticRbdPV := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b841cb", @@ -263,9 +289,11 @@ func TestHandleNodeFailure(t *testing.T) { Spec: corev1.PersistentVolumeSpec{ PersistentVolumeSource: corev1.PersistentVolumeSource{ CSI: &corev1.CSIPersistentVolumeSource{ - Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns), - VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001", - VolumeAttributes: map[string]string{}, + Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns), + VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001", + VolumeAttributes: map[string]string{ + "staticVolume": "true", + }, }, }, }, @@ -311,7 +339,10 @@ func TestHandleNodeFailure(t *testing.T) { _, err := c.context.Clientset.CoreV1().Secrets(ns).Create(ctx, secret, metav1.CreateOptions{}) assert.NoError(t, err) - _, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}) + _, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, rbdPV, metav1.CreateOptions{}) + assert.NoError(t, err) + + _, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, cephfsPV, metav1.CreateOptions{}) assert.NoError(t, err) _, err = c.context.ApiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &v1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "networkfences.csiaddons.openshift.io"}}, metav1.CreateOptions{}) @@ -321,10 +352,33 @@ func TestHandleNodeFailure(t *testing.T) { err = c.handleNodeFailure(ctx, cephCluster, node) assert.NoError(t, err) - networkFence := &addonsv1alpha1.NetworkFence{} - err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cephCluster.Namespace}, networkFence) + networkFenceRbd := &addonsv1alpha1.NetworkFence{} + err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, rbdDriver), Namespace: cephCluster.Namespace}, networkFenceRbd) assert.NoError(t, err) + networkFenceCephFs := &addonsv1alpha1.NetworkFence{} + err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, cephfsDriver), Namespace: cephCluster.Namespace}, networkFenceCephFs) + assert.NoError(t, err) + + networkFences := &addonsv1alpha1.NetworkFenceList{} + err = c.client.List(ctx, networkFences) + assert.NoError(t, err) + var rbdCount, cephFsCount int + + for _, fence := range networkFences.Items { + // Check if the resource is in the desired namespace + if fence.Namespace == cephCluster.Namespace { + if strings.Contains(fence.Name, rbdDriver) { + rbdCount++ + } else if strings.Contains(fence.Name, cephfsDriver) { + cephFsCount++ + } + } + } + + assert.Equal(t, 1, rbdCount) + assert.Equal(t, 1, cephFsCount) + // For static rbd pv _, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, staticRbdPV, metav1.CreateOptions{}) assert.NoError(t, err) @@ -334,7 +388,7 @@ func TestHandleNodeFailure(t *testing.T) { rbdVolumesInUse, _ := getCephVolumesInUse(cephCluster, node.Status.VolumesInUse) rbdPVList := listRBDPV(pvList, cephCluster, rbdVolumesInUse) - assert.Equal(t, len(rbdPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `PV` + assert.Equal(t, len(rbdPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `rbdPV` err = c.handleNodeFailure(ctx, cephCluster, node) assert.NoError(t, err) @@ -352,7 +406,7 @@ func TestHandleNodeFailure(t *testing.T) { cephFSVolumesInUseMap[vol] = struct{}{} } cephFSPVList := listRWOCephFSPV(pvList, cephCluster, cephFSVolumesInUseMap) - assert.Equal(t, len(cephFSPVList), 0) + assert.Equal(t, len(cephFSPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `cephfsPV` err = c.handleNodeFailure(ctx, cephCluster, node) assert.NoError(t, err) @@ -377,8 +431,12 @@ func TestHandleNodeFailure(t *testing.T) { err = c.handleNodeFailure(ctx, cephCluster, node) assert.NoError(t, err) - err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cephCluster.Namespace}, networkFence) + err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, rbdDriver), Namespace: cephCluster.Namespace}, networkFenceRbd) assert.Error(t, err, kerrors.IsNotFound(err)) + + err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, cephfsDriver), Namespace: cephCluster.Namespace}, networkFenceCephFs) + assert.Error(t, err, kerrors.IsNotFound(err)) + } func TestGetCephVolumesInUse(t *testing.T) { @@ -428,6 +486,11 @@ func TestConcatenateWatcherIp(t *testing.T) { assert.Equal(t, WatcherIP, "192.168.39.137/32") } +func TestFenceResourceName(t *testing.T) { + FenceName := fenceResourceName("fakenode", "rbd") + assert.Equal(t, FenceName, "fakenode-rbd") +} + func TestOnDeviceCMUpdate(t *testing.T) { // Set DEBUG logging capnslog.SetGlobalLogLevel(capnslog.DEBUG)