Skip to content

Commit

Permalink
Fix big cache
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 10, 2025
1 parent 2a8db4d commit 49d578f
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 64 deletions.
98 changes: 75 additions & 23 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,50 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size)
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
return
}

stat, err := c.cache.StatBlob(ctx, info.Blobs)
if err == nil {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
if c.bigCache != nil && stat.Size() >= int64(c.bigCacheSize) {
stat, err := c.bigCache.StatBlob(ctx, info.Blobs)
if err == nil {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
}
} else {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
}
} else {
stat, err := c.bigCache.StatBlob(ctx, info.Blobs)
if err == nil {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
}
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
select {
case <-ctx.Done():
return
Expand All @@ -366,18 +394,36 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
utils.ServeError(rw, r, value.Error, 0)
return
}

if c.serveCachedBlobHead(rw, r, value.Size) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
return
}

if value.BigCache {
stat, err = c.bigCache.StatBlob(ctx, info.Blobs)
if err == nil {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
}
}

stat, err = c.cache.StatBlob(ctx, info.Blobs)
if err == nil {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
return
}
Expand Down Expand Up @@ -475,15 +521,15 @@ func (c *Agent) cacheBlob(info *BlobInfo) (int64, func() error, int, error) {
if err != nil {
return fmt.Errorf("Put to big cache: %w", err)
}
c.blobCache.PutNoTTL(info.Blobs, size)
c.blobCache.PutNoTTL(info.Blobs, size, true)
return nil
}

size, err := c.cache.PutBlob(ctx, info.Blobs, resp.Body)
if err != nil {
return fmt.Errorf("Put to cache: %w", err)
}
c.blobCache.Put(info.Blobs, size)
c.blobCache.Put(info.Blobs, size, false)
return nil
}

