Skip to content

Commit

Permalink
Support cache manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jun 24, 2024
1 parent 44807f4 commit 1265a53
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 8 deletions.
11 changes: 8 additions & 3 deletions crproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,14 @@ func (c *CRProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
}

if c.storageDriver != nil && info.Blobs != "" {
c.cacheBlobResponse(rw, r, info)
return
if c.storageDriver != nil {
if info.Blobs != "" {
c.cacheBlobResponse(rw, r, info)
return
} else if info.Manifests != "" {
c.cacheManifestResponse(rw, r, info)
return
}
}
c.directResponse(rw, r, info)
}
Expand Down
162 changes: 162 additions & 0 deletions crproxy_manifest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package crproxy

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/textproto"
"path"
"strconv"
"strings"

"github.com/distribution/distribution/v3/registry/api/errcode"
)

func manifestRevisionsCachePath(host, image, tagOrBlob string) string {
return path.Join("/docker/registry/v2/repositories", host, image, "_manifests/revisions/sha256", tagOrBlob, "link")
}

func manifestTagCachePath(host, image, tagOrBlob string) string {
return path.Join("/docker/registry/v2/repositories", host, image, "_manifests/tags", tagOrBlob, "current/link")
}

func (c *CRProxy) cacheManifestResponse(rw http.ResponseWriter, r *http.Request, info *PathInfo) {
cli := c.getClientset(info.Host, info.Image)
resp, err := c.doWithAuth(cli, r, info.Host)
if err != nil {
if c.cachedManifest(rw, r, info) {
return
}
if c.logger != nil {
c.logger.Println("failed to request", info.Host, info.Image, err)
}
errcode.ServeJSON(rw, errcode.ErrorCodeUnknown)
return
}
defer func() {
resp.Body.Close()
}()

switch resp.StatusCode {
case http.StatusUnauthorized, http.StatusForbidden:
errcode.ServeJSON(rw, errcode.ErrorCodeDenied)
return
}

if resp.StatusCode >= http.StatusInternalServerError {
if c.cachedManifest(rw, r, info) {
return
}
}

resp.Header.Del("Docker-Ratelimit-Source")

header := rw.Header()
for k, v := range resp.Header {
key := textproto.CanonicalMIMEHeaderKey(k)
header[key] = v
}

if r.Method == http.MethodHead {
rw.WriteHeader(resp.StatusCode)
return
}

body, err := io.ReadAll(resp.Body)
if err != nil {
c.errorResponse(rw, r, err)
return
}
err = c.cacheManifestContent(context.Background(), info, body)
if err != nil {
c.errorResponse(rw, r, err)
return
}
rw.WriteHeader(resp.StatusCode)
rw.Write(body)
}

func (c *CRProxy) cacheManifestContent(ctx context.Context, info *PathInfo, content []byte) error {
h := sha256.New()
h.Write(content)
hash := hex.EncodeToString(h.Sum(nil)[:])

if strings.HasPrefix(info.Manifests, "sha256:") {
if info.Manifests[7:] != hash {
return fmt.Errorf("expected hash %s is not same to %s, %s", info.Manifests[7:], hash)
}
} else {
manifestLinkPath := manifestTagCachePath(info.Host, info.Image, info.Manifests)
err := c.storageDriver.PutContent(ctx, manifestLinkPath, []byte("sha256:"+hash))
if err != nil {
return err
}
}

manifestLinkPath := manifestRevisionsCachePath(info.Host, info.Image, hash)
err := c.storageDriver.PutContent(ctx, manifestLinkPath, []byte("sha256:"+hash))
if err != nil {
return err
}

blobCachePath := blobCachePath(hash)
err = c.storageDriver.PutContent(ctx, blobCachePath, content)
if err != nil {
return err
}

return nil
}

func (c *CRProxy) cachedManifest(rw http.ResponseWriter, r *http.Request, info *PathInfo) bool {
ctx := r.Context()
var manifestLinkPath string
if strings.HasPrefix(info.Manifests, "sha256:") {
manifestLinkPath = manifestRevisionsCachePath(info.Host, info.Image, info.Manifests[7:])
} else {
manifestLinkPath = manifestTagCachePath(info.Host, info.Image, info.Manifests)
}

content, err := c.storageDriver.GetContent(ctx, manifestLinkPath)
if err == nil {
digest := string(content)
blobCachePath := blobCachePath(digest)
content, err := c.storageDriver.GetContent(ctx, blobCachePath)
if err == nil {

mt := struct {
MediaType string `json:"mediaType"`
}{}
err := json.Unmarshal(content, &mt)
if err != nil {
if c.logger != nil {
c.logger.Println("Manifest blob cache err", blobCachePath, err)
}
return false
}
if c.logger != nil {
c.logger.Println("Manifest blob cache hit", blobCachePath)
}
rw.Header().Set("docker-content-digest", digest)
rw.Header().Set("Content-Type", mt.MediaType)
rw.Header().Set("Content-Length", strconv.FormatInt(int64(len(content)), 10))
if r.Method != http.MethodHead {
rw.Write(content)
}
return true
}
if c.logger != nil {
c.logger.Println("Manifest blob cache missed", blobCachePath, err)
}
} else {
if c.logger != nil {
c.logger.Println("Manifest cache missed", manifestLinkPath, err)
}
}

return false
}
37 changes: 32 additions & 5 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *CRProxy) SyncImageLayer(ctx context.Context, image string, filter func(
defer c.bytesPool.Put(buf)

uniq := map[digest.Digest]struct{}{}
cb0 := func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error {
blobCallback := func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error {
_, ok := uniq[dgst]
if ok {
if cb != nil {
Expand Down Expand Up @@ -255,14 +255,28 @@ func (c *CRProxy) SyncImageLayer(ctx context.Context, image string, filter func(
return nil
}

err = getLayerFromManifestList(ctx, ms, ref, filter, cb0)
manifestCallback := func(tagOrHash string, m distribution.Manifest) error {
_, playload, err := m.Payload()
if err != nil {
return err
}
return c.cacheManifestContent(ctx, &PathInfo{
Host: info.Host,
Image: info.Name,
Manifests: tagOrHash,
}, playload)
}

err = getLayerFromManifestList(ctx, ms, ref, filter, blobCallback, manifestCallback)
if err != nil {
return err
}
return nil
}

func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool, cb func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error) error {
func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool,
digestCallback func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error,
manifestCallback func(tagOrHash string, m distribution.Manifest) error) error {
var (
m distribution.Manifest
err error
Expand All @@ -273,11 +287,20 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi
if err != nil {
return err
}
err = manifestCallback(r.Digest().String(), m)
if err != nil {
return err
}
case reference.Tagged:
tag := r.Tag()
m, err = ms.Get(ctx, "", distribution.WithTag(r.Tag()))
if err != nil {
return err
}
err = manifestCallback(tag, m)
if err != nil {
return err
}
default:
return fmt.Errorf("%s no reference to any source", ref)
}
Expand All @@ -293,8 +316,12 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi
if err != nil {
return err
}
err = manifestCallback(mfest.Digest.String(), m0)
if err != nil {
return err
}
err = getLayerFromManifest(m0, func(dgst digest.Digest, size int64) error {
return cb(dgst, size, &mfest.Platform)
return digestCallback(dgst, size, &mfest.Platform)
})
if err != nil {
return err
Expand All @@ -303,7 +330,7 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi
return nil
default:
return getLayerFromManifest(m, func(dgst digest.Digest, size int64) error {
return cb(dgst, size, nil)
return digestCallback(dgst, size, nil)
})
}
}
Expand Down

0 comments on commit 1265a53

Please sign in to comment.