Skip to content

Commit

Permalink
Reapply "malloc: refactor mmap allocator (matrixorigin#21332)" (matri…
Browse files Browse the repository at this point in the history
…xorigin#21406)

This reverts commit d877a55.
  • Loading branch information
reusee committed Feb 21, 2025
1 parent 5b85c5b commit c80b6c6
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 97 deletions.
14 changes: 7 additions & 7 deletions pkg/common/malloc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,27 @@ var defaultConfig = func() *atomic.Pointer[Config] {
func patchConfig(config Config, delta Config) Config {
if delta.CheckFraction != nil {
config.CheckFraction = delta.CheckFraction
logutil.Info("malloc set config", zap.Any("CheckFraction", *delta.CheckFraction))
logutil.Debug("malloc set config", zap.Any("CheckFraction", *delta.CheckFraction))
}
if delta.EnableMetrics != nil {
config.EnableMetrics = delta.EnableMetrics
logutil.Info("malloc set config", zap.Any("EnableMetrics", *delta.EnableMetrics))
logutil.Debug("malloc set config", zap.Any("EnableMetrics", *delta.EnableMetrics))
}
if delta.FullStackFraction != nil {
config.FullStackFraction = delta.FullStackFraction
logutil.Info("malloc set config", zap.Any("FullStackFraction", *delta.FullStackFraction))
logutil.Debug("malloc set config", zap.Any("FullStackFraction", *delta.FullStackFraction))
}
if delta.Allocator != nil {
config.Allocator = delta.Allocator
logutil.Info("malloc set config", zap.Any("Allocator", *delta.Allocator))
logutil.Debug("malloc set config", zap.Any("Allocator", *delta.Allocator))
}
if delta.HashmapSoftLimit != nil {
config.HashmapSoftLimit = delta.HashmapSoftLimit
logutil.Info("malloc set config", zap.Any("HashmapSoftLimit", *delta.HashmapSoftLimit))
logutil.Debug("malloc set config", zap.Any("HashmapSoftLimit", *delta.HashmapSoftLimit))
}
if delta.HashmapHardLimit != nil {
config.HashmapHardLimit = delta.HashmapHardLimit
logutil.Info("malloc set config", zap.Any("HashmapHardLimit", *delta.HashmapHardLimit))
logutil.Debug("malloc set config", zap.Any("HashmapHardLimit", *delta.HashmapHardLimit))
}
return config
}
Expand All @@ -89,7 +89,7 @@ func SetDefaultConfig(delta Config) {
config := *defaultConfig.Load()
config = patchConfig(config, delta)
defaultConfig.Store(&config)
logutil.Info("malloc: set default config", zap.Any("config", delta))
logutil.Debug("malloc: set default config", zap.Any("config", delta))
}

func GetDefaultConfig() Config {
Expand Down
241 changes: 169 additions & 72 deletions pkg/common/malloc/fixed_size_mmap_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,31 @@
package malloc

import (
"math/bits"
"slices"
"sync"
"sync/atomic"
"unsafe"

"golang.org/x/sys/unix"
)

const (
// Classes with smaller size than smallClassCap will buffer min((smallClassCap/size), maxBuffer1Cap) objects in buffer1
smallClassCap = 1 * MB
maxBuffer1Cap = 256

// objects in buffer2 will be MADV_DONTNEED-advised and will not occupy RSS, so it's safe to use a large number
buffer2Cap = 1024
)

type fixedSizeMmapAllocator struct {
size uint64
// buffer1 buffers objects
buffer1 chan unsafe.Pointer
// buffer2 buffers MADV_DONTNEED objects
buffer2 chan unsafe.Pointer

mu sync.Mutex // it's OK to use mutex since this allocator will be in ShardedAllocator
activeSlabs []*_Slab // active slabs
maxActiveSlabs int // max active slabs
standbySlabs []*_Slab // slabs still mapped but no physical memory backed
maxStandbySlabs int // max standby slabs

deallocatorPool *ClosureDeallocatorPool[fixedSizeMmapDeallocatorArgs, *fixedSizeMmapDeallocatorArgs]
}

type fixedSizeMmapDeallocatorArgs struct {
length uint64
slab *_Slab
ptr unsafe.Pointer
length uint64
}

func (f fixedSizeMmapDeallocatorArgs) As(trait Trait) bool {
Expand All @@ -60,97 +58,196 @@ type MmapInfo struct {

func (*MmapInfo) IsTrait() {}

const (
maxActiveSlabs = 256
maxActiveBytes = 1 * MB
minActiveSlabs = 4
maxStandbySlabs = 1024
maxStandbyBytes = 16 * MB
minStandbySlabs = 4
)

func NewFixedSizeMmapAllocator(
size uint64,
) (ret *fixedSizeMmapAllocator) {

// if size is larger than smallClassCap, num1 will be zero, buffer1 will be empty
num1 := smallClassCap / size
if num1 > maxBuffer1Cap {
// don't buffer too much, since chans with larger buffer consume more memory
num1 = maxBuffer1Cap
}

ret = &fixedSizeMmapAllocator{
size: size,
buffer1: make(chan unsafe.Pointer, num1),
buffer2: make(chan unsafe.Pointer, buffer2Cap),
size: size,

maxActiveSlabs: max(
min(
maxActiveSlabs,
maxActiveBytes/(int(size)*slabCapacity),
),
minActiveSlabs,
),

maxStandbySlabs: max(
min(
maxStandbySlabs,
maxStandbyBytes/(int(size)*slabCapacity),
),
minStandbySlabs,
),

deallocatorPool: NewClosureDeallocatorPool(
func(hints Hints, args *fixedSizeMmapDeallocatorArgs) {

if hints&DoNotReuse > 0 {
ret.freeMem(args.ptr)
return
empty := args.slab.free(args.ptr)
if empty {
ret.freeSlab(args.slab)
}
},
),
}

select {
return ret
}

case ret.buffer1 <- args.ptr:
// buffer in buffer1
var _ FixedSizeAllocator = new(fixedSizeMmapAllocator)

default:
func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) ([]byte, Deallocator, error) {
slab, ptr, err := f.allocate()
if err != nil {
return nil, nil, err
}

ret.freeMem(args.ptr)
slice := unsafe.Slice(
(*byte)(ptr),
f.size,
)
if hints&NoClear == 0 {
clear(slice[:min(clearSize, f.size)])
}

select {
return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{
slab: slab,
ptr: ptr,
length: f.size,
}), nil
}

case ret.buffer2 <- args.ptr:
// buffer in buffer2
func (f *fixedSizeMmapAllocator) allocate() (*_Slab, unsafe.Pointer, error) {
f.mu.Lock()
defer f.mu.Unlock()

default:
// from existing
for _, slab := range f.activeSlabs {
ptr, ok := slab.allocate()
if ok {
return slab, ptr, nil
}
}

}
// empty or all full
// from standby slabs
if len(f.standbySlabs) > 0 {
slab := f.standbySlabs[len(f.standbySlabs)-1]
f.standbySlabs = f.standbySlabs[:len(f.standbySlabs)-1]
reuseMem(slab.base, slab.objectSize*slabCapacity)
f.activeSlabs = append(f.activeSlabs, slab)
ptr, _ := slab.allocate()
return slab, ptr, nil
}

}
// allocate new slab
slice, err := unix.Mmap(
-1, 0,
int(f.size*slabCapacity),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_PRIVATE|unix.MAP_ANONYMOUS,
)
if err != nil {
return nil, nil, err
}

},
),
base := unsafe.Pointer(unsafe.SliceData(slice))
slab := &_Slab{
base: base,
objectSize: int(f.size),
}
f.activeSlabs = append(f.activeSlabs, slab)

return ret
ptr, _ := slab.allocate()
return slab, ptr, nil
}

var _ FixedSizeAllocator = new(fixedSizeMmapAllocator)
func (f *fixedSizeMmapAllocator) freeSlab(slab *_Slab) {
f.mu.Lock() // to prevent new allocation
defer f.mu.Unlock()

func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) (slice []byte, dec Deallocator, err error) {
if len(f.activeSlabs) < f.maxActiveSlabs {
return
}

select {
if slab.mask.Load() != 0 {
// has new allocation
return
}

case ptr := <-f.buffer1:
// from buffer1
slice = unsafe.Slice((*byte)(ptr), f.size)
if hints&NoClear == 0 {
clear(slice[:clearSize])
offset := -1
for i, s := range f.activeSlabs {
if s == slab {
offset = i
break
}
}
if offset == -1 {
// already moved
return
}

default:
// free slab memory
freeMem(slab.base, slab.objectSize*slabCapacity)

// move to standby slabs
f.activeSlabs = slices.Delete(f.activeSlabs, offset, offset+1)
f.standbySlabs = append(f.standbySlabs, slab)

// unmap standby slabs
for len(f.standbySlabs) > f.maxStandbySlabs {
slab := f.standbySlabs[0]
f.standbySlabs = slices.Delete(f.standbySlabs, 0, 1)
unix.Munmap(
unsafe.Slice(
(*byte)(slab.base),
slab.objectSize*slabCapacity,
),
)
}

select {
}

case ptr := <-f.buffer2:
// from buffer2
f.reuseMem(ptr, hints, clearSize)
slice = unsafe.Slice((*byte)(ptr), f.size)
const slabCapacity = 64 // uint64 masked, so 64

default:
// allocate new
slice, err = unix.Mmap(
-1, 0,
int(f.size),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_PRIVATE|unix.MAP_ANONYMOUS,
)
if err != nil {
return nil, nil, err
}
type _Slab struct {
base unsafe.Pointer
objectSize int
mask atomic.Uint64
}

func (s *_Slab) allocate() (unsafe.Pointer, bool) {
for {
mask := s.mask.Load()
reverse := ^mask
if reverse == 0 {
// full
return nil, false
}
offset := bits.TrailingZeros64(reverse)
addr := unsafe.Add(s.base, offset*s.objectSize)
if s.mask.CompareAndSwap(mask, mask|(1<<offset)) {
return addr, true
}

}
}

return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{
ptr: unsafe.Pointer(unsafe.SliceData(slice)),
length: f.size,
}), nil
func (s *_Slab) free(ptr unsafe.Pointer) bool {
offset := (uintptr(ptr) - uintptr(s.base)) / uintptr(s.objectSize)
for {
mask := s.mask.Load()
newMask := mask & ^(uint64(1) << offset)
if s.mask.CompareAndSwap(mask, newMask) {
return newMask == 0
}
}
}
Loading

0 comments on commit c80b6c6

Please sign in to comment.