Skip to content

Commit

Permalink
cephfs: take lock on targetpath on node operation
Browse files Browse the repository at this point in the history
We should not be dependent on the CO to ensure
that it will serialize the request instead of
that we need to have own internal locks to ensure
that we dont do concurrent operations for same
request.

Signed-off-by: Madhu Rajanna <[email protected]>
(cherry picked from commit 38b0a4c)
  • Loading branch information
Madhu-1 authored and yati1998 committed Dec 6, 2024
1 parent 08b068e commit bfcac0b
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,26 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

if err := util.CreateMountPoint(targetPath); err != nil {
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil {
return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error())
}

volMounter, err := mounter.New(volOptions)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -535,9 +551,18 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
Expand Down

0 comments on commit bfcac0b

Please sign in to comment.