Skip to content

Commit

Permalink
now queryCache and resolveCache only cache PersistentCache
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Feb 5, 2024
1 parent 8813351 commit 1fcb224
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 93 deletions.
103 changes: 43 additions & 60 deletions storagev2/region/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/qiniu/go-sdk/v7/internal/cache"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
Expand Down Expand Up @@ -108,10 +107,10 @@ type (
const cacheFileName = "query_v4_01.cache.json"

var (
queryCaches map[uint64]BucketRegionsQuery
queryCachesLock sync.Mutex
defaultResolver = resolver.NewDefaultResolver()
defaultChooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
persistentCaches map[uint64]*cache.Cache
persistentCachesLock sync.Mutex
defaultResolver = resolver.NewDefaultResolver()
defaultChooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
)

// NewBucketRegionsQuery 创建空间区域查询器
Expand All @@ -132,18 +131,43 @@ func NewBucketRegionsQuery(bucketHosts Endpoints, opts *BucketRegionsQueryOption
opts.PersistentDuration = time.Minute
}

crc64Value := calcBucketRegionsQueryCrc64(bucketHosts, opts)
queryCachesLock.Lock()
defer queryCachesLock.Unlock()

if queryCaches == nil {
queryCaches = make(map[uint64]BucketRegionsQuery)
persistentCache, err := getPersistentCache(opts)
if err != nil {
return nil, err
}
r := opts.Resolver
cs := opts.Chooser
bf := opts.Backoff
if r == nil {
r = defaultResolver
}
if cs == nil {
cs = defaultChooser
}
return &bucketRegionsQuery{
bucketHosts: bucketHosts,
cache: persistentCache,
client: makeBucketQueryClient(opts.Client, bucketHosts, !opts.UseInsecureProtocol, opts.RetryMax, opts.HostFreezeDuration, r, cs, bf),
useHttps: !opts.UseInsecureProtocol,
}, nil
}

func getPersistentCache(opts *BucketRegionsQueryOptions) (*cache.Cache, error) {
var (
persistentCache *cache.Cache
ok bool
err error
)

crc64Value := calcPersistentCacheCrc64(opts)
persistentCachesLock.Lock()
defer persistentCachesLock.Unlock()

if query, ok := queryCaches[crc64Value]; ok {
return query, nil
} else {
persistentCache, err := cache.NewPersistentCache(
if persistentCaches == nil {
persistentCaches = make(map[uint64]*cache.Cache)
}
if persistentCache, ok = persistentCaches[crc64Value]; !ok {
persistentCache, err = cache.NewPersistentCache(
reflect.TypeOf(&v4QueryCacheValue{}),
opts.PersistentFilePath,
opts.CompactInterval,
Expand All @@ -154,24 +178,9 @@ func NewBucketRegionsQuery(bucketHosts Endpoints, opts *BucketRegionsQueryOption
if err != nil {
return nil, err
}
r := opts.Resolver
cs := opts.Chooser
bf := opts.Backoff
if r == nil {
r = defaultResolver
}
if cs == nil {
cs = defaultChooser
}
query = &bucketRegionsQuery{
bucketHosts: bucketHosts,
cache: persistentCache,
client: makeBucketQueryClient(opts.Client, bucketHosts, !opts.UseInsecureProtocol, opts.RetryMax, opts.HostFreezeDuration, r, cs, bf),
useHttps: !opts.UseInsecureProtocol,
}
queryCaches[crc64Value] = query
return query, nil
persistentCaches[crc64Value] = persistentCache
}
return persistentCache, nil
}

// Query 查询空间区域,返回 region.RegionsProvider
Expand Down Expand Up @@ -306,39 +315,13 @@ func makeBucketQueryClient(

func (opts *BucketRegionsQueryOptions) toBytes() []byte {
bytes := make([]byte, 0, 1024)
bytes = strconv.AppendBool(bytes, opts.UseInsecureProtocol)
bytes = strconv.AppendInt(bytes, int64(opts.CompactInterval), 36)
bytes = append(bytes, []byte(opts.PersistentFilePath)...)
bytes = append(bytes, byte(0))
bytes = strconv.AppendInt(bytes, int64(opts.PersistentDuration), 36)
bytes = strconv.AppendInt(bytes, int64(opts.RetryMax), 36)
bytes = strconv.AppendInt(bytes, int64(opts.HostFreezeDuration), 36)
if opts.Client != nil {
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Client))), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
if opts.Resolver != nil {
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Resolver))), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
if opts.Chooser != nil {
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Chooser))), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
if opts.Backoff != nil {
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Backoff))), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
return bytes
}

