Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): repair corrupted blobs when pushed again #1927

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,94 @@
return artManifest, nil
}

// Get blob descriptor from it's manifest contents, if blob can not be found it will return error.
func GetBlobDescriptorFromRepo(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
log zlog.Logger,
) (ispec.Descriptor, error) {
index, err := GetIndex(imgStore, repo, log)
if err != nil {
return ispec.Descriptor{}, err
}

return GetBlobDescriptorFromIndex(imgStore, index, repo, blobDigest, log)
}

func GetBlobDescriptorFromIndex(imgStore storageTypes.ImageStore, index ispec.Index, repo string,
blobDigest godigest.Digest, log zlog.Logger,
) (ispec.Descriptor, error) {
for _, desc := range index.Manifests {
if desc.Digest == blobDigest {
return desc, nil
}

switch desc.MediaType {
case ispec.MediaTypeImageManifest:
if foundDescriptor, err := getBlobDescriptorFromManifest(imgStore, repo, blobDigest, desc, log); err == nil {
return foundDescriptor, nil
}
case ispec.MediaTypeImageIndex:
indexImage, err := GetImageIndex(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

if foundDescriptor, err := GetBlobDescriptorFromIndex(imgStore, indexImage, repo, blobDigest, log); err == nil {
return foundDescriptor, nil
}
case oras.MediaTypeArtifactManifest:
if foundDescriptor, err := getBlobDescriptorFromORASManifest(imgStore, repo, blobDigest, desc, log); err == nil {
return foundDescriptor, nil
}
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func getBlobDescriptorFromManifest(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
desc ispec.Descriptor, log zlog.Logger,
) (ispec.Descriptor, error) {
manifest, err := GetImageManifest(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

if manifest.Config.Digest == blobDigest {
return manifest.Config, nil
}

for _, layer := range manifest.Layers {
if layer.Digest == blobDigest {
return layer, nil
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func getBlobDescriptorFromORASManifest(imgStore storageTypes.ImageStore, repo string, blobDigest godigest.Digest,
desc ispec.Descriptor, log zlog.Logger,
) (ispec.Descriptor, error) {
manifest, err := GetOrasManifestByDigest(imgStore, repo, desc.Digest, log)
if err != nil {
return ispec.Descriptor{}, err
}

Check warning on line 883 in pkg/storage/common/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/common/common.go#L882-L883

Added lines #L882 - L883 were not covered by tests

for _, layer := range manifest.Blobs {
if layer.Digest == blobDigest {
return ispec.Descriptor{
MediaType: layer.MediaType,
Size: layer.Size,
Digest: layer.Digest,
ArtifactType: layer.ArtifactType,
Annotations: layer.Annotations,
}, nil
}
}

return ispec.Descriptor{}, zerr.ErrBlobNotFound
}

func IsSupportedMediaType(mediaType string) bool {
return mediaType == ispec.MediaTypeImageIndex ||
mediaType == ispec.MediaTypeImageManifest ||
Expand Down
82 changes: 82 additions & 0 deletions pkg/storage/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
common "zotregistry.io/zot/pkg/storage/common"
"zotregistry.io/zot/pkg/storage/imagestore"
"zotregistry.io/zot/pkg/storage/local"
. "zotregistry.io/zot/pkg/test/image-utils"
"zotregistry.io/zot/pkg/test/mocks"
Expand Down Expand Up @@ -420,6 +421,87 @@ func TestGetImageIndexErrors(t *testing.T) {
})
}

func TestGetBlobDescriptorFromRepo(t *testing.T) {
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)

tdir := t.TempDir()
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: tdir,
Name: "cache",
UseRelPaths: true,
}, log)

driver := local.New(true)
imgStore := imagestore.NewImageStore(tdir, tdir, true,
true, log, metrics, nil, driver, cacheDriver)

repoName := "zot-test"

Convey("Test error paths", t, func() {
storeController := storage.StoreController{DefaultStore: imgStore}

image := CreateRandomMultiarch()

tag := "index"

err := WriteMultiArchImageToFileSystem(image, repoName, tag, storeController)
So(err, ShouldBeNil)

blob := image.Images[0].Layers[0]
blobDigest := godigest.FromBytes(blob)
blobSize := len(blob)

desc, err := common.GetBlobDescriptorFromIndex(imgStore, ispec.Index{Manifests: []ispec.Descriptor{
{
Digest: image.Digest(),
MediaType: ispec.MediaTypeImageIndex,
},
}}, repoName, blobDigest, log)
So(err, ShouldBeNil)
So(desc.Digest, ShouldEqual, blobDigest)
So(desc.Size, ShouldEqual, blobSize)

desc, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, blobDigest, log)
So(err, ShouldBeNil)
So(desc.Digest, ShouldEqual, blobDigest)
So(desc.Size, ShouldEqual, blobSize)

indexBlobPath := imgStore.BlobPath(repoName, image.Digest())
err = os.Chmod(indexBlobPath, 0o000)
So(err, ShouldBeNil)

defer func() {
err = os.Chmod(indexBlobPath, 0o644)
So(err, ShouldBeNil)
}()

_, err = common.GetBlobDescriptorFromIndex(imgStore, ispec.Index{Manifests: []ispec.Descriptor{
{
Digest: image.Digest(),
MediaType: ispec.MediaTypeImageIndex,
},
}}, repoName, blobDigest, log)
So(err, ShouldNotBeNil)

manifestDigest := image.Images[0].Digest()
manifestBlobPath := imgStore.BlobPath(repoName, manifestDigest)
err = os.Chmod(manifestBlobPath, 0o000)
So(err, ShouldBeNil)

defer func() {
err = os.Chmod(manifestBlobPath, 0o644)
So(err, ShouldBeNil)
}()

_, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, blobDigest, log)
So(err, ShouldNotBeNil)

_, err = common.GetBlobDescriptorFromRepo(imgStore, repoName, manifestDigest, log)
So(err, ShouldBeNil)
})
}

func TestIsSignature(t *testing.T) {
Convey("Unknown media type", t, func(c C) {
isSingature := common.IsSignature(ispec.Descriptor{
Expand Down
69 changes: 51 additions & 18 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,20 +866,11 @@
return err
}

fileReader, err := is.storeDriver.Reader(src, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open file")

return zerr.ErrUploadNotFound
}

defer fileReader.Close()

srcDigest, err := godigest.FromReader(fileReader)
srcDigest, err := getBlobDigest(is, src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")

return zerr.ErrBadBlobDigest
return err
}

if srcDigest != dstDigest {
Expand All @@ -906,7 +897,7 @@
defer is.Unlock(&lockLatency)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
err = is.DedupeBlob(src, dstDigest, dst)
err = is.DedupeBlob(src, dstDigest, repo, dst)
if err := inject.Error(err); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
Expand Down Expand Up @@ -985,7 +976,7 @@
dst := is.BlobPath(repo, dstDigest)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
if err := is.DedupeBlob(src, dstDigest, repo, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")

Expand All @@ -1003,7 +994,7 @@
return uuid, int64(nbytes), nil
}

func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error {
retry:
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter")

Expand Down Expand Up @@ -1033,12 +1024,11 @@
} else {
// cache record exists, but due to GC and upgrades from older versions,
// disk content and cache records may go out of sync

if is.cache.UsesRelativePaths() {
dstRecord = path.Join(is.rootDir, dstRecord)
}

_, err := is.storeDriver.Stat(dstRecord)
blobInfo, err := is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
// the actual blob on disk may have been removed by GC, so sync the cache
Expand Down Expand Up @@ -1066,6 +1056,22 @@

return err
}
} else {
// if it's same file then it was already uploaded, check if blob is corrupted
if desc, err := common.GetBlobDescriptorFromRepo(is, dstRepo, dstDigest, is.log); err == nil {
// blob corrupted, replace content
if desc.Size != blobInfo.Size() {
if err := is.storeDriver.Move(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")

return err
}

Check warning on line 1068 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1065-L1068

Added lines #L1065 - L1068 were not covered by tests

is.log.Debug().Str("src", src).Msg("dedupe: remove")

return nil
}
}
}

// remove temp blobupload
Expand Down Expand Up @@ -1134,9 +1140,20 @@

binfo, err := is.storeDriver.Stat(blobPath)
if err == nil && binfo.Size() > 0 {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
// try to find blob size in blob descriptors, if blob can not be found
desc, err := common.GetBlobDescriptorFromRepo(is, repo, digest, is.log)
if err != nil || desc.Size == binfo.Size() {
// blob not found in descriptors, can not compare, just return
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")

return true, binfo.Size(), nil //nolint: nilerr
}

return true, binfo.Size(), nil
if desc.Size != binfo.Size() {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found, but it's corrupted")

return false, -1, zerr.ErrBlobNotFound
}
}
// otherwise is a 'deduped' blob (empty file)

Expand Down Expand Up @@ -1634,6 +1651,22 @@
return nil
}

func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) {
fileReader, err := imgStore.storeDriver.Reader(path, 0)
if err != nil {
return "", zerr.ErrUploadNotFound
}

defer fileReader.Close()

digest, err := godigest.FromReader(fileReader)
if err != nil {
return "", zerr.ErrBadBlobDigest
}

Check warning on line 1665 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1664-L1665

Added lines #L1664 - L1665 were not covered by tests

return digest, nil
}

func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo, "blobs", "sha256")

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func FuzzDedupeBlob(f *testing.F) {
t.Error(err)
}

err = imgStore.DedupeBlob(src, blobDigest, dst)
err = imgStore.DedupeBlob(src, blobDigest, "repoName", dst)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func TestDedupe(t *testing.T) {
Convey("Dedupe", t, func(c C) {
Convey("Nil ImageStore", func() {
var is storageTypes.ImageStore
So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic)
So(func() { _ = is.DedupeBlob("", "", "", "") }, ShouldPanic)
})

Convey("Valid ImageStore", func() {
Expand All @@ -1530,7 +1530,7 @@ func TestDedupe(t *testing.T) {

il := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)

So(il.DedupeBlob("", "", ""), ShouldNotBeNil)
So(il.DedupeBlob("", "", "", ""), ShouldNotBeNil)
})
})
}
Expand Down
Loading
Loading