diff --git a/frontend/csi/node_server.go b/frontend/csi/node_server.go index 4b8814b9f..15afb3138 100644 --- a/frontend/csi/node_server.go +++ b/frontend/csi/node_server.go @@ -43,13 +43,28 @@ var ( publishedISCSISessions, currentISCSISessions utils.ISCSISessions ) +func attemptLock(ctx context.Context, lockContext string) bool { + startTime := time.Now() + utils.Lock(ctx, lockContext, lockID) + + // Fail if the gRPC call came in a long time ago to avoid kubelet 120s timeout + if time.Since(startTime) > defaultNodeReconciliationPeriod { + return false + } + + return true +} + func (p *Plugin) NodeStageVolume( ctx context.Context, req *csi.NodeStageVolumeRequest, ) (*csi.NodeStageVolumeResponse, error) { lockContext := "NodeStageVolume-" + req.GetVolumeId() - utils.Lock(ctx, lockContext, lockID) defer utils.Unlock(ctx, lockContext, lockID) + if !attemptLock(ctx, lockContext) { + return nil, status.Error(codes.Aborted, "too long since request start time") + } + fields := LogFields{"Method": "NodeStageVolume", "Type": "CSI_Node"} ctx = SetContextWorkflow(ctx, WorkflowNodeStage) ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend) @@ -86,9 +101,12 @@ func (p *Plugin) nodeUnstageVolume( ctx context.Context, req *csi.NodeUnstageVolumeRequest, force bool, ) (*csi.NodeUnstageVolumeResponse, error) { lockContext := "NodeUnstageVolume-" + req.GetVolumeId() - utils.Lock(ctx, lockContext, lockID) defer utils.Unlock(ctx, lockContext, lockID) + if !attemptLock(ctx, lockContext) { + return nil, status.Error(codes.Aborted, "too long since request start time") + } + fields := LogFields{ "Method": "NodeUnstageVolume", "Type": "CSI_Node", @@ -165,9 +183,12 @@ func (p *Plugin) NodePublishVolume( ctx context.Context, req *csi.NodePublishVolumeRequest, ) (*csi.NodePublishVolumeResponse, error) { lockContext := "NodePublishVolume-" + req.GetVolumeId() - utils.Lock(ctx, lockContext, lockID) defer utils.Unlock(ctx, lockContext, lockID) + if !attemptLock(ctx, lockContext) { + return nil, status.Error(codes.Aborted, "too long since request start time") + } + ctx = SetContextWorkflow(ctx, WorkflowNodePublish) ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend) @@ -203,9 +224,12 @@ func (p *Plugin) NodeUnpublishVolume( ctx context.Context, req *csi.NodeUnpublishVolumeRequest, ) (*csi.NodeUnpublishVolumeResponse, error) { lockContext := "NodeUnpublishVolume-" + req.GetVolumeId() - utils.Lock(ctx, lockContext, lockID) defer utils.Unlock(ctx, lockContext, lockID) + if !attemptLock(ctx, lockContext) { + return nil, status.Error(codes.Aborted, "too long since request start time") + } + ctx = SetContextWorkflow(ctx, WorkflowNodeUnpublish) fields := LogFields{"Method": "NodeUnpublishVolume", "Type": "CSI_Node"} Logc(ctx).WithFields(fields).Debug(">>>> NodeUnpublishVolume") diff --git a/utils/devices.go b/utils/devices.go index 935b7a581..7f47e574b 100644 --- a/utils/devices.go +++ b/utils/devices.go @@ -14,7 +14,6 @@ import ( "time" "github.com/cenkalti/backoff/v4" - log "github.com/sirupsen/logrus" "golang.org/x/net/context" . "github.com/netapp/trident/logging" @@ -318,7 +317,7 @@ func GetDeviceNameFromMount(ctx context.Context, mountpath string) (string, int, // In the case of a iscsi trace debug, log info about session and what devices are present func listAllISCSIDevices(ctx context.Context) { - if !IsLevelEnabled(log.TraceLevel) { + if GetDefaultLogLevel() != "trace" { // Don't even run the commands if trace logging is not enabled return } diff --git a/utils/mount_linux.go b/utils/mount_linux.go index 72cb4ba03..bf6afa21c 100644 --- a/utils/mount_linux.go +++ b/utils/mount_linux.go @@ -57,6 +57,13 @@ func IsMounted(ctx context.Context, sourceDevice, mountpoint, mountOptions strin Logc(ctx).WithFields(logFields).Debug(">>>> mount_linux.IsMounted") defer Logc(ctx).WithFields(logFields).Debug("<<<< mount_linux.IsMounted") + rawPath, err := filepath.EvalSymlinks(sourceDevice) + if err != nil { + Logc(ctx).Error(err) + rawPath = "" + } + rawPath = strings.TrimPrefix(rawPath, "/dev/") + sourceDevice = strings.TrimPrefix(sourceDevice, "/dev/") // Ensure at least one arg was specified @@ -90,7 +97,7 @@ func IsMounted(ctx context.Context, sourceDevice, mountpoint, mountOptions strin if strings.HasPrefix(procMount.MountSource, "/dev/") { procSourceDevice = strings.TrimPrefix(procMount.MountSource, "/dev/") - if sourceDevice != procSourceDevice { + if sourceDevice != procSourceDevice && rawPath != procSourceDevice { // Resolve any symlinks to get the real device procSourceDevice, err = filepath.EvalSymlinks(procMount.MountSource) if err != nil { @@ -101,7 +108,7 @@ func IsMounted(ctx context.Context, sourceDevice, mountpoint, mountOptions strin } } - if sourceDevice != procSourceDevice { + if sourceDevice != procSourceDevice && rawPath != procSourceDevice { continue } else { Logc(ctx).Debugf("Device found: %v", sourceDevice)