diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index e957d260c29..c51f083f644 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -445,10 +445,26 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) - if err := util.CreateMountPoint(targetPath); err != nil { + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + volOptions := &store.VolumeOptions{} + defer volOptions.Destroy() + + if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil { + return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error()) + } + + volMounter, err := mounter.New(volOptions) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) + } + + if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) @@ -535,9 +551,18 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(volID, targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -559,7 +584,7 @@ func (ns *NodeServer) NodeUnpublishVolume( isMnt = true } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index c4b7396acd3..3b4aaedc8ce 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -694,8 +694,12 @@ func (ns *NodeServer) NodePublishVolume( volID := req.GetVolumeId() stagingPath += "/" + volID - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // Check if that target path exists properly notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) @@ -888,8 +892,14 @@ func (ns *NodeServer) NodeUnpublishVolume( } targetPath := req.GetTargetPath() - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. + + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + isMnt, err := ns.Mounter.IsMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { @@ -902,7 +912,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.NotFound, err.Error()) } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -913,7 +923,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.Internal, err.Error()) } - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c9c..211081a1372 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -28,6 +28,9 @@ const ( // SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation. SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" + + // TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path. + TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs