Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

value lru to rwmutex #212

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/jonboulle/clockwork v0.4.0
github.com/letsencrypt/pebble/v2 v2.4.0
github.com/rekby/fastuuid v0.9.0
github.com/rekby/safemutex v0.2.0
github.com/rekby/safemutex v0.2.1
golang.org/x/time v0.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/rekby/fastuuid v0.9.0 h1:iQk8V/AyqSrgQAtKRdqx/CVep+CaKwaSWeerw1yEP3Q=
github.com/rekby/fastuuid v0.9.0/go.mod h1:qP8Lh0BH2+4rNGVRDHmDpkvE/ZuLUhjmKpRWjx+WesY=
github.com/rekby/fixenv v0.3.1 h1:zOPocbQmcsxSIjiVu5U+9JAfeu6WeLN7a9ryZkGTGJY=
github.com/rekby/fixenv v0.3.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
github.com/rekby/safemutex v0.2.0 h1:iEfcPqsR3EApwWHwdHvp+srN9Wfna+IG8bSpN467Jmk=
github.com/rekby/safemutex v0.2.0/go.mod h1:6I/yJdmctX0RmxEp00RzYBJJXl3ona8PsBiIDqg0v+U=
github.com/rekby/safemutex v0.2.1 h1:4PzkGz1dlDpsI1+601WCqxlN2xbiNuQIjYZCOZI+Xp4=
github.com/rekby/safemutex v0.2.1/go.mod h1:6I/yJdmctX0RmxEp00RzYBJJXl3ona8PsBiIDqg0v+U=
github.com/rekby/zapcontext v0.0.4 h1:85600nHTteGCLcuOhGp/SzXHymm9QcCA5sn+MPKCodY=
github.com/rekby/zapcontext v0.0.4/go.mod h1:lTIxvHAwWXBZBPPfEvmAEXPbVEcTwd52VaASZWZWcxI=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
79 changes: 40 additions & 39 deletions internal/cache/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,73 @@ package cache
import (
"context"
"github.com/rekby/lets-proxy2/internal/log"
"github.com/rekby/safemutex"
zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"

zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
)

type DiskCache struct {
Dir string

mu sync.RWMutex
mu safemutex.RWMutex[struct{}]
}

func (c *DiskCache) filepath(key string) string {
return filepath.Join(c.Dir, diskCacheSanitizeKey(key))
}

func (c *DiskCache) Get(ctx context.Context, key string) ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()

filePath := c.filepath(key)

logLevel := zapcore.DebugLevel
res, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
err = ErrCacheMiss
} else {
logLevel = zapcore.ErrorLevel
func (c *DiskCache) Get(ctx context.Context, key string) (res []byte, err error) {
c.mu.RLock(func(synced struct{}) {
filePath := c.filepath(key)

logLevel := zapcore.DebugLevel
res, err = os.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
err = ErrCacheMiss
} else {
logLevel = zapcore.ErrorLevel
}
}
}

log.LevelParamCtx(ctx, logLevel, "Got from disk cache", zap.String("dir", c.Dir), zap.String("key", key),
zap.String("file", filePath), zap.Error(err))
log.LevelParamCtx(ctx, logLevel, "Got from disk cache", zap.String("dir", c.Dir), zap.String("key", key),
zap.String("file", filePath), zap.Error(err))
})

return res, err
}

func (c *DiskCache) Put(ctx context.Context, key string, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
func (c *DiskCache) Put(ctx context.Context, key string, data []byte) (err error) {
c.mu.Lock(func(synced struct{}) struct{} {
zc.L(ctx).Debug("Put to disk cache", zap.String("dir", c.Dir), zap.String("key", key))
err = os.WriteFile(c.filepath(key), data, 0600)
zc.L(ctx).Debug("Put to disk cache result.", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))

return synced
})

zc.L(ctx).Debug("Put to disk cache", zap.String("dir", c.Dir), zap.String("key", key))
err := ioutil.WriteFile(c.filepath(key), data, 0600)
zc.L(ctx).Debug("Put to disk cache result.", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))
return err
}

