From c80b6c61af762859701fcbeac4a7879adec7bccd Mon Sep 17 00:00:00 2001 From: reus Date: Fri, 21 Feb 2025 14:39:16 +0800 Subject: [PATCH] Reapply "malloc: refactor mmap allocator (#21332)" (#21406) This reverts commit d877a55deb0d39b69140365701ff7248a09aa5a0. --- pkg/common/malloc/config.go | 14 +- .../malloc/fixed_size_mmap_allocator.go | 241 ++++++++++++------ .../malloc/fixed_size_mmap_allocator_test.go | 105 ++++++++ pkg/common/malloc/mmap_darwin.go | 24 +- pkg/common/malloc/mmap_linux.go | 18 +- 5 files changed, 305 insertions(+), 97 deletions(-) create mode 100644 pkg/common/malloc/fixed_size_mmap_allocator_test.go diff --git a/pkg/common/malloc/config.go b/pkg/common/malloc/config.go index ef83a0e645ecb..a4db4d742cc12 100644 --- a/pkg/common/malloc/config.go +++ b/pkg/common/malloc/config.go @@ -60,27 +60,27 @@ var defaultConfig = func() *atomic.Pointer[Config] { func patchConfig(config Config, delta Config) Config { if delta.CheckFraction != nil { config.CheckFraction = delta.CheckFraction - logutil.Info("malloc set config", zap.Any("CheckFraction", *delta.CheckFraction)) + logutil.Debug("malloc set config", zap.Any("CheckFraction", *delta.CheckFraction)) } if delta.EnableMetrics != nil { config.EnableMetrics = delta.EnableMetrics - logutil.Info("malloc set config", zap.Any("EnableMetrics", *delta.EnableMetrics)) + logutil.Debug("malloc set config", zap.Any("EnableMetrics", *delta.EnableMetrics)) } if delta.FullStackFraction != nil { config.FullStackFraction = delta.FullStackFraction - logutil.Info("malloc set config", zap.Any("FullStackFraction", *delta.FullStackFraction)) + logutil.Debug("malloc set config", zap.Any("FullStackFraction", *delta.FullStackFraction)) } if delta.Allocator != nil { config.Allocator = delta.Allocator - logutil.Info("malloc set config", zap.Any("Allocator", *delta.Allocator)) + logutil.Debug("malloc set config", zap.Any("Allocator", *delta.Allocator)) } if delta.HashmapSoftLimit != nil { config.HashmapSoftLimit = delta.HashmapSoftLimit - logutil.Info("malloc set config", zap.Any("HashmapSoftLimit", *delta.HashmapSoftLimit)) + logutil.Debug("malloc set config", zap.Any("HashmapSoftLimit", *delta.HashmapSoftLimit)) } if delta.HashmapHardLimit != nil { config.HashmapHardLimit = delta.HashmapHardLimit - logutil.Info("malloc set config", zap.Any("HashmapHardLimit", *delta.HashmapHardLimit)) + logutil.Debug("malloc set config", zap.Any("HashmapHardLimit", *delta.HashmapHardLimit)) } return config } @@ -89,7 +89,7 @@ func SetDefaultConfig(delta Config) { config := *defaultConfig.Load() config = patchConfig(config, delta) defaultConfig.Store(&config) - logutil.Info("malloc: set default config", zap.Any("config", delta)) + logutil.Debug("malloc: set default config", zap.Any("config", delta)) } func GetDefaultConfig() Config { diff --git a/pkg/common/malloc/fixed_size_mmap_allocator.go b/pkg/common/malloc/fixed_size_mmap_allocator.go index 940de36115523..5fabd4dedbe03 100644 --- a/pkg/common/malloc/fixed_size_mmap_allocator.go +++ b/pkg/common/malloc/fixed_size_mmap_allocator.go @@ -15,33 +15,31 @@ package malloc import ( + "math/bits" + "slices" + "sync" + "sync/atomic" "unsafe" "golang.org/x/sys/unix" ) -const ( - // Classes with smaller size than smallClassCap will buffer min((smallClassCap/size), maxBuffer1Cap) objects in buffer1 - smallClassCap = 1 * MB - maxBuffer1Cap = 256 - - // objects in buffer2 will be MADV_DONTNEED-advised and will not occupy RSS, so it's safe to use a large number - buffer2Cap = 1024 -) - type fixedSizeMmapAllocator struct { size uint64 - // buffer1 buffers objects - buffer1 chan unsafe.Pointer - // buffer2 buffers MADV_DONTNEED objects - buffer2 chan unsafe.Pointer + + mu sync.Mutex // it's OK to use mutex since this allocator will be in ShardedAllocator + activeSlabs []*_Slab // active slabs + maxActiveSlabs int // max active slabs + standbySlabs []*_Slab // slabs still mapped but no physical memory backed + maxStandbySlabs int // max standby slabs deallocatorPool *ClosureDeallocatorPool[fixedSizeMmapDeallocatorArgs, *fixedSizeMmapDeallocatorArgs] } type fixedSizeMmapDeallocatorArgs struct { - length uint64 + slab *_Slab ptr unsafe.Pointer + length uint64 } func (f fixedSizeMmapDeallocatorArgs) As(trait Trait) bool { @@ -60,97 +58,196 @@ type MmapInfo struct { func (*MmapInfo) IsTrait() {} +const ( + maxActiveSlabs = 256 + maxActiveBytes = 1 * MB + minActiveSlabs = 4 + maxStandbySlabs = 1024 + maxStandbyBytes = 16 * MB + minStandbySlabs = 4 +) + func NewFixedSizeMmapAllocator( size uint64, ) (ret *fixedSizeMmapAllocator) { - // if size is larger than smallClassCap, num1 will be zero, buffer1 will be empty - num1 := smallClassCap / size - if num1 > maxBuffer1Cap { - // don't buffer too much, since chans with larger buffer consume more memory - num1 = maxBuffer1Cap - } - ret = &fixedSizeMmapAllocator{ - size: size, - buffer1: make(chan unsafe.Pointer, num1), - buffer2: make(chan unsafe.Pointer, buffer2Cap), + size: size, + + maxActiveSlabs: max( + min( + maxActiveSlabs, + maxActiveBytes/(int(size)*slabCapacity), + ), + minActiveSlabs, + ), + + maxStandbySlabs: max( + min( + maxStandbySlabs, + maxStandbyBytes/(int(size)*slabCapacity), + ), + minStandbySlabs, + ), deallocatorPool: NewClosureDeallocatorPool( func(hints Hints, args *fixedSizeMmapDeallocatorArgs) { - - if hints&DoNotReuse > 0 { - ret.freeMem(args.ptr) - return + empty := args.slab.free(args.ptr) + if empty { + ret.freeSlab(args.slab) } + }, + ), + } - select { + return ret +} - case ret.buffer1 <- args.ptr: - // buffer in buffer1 +var _ FixedSizeAllocator = new(fixedSizeMmapAllocator) - default: +func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) ([]byte, Deallocator, error) { + slab, ptr, err := f.allocate() + if err != nil { + return nil, nil, err + } - ret.freeMem(args.ptr) + slice := unsafe.Slice( + (*byte)(ptr), + f.size, + ) + if hints&NoClear == 0 { + clear(slice[:min(clearSize, f.size)]) + } - select { + return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{ + slab: slab, + ptr: ptr, + length: f.size, + }), nil +} - case ret.buffer2 <- args.ptr: - // buffer in buffer2 +func (f *fixedSizeMmapAllocator) allocate() (*_Slab, unsafe.Pointer, error) { + f.mu.Lock() + defer f.mu.Unlock() - default: + // from existing + for _, slab := range f.activeSlabs { + ptr, ok := slab.allocate() + if ok { + return slab, ptr, nil + } + } - } + // empty or all full + // from standby slabs + if len(f.standbySlabs) > 0 { + slab := f.standbySlabs[len(f.standbySlabs)-1] + f.standbySlabs = f.standbySlabs[:len(f.standbySlabs)-1] + reuseMem(slab.base, slab.objectSize*slabCapacity) + f.activeSlabs = append(f.activeSlabs, slab) + ptr, _ := slab.allocate() + return slab, ptr, nil + } - } + // allocate new slab + slice, err := unix.Mmap( + -1, 0, + int(f.size*slabCapacity), + unix.PROT_READ|unix.PROT_WRITE, + unix.MAP_PRIVATE|unix.MAP_ANONYMOUS, + ) + if err != nil { + return nil, nil, err + } - }, - ), + base := unsafe.Pointer(unsafe.SliceData(slice)) + slab := &_Slab{ + base: base, + objectSize: int(f.size), } + f.activeSlabs = append(f.activeSlabs, slab) - return ret + ptr, _ := slab.allocate() + return slab, ptr, nil } -var _ FixedSizeAllocator = new(fixedSizeMmapAllocator) +func (f *fixedSizeMmapAllocator) freeSlab(slab *_Slab) { + f.mu.Lock() // to prevent new allocation + defer f.mu.Unlock() -func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) (slice []byte, dec Deallocator, err error) { + if len(f.activeSlabs) < f.maxActiveSlabs { + return + } - select { + if slab.mask.Load() != 0 { + // has new allocation + return + } - case ptr := <-f.buffer1: - // from buffer1 - slice = unsafe.Slice((*byte)(ptr), f.size) - if hints&NoClear == 0 { - clear(slice[:clearSize]) + offset := -1 + for i, s := range f.activeSlabs { + if s == slab { + offset = i + break } + } + if offset == -1 { + // already moved + return + } - default: + // free slab memory + freeMem(slab.base, slab.objectSize*slabCapacity) + + // move to standby slabs + f.activeSlabs = slices.Delete(f.activeSlabs, offset, offset+1) + f.standbySlabs = append(f.standbySlabs, slab) + + // unmap standby slabs + for len(f.standbySlabs) > f.maxStandbySlabs { + slab := f.standbySlabs[0] + f.standbySlabs = slices.Delete(f.standbySlabs, 0, 1) + unix.Munmap( + unsafe.Slice( + (*byte)(slab.base), + slab.objectSize*slabCapacity, + ), + ) + } - select { +} - case ptr := <-f.buffer2: - // from buffer2 - f.reuseMem(ptr, hints, clearSize) - slice = unsafe.Slice((*byte)(ptr), f.size) +const slabCapacity = 64 // uint64 masked, so 64 - default: - // allocate new - slice, err = unix.Mmap( - -1, 0, - int(f.size), - unix.PROT_READ|unix.PROT_WRITE, - unix.MAP_PRIVATE|unix.MAP_ANONYMOUS, - ) - if err != nil { - return nil, nil, err - } +type _Slab struct { + base unsafe.Pointer + objectSize int + mask atomic.Uint64 +} +func (s *_Slab) allocate() (unsafe.Pointer, bool) { + for { + mask := s.mask.Load() + reverse := ^mask + if reverse == 0 { + // full + return nil, false + } + offset := bits.TrailingZeros64(reverse) + addr := unsafe.Add(s.base, offset*s.objectSize) + if s.mask.CompareAndSwap(mask, mask|(1<