Expand All @@ -508,28 +554,29 @@ func (c *Agent) serveCachedBlobHead(rw http.ResponseWriter, r *http.Request, siz
return false
}

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {
func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {
referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
}

if c.bigCache != nil && c.bigCacheSize > 0 && size >= int64(c.bigCacheSize) {
u, err := c.bigCache.RedirectBlob(r.Context(), blob, referer)
if err != nil {
c.logger.Info("failed to redirect blob", "digest", blob, "error", err)
c.blobCache.Remove(info.Blobs)
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
return
}

c.blobCache.PutNoTTL(info.Blobs, size)

c.logger.Info("Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
u, err := c.bigCache.RedirectBlob(r.Context(), blob, referer)
if err != nil {
c.logger.Info("failed to redirect blob", "digest", blob, "error", err)
c.blobCache.Remove(info.Blobs)
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
return
}

c.blobCache.PutNoTTL(info.Blobs, size, true)

c.logger.Info("Big Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
return
}

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {

if c.blobNoRedirectSize < 0 || int64(c.blobNoRedirectSize) > size {
data, err := c.cache.GetBlob(r.Context(), info.Blobs)
if err != nil {
Expand All @@ -540,7 +587,7 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
}
defer data.Close()

c.blobCache.Put(info.Blobs, size)
c.blobCache.Put(info.Blobs, size, false)

rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
rw.Header().Set("Content-Type", "application/octet-stream")
Expand All @@ -561,6 +608,11 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
// fallback to redirect
}

referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
}

u, err := c.cache.RedirectBlob(r.Context(), blob, referer)
if err != nil {
c.logger.Info("failed to redirect blob", "digest", blob, "error", err)
Expand All @@ -569,7 +621,7 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
return
}

c.blobCache.Put(info.Blobs, size)
c.blobCache.Put(info.Blobs, size, false)

c.logger.Info("Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
Expand Down
11 changes: 7 additions & 4 deletions agent/blob_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,23 @@ func (m *blobCache) PutError(key string, err error, sc int) {
}, m.duration)
}

func (m *blobCache) Put(key string, size int64) {
func (m *blobCache) Put(key string, size int64, bigCache bool) {
m.digest.SetWithTTL(key, blobValue{
Size: size,
Size: size,
BigCache: bigCache,
}, m.duration)
}

func (m *blobCache) PutNoTTL(key string, size int64) {
func (m *blobCache) PutNoTTL(key string, size int64, bigCache bool) {
m.digest.Set(key, blobValue{
Size: size,
Size: size,
BigCache: bigCache,
})
}

type blobValue struct {
Size int64
BigCache bool
Error error
StatusCode int
}
88 changes: 51 additions & 37 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,56 @@ func (r *Runner) runOnceManifestSync(ctx context.Context) error {
}

func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64, gotSize, progress *atomic.Int64) error {
u := &url.URL{
Scheme: "https",
Host: host,
Path: fmt.Sprintf("/v2/%s/blobs/%s", name, blob),
}

if size == 0 {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
if err != nil {
return err
}
resp, err := r.httpClient.Do(req)
if err != nil {
return err
}
if resp.Body != nil {
_ = resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to head blob: status code %d", resp.StatusCode)
}
size = resp.ContentLength
}

if size > 0 {
gotSize.Store(size)
}

var caches []*cache.Cache

if r.bigCache != nil && r.bigCacheSize > 0 && size >= int64(r.bigCacheSize) {
if !r.bigCacheBackup {
caches = []*cache.Cache{r.bigCache}
} else {
caches = append([]*cache.Cache{r.bigCache}, r.caches...)
}
} else {
caches = append(caches, r.caches...)
}

var subCaches []*cache.Cache
for _, cache := range r.caches {
for _, cache := range caches {
stat, err := cache.StatBlob(ctx, blob)
if err == nil {
if size > 0 {
gotSize := stat.Size()
if size == gotSize {
continue
}
r.logger.Error("size is not meeting expectations", "digest", blob, "size", size, "gotSize", gotSize)
} else {
gotSize := stat.Size()
if size == gotSize {
continue
}
r.logger.Error("size is not meeting expectations", "digest", blob, "size", size, "gotSize", gotSize)
}
subCaches = append(subCaches, cache)
}
Expand All @@ -394,12 +431,6 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
return nil
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: fmt.Sprintf("/v2/%s/blobs/%s", name, blob),
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return err
Expand All @@ -411,7 +442,7 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to retrieve blob: status code %d", resp.StatusCode)
return fmt.Errorf("failed to get blob: status code %d", resp.StatusCode)
}

r.logger.Info("start sync blob", "digest", blob, "url", u.String())
Expand All @@ -422,32 +453,11 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
}
}

if size > 0 {
gotSize.Store(size)
}
if resp.ContentLength > 0 {
gotSize.Store(resp.ContentLength)
}

body := &readerCounter{
r: resp.Body,
counter: progress,
}

if r.bigCache != nil && r.bigCacheSize > 0 && gotSize.Load() >= int64(r.bigCacheSize) {
if !r.bigCacheBackup {
n, err := r.bigCache.PutBlob(ctx, blob, body)
if err != nil {
return fmt.Errorf("put blob failed: %w", err)
}

r.logger.Info("finish sync blob", "digest", blob, "size", n)
return nil
}

subCaches = append(subCaches, r.bigCache)
}

if len(subCaches) == 1 {
n, err := subCaches[0].PutBlob(ctx, blob, body)
if err != nil {
Expand Down Expand Up @@ -640,6 +650,10 @@ func (r *Runner) manifest(ctx context.Context, messageID int64, host, image, tag
_ = resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to head manifest: status code %d", resp.StatusCode)
}

digest := resp.Header.Get("Docker-Content-Digest")
if digest != "" {
for _, cache := range r.caches {
Expand Down Expand Up @@ -682,7 +696,7 @@ func (r *Runner) manifest(ctx context.Context, messageID int64, host, image, tag
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to retrieve manifest: status code %d", resp.StatusCode)
return fmt.Errorf("failed to get manifest: status code %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
Expand Down

0 comments on commit 49d578f

Please sign in to comment.