Skip to content

Commit

Permalink
Merge pull request #1980 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…1953-to-release-1.29

[release-1.29] fix: Ultra and PremiumV2 disk snapshot delay issue
  • Loading branch information
k8s-ci-robot authored Sep 27, 2023
2 parents 143756c + f7f50c8 commit 781ebc6
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 29 deletions.
29 changes: 17 additions & 12 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type DriverOptions struct {
EnableWindowsHostProcess bool
GetNodeIDFromIMDS bool
EnableOtelTracing bool
WaitForSnapshotReady bool
}

// CSIDriver defines the interface for a CSI driver.
Expand Down Expand Up @@ -123,6 +124,7 @@ type DriverCore struct {
enableWindowsHostProcess bool
getNodeIDFromIMDS bool
enableOtelTracing bool
shouldWaitForSnapshotReady bool
}

// Driver is the v1 implementation of the Azure Disk CSI Driver.
Expand Down Expand Up @@ -164,6 +166,7 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
driver.getNodeIDFromIMDS = options.GetNodeIDFromIMDS
driver.enableOtelTracing = options.EnableOtelTracing
driver.shouldWaitForSnapshotReady = options.WaitForSnapshotReady
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand Down Expand Up @@ -418,29 +421,31 @@ func (d *DriverCore) getHostUtil() hostUtil {
return d.hostUtil
}

// getSnapshotCopyCompletionPercent returns the completion percent of copy snapshot
func (d *DriverCore) getSnapshotCopyCompletionPercent(ctx context.Context, subsID, resourceGroup, copySnapshotName string) (float64, error) {
copySnapshot, rerr := d.cloud.SnapshotsClient.Get(ctx, subsID, resourceGroup, copySnapshotName)
// getSnapshotCompletionPercent returns the completion percent of snapshot
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float64, error) {
copySnapshot, rerr := d.cloud.SnapshotsClient.Get(ctx, subsID, resourceGroup, snapshotName)
if rerr != nil {
return 0.0, rerr.Error()
}

if copySnapshot.SnapshotProperties == nil || copySnapshot.SnapshotProperties.CompletionPercent == nil {
return 0.0, fmt.Errorf("copy snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", copySnapshotName, resourceGroup)
// If CompletionPercent is nil, it means the snapshot is complete
klog.V(2).Infof("snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", snapshotName, resourceGroup)
return 100.0, nil
}

return *copySnapshot.SnapshotProperties.CompletionPercent, nil
}