func calcBucketRegionsQueryCrc64(bucketHosts Endpoints, opts *BucketRegionsQueryOptions) uint64 {
hasher := crc64.New(crc64.MakeTable(crc64.ISO))
hasher.Write(bucketHosts.toBytes())
hasher.Write(opts.toBytes())
return hasher.Sum64()
func calcPersistentCacheCrc64(opts *BucketRegionsQueryOptions) uint64 {
return crc64.Checksum(opts.toBytes(), crc64.MakeTable(crc64.ISO))
}
66 changes: 33 additions & 33 deletions storagev2/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type (
cacheResolver struct {
resolver Resolver
cache *cache.Cache
crc64 string
cacheLifetime time.Duration
}

Expand Down Expand Up @@ -78,8 +77,8 @@ type (
const cacheFileName = "resolver_01.cache.json"

var (
resolverCaches map[uint64]*cacheResolver
resolverCachesLock sync.Mutex
persistentCaches map[uint64]*cache.Cache
persistentCachesLock sync.Mutex
staticDefaultResolver Resolver = &defaultResolver{}
)

Expand All @@ -104,40 +103,50 @@ func NewCacheResolver(resolver Resolver, opts *CacheResolverOptions) (Resolver,
resolver = staticDefaultResolver
}

crc64Value := calcCacheResolverCrc64(resolver, opts)
resolverCachesLock.Lock()
defer resolverCachesLock.Unlock()
persistentCache, err := getPersistentCache(opts)
if err != nil {
return nil, err
}
return &cacheResolver{
cache: persistentCache,
cacheLifetime: opts.CacheLifetime,
resolver: resolver,
}, nil
}

if resolverCaches == nil {
resolverCaches = make(map[uint64]*cacheResolver)
func getPersistentCache(opts *CacheResolverOptions) (*cache.Cache, error) {
var (
persistentCache *cache.Cache
ok bool
err error
)

crc64Value := calcPersistentCacheCrc64(opts)
persistentCachesLock.Lock()
defer persistentCachesLock.Unlock()

if persistentCaches == nil {
persistentCaches = make(map[uint64]*cache.Cache)
}

if cresolver, ok := resolverCaches[crc64Value]; ok {
return cresolver, nil
} else {
persistentCache, err := cache.NewPersistentCache(reflect.TypeOf(&resolverCacheValue{}), opts.PersistentFilePath, opts.CompactInterval, opts.PersistentDuration, func(err error) {
if persistentCache, ok = persistentCaches[crc64Value]; !ok {
persistentCache, err = cache.NewPersistentCache(reflect.TypeOf(&resolverCacheValue{}), opts.PersistentFilePath, opts.CompactInterval, opts.PersistentDuration, func(err error) {
log.Warn(fmt.Sprintf("CacheResolver persist error: %s", err))
})
if err != nil {
return nil, err
}
cresolver = &cacheResolver{
cache: persistentCache,
cacheLifetime: opts.CacheLifetime,
resolver: resolver,
crc64: strconv.FormatUint(crc64Value, 36),
}
resolverCaches[crc64Value] = cresolver
return cresolver, nil
persistentCaches[crc64Value] = persistentCache
}
return persistentCache, nil
}

func (resolver *cacheResolver) Resolve(ctx context.Context, host string) ([]net.IP, error) {
lip, err := resolver.localIp(host)
if err != nil {
return nil, err
}
cacheValue, status := resolver.cache.Get(resolver.crc64+":"+lip+":"+host, func() (cache.CacheValue, error) {
cacheValue, status := resolver.cache.Get(lip+":"+host, func() (cache.CacheValue, error) {
var ips []net.IP
if ips, err = resolver.resolver.Resolve(ctx, host); err != nil {
return nil, err
Expand Down Expand Up @@ -180,25 +189,16 @@ func (*cacheResolver) localIp(host string) (string, error) {
return conn.LocalAddr().(*net.UDPAddr).IP.String(), nil
}

func (opts *CacheResolverOptions) toBytes(resolver Resolver) []byte {
func (opts *CacheResolverOptions) toBytes() []byte {
bytes := make([]byte, 0, 1024)
if resolver != nil {
p := reflect.ValueOf(resolver).Pointer()
bytes = strconv.AppendUint(bytes, uint64(p), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
bytes = strconv.AppendInt(bytes, int64(opts.CompactInterval), 36)
bytes = strconv.AppendInt(bytes, int64(opts.HostFreezeDuration), 36)
bytes = strconv.AppendInt(bytes, int64(opts.PersistentDuration), 36)
bytes = strconv.AppendInt(bytes, int64(opts.CacheLifetime), 36)
bytes = append(bytes, []byte(opts.PersistentFilePath)...)
bytes = append(bytes, byte(0))
return bytes
}

func calcCacheResolverCrc64(resolver Resolver, opts *CacheResolverOptions) uint64 {
hasher := crc64.New(crc64.MakeTable(crc64.ISO))
hasher.Write(opts.toBytes(resolver))
return hasher.Sum64()
func calcPersistentCacheCrc64(opts *CacheResolverOptions) uint64 {
return crc64.Checksum(opts.toBytes(), crc64.MakeTable(crc64.ISO))
}

0 comments on commit 1fcb224

Please sign in to comment.