From 08b068eb9187ffa79130e6343c363fcbee07b6e9 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:07:38 +0100 Subject: [PATCH 1/4] rbd: take lock on targetpath during node operation We should not be dependent on the CO to ensure that it will serialize the request instead of that we need to have own internal locks to ensure that we dont do concurrent operations for same request. Signed-off-by: Madhu Rajanna (cherry picked from commit 38c0e64307d4fb108b95498d2220fe870f8440c8) --- internal/rbd/nodeserver.go | 18 ++++++++++++++---- internal/util/idlocker.go | 3 +++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index c4b7396acd3..6d2f680435e 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -694,8 +694,12 @@ func (ns *NodeServer) NodePublishVolume( volID := req.GetVolumeId() stagingPath += "/" + volID - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // Check if that target path exists properly notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) @@ -888,8 +892,14 @@ func (ns *NodeServer) NodeUnpublishVolume( } targetPath := req.GetTargetPath() - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. + + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + isMnt, err := ns.Mounter.IsMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c9c..211081a1372 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -28,6 +28,9 @@ const ( // SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation. SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" + + // TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path. + TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs From bfcac0b51eeeafecbadfd5c79cf4c377ca703885 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:12:59 +0100 Subject: [PATCH 2/4] cephfs: take lock on targetpath on node operation We should not be dependent on the CO to ensure that it will serialize the request instead of that we need to have own internal locks to ensure that we dont do concurrent operations for same request. Signed-off-by: Madhu Rajanna (cherry picked from commit 38b0a4cbadfb20e818e76cc0d3b603274513ec8e) --- internal/cephfs/nodeserver.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index e957d260c29..491fe783049 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -445,10 +445,26 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) - if err := util.CreateMountPoint(targetPath); err != nil { + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + volOptions := &store.VolumeOptions{} + defer volOptions.Destroy() + + if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil { + return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error()) + } + + volMounter, err := mounter.New(volOptions) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) + } + + if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) @@ -535,9 +551,18 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(volID, targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) From 391467b7ced40affee58e18e71797bb17303c24b Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:14:26 +0100 Subject: [PATCH 3/4] cephfs: use os.Remove to remove directory using os.RemoveAll will remove everything in the director after the Umount we should be using os.Remove only to remove the empty directory Signed-off-by: Madhu Rajanna (cherry picked from commit ffa8eaf5ddc3a6f3041bf010f76e7d71c0a08b8d) --- internal/cephfs/nodeserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 491fe783049..c51f083f644 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -584,7 +584,7 @@ func (ns *NodeServer) NodeUnpublishVolume( isMnt = true } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } From 33302c89ebdeda9274c971ce4f24294744366cb3 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:15:40 +0100 Subject: [PATCH 4/4] rbd: use os.Remove to remove directory using os.RemoveAll will remove everything in the director after the Umount we should be using os.Remove only to remove the empty directory Signed-off-by: Madhu Rajanna (cherry picked from commit 39cc628adf1da1ab78f3aacfb7fe8f53e9a1fd29) --- internal/rbd/nodeserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 6d2f680435e..3b4aaedc8ce 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -912,7 +912,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.NotFound, err.Error()) } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -923,7 +923,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.Internal, err.Error()) } - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) }