// waitForSnapshotCopy wait for copy incremental snapshot to a new region until completionPercent is 100.0
func (d *DriverCore) waitForSnapshotCopy(ctx context.Context, subsID, resourceGroup, copySnapshotName string, intervel, timeout time.Duration) error {
completionPercent, err := d.getSnapshotCopyCompletionPercent(ctx, subsID, resourceGroup, copySnapshotName)
// waitForSnapshotReady wait for completionPercent of snapshot is 100.0
func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceGroup, snapshotName string, intervel, timeout time.Duration) error {
completionPercent, err := d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
if err != nil {
return err
}

if completionPercent >= float64(100.0) {
klog.V(2).Infof("copy snapshot(%s) under rg(%s) complete", copySnapshotName, resourceGroup)
klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
return nil
}

Expand All @@ -449,18 +454,18 @@ func (d *DriverCore) waitForSnapshotCopy(ctx context.Context, subsID, resourceGr
for {
select {
case <-timeTick:
completionPercent, err = d.getSnapshotCopyCompletionPercent(ctx, subsID, resourceGroup, copySnapshotName)
completionPercent, err = d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
if err != nil {
return err
}

if completionPercent >= float64(100.0) {
klog.V(2).Infof("copy snapshot(%s) under rg(%s) complete", copySnapshotName, resourceGroup)
klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
return nil
}
klog.V(2).Infof("copy snapshot(%s) under rg(%s) completionPercent: %f", copySnapshotName, resourceGroup, completionPercent)
klog.V(2).Infof("snapshot(%s) under rg(%s) completionPercent: %f", snapshotName, resourceGroup, completionPercent)
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy snapshot(%s) under rg(%s)", copySnapshotName, resourceGroup)
return fmt.Errorf("timeout waiting for snapshot(%s) under rg(%s)", snapshotName, resourceGroup)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/azuredisk/azuredisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestGetDefaultDiskMBPSReadWrite(t *testing.T) {
}
}

func TestWaitForSnapshotCopy(t *testing.T) {
func TestWaitForSnapshot(t *testing.T) {
testCases := []struct {
name string
testFunc func(t *testing.T)
Expand All @@ -284,15 +284,15 @@ func TestWaitForSnapshotCopy(t *testing.T) {
RawError: fmt.Errorf("invalid snapshotID"),
}
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, rerr).AnyTimes()
err := d.waitForSnapshotCopy(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := true
subErrMsg := "invalid snapshotID"
if (err != nil) != wantErr {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, wantErr)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, wantErr)
}
if err != nil && !strings.Contains(err.Error(), subErrMsg) {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, subErrMsg)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, subErrMsg)
}
},
},
Expand Down Expand Up @@ -324,15 +324,15 @@ func TestWaitForSnapshotCopy(t *testing.T) {
mockSnapshotClient := mocksnapshotclient.NewMockInterface(ctrl)
d.getCloud().SnapshotsClient = mockSnapshotClient
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
err := d.waitForSnapshotCopy(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := true
subErrMsg := "timeout"
if (err != nil) != wantErr {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, wantErr)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, wantErr)
}
if err != nil && !strings.Contains(err.Error(), subErrMsg) {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, subErrMsg)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, subErrMsg)
}
},
},
Expand Down Expand Up @@ -364,15 +364,15 @@ func TestWaitForSnapshotCopy(t *testing.T) {
mockSnapshotClient := mocksnapshotclient.NewMockInterface(ctrl)
d.getCloud().SnapshotsClient = mockSnapshotClient
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, nil).AnyTimes()
err := d.waitForSnapshotCopy(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)
err := d.waitForSnapshotReady(context.Background(), subID, resourceGroup, snapshotID, intervel, timeout)

wantErr := false
subErrMsg := ""
if (err != nil) != wantErr {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, wantErr)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, wantErr)
}
if err != nil && !strings.Contains(err.Error(), subErrMsg) {
t.Errorf("waitForSnapshotCopy() error = %v, wantErr %v", err, subErrMsg)
t.Errorf("waitForSnapshotReady() error = %v, wantErr %v", err, subErrMsg)
}
},
},
Expand Down
20 changes: 15 additions & 5 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
)

const (
waitForSnapshotCopyInterval = 5 * time.Second
waitForSnapshotCopyTimeout = 10 * time.Minute
waitForSnapshotReadyInterval = 5 * time.Second
waitForSnapshotReadyTimeout = 10 * time.Minute
)

// listVolumeStatus explains the return status of `listVolumesByResourceGroup`
Expand Down Expand Up @@ -916,7 +916,11 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
}
}

mc := metrics.NewMetricContext(consts.AzureDiskCSIDriverName, "controller_create_snapshot", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
metricsRequest := "controller_create_snapshot"
if crossRegionSnapshotName != "" {
metricsRequest = "controller_create_snapshot_cross_region"
}
mc := metrics.NewMetricContext(consts.AzureDiskCSIDriverName, metricsRequest, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded, consts.SourceResourceID, sourceVolumeID, consts.SnapshotName, snapshotName)
Expand All @@ -931,6 +935,12 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", rerr.Error()))
}

if d.shouldWaitForSnapshotReady {
if err := d.waitForSnapshotReady(ctx, subsID, resourceGroup, snapshotName, waitForSnapshotReadyInterval, waitForSnapshotReadyTimeout); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("waitForSnapshotReady(%s, %s, %s) failed with %v", subsID, resourceGroup, snapshotName, err))
}
}
klog.V(2).Infof("create snapshot(%s) under rg(%s) region(%s) successfully", snapshotName, resourceGroup, d.cloud.Location)

csiSnapshot, err := d.getSnapshotByID(ctx, subsID, resourceGroup, snapshotName, sourceVolumeID)
Expand Down Expand Up @@ -961,8 +971,8 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
}
klog.V(2).Infof("create snapshot(%s) under rg(%s) region(%s) successfully", crossRegionSnapshotName, resourceGroup, location)

