diff --git a/Dockerfile b/Dockerfile index 2a33ac7e..e55edf9b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,8 +32,8 @@ RUN make FROM ubuntu:20.04 as builder RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y libdevmapper-dev libjson-c-dev wget pkg-config build-essential -RUN wget https://go.dev/dl/go1.17.6.linux-amd64.tar.gz -RUN tar -C /usr/local -xzf go1.17.6.linux-amd64.tar.gz +RUN wget https://go.dev/dl/go1.18.4.linux-amd64.tar.gz +RUN tar -C /usr/local -xzf go1.18.4.linux-amd64.tar.gz ENV PATH=${PATH}:/usr/local/go/bin COPY --from=lib-builder /cryptsetup/.libs/libcryptsetup.so /usr/lib/x86_64-linux-gnu/libcryptsetup.so diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 92941a5d..ca89c9ae 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -39,8 +39,8 @@ import ( ) const ( - nodeBackoffInitialDuration = 200 * time.Millisecond - nodeBackoffMaxDuration = 5 * time.Minute + errorBackoffInitialDuration = 200 * time.Millisecond + errorBackoffMaxDuration = 5 * time.Minute ) type GCEControllerServer struct { @@ -58,11 +58,46 @@ type GCEControllerServer struct { // Aborted error volumeLocks *common.VolumeLocks - // When the attacher sidecar issues controller publish/unpublish for multiple disks for a given node, the per-instance operation queue in GCE fills up causing attach/detach disk requests to immediately return with an error until the queue drains. nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. A node is marked with backoff when any error is encountered by the driver during controller publish/unpublish calls. - // If the controller eventually allows controller publish/publish requests for volumes (because the backoff time expired), and those requests fail, the next backoff retry time will be updated on every failure and capped at 'nodeBackoffMaxDuration'. Also, any successful controller publish/unpublish call will clear the backoff condition for the node. - nodeBackoff *flowcontrol.Backoff + // There are several kinds of errors that are immediately retried by either + // the CSI sidecars or the k8s control plane. The retries consume GCP api + // quota, eg by doing ListVolumes, and so backoff needs to be used to + // prevent quota exhaustion. + // + // Examples of these errors are the per-instance GCE operation queue getting + // full (typically only 32 operations in flight at a time are allowed), and + // disks being deleted out from under a PV causing unpublish errors. + // + // While we need to backoff, we also need some semblance of fairness. In + // particular, volume unpublish retries happen very quickly, and with + // a single backoff per node these retries can prevent any other operation + // from making progess, even if it would succeed. Hence we track errors on + // node and disk pairs, backing off only for calls matching such a + // pair. + // + // An implication is that in the full operation queue situation, requests + // for new disks will not backoff the first time. This is acceptible as a + // single spurious call will not cause problems for quota exhaustion or make + // the operation queue problem worse. This is well compensated by giving + // disks where no problems are ocurring a chance to be processed. + // + // errorBackoff keeps track of any active backoff condition on a given node, + // and the time when retry of controller publish/unpublish is permissible. A + // node and disk pair is marked with backoff when any error is encountered + // by the driver during controller publish/unpublish calls. If the + // controller eventually allows controller publish/publish requests for + // volumes (because the backoff time expired), and those requests fail, the + // next backoff retry time will be updated on every failure and capped at + // 'errorBackoffMaxDuration'. Also, any successful controller + // publish/unpublish call will clear the backoff condition for a node and + // disk. + errorBackoff *csiErrorBackoff } +type csiErrorBackoff struct { + backoff *flowcontrol.Backoff +} +type csiErrorBackoffId string + type workItem struct { ctx context.Context publishReq *csi.ControllerPublishVolumeRequest @@ -376,17 +411,18 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r return nil, err } - if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) + if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId) } resp, err := gceCS.executeControllerPublishVolume(ctx, req) if err != nil { - klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) - gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now()) + klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err) + gceCS.errorBackoff.next(backoffId) } else { klog.Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId) - gceCS.nodeBackoff.Reset(req.NodeId) + gceCS.errorBackoff.reset(backoffId) } return resp, err } @@ -513,17 +549,18 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, return nil, err } - if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId) + if gceCS.errorBackoff.blocking(backoffId) { return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) } resp, err := gceCS.executeControllerUnpublishVolume(ctx, req) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) - gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now()) + gceCS.errorBackoff.next(backoffId) } else { klog.Infof("For node %s clear backoff due to successful unpublish of volume %v", req.NodeId, req.VolumeId) - gceCS.nodeBackoff.Reset(req.NodeId) + gceCS.errorBackoff.reset(backoffId) } return resp, err } @@ -1560,3 +1597,24 @@ func pickRandAndConsecutive(slice []string, n int) ([]string, error) { } return ret, nil } + +func newCsiErrorBackoff() *csiErrorBackoff { + return &csiErrorBackoff{flowcontrol.NewBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration)} +} + +func (_ *csiErrorBackoff) backoffId(nodeId, volumeId string) csiErrorBackoffId { + return csiErrorBackoffId(fmt.Sprintf("%s:%s", nodeId, volumeId)) +} + +func (b *csiErrorBackoff) blocking(id csiErrorBackoffId) bool { + blk := b.backoff.IsInBackOffSinceUpdate(string(id), b.backoff.Clock.Now()) + return blk +} + +func (b *csiErrorBackoff) next(id csiErrorBackoffId) { + b.backoff.Next(string(id), b.backoff.Clock.Now()) +} + +func (b *csiErrorBackoff) reset(id csiErrorBackoffId) { + b.backoff.Reset(string(id)) +} diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index d92d390b..ecf2915b 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -2149,18 +2149,24 @@ type backoffTesterConfig struct { mockMissingInstance bool // used by the backoff tester to mock a missing instance scenario } +func newFakeCsiErrorBackoff(tc *clock.FakeClock) *csiErrorBackoff { + return &csiErrorBackoff{flowcontrol.NewFakeBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration, tc)} +} + func TestControllerUnpublishBackoff(t *testing.T) { + backoffTesterForUnpublish(t, &backoffTesterConfig{}) +} + +func TestControllerUnpublishBackoffMissingInstance(t *testing.T) { backoffTesterForUnpublish(t, &backoffTesterConfig{ mockMissingInstance: true, }) - backoffTesterForUnpublish(t, &backoffTesterConfig{}) } func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) { - readyToExecute := make(chan chan gce.Signal, 1) - disk1 := name + "1" + readyToExecute := make(chan chan gce.Signal) cloudDisks := []*gce.CloudDisk{ - createZonalCloudDisk(disk1), + createZonalCloudDisk(name), } fcp, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks) if err != nil { @@ -2173,7 +2179,7 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) { instance := &compute.Instance{ Name: node, Disks: []*compute.AttachedDisk{ - {DeviceName: disk1}, // mock attached disks + {DeviceName: name}, // mock attached disks }, } if !config.mockMissingInstance { @@ -2187,43 +2193,62 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) { CloudProvider: fcpBlocking, seen: map[string]int{}, volumeLocks: common.NewVolumeLocks(), - nodeBackoff: flowcontrol.NewFakeBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration, tc), + errorBackoff: newFakeCsiErrorBackoff(tc), } - key := testNodeID + backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID) step := 1 * time.Millisecond - // Mock an active backoff condition on the node. This will setup a backoff duration of the 'nodeBackoffInitialDuration'. - driver.cs.nodeBackoff.Next(key, tc.Now()) + + runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest, reportError bool) error { + response := make(chan error) + go func() { + _, err := driver.cs.ControllerUnpublishVolume(context.Background(), req) + response <- err + }() + go func() { + executeChan := <-readyToExecute + executeChan <- gcecloudprovider.Signal{ReportError: reportError} + }() + return <-response + } + + // Mock an active backoff condition on the node. + driver.cs.errorBackoff.next(backoffId) + + tc.Step(step) + // A requst for a a different volume should succeed. This volume is not + // mounted on the node, so no GCE call will be made (ie, runUnpublishRequest + // doesn't need to be called, the request can be called directly). + differentUnpubReq := &csi.ControllerUnpublishVolumeRequest{ + VolumeId: testVolumeID + "-different", + NodeId: testNodeID, + } + if _, err := driver.cs.ControllerUnpublishVolume(context.Background(), differentUnpubReq); err != nil { + t.Errorf("expected no error on different unpublish, got %v", err) + } + unpubreq := &csi.ControllerUnpublishVolumeRequest{ - VolumeId: testVolumeID + "1", + VolumeId: testVolumeID, NodeId: testNodeID, } // For the first 199 ms, the backoff condition is true. All controller publish request will be denied with 'Unavailable' error code. for i := 0; i < 199; i++ { - tc.Step(step) var err error _, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq) if !isUnavailableError(err) { t.Errorf("unexpected error %v", err) } + tc.Step(step) } - // Mock clock tick for the 200th millisecond. So backoff condition is no longer true. - tc.Step(step) - runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest) <-chan error { - response := make(chan error) - go func() { - _, err := driver.cs.ControllerUnpublishVolume(context.Background(), req) - response <- err - }() - return response - } - - // For a missing instance the driver should return a success code, and the node backoff condition should be cleared. + // At the 200th millisecond, the backoff condition is no longer true. The driver should return a success code, and the backoff condition should be cleared. if config.mockMissingInstance { _, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq) + if err != nil { + t.Errorf("unexpected error %v", err) + } // Driver is expected to remove the node key from the backoff map. - t1 := driver.cs.nodeBackoff.Get(key) + t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId)) if t1 != 0 { t.Error("unexpected delay") } @@ -2231,12 +2256,7 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) { } // mock an error - var respUnpublish <-chan error - respUnpublish = runUnpublishRequest(unpubreq) - execute := <-readyToExecute - s1 := gcecloudprovider.Signal{ReportError: true} - execute <- s1 - if err := <-respUnpublish; err == nil { + if err := runUnpublishRequest(unpubreq, true); err == nil { t.Errorf("expected error") } @@ -2254,33 +2274,31 @@ func backoffTesterForUnpublish(t *testing.T, config *backoffTesterConfig) { // Mock clock tick for the 600th millisecond. So backoff condition is no longer true. tc.Step(step) // Now mock a successful ControllerUnpublish request, where DetachDisk call succeeds. - respUnpublish = runUnpublishRequest(unpubreq) - execute = <-readyToExecute - s1 = gcecloudprovider.Signal{} - execute <- s1 - if err := <-respUnpublish; err != nil { + if err := runUnpublishRequest(unpubreq, false); err != nil { t.Errorf("unexpected error") } // Driver is expected to remove the node key from the backoff map. - t1 := driver.cs.nodeBackoff.Get(key) + t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId)) if t1 != 0 { t.Error("unexpected delay") } } func TestControllerPublishBackoff(t *testing.T) { + backoffTesterForPublish(t, &backoffTesterConfig{}) +} + +func TestControllerPublishBackoffMissingInstance(t *testing.T) { backoffTesterForPublish(t, &backoffTesterConfig{ mockMissingInstance: true, }) - backoffTesterForPublish(t, &backoffTesterConfig{}) } func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) { - readyToExecute := make(chan chan gce.Signal, 1) - disk1 := name + "1" + readyToExecute := make(chan chan gce.Signal) cloudDisks := []*gce.CloudDisk{ - createZonalCloudDisk(disk1), + createZonalCloudDisk(name), } fcp, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks) if err != nil { @@ -2305,15 +2323,24 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) { CloudProvider: fcpBlocking, seen: map[string]int{}, volumeLocks: common.NewVolumeLocks(), - nodeBackoff: flowcontrol.NewFakeBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration, tc), + errorBackoff: newFakeCsiErrorBackoff(tc), } - key := testNodeID + backoffId := driver.cs.errorBackoff.backoffId(testNodeID, testVolumeID) step := 1 * time.Millisecond - // Mock an active backoff condition on the node. This will setup a backoff duration of the 'nodeBackoffInitialDuration'. - driver.cs.nodeBackoff.Next(key, tc.Now()) + // Mock an active backoff condition on the node. + driver.cs.errorBackoff.next(backoffId) + + // A detach request for a different disk should succeed. As this disk is not + // on the instance, the detach will succeed without calling the gce detach + // disk api so we don't have to go through the blocking cloud provider and + // and make the request directly. + if _, err := driver.cs.ControllerUnpublishVolume(context.Background(), &csi.ControllerUnpublishVolumeRequest{VolumeId: testVolumeID + "different", NodeId: testNodeID}); err != nil { + t.Errorf("expected no error on different unpublish, got %v", err) + } + pubreq := &csi.ControllerPublishVolumeRequest{ - VolumeId: testVolumeID + "1", + VolumeId: testVolumeID, NodeId: testNodeID, VolumeCapability: &csi.VolumeCapability{ AccessType: &csi.VolumeCapability_Mount{ @@ -2336,23 +2363,27 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) { // Mock clock tick for the 200th millisecond. So backoff condition is no longer true. tc.Step(step) - runPublishRequest := func(req *csi.ControllerPublishVolumeRequest) <-chan error { + runPublishRequest := func(req *csi.ControllerPublishVolumeRequest, reportError bool) error { response := make(chan error) go func() { _, err := driver.cs.ControllerPublishVolume(context.Background(), req) response <- err }() - return response + go func() { + executeChan := <-readyToExecute + executeChan <- gcecloudprovider.Signal{ReportError: reportError} + }() + return <-response } - // For a missing instance the driver should return error code, and the node backoff condition should be set. + // For a missing instance the driver should return error code, and the backoff condition should be set. if config.mockMissingInstance { _, err = driver.cs.ControllerPublishVolume(context.Background(), pubreq) if err == nil { t.Errorf("unexpected error %v", err) } - t1 := driver.cs.nodeBackoff.Get(key) + t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId)) if t1 == 0 { t.Error("expected delay, got none") } @@ -2360,12 +2391,7 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) { } // mock an error - var respPublish <-chan error - respPublish = runPublishRequest(pubreq) - execute := <-readyToExecute - s1 := gcecloudprovider.Signal{ReportError: true} - execute <- s1 - if err := <-respPublish; err == nil { + if err := runPublishRequest(pubreq, true); err == nil { t.Errorf("expected error") } @@ -2383,16 +2409,12 @@ func backoffTesterForPublish(t *testing.T, config *backoffTesterConfig) { // Mock clock tick for the 600th millisecond. So backoff condition is no longer true. tc.Step(step) // Now mock a successful ControllerUnpublish request, where DetachDisk call succeeds. - respPublish = runPublishRequest(pubreq) - execute = <-readyToExecute - s1 = gcecloudprovider.Signal{} - execute <- s1 - if err := <-respPublish; err != nil { + if err := runPublishRequest(pubreq, false); err != nil { t.Errorf("unexpected error") } // Driver is expected to remove the node key from the backoff map. - t1 := driver.cs.nodeBackoff.Get(key) + t1 := driver.cs.errorBackoff.backoff.Get(string(backoffId)) if t1 != 0 { t.Error("unexpected delay") } diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index bbcd2989..ad523a4b 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -25,7 +25,6 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" "k8s.io/mount-utils" common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -162,8 +161,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GC CloudProvider: cloudProvider, seen: map[string]int{}, volumeLocks: common.NewVolumeLocks(), - // flowcontrol uses an exponential backoff policy with a factor of 2 - nodeBackoff: flowcontrol.NewBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration), + errorBackoff: newCsiErrorBackoff(), } }