Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFBUGS-915: Prevent dataloss due to the concurrent RPC calls (occurrence is very low) #430

Open
wants to merge 4 commits into
base: release-4.13
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,26 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

if err := util.CreateMountPoint(targetPath); err != nil {
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil {
return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error())
}

volMounter, err := mounter.New(volOptions)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -535,9 +551,18 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
Expand All @@ -559,7 +584,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
isMnt = true
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
22 changes: 16 additions & 6 deletions internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,12 @@ func (ns *NodeServer) NodePublishVolume(
volID := req.GetVolumeId()
stagingPath += "/" + volID

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
Expand Down Expand Up @@ -888,8 +892,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
}

targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

isMnt, err := ns.Mounter.IsMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -902,7 +912,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.NotFound, err.Error())
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -913,7 +923,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}

if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
3 changes: 3 additions & 0 deletions internal/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation.
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"

// TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path.
TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists"
)

// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
Expand Down
Loading