if err := d.waitForSnapshotCopy(ctx, subsID, resourceGroup, crossRegionSnapshotName, waitForSnapshotCopyInterval, waitForSnapshotCopyTimeout); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to wait for snapshot copy cross region: %v", err.Error()))
if err := d.waitForSnapshotReady(ctx, subsID, resourceGroup, crossRegionSnapshotName, waitForSnapshotReadyInterval, waitForSnapshotReadyTimeout); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("waitForSnapshotReady(%s, %s, %s) failed with %v", subsID, resourceGroup, crossRegionSnapshotName, err))
}

klog.V(2).Infof("begin to delete snapshot(%s) under rg(%s) region(%s)", snapshotName, resourceGroup, d.cloud.Location)
Expand Down
2 changes: 1 addition & 1 deletion pkg/azuredisk/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ func TestCreateSnapshot(t *testing.T) {
mockSnapshotClient.EXPECT().CreateOrUpdate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockSnapshotClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(snapshot, rerr).AnyTimes()
_, err := d.CreateSnapshot(context.Background(), req)
expectedErr := status.Errorf(codes.Internal, "get snapshot unit-test from rg(rg) error: Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: get snapshot error")
expectedErr := status.Errorf(codes.Internal, "waitForSnapshotReady(, rg, unit-test) failed with Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: get snapshot error")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/azuredisk/controllerserver_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,9 @@ func (d *DriverV2) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRe
azureutils.SleepIfThrottled(rerr.Error(), consts.SnapshotOpThrottlingSleepSec)
return nil, status.Error(codes.Internal, fmt.Sprintf("create snapshot error: %v", rerr.Error()))
}
if err := d.waitForSnapshotReady(ctx, subsID, resourceGroup, snapshotName, waitForSnapshotReadyInterval, waitForSnapshotReadyTimeout); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("waitForSnapshotReady(%s, %s, %s) failed with %v", subsID, resourceGroup, snapshotName, err))
}
klog.V(2).Infof("create snapshot(%s) under rg(%s) successfully", snapshotName, resourceGroup)

csiSnapshot, err := d.getSnapshotByID(ctx, subsID, resourceGroup, snapshotName, sourceVolumeID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type FakeDriver interface {
checkDiskCapacity(context.Context, string, string, string, int) (bool, error)
checkDiskExists(ctx context.Context, diskURI string) (*compute.Disk, error)
getSnapshotInfo(string) (string, string, string, error)
waitForSnapshotCopy(context.Context, string, string, string, time.Duration, time.Duration) error
waitForSnapshotReady(context.Context, string, string, string, time.Duration, time.Duration) error
getSnapshotByID(context.Context, string, string, string, string) (*csi.Snapshot, error)
ensureMountPoint(string) (bool, error)
ensureBlockTargetFile(string) error
Expand All @@ -112,6 +112,7 @@ func newFakeDriverV1(t *testing.T) (*fakeDriverV1, error) {
driver.hostUtil = azureutils.NewFakeHostUtil()
driver.useCSIProxyGAInterface = true
driver.allowEmptyCloudConfig = true
driver.shouldWaitForSnapshotReady = true

ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
2 changes: 2 additions & 0 deletions pkg/azurediskplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
trafficManagerPort = flag.Int64("traffic-manager-port", 7788, "default traffic manager port")
enableWindowsHostProcess = flag.Bool("enable-windows-host-process", false, "enable windows host process")
enableOtelTracing = flag.Bool("enable-otel-tracing", false, "If set, enable opentelemetry tracing for the driver. The tracing is disabled by default. Configure the exporter endpoint with OTEL_EXPORTER_OTLP_ENDPOINT and other env variables, see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.")
waitForSnapshotReady = flag.Bool("wait-for-snapshot-ready", true, "boolean flag to wait for snapshot ready when creating snapshot in same region")
)

func main() {
Expand Down Expand Up @@ -133,6 +134,7 @@ func handle() {
EnableWindowsHostProcess: *enableWindowsHostProcess,
GetNodeIDFromIMDS: *getNodeIDFromIMDS,
EnableOtelTracing: *enableOtelTracing,
WaitForSnapshotReady: *waitForSnapshotReady,
}
driver := azuredisk.NewDriver(&driverOptions)
if driver == nil {
Expand Down

0 comments on commit 781ebc6

Please sign in to comment.