From 1265a530bfa472cca3b2733bb1d309e2f359cc29 Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Mon, 24 Jun 2024 16:37:53 +0800 Subject: [PATCH] Support cache manifest --- crproxy.go | 11 ++- crproxy_manifest.go | 162 ++++++++++++++++++++++++++++++++++++++++++++ sync.go | 37 ++++++++-- 3 files changed, 202 insertions(+), 8 deletions(-) create mode 100644 crproxy_manifest.go diff --git a/crproxy.go b/crproxy.go index a9ab386..eee2ede 100644 --- a/crproxy.go +++ b/crproxy.go @@ -529,9 +529,14 @@ func (c *CRProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } - if c.storageDriver != nil && info.Blobs != "" { - c.cacheBlobResponse(rw, r, info) - return + if c.storageDriver != nil { + if info.Blobs != "" { + c.cacheBlobResponse(rw, r, info) + return + } else if info.Manifests != "" { + c.cacheManifestResponse(rw, r, info) + return + } } c.directResponse(rw, r, info) } diff --git a/crproxy_manifest.go b/crproxy_manifest.go new file mode 100644 index 0000000..90556d3 --- /dev/null +++ b/crproxy_manifest.go @@ -0,0 +1,162 @@ +package crproxy + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/textproto" + "path" + "strconv" + "strings" + + "github.com/distribution/distribution/v3/registry/api/errcode" +) + +func manifestRevisionsCachePath(host, image, tagOrBlob string) string { + return path.Join("/docker/registry/v2/repositories", host, image, "_manifests/revisions/sha256", tagOrBlob, "link") +} + +func manifestTagCachePath(host, image, tagOrBlob string) string { + return path.Join("/docker/registry/v2/repositories", host, image, "_manifests/tags", tagOrBlob, "current/link") +} + +func (c *CRProxy) cacheManifestResponse(rw http.ResponseWriter, r *http.Request, info *PathInfo) { + cli := c.getClientset(info.Host, info.Image) + resp, err := c.doWithAuth(cli, r, info.Host) + if err != nil { + if c.cachedManifest(rw, r, info) { + return + } + if c.logger != nil { + c.logger.Println("failed to request", info.Host, info.Image, err) + } + errcode.ServeJSON(rw, errcode.ErrorCodeUnknown) + return + } + defer func() { + resp.Body.Close() + }() + + switch resp.StatusCode { + case http.StatusUnauthorized, http.StatusForbidden: + errcode.ServeJSON(rw, errcode.ErrorCodeDenied) + return + } + + if resp.StatusCode >= http.StatusInternalServerError { + if c.cachedManifest(rw, r, info) { + return + } + } + + resp.Header.Del("Docker-Ratelimit-Source") + + header := rw.Header() + for k, v := range resp.Header { + key := textproto.CanonicalMIMEHeaderKey(k) + header[key] = v + } + + if r.Method == http.MethodHead { + rw.WriteHeader(resp.StatusCode) + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + c.errorResponse(rw, r, err) + return + } + err = c.cacheManifestContent(context.Background(), info, body) + if err != nil { + c.errorResponse(rw, r, err) + return + } + rw.WriteHeader(resp.StatusCode) + rw.Write(body) +} + +func (c *CRProxy) cacheManifestContent(ctx context.Context, info *PathInfo, content []byte) error { + h := sha256.New() + h.Write(content) + hash := hex.EncodeToString(h.Sum(nil)[:]) + + if strings.HasPrefix(info.Manifests, "sha256:") { + if info.Manifests[7:] != hash { + return fmt.Errorf("expected hash %s is not same to %s, %s", info.Manifests[7:], hash) + } + } else { + manifestLinkPath := manifestTagCachePath(info.Host, info.Image, info.Manifests) + err := c.storageDriver.PutContent(ctx, manifestLinkPath, []byte("sha256:"+hash)) + if err != nil { + return err + } + } + + manifestLinkPath := manifestRevisionsCachePath(info.Host, info.Image, hash) + err := c.storageDriver.PutContent(ctx, manifestLinkPath, []byte("sha256:"+hash)) + if err != nil { + return err + } + + blobCachePath := blobCachePath(hash) + err = c.storageDriver.PutContent(ctx, blobCachePath, content) + if err != nil { + return err + } + + return nil +} + +func (c *CRProxy) cachedManifest(rw http.ResponseWriter, r *http.Request, info *PathInfo) bool { + ctx := r.Context() + var manifestLinkPath string + if strings.HasPrefix(info.Manifests, "sha256:") { + manifestLinkPath = manifestRevisionsCachePath(info.Host, info.Image, info.Manifests[7:]) + } else { + manifestLinkPath = manifestTagCachePath(info.Host, info.Image, info.Manifests) + } + + content, err := c.storageDriver.GetContent(ctx, manifestLinkPath) + if err == nil { + digest := string(content) + blobCachePath := blobCachePath(digest) + content, err := c.storageDriver.GetContent(ctx, blobCachePath) + if err == nil { + + mt := struct { + MediaType string `json:"mediaType"` + }{} + err := json.Unmarshal(content, &mt) + if err != nil { + if c.logger != nil { + c.logger.Println("Manifest blob cache err", blobCachePath, err) + } + return false + } + if c.logger != nil { + c.logger.Println("Manifest blob cache hit", blobCachePath) + } + rw.Header().Set("docker-content-digest", digest) + rw.Header().Set("Content-Type", mt.MediaType) + rw.Header().Set("Content-Length", strconv.FormatInt(int64(len(content)), 10)) + if r.Method != http.MethodHead { + rw.Write(content) + } + return true + } + if c.logger != nil { + c.logger.Println("Manifest blob cache missed", blobCachePath, err) + } + } else { + if c.logger != nil { + c.logger.Println("Manifest cache missed", manifestLinkPath, err) + } + } + + return false +} diff --git a/sync.go b/sync.go index 044dd76..b01f042 100644 --- a/sync.go +++ b/sync.go @@ -141,7 +141,7 @@ func (c *CRProxy) SyncImageLayer(ctx context.Context, image string, filter func( defer c.bytesPool.Put(buf) uniq := map[digest.Digest]struct{}{} - cb0 := func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error { + blobCallback := func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error { _, ok := uniq[dgst] if ok { if cb != nil { @@ -255,14 +255,28 @@ func (c *CRProxy) SyncImageLayer(ctx context.Context, image string, filter func( return nil } - err = getLayerFromManifestList(ctx, ms, ref, filter, cb0) + manifestCallback := func(tagOrHash string, m distribution.Manifest) error { + _, playload, err := m.Payload() + if err != nil { + return err + } + return c.cacheManifestContent(ctx, &PathInfo{ + Host: info.Host, + Image: info.Name, + Manifests: tagOrHash, + }, playload) + } + + err = getLayerFromManifestList(ctx, ms, ref, filter, blobCallback, manifestCallback) if err != nil { return err } return nil } -func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool, cb func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error) error { +func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool, + digestCallback func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec) error, + manifestCallback func(tagOrHash string, m distribution.Manifest) error) error { var ( m distribution.Manifest err error @@ -273,11 +287,20 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi if err != nil { return err } + err = manifestCallback(r.Digest().String(), m) + if err != nil { + return err + } case reference.Tagged: + tag := r.Tag() m, err = ms.Get(ctx, "", distribution.WithTag(r.Tag())) if err != nil { return err } + err = manifestCallback(tag, m) + if err != nil { + return err + } default: return fmt.Errorf("%s no reference to any source", ref) } @@ -293,8 +316,12 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi if err != nil { return err } + err = manifestCallback(mfest.Digest.String(), m0) + if err != nil { + return err + } err = getLayerFromManifest(m0, func(dgst digest.Digest, size int64) error { - return cb(dgst, size, &mfest.Platform) + return digestCallback(dgst, size, &mfest.Platform) }) if err != nil { return err @@ -303,7 +330,7 @@ func getLayerFromManifestList(ctx context.Context, ms distribution.ManifestServi return nil default: return getLayerFromManifest(m, func(dgst digest.Digest, size int64) error { - return cb(dgst, size, nil) + return digestCallback(dgst, size, nil) }) } }