Skip to content

Commit

Permalink
feat(api): repair corrupted blobs when pushed again
Browse files Browse the repository at this point in the history
CheckBlob() returns ErrBlobNotFound on corrupted blobs

closes #1922

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 20, 2023
1 parent 1675f30 commit 575fdb8
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 51 deletions.
88 changes: 88 additions & 0 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,94 @@ func GetOrasManifestByDigest(imgStore storageTypes.ImageStore, repo string, dige
return artManifest, nil
}

// Get blob descriptor from it's manifest contents, if blob can not be found it will return error.
func GetBlobDescriptorFromRepo(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
log zlog.Logger,
) (ispec.Descriptor, error) {
index, err := GetIndex(imgStore, repo, log)
if err != nil {
return ispec.Descriptor{}, err
}

return GetBlobDescriptorFromIndex(imgStore, index, repo, blobDigest, log)
}

func GetBlobDescriptorFromIndex(imgStore storageTypes.ImageStore, index ispec.Index, repo string,
blobDigest godigest.Digest, log zlog.Logger,
) (ispec.Descriptor, error) {
for _, desc := range index.Manifests {
if desc.Digest == blobDigest {
return desc, nil
}

switch desc.MediaType {
case ispec.MediaTypeImageManifest:
if foundDescriptor, err := getBlobDescriptorFromManifest(imgStore, repo, blobDigest, desc, log); err == nil {
return foundDescriptor, nil
}
case ispec.MediaTypeImageIndex:
indexImage, err := GetImageIndex(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

if foundDescriptor, err := GetBlobDescriptorFromIndex(imgStore, indexImage, repo, blobDigest, log); err == nil {
return foundDescriptor, nil
}
case oras.MediaTypeArtifactManifest:
if foundDescriptor, err := getBlobDescriptorFromORASManifest(imgStore, repo, blobDigest, desc, log); err == nil {
return foundDescriptor, nil
}
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func getBlobDescriptorFromManifest(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
desc ispec.Descriptor, log zlog.Logger,
) (ispec.Descriptor, error) {
manifest, err := GetImageManifest(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

if manifest.Config.Digest == blobDigest {
return manifest.Config, nil
}

for _, layer := range manifest.Layers {
if layer.Digest == blobDigest {
return layer, nil
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func getBlobDescriptorFromORASManifest(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
desc ispec.Descriptor, log zlog.Logger,
) (ispec.Descriptor, error) {
manifest, err := GetOrasManifestByDigest(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

Check warning on line 883 in pkg/storage/common/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/common/common.go#L882-L883

Added lines #L882 - L883 were not covered by tests

for _, layer := range manifest.Blobs {
if layer.Digest == blobDigest {
return ispec.Descriptor{
MediaType: layer.MediaType,
Size: layer.Size,
Digest: layer.Digest,
ArtifactType: layer.ArtifactType,
Annotations: layer.Annotations,
}, nil
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func IsSupportedMediaType(mediaType string) bool {
return mediaType == ispec.MediaTypeImageIndex ||
mediaType == ispec.MediaTypeImageManifest ||
Expand Down
82 changes: 82 additions & 0 deletions pkg/storage/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
common "zotregistry.io/zot/pkg/storage/common"
"zotregistry.io/zot/pkg/storage/imagestore"
"zotregistry.io/zot/pkg/storage/local"
. "zotregistry.io/zot/pkg/test/image-utils"
"zotregistry.io/zot/pkg/test/mocks"
Expand Down Expand Up @@ -420,6 +421,87 @@ func TestGetImageIndexErrors(t *testing.T) {
})
}

func TestGetBlobDescriptorFromRepo(t *testing.T) {
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)

tdir := t.TempDir()
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: tdir,
Name: "cache",
UseRelPaths: true,
}, log)

driver := local.New(true)
imgStore := imagestore.NewImageStore(tdir, tdir, true,
true, log, metrics, nil, driver, cacheDriver)

repoName := "zot-test"

Convey("Test error paths", t, func() {
storeController := storage.StoreController{DefaultStore: imgStore}

image := CreateRandomMultiarch()

tag := "index"

err := WriteMultiArchImageToFileSystem(image, repoName, tag, storeController)
So(err, ShouldBeNil)

blob := image.Images[0].Layers[0]
blobDigest := godigest.FromBytes(blob)
blobSize := len(blob)

desc, err := common.GetBlobDescriptorFromIndex(imgStore, ispec.Index{Manifests: []ispec.Descriptor{
{
Digest: image.Digest(),
MediaType: ispec.MediaTypeImageIndex,
},
}}, repoName, blobDigest, log)
So(err, ShouldBeNil)
So(desc.Digest, ShouldEqual, blobDigest)
So(desc.Size, ShouldEqual, blobSize)

desc, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, blobDigest, log)
So(err, ShouldBeNil)
So(desc.Digest, ShouldEqual, blobDigest)
So(desc.Size, ShouldEqual, blobSize)

indexBlobPath := imgStore.BlobPath(repoName, image.Digest())
err = os.Chmod(indexBlobPath, 0o000)
So(err, ShouldBeNil)

defer func() {
err = os.Chmod(indexBlobPath, 0o644)
So(err, ShouldBeNil)
}()

_, err = common.GetBlobDescriptorFromIndex(imgStore, ispec.Index{Manifests: []ispec.Descriptor{
{
Digest: image.Digest(),
MediaType: ispec.MediaTypeImageIndex,
},
}}, repoName, blobDigest, log)
So(err, ShouldNotBeNil)

manifestDigest := image.Images[0].Digest()
manifestBlobPath := imgStore.BlobPath(repoName, manifestDigest)
err = os.Chmod(manifestBlobPath, 0o000)
So(err, ShouldBeNil)

defer func() {
err = os.Chmod(manifestBlobPath, 0o644)
So(err, ShouldBeNil)
}()

_, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, blobDigest, log)
So(err, ShouldNotBeNil)

_, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, manifestDigest, log)
So(err, ShouldBeNil)
})
}

func TestIsSignature(t *testing.T) {
Convey("Unknown media type", t, func(c C) {
isSingature := common.IsSignature(ispec.Descriptor{
Expand Down
69 changes: 51 additions & 18 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,20 +866,11 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
return err
}

fileReader, err := is.storeDriver.Reader(src, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open file")

return zerr.ErrUploadNotFound
}

defer fileReader.Close()

srcDigest, err := godigest.FromReader(fileReader)
srcDigest, err := getBlobDigest(is, src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")

return zerr.ErrBadBlobDigest
return err
}

if srcDigest != dstDigest {
Expand All @@ -906,7 +897,7 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
defer is.Unlock(&lockLatency)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
err = is.DedupeBlob(src, dstDigest, dst)
err = is.DedupeBlob(src, dstDigest, repo, dst)
if err := inject.Error(err); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
Expand Down Expand Up @@ -985,7 +976,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
dst := is.BlobPath(repo, dstDigest)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
if err := is.DedupeBlob(src, dstDigest, repo, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")

Expand All @@ -1003,7 +994,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
return uuid, int64(nbytes), nil
}

func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error {
retry:
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter")

Expand Down Expand Up @@ -1033,12 +1024,11 @@ retry:
} else {
// cache record exists, but due to GC and upgrades from older versions,
// disk content and cache records may go out of sync

if is.cache.UsesRelativePaths() {
dstRecord = path.Join(is.rootDir, dstRecord)
}

_, err := is.storeDriver.Stat(dstRecord)
blobInfo, err := is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
// the actual blob on disk may have been removed by GC, so sync the cache
Expand Down Expand Up @@ -1066,6 +1056,22 @@ retry:

return err
}
} else {
// if it's same file then it was already uploaded, check if blob is corrupted
if desc, err := common.GetBlobDescriptorFromRepo(is, dstRepo, dstDigest, is.log); err == nil {
// blob corrupted, replace content
if desc.Size != blobInfo.Size() {
if err := is.storeDriver.Move(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")

return err
}

Check warning on line 1068 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1065-L1068

Added lines #L1065 - L1068 were not covered by tests

is.log.Debug().Str("src", src).Msg("dedupe: remove")

return nil
}
}
}

// remove temp blobupload
Expand Down Expand Up @@ -1134,9 +1140,20 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6

binfo, err := is.storeDriver.Stat(blobPath)
if err == nil && binfo.Size() > 0 {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
// try to find blob size in blob descriptors, if blob can not be found
desc, err := common.GetBlobDescriptorFromRepo(is, repo, digest, is.log)
if err != nil || desc.Size == binfo.Size() {
// blob not found in descriptors, can not compare, just return
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")

return true, binfo.Size(), nil //nolint: nilerr
}

return true, binfo.Size(), nil
if desc.Size != binfo.Size() {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found, but it's corrupted")

return false, -1, zerr.ErrBlobNotFound
}
}
// otherwise is a 'deduped' blob (empty file)

Expand Down Expand Up @@ -1634,6 +1651,22 @@ func (is *ImageStore) deleteBlob(repo string, digest godigest.Digest) error {
return nil
}

func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) {
fileReader, err := imgStore.storeDriver.Reader(path, 0)
if err != nil {
return "", zerr.ErrUploadNotFound
}

defer fileReader.Close()

digest, err := godigest.FromReader(fileReader)
if err != nil {
return "", zerr.ErrBadBlobDigest
}

Check warning on line 1665 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1664-L1665

Added lines #L1664 - L1665 were not covered by tests

return digest, nil
}

func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo, "blobs", "sha256")

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func FuzzDedupeBlob(f *testing.F) {
t.Error(err)
}

err = imgStore.DedupeBlob(src, blobDigest, dst)
err = imgStore.DedupeBlob(src, blobDigest, "repoName", dst)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func TestDedupe(t *testing.T) {
Convey("Dedupe", t, func(c C) {
Convey("Nil ImageStore", func() {
var is storageTypes.ImageStore
So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic)
So(func() { _ = is.DedupeBlob("", "", "", "") }, ShouldPanic)
})

Convey("Valid ImageStore", func() {
Expand All @@ -1530,7 +1530,7 @@ func TestDedupe(t *testing.T) {

il := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)

So(il.DedupeBlob("", "", ""), ShouldNotBeNil)
So(il.DedupeBlob("", "", "", ""), ShouldNotBeNil)
})
})
}
Expand Down
Loading

0 comments on commit 575fdb8

Please sign in to comment.