func (c *DiskCache) Delete(ctx context.Context, key string) error {
c.mu.Lock()
defer c.mu.Unlock()
func (c *DiskCache) Delete(ctx context.Context, key string) (err error) {
c.mu.Lock(func(synced struct{}) struct{} {
zc.L(ctx).Debug("Delete from cache", zap.String("dir", c.Dir), zap.String("key", key))
err = os.Remove(c.filepath(key))
zc.L(ctx).Debug("Delete from cache result", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))

if os.IsNotExist(err) {
err = nil
}

zc.L(ctx).Debug("Delete from cache", zap.String("dir", c.Dir), zap.String("key", key))
err := os.Remove(c.filepath(key))
zc.L(ctx).Debug("Delete from cache result", zap.String("dir", c.Dir), zap.String("key", key),
zap.Error(err))
return synced
})

if os.IsNotExist(err) {
err = nil
}
return err
}

Expand Down
42 changes: 23 additions & 19 deletions internal/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package cache

import (
"context"
"sync"

"github.com/rekby/safemutex"
zc "github.com/rekby/zapcontext"
"go.uber.org/zap"
)

type MemoryCache struct {
Name string // use for log

mu sync.RWMutex
m map[string][]byte
stateMutex safemutex.RWMutexWithPointers[map[string][]byte]
}

func NewMemoryCache(name string) *MemoryCache {
return &MemoryCache{
Name: name,
m: make(map[string][]byte),
Name: name,
stateMutex: safemutex.RWNewWithPointers(map[string][]byte{}),
}
}

Expand All @@ -28,12 +26,14 @@ func (c *MemoryCache) Get(ctx context.Context, key string) (data []byte, err err
zap.String("key", key), zap.Int("data_len", len(data)), zap.Error(err))
}()

c.mu.RLock()
defer c.mu.RUnlock()
if resp, exist := c.m[key]; exist {
return resp, nil
}
return nil, ErrCacheMiss
c.stateMutex.RLock(func(synced map[string][]byte) {
if resp, exist := synced[key]; exist {
data = resp
return
}
err = ErrCacheMiss
})
return data, err
}

func (c *MemoryCache) Put(ctx context.Context, key string, data []byte) (err error) {
Expand All @@ -42,9 +42,13 @@ func (c *MemoryCache) Put(ctx context.Context, key string, data []byte) (err err
zap.String("key", key), zap.Int("data_len", len(data)), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()
c.m[key] = data
localCopy := make([]byte, len(data))
copy(localCopy, data)
c.stateMutex.Lock(func(synced map[string][]byte) map[string][]byte {
synced[key] = localCopy
return synced
})

return nil
}

Expand All @@ -54,10 +58,10 @@ func (c *MemoryCache) Delete(ctx context.Context, key string) (err error) {
zap.String("key", key), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()

delete(c.m, key)
c.stateMutex.Lock(func(synced map[string][]byte) map[string][]byte {
delete(synced, key)
return synced
})

return nil
}
111 changes: 61 additions & 50 deletions internal/cache/value_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package cache

import (
"context"
"github.com/rekby/safemutex"
"math"
"sort"
"sync"
"sync/atomic"

zc "github.com/rekby/zapcontext"
Expand All @@ -28,14 +28,19 @@ type MemoryValueLRU struct {
CleanCount int

lastTime uint64
mu sync.RWMutex
m map[string]*memoryValueLRUItem // stored always non nil item
mu safemutex.RWMutexWithPointers[memoryValueLRUSynced]
}

type memoryValueLRUSynced struct {
Items map[string]*memoryValueLRUItem // always stored non nil items
}

func NewMemoryValueLRU(name string) *MemoryValueLRU {
return &MemoryValueLRU{
Name: name,
m: make(map[string]*memoryValueLRUItem, defaultMemoryLimitSize+1),
Name: name,
mu: safemutex.RWNewWithPointers(memoryValueLRUSynced{
Items: make(map[string]*memoryValueLRUItem, defaultMemoryLimitSize+1),
}),
MaxSize: defaultMemoryLimitSize,
CleanCount: defaultLRUCleanCount,
}
Expand All @@ -47,14 +52,16 @@ func (c *MemoryValueLRU) Get(ctx context.Context, key string) (value interface{}
zap.String("key", key), zap.Reflect("value", value), zap.Error(err))
}()

c.mu.RLock()
defer c.mu.RUnlock()
c.mu.RLock(func(synced memoryValueLRUSynced) {
if resp, exist := synced.Items[key]; exist {
resp.lastUsedTime.Store(c.time())
value = resp.value
return
}
err = ErrCacheMiss
})

if resp, exist := c.m[key]; exist {
resp.lastUsedTime.Store(c.time())
return resp.value, nil
}
return nil, ErrCacheMiss
return value, err
}

func (c *MemoryValueLRU) Put(ctx context.Context, key string, value interface{}) (err error) {
Expand All @@ -63,13 +70,16 @@ func (c *MemoryValueLRU) Put(ctx context.Context, key string, value interface{})
zap.String("key", key), zap.Reflect("data_len", value), zap.Error(err))
}()

