Skip to content

Commit

Permalink
Handled multithread issue for attach and detach 7598035 (#151)
Browse files Browse the repository at this point in the history
* Handled multithread issue for attach and detach

* Fixed error
  • Loading branch information
prgavali authored and arahamad-zz committed Dec 18, 2019
1 parent 75ce88d commit b94a5d7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 35 deletions.
22 changes: 12 additions & 10 deletions volume-providers/vpc/provider/attach_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,33 @@ func (vpcs *VPCSession) AttachVolume(volumeAttachmentRequest provider.VolumeAtta
return nil, err
}
var volumeAttachResult *models.VolumeAttachment
// First , check if volume is already attached or attaching to given instance
vpcs.Logger.Info("Checking if volume is already attached ")
currentVolAttachment, err := vpcs.GetVolumeAttachment(volumeAttachmentRequest)
if err == nil && currentVolAttachment != nil && currentVolAttachment.Status != StatusDetaching {
vpcs.Logger.Info("Volume is already attached", zap.Reflect("currentVolAttachment", currentVolAttachment))
return currentVolAttachment, nil
}
//Try attaching volume if it's not already attached or there is error in getting current volume attachment
vpcs.Logger.Info("Attaching volume from VPC provider...", zap.Bool("IKSEnabled?", vpcs.Config.IsIKS))
var varp *provider.VolumeAttachmentResponse
volumeAttachment := models.NewVolumeAttachment(volumeAttachmentRequest)

err = vpcs.APIRetry.FlexyRetry(vpcs.Logger, func() (error, bool) {
// First , check if volume is already attached or attaching to given instance
vpcs.Logger.Info("Checking if volume is already attached by other thread")
currentVolAttachment, err := vpcs.GetVolumeAttachment(volumeAttachmentRequest)
if err == nil && currentVolAttachment != nil && currentVolAttachment.Status != StatusDetaching {
vpcs.Logger.Info("Volume is already attached", zap.Reflect("currentVolAttachment", currentVolAttachment))
varp = currentVolAttachment
return nil, true // stop retry volume already attached
}
//Try attaching volume if it's not already attached or there is error in getting current volume attachment
vpcs.Logger.Info("Attaching volume from VPC provider...", zap.Bool("IKSEnabled?", vpcs.Config.IsIKS))
volumeAttachResult, err = vpcs.APIClientVolAttachMgr.AttachVolume(&volumeAttachment, vpcs.Logger)
// Keep retry, until we get the proper volumeAttachResult object
if err != nil {
return err, skipRetryForAttach(err, vpcs.Config.IsIKS)
}
varp = volumeAttachResult.ToVolumeAttachmentResponse(vpcs.Config.VPCBlockProviderType)
return err, true // stop retry as no error
})

if err != nil {
userErr := userError.GetUserError(string(userError.VolumeAttachFailed), err, volumeAttachmentRequest.VolumeID, volumeAttachmentRequest.InstanceID)
return nil, userErr
}
varp := volumeAttachResult.ToVolumeAttachmentResponse(vpcs.Config.VPCBlockProviderType)
vpcs.Logger.Info("Successfully attached volume from VPC provider", zap.Reflect("volumeResponse", varp))
return varp, nil
}
Expand Down
45 changes: 23 additions & 22 deletions volume-providers/vpc/provider/detach_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,33 @@ func (vpcs *VPCSession) DetachVolume(volumeAttachmentTemplate provider.VolumeAtt
}

