Skip to content

Commit

Permalink
Merge pull request #2622 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…2616-to-release-1.31

[release-1.31] fix: add serial format limit to fix OOM issue when formatting a few disks in parallel in csi-azuredisk-node
  • Loading branch information
andyzhangx authored Nov 13, 2024
2 parents abd4c31 + 4ac69ae commit 2c5494b
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 17 deletions.
8 changes: 6 additions & 2 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ type DriverCore struct {
removeNotReadyTaint bool
kubeClient kubernetes.Interface
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
volStatsCache azcache.Resource
maxConcurrentFormat int64
concurrentFormatTimeout int64
}

// Driver is the v1 implementation of the Azure Disk CSI Driver.
Expand Down Expand Up @@ -176,6 +178,8 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.endpoint = options.Endpoint
driver.disableAVSetNodes = options.DisableAVSetNodes
driver.removeNotReadyTaint = options.RemoveNotReadyTaint
driver.maxConcurrentFormat = options.MaxConcurrentFormat
driver.concurrentFormatTimeout = options.ConcurrentFormatTimeout
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand Down Expand Up @@ -263,7 +267,7 @@ func newDriverV1(options *DriverOptions) *Driver {
}
}

driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface)
driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
klog.Fatalf("Failed to get safe mounter. Error: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/azuredisk/azuredisk_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type DriverOptions struct {
Endpoint string
DisableAVSetNodes bool
RemoveNotReadyTaint bool
MaxConcurrentFormat int64
ConcurrentFormatTimeout int64
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -103,6 +105,8 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.BoolVar(&o.DisableAVSetNodes, "disable-avset-nodes", false, "disable DisableAvailabilitySetNodes in cloud config for controller")
fs.BoolVar(&o.RemoveNotReadyTaint, "remove-not-ready-taint", true, "remove NotReady taint from node when node is ready")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
fs.Int64Var(&o.MaxConcurrentFormat, "max-concurrent-format", 2, "maximum number of concurrent format exec calls")
fs.Int64Var(&o.ConcurrentFormatTimeout, "concurrent-format-timeout", 120, "maximum time in seconds duration of a format operation before its concurrency token is released")

return fs
}
3 changes: 2 additions & 1 deletion pkg/azuredisk/azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"os"
"reflect"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v6"
"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -143,7 +144,7 @@ func newDriverV2(options *DriverOptions) *DriverV2 {
}
}

driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface)
driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
klog.Fatalf("Failed to get safe mounter. Error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newFakeDriverV1(ctrl *gomock.Controller) (*fakeDriverV1, error) {
driver.diskController = NewManagedDiskController(driver.cloud)
driver.clientFactory = driver.cloud.ComputeClientFactory

mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface)
mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/azuredisk/fake_azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package azuredisk

import (
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"go.uber.org/mock/gomock"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -74,7 +76,7 @@ func newFakeDriverV2(ctrl *gomock.Controller) (*fakeDriverV2, error) {
driver.diskController = NewManagedDiskController(driver.cloud)
driver.clientFactory = driver.cloud.ComputeClientFactory

mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface)
mounter, err := mounter.NewSafeMounter(true, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/mounter/fake_safe_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"strings"
"time"

"k8s.io/mount-utils"
"k8s.io/utils/exec"
Expand All @@ -35,7 +36,7 @@ type FakeSafeMounter struct {
// NewFakeSafeMounter creates a mount.SafeFormatAndMount instance suitable for use in unit tests.
func NewFakeSafeMounter() (*mount.SafeFormatAndMount, error) {
if runtime.GOOS == "windows" {
return NewSafeMounter(true, true)
return NewSafeMounter(true, true, 2, time.Duration(120)*time.Second)
}

fakeSafeMounter := FakeSafeMounter{}
Expand Down
10 changes: 5 additions & 5 deletions pkg/mounter/safe_mounter_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ limitations under the License.
package mounter

import (
"time"

"k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
)

func NewSafeMounter(_, _ bool) (*mount.SafeFormatAndMount, error) {
return &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
}, nil
func NewSafeMounter(_, _ bool, maxConcurrentFormat int, concurrentFormatTimeout time.Duration) (*mount.SafeFormatAndMount, error) {
opt := mount.WithMaxConcurrentFormat(maxConcurrentFormat, concurrentFormatTimeout)
return mount.NewSafeFormatAndMount(mount.New(""), utilexec.New(), opt), nil
}
3 changes: 2 additions & 1 deletion pkg/mounter/safe_mounter_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package mounter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewSafeMounter(t *testing.T) {
resp, err := NewSafeMounter(true, true)
resp, err := NewSafeMounter(true, true, 2, time.Duration(120)*time.Second)
assert.NotNil(t, resp)
assert.Nil(t, err)
}
9 changes: 4 additions & 5 deletions pkg/mounter/safe_mounter_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
disk "github.com/kubernetes-csi/csi-proxy/client/api/disk/v1"
Expand Down Expand Up @@ -411,13 +412,11 @@ func newCSIProxyMounter() (*csiProxyMounter, error) {
}, nil
}

func NewSafeMounter(enableWindowsHostProcess, useCSIProxyGAInterface bool) (*mount.SafeFormatAndMount, error) {
func NewSafeMounter(enableWindowsHostProcess, useCSIProxyGAInterface bool, maxConcurrentFormat int, concurrentFormatTimeout time.Duration) (*mount.SafeFormatAndMount, error) {
if enableWindowsHostProcess {
klog.V(2).Infof("using windows host process mounter")
return &mount.SafeFormatAndMount{
Interface: NewWinMounter(),
Exec: utilexec.New(),
}, nil
opt := mount.WithMaxConcurrentFormat(maxConcurrentFormat, concurrentFormatTimeout)
return mount.NewSafeFormatAndMount(NewWinMounter(), utilexec.New(), opt), nil
} else {
if useCSIProxyGAInterface {
csiProxyMounter, err := newCSIProxyMounter()
Expand Down

0 comments on commit 2c5494b

Please sign in to comment.