Skip to content

Commit

Permalink
value lru to rwmutex
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Sep 14, 2023
1 parent 2b2facb commit 5b3f632
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 259 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/jonboulle/clockwork v0.4.0
github.com/letsencrypt/pebble/v2 v2.4.0
github.com/rekby/fastuuid v0.9.0
github.com/rekby/safemutex v0.2.0
github.com/rekby/safemutex v0.2.1
golang.org/x/time v0.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/rekby/fastuuid v0.9.0 h1:iQk8V/AyqSrgQAtKRdqx/CVep+CaKwaSWeerw1yEP3Q=
github.com/rekby/fastuuid v0.9.0/go.mod h1:qP8Lh0BH2+4rNGVRDHmDpkvE/ZuLUhjmKpRWjx+WesY=
github.com/rekby/fixenv v0.3.1 h1:zOPocbQmcsxSIjiVu5U+9JAfeu6WeLN7a9ryZkGTGJY=
github.com/rekby/fixenv v0.3.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
github.com/rekby/safemutex v0.2.0 h1:iEfcPqsR3EApwWHwdHvp+srN9Wfna+IG8bSpN467Jmk=
github.com/rekby/safemutex v0.2.0/go.mod h1:6I/yJdmctX0RmxEp00RzYBJJXl3ona8PsBiIDqg0v+U=
github.com/rekby/safemutex v0.2.1 h1:4PzkGz1dlDpsI1+601WCqxlN2xbiNuQIjYZCOZI+Xp4=
github.com/rekby/safemutex v0.2.1/go.mod h1:6I/yJdmctX0RmxEp00RzYBJJXl3ona8PsBiIDqg0v+U=
github.com/rekby/zapcontext v0.0.4 h1:85600nHTteGCLcuOhGp/SzXHymm9QcCA5sn+MPKCodY=
github.com/rekby/zapcontext v0.0.4/go.mod h1:lTIxvHAwWXBZBPPfEvmAEXPbVEcTwd52VaASZWZWcxI=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
79 changes: 40 additions & 39 deletions internal/cache/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,73 @@ package cache
import (
"context"
"github.com/rekby/lets-proxy2/internal/log"
"github.com/rekby/safemutex"
zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"

zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
)

type DiskCache struct {
Dir string

mu sync.RWMutex
mu safemutex.RWMutex[struct{}]
}

func (c *DiskCache) filepath(key string) string {
return filepath.Join(c.Dir, diskCacheSanitizeKey(key))
}

func (c *DiskCache) Get(ctx context.Context, key string) ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()

filePath := c.filepath(key)

logLevel := zapcore.DebugLevel
res, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
err = ErrCacheMiss
} else {
logLevel = zapcore.ErrorLevel
func (c *DiskCache) Get(ctx context.Context, key string) (res []byte, err error) {
c.mu.RLock(func(synced struct{}) {
filePath := c.filepath(key)

logLevel := zapcore.DebugLevel
res, err = os.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
err = ErrCacheMiss
} else {
logLevel = zapcore.ErrorLevel
}
}
}

log.LevelParamCtx(ctx, logLevel, "Got from disk cache", zap.String("dir", c.Dir), zap.String("key", key),
zap.String("file", filePath), zap.Error(err))
log.LevelParamCtx(ctx, logLevel, "Got from disk cache", zap.String("dir", c.Dir), zap.String("key", key),
zap.String("file", filePath), zap.Error(err))
})

return res, err
}

func (c *DiskCache) Put(ctx context.Context, key string, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
func (c *DiskCache) Put(ctx context.Context, key string, data []byte) (err error) {
c.mu.Lock(func(synced struct{}) struct{} {
zc.L(ctx).Debug("Put to disk cache", zap.String("dir", c.Dir), zap.String("key", key))
err = os.WriteFile(c.filepath(key), data, 0600)
zc.L(ctx).Debug("Put to disk cache result.", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))

return synced
})

zc.L(ctx).Debug("Put to disk cache", zap.String("dir", c.Dir), zap.String("key", key))
err := ioutil.WriteFile(c.filepath(key), data, 0600)
zc.L(ctx).Debug("Put to disk cache result.", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))
return err
}

func (c *DiskCache) Delete(ctx context.Context, key string) error {
c.mu.Lock()
defer c.mu.Unlock()
func (c *DiskCache) Delete(ctx context.Context, key string) (err error) {
c.mu.Lock(func(synced struct{}) struct{} {
zc.L(ctx).Debug("Delete from cache", zap.String("dir", c.Dir), zap.String("key", key))
err = os.Remove(c.filepath(key))
zc.L(ctx).Debug("Delete from cache result", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))

if os.IsNotExist(err) {
err = nil
}

zc.L(ctx).Debug("Delete from cache", zap.String("dir", c.Dir), zap.String("key", key))
err := os.Remove(c.filepath(key))
zc.L(ctx).Debug("Delete from cache result", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))
return synced
})

if os.IsNotExist(err) {
err = nil
}
return err
}

Expand Down
42 changes: 23 additions & 19 deletions internal/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package cache

import (
"context"
"sync"

"github.com/rekby/safemutex"
zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
)

type MemoryCache struct {
Name string // use for log

mu sync.RWMutex
m map[string][]byte
stateMutex safemutex.RWMutexWithPointers[map[string][]byte]
}

func NewMemoryCache(name string) *MemoryCache {
return &MemoryCache{
Name: name,
m: make(map[string][]byte),
Name: name,
stateMutex: safemutex.RWNewWithPointers(map[string][]byte{}),
}
}

Expand All @@ -28,12 +26,14 @@ func (c *MemoryCache) Get(ctx context.Context, key string) (data []byte, err err
zap.String("key", key), zap.Int("data_len", len(data)), zap.Error(err))
}()

c.mu.RLock()
defer c.mu.RUnlock()
if resp, exist := c.m[key]; exist {
return resp, nil
}
return nil, ErrCacheMiss
c.stateMutex.RLock(func(synced map[string][]byte) {
if resp, exist := synced[key]; exist {
data = resp
return
}
err = ErrCacheMiss
})
return data, err
}

func (c *MemoryCache) Put(ctx context.Context, key string, data []byte) (err error) {
Expand All @@ -42,9 +42,13 @@ func (c *MemoryCache) Put(ctx context.Context, key string, data []byte) (err err
zap.String("key", key), zap.Int("data_len", len(data)), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()
c.m[key] = data
localCopy := make([]byte, len(data))
copy(localCopy, data)
c.stateMutex.Lock(func(synced map[string][]byte) map[string][]byte {
synced[key] = localCopy
return synced
})

return nil
}

Expand All @@ -54,10 +58,10 @@ func (c *MemoryCache) Delete(ctx context.Context, key string) (err error) {
zap.String("key", key), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()

delete(c.m, key)
c.stateMutex.Lock(func(synced map[string][]byte) map[string][]byte {
delete(synced, key)
return synced
})

return nil
}
111 changes: 61 additions & 50 deletions internal/cache/value_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package cache

import (
"context"
"github.com/rekby/safemutex"
"math"
"sort"
"sync"
"sync/atomic"

zc "github.com/rekby/zapcontext"
Expand All @@ -28,14 +28,19 @@ type MemoryValueLRU struct {
CleanCount int

lastTime uint64
mu sync.RWMutex
m map[string]*memoryValueLRUItem // stored always non nil item
mu safemutex.RWMutexWithPointers[memoryValueLRUSynced]
}

type memoryValueLRUSynced struct {
Items map[string]*memoryValueLRUItem // always stored non nil items
}

func NewMemoryValueLRU(name string) *MemoryValueLRU {
return &MemoryValueLRU{
Name: name,
m: make(map[string]*memoryValueLRUItem, defaultMemoryLimitSize+1),
Name: name,
mu: safemutex.RWNewWithPointers(memoryValueLRUSynced{
Items: make(map[string]*memoryValueLRUItem, defaultMemoryLimitSize+1),
}),
MaxSize: defaultMemoryLimitSize,
CleanCount: defaultLRUCleanCount,
}
Expand All @@ -47,14 +52,16 @@ func (c *MemoryValueLRU) Get(ctx context.Context, key string) (value interface{}
zap.String("key", key), zap.Reflect("value", value), zap.Error(err))
}()

c.mu.RLock()
defer c.mu.RUnlock()
c.mu.RLock(func(synced memoryValueLRUSynced) {
if resp, exist := synced.Items[key]; exist {
resp.lastUsedTime.Store(c.time())
value = resp.value
return
}
err = ErrCacheMiss
})

if resp, exist := c.m[key]; exist {
resp.lastUsedTime.Store(c.time())
return resp.value, nil
}
return nil, ErrCacheMiss
return value, err
}

func (c *MemoryValueLRU) Put(ctx context.Context, key string, value interface{}) (err error) {
Expand All @@ -63,13 +70,16 @@ func (c *MemoryValueLRU) Put(ctx context.Context, key string, value interface{})
zap.String("key", key), zap.Reflect("data_len", value), zap.Error(err))
}()