c.mu.Lock()
c.m[key] = &memoryValueLRUItem{key: key, value: value, lastUsedTime: newUint64Atomic(c.time())}
if len(c.m) > c.MaxSize {
// handlepanic: no external call
go c.clean()
}
c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
synced.Items[key] = &memoryValueLRUItem{key: key, value: value, lastUsedTime: newUint64Atomic(c.time())}
if len(synced.Items) > c.MaxSize {
// handlepanic: no external call
go c.clean()
}

return synced
})

return nil
}

Expand All @@ -79,10 +89,10 @@ func (c *MemoryValueLRU) Delete(ctx context.Context, key string) (err error) {
zap.String("key", key), zap.Error(err))
}()

c.mu.Lock()
defer c.mu.Unlock()

delete(c.m, key)
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
delete(synced.Items, key)
return synced
})

return nil
}
Expand All @@ -97,20 +107,19 @@ func (c *MemoryValueLRU) time() uint64 {
}

func (c *MemoryValueLRU) renumberTime() {
c.mu.Lock()

items := c.getSortedItems()
for i, item := range items {
item.lastUsedTime.Store(uint64(i))
}

c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
items := c.getSortedItems(&synced)
for i, item := range items {
item.lastUsedTime.Store(uint64(i))
}
return synced
})
}

// must called from locked state
func (c *MemoryValueLRU) getSortedItems() []*memoryValueLRUItem {
items := make([]*memoryValueLRUItem, 0, len(c.m))
for _, item := range c.m {
func (c *MemoryValueLRU) getSortedItems(synced *memoryValueLRUSynced) []*memoryValueLRUItem {
items := make([]*memoryValueLRUItem, 0, len(synced.Items))
for _, item := range synced.Items {
items = append(items, item)
}

Expand All @@ -121,28 +130,30 @@ func (c *MemoryValueLRU) getSortedItems() []*memoryValueLRUItem {
}

func (c *MemoryValueLRU) clean() {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.Lock(func(synced memoryValueLRUSynced) memoryValueLRUSynced {
if c.CleanCount == 0 {
return synced
}

if c.CleanCount == 0 {
return
}
if len(synced.Items) <= c.MaxSize {
return synced
}

if len(c.m) <= c.MaxSize {
return
}
if c.CleanCount >= c.MaxSize {
synced.Items = make(map[string]*memoryValueLRUItem, c.MaxSize+1)
return synced
}

if c.CleanCount >= c.MaxSize {
c.m = make(map[string]*memoryValueLRUItem, c.MaxSize+1)
return
}
items := c.getSortedItems(&synced)

items := c.getSortedItems()
for i := 0; i < c.CleanCount; i++ {
delete(synced.Items, items[i].key)
}

for i := 0; i < c.CleanCount; i++ {
delete(c.m, items[i].key)
}
return synced
})
}

func newUint64Atomic(val uint64) atomic.Uint64 {
var v atomic.Uint64
v.Store(val)
Expand Down
Loading