var response *http.Response
// First , check if volume is already attached to given instance
vpcs.Logger.Info("Checking if volume is already attached ")
currentVolAttachment, err := vpcs.GetVolumeAttachment(volumeAttachmentTemplate)
if err == nil && currentVolAttachment.Status != StatusDetaching {
// If no error and current volume is not already in detaching state ( i.e in attached or attaching state) attemp to detach
vpcs.Logger.Info("Found volume attachment", zap.Reflect("currentVolAttachment", currentVolAttachment))
volumeAttachment := models.NewVolumeAttachment(volumeAttachmentTemplate)
volumeAttachment.ID = currentVolAttachment.VPCVolumeAttachment.ID
vpcs.Logger.Info("Detaching volume from VPC provider...")
var volumeAttachment models.VolumeAttachment

err = vpcs.APIRetry.FlexyRetry(vpcs.Logger, func() (error, bool) {
err = vpcs.APIRetry.FlexyRetry(vpcs.Logger, func() (error, bool) {
// First , check if volume is already attached to given instance
vpcs.Logger.Info("Checking if volume is already attached ")
currentVolAttachment, err := vpcs.GetVolumeAttachment(volumeAttachmentTemplate)
if err == nil && currentVolAttachment.Status != StatusDetaching {
// If no error and current volume is not already in detaching state ( i.e in attached or attaching state) attemp to detach
vpcs.Logger.Info("Found volume attachment", zap.Reflect("currentVolAttachment", currentVolAttachment))
volumeAttachment := models.NewVolumeAttachment(volumeAttachmentTemplate)
volumeAttachment.ID = currentVolAttachment.VPCVolumeAttachment.ID
vpcs.Logger.Info("Detaching volume from VPC provider...")
response, err = vpcs.APIClientVolAttachMgr.DetachVolume(&volumeAttachment, vpcs.Logger)
return err, err == nil // Retry in case of all errors
})
if err != nil {
userErr := userError.GetUserError(string(userError.VolumeDetachFailed), err, volumeAttachmentTemplate.VolumeID, volumeAttachmentTemplate.InstanceID, volumeAttachment.ID)
vpcs.Logger.Error("Volume detach failed with error", zap.Error(err))
return response, userErr
}
vpcs.Logger.Info("Successfully detached volume from VPC provider", zap.Reflect("resp", response))
return response, nil
}
vpcs.Logger.Info("No volume attachment found for", zap.Reflect("currentVolAttachment", currentVolAttachment), zap.Error(err))
// consider volume detach success if its already in Detaching or VolumeAttachment is not found
response = &http.Response{
StatusCode: http.StatusOK,
vpcs.Logger.Info("No volume attachment found for", zap.Reflect("currentVolAttachment", currentVolAttachment), zap.Error(err))
// consider volume detach success if its already in Detaching or VolumeAttachment is not found
response = &http.Response{
StatusCode: http.StatusOK,
}
return nil, true // skip retry if volume is not found OR alreadd in detaching state
})
if err != nil {
userErr := userError.GetUserError(string(userError.VolumeDetachFailed), err, volumeAttachmentTemplate.VolumeID, volumeAttachmentTemplate.InstanceID, volumeAttachment.ID)
vpcs.Logger.Error("Volume detach failed with error", zap.Error(err))
return response, userErr
}
vpcs.Logger.Info("Successfully detached volume from VPC provider", zap.Reflect("resp", response))
return response, nil
}
6 changes: 3 additions & 3 deletions volume-providers/vpc/vpcclient/instances/detach_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func (vs *VolumeAttachService) DetachVolume(volumeAttachmentTemplate *models.Vol
if err != nil {
ctxLogger.Error("Error occured while deleting volume attachment", zap.Error(err))
if resp != nil && resp.StatusCode == http.StatusNotFound {
// volume attachment is deleted. So do not want to retry
// volume Attachment is deleted. So do not want to retry
ctxLogger.Info("Exit DetachVolume", zap.Any("resp", resp.StatusCode), zap.Error(err), zap.Error(apiErr))
return resp, apiErr
}
}

ctxLogger.Info("Successfuly deleted the volume attachment")
ctxLogger.Info("Exit DetachVolume", zap.Error(err), zap.Error(apiErr))
return resp, err
}

0 comments on commit b94a5d7

Please sign in to comment.