c.mu.Lock()
c.m[key] = &memoryValueLRUItem{key: key, value: value, lastUsedTime: newUint64Atomic(c.time())}
if len(c.m) > c.MaxSize {
// handlepanic: no external call
go c.clean()
}
c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
synced.Items[key] = &memoryValueLRUItem{key: key, value: value, lastUsedTime: newUint64Atomic(c.time())}
if len(synced.Items) > c.MaxSize {
// handlepanic: no external call
go c.clean()
}

return synced
})

return nil
}

Expand All @@ -79,10 +89,10 @@ func (c *MemoryValueLRU) Delete(ctx context.Context, key string) (err error) {
zap.String("key", key), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()

delete(c.m, key)
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
delete(synced.Items, key)
return synced
})

return nil
}
Expand All @@ -97,20 +107,19 @@ func (c *MemoryValueLRU) time() uint64 {
}

func (c *MemoryValueLRU) renumberTime() {
c.mu.Lock()

items := c.getSortedItems()
for i, item := range items {
item.lastUsedTime.Store(uint64(i))
}

c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
items := c.getSortedItems(&synced)
for i, item := range items {
item.lastUsedTime.Store(uint64(i))
}
return synced
})
}

// must called from locked state
func (c *MemoryValueLRU) getSortedItems() []*memoryValueLRUItem {
items := make([]*memoryValueLRUItem, 0, len(c.m))
for _, item := range c.m {
func (c *MemoryValueLRU) getSortedItems(synced *memoryValueLRUSynced) []*memoryValueLRUItem {
items := make([]*memoryValueLRUItem, 0, len(synced.Items))
for _, item := range synced.Items {
items = append(items, item)
}

Expand All @@ -121,28 +130,30 @@ func (c *MemoryValueLRU) getSortedItems() []*memoryValueLRUItem {
}

func (c *MemoryValueLRU) clean() {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
if c.CleanCount == 0 {
return synced
}

if c.CleanCount == 0 {
return
}
if len(synced.Items) <= c.MaxSize {
return synced
}

if len(c.m) <= c.MaxSize {
return
}
if c.CleanCount >= c.MaxSize {
synced.Items = make(map[string]*memoryValueLRUItem, c.MaxSize+1)
return synced
}

if c.CleanCount >= c.MaxSize {
c.m = make(map[string]*memoryValueLRUItem, c.MaxSize+1)
return
}
items := c.getSortedItems(&synced)

items := c.getSortedItems()
for i := 0; i < c.CleanCount; i++ {
delete(synced.Items, items[i].key)
}

for i := 0; i < c.CleanCount; i++ {
delete(c.m, items[i].key)
}
return synced
})
}

func newUint64Atomic(val uint64) atomic.Uint64 {
var v atomic.Uint64
v.Store(val)
Expand Down
Loading

0 comments on commit 5b3f632

Please sign in to comment.