Skip to content

Commit

Permalink
feat(api): repair corrupted blobs when pushed again
Browse files Browse the repository at this point in the history
CheckBlob() returns ErrBlobNotFound on corrupted blobs

closes #1922

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 13, 2023
1 parent 458d40f commit 1ab971b
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 47 deletions.
120 changes: 102 additions & 18 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,11 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
return err
}

fileReader, err := is.storeDriver.Reader(src, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open file")

return zerr.ErrUploadNotFound
}

defer fileReader.Close()

srcDigest, err := godigest.FromReader(fileReader)
srcDigest, err := getBlobDigest(is, src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")

return zerr.ErrBadBlobDigest
return err
}

if srcDigest != dstDigest {
Expand All @@ -902,7 +893,7 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
defer is.Unlock(&lockLatency)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
err = is.DedupeBlob(src, dstDigest, dst)
err = is.DedupeBlob(src, dstDigest, repo, dst)
if err := inject.Error(err); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
Expand Down Expand Up @@ -981,7 +972,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
dst := is.BlobPath(repo, dstDigest)

if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
if err := is.DedupeBlob(src, dstDigest, repo, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")

Expand All @@ -999,7 +990,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
return uuid, int64(nbytes), nil
}

func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error {
retry:
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter")

Expand Down Expand Up @@ -1029,12 +1020,11 @@ retry:
} else {
// cache record exists, but due to GC and upgrades from older versions,
// disk content and cache records may go out of sync

if is.cache.UsesRelativePaths() {
dstRecord = path.Join(is.rootDir, dstRecord)
}

_, err := is.storeDriver.Stat(dstRecord)
blobInfo, err := is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
// the actual blob on disk may have been removed by GC, so sync the cache
Expand Down Expand Up @@ -1062,6 +1052,21 @@ retry:

return err
}
} else {
// if it's same file then it was already uploaded, check if blob is corrupted
if size, err := getBlobSizeFromRepoManifests(is, dstRepo, dstDigest, is.log); err == nil {
if size != blobInfo.Size() {
if err := is.storeDriver.Move(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")

return err
}

Check warning on line 1063 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1059-L1063

Added lines #L1059 - L1063 were not covered by tests

is.log.Debug().Str("src", src).Msg("dedupe: remove")

return nil

Check warning on line 1067 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1065-L1067

Added lines #L1065 - L1067 were not covered by tests
}
}
}

// remove temp blobupload
Expand Down Expand Up @@ -1130,9 +1135,20 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6

binfo, err := is.storeDriver.Stat(blobPath)
if err == nil && binfo.Size() > 0 {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
// try to find blob size in blob descriptors, if blob can not be found
size, err := getBlobSizeFromRepoManifests(is, repo, digest, is.log)
if err != nil || size == binfo.Size() {
// blob not found in descriptors, can not compare, just return
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")

return true, binfo.Size(), nil //nolint: nilerr
}

if size != binfo.Size() {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found, but it's corrupted")

return true, binfo.Size(), nil
return false, -1, zerr.ErrBlobNotFound
}
}
// otherwise is a 'deduped' blob (empty file)

Expand Down Expand Up @@ -1630,6 +1646,74 @@ func (is *ImageStore) deleteBlob(repo string, digest godigest.Digest) error {
return nil
}

// Get blob size from it's manifest contents, if blob can not be found (manifest not uploaded yet) it will return -1.
func getBlobSizeFromRepoManifests(imgStore *ImageStore, repo string, blobDigest godigest.Digest, log zlog.Logger,
) (int64, error) {
index, err := common.GetIndex(imgStore, repo, log)
if err != nil {
return -1, err
}

for _, desc := range index.Manifests {
switch desc.MediaType {
case ispec.MediaTypeImageManifest:
if size, _ := getBlobSizeFromManifest(imgStore, repo, blobDigest, desc, log); size > -1 {
return size, nil
}
case ispec.MediaTypeImageIndex:
indexImage, err := common.GetImageIndex(imgStore, repo, desc.Digest, log)
if err != nil {
return -1, err
}

Check warning on line 1667 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1666-L1667

Added lines #L1666 - L1667 were not covered by tests

for _, indexManifestDesc := range indexImage.Manifests {
if size, _ := getBlobSizeFromManifest(imgStore, repo, blobDigest, indexManifestDesc, log); size > -1 {
return size, nil
}

Check warning on line 1672 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1671-L1672

Added lines #L1671 - L1672 were not covered by tests
}
}
}

return -1, zerr.ErrBlobNotFound
}

func getBlobSizeFromManifest(imgStore *ImageStore, repo string, blobDigest godigest.Digest,
desc ispec.Descriptor, log zlog.Logger,
) (int64, error) {
manifest, err := common.GetImageManifest(imgStore, repo, desc.Digest, log)
if err != nil {
return -1, err
}

Check warning on line 1686 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1685-L1686

Added lines #L1685 - L1686 were not covered by tests

if manifest.Config.Digest == blobDigest {
return manifest.Config.Size, nil
}

for _, layer := range manifest.Layers {
if layer.Digest == blobDigest {
return layer.Size, nil
}
}

return -1, nil
}

func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) {
fileReader, err := imgStore.storeDriver.Reader(path, 0)
if err != nil {
return "", zerr.ErrUploadNotFound
}

defer fileReader.Close()

digest, err := godigest.FromReader(fileReader)
if err != nil {
return "", zerr.ErrBadBlobDigest
}

Check warning on line 1712 in pkg/storage/imagestore/imagestore.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/imagestore/imagestore.go#L1711-L1712

Added lines #L1711 - L1712 were not covered by tests

return digest, nil
}

func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo, "blobs", "sha256")

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func FuzzDedupeBlob(f *testing.F) {
t.Error(err)
}

err = imgStore.DedupeBlob(src, blobDigest, dst)
err = imgStore.DedupeBlob(src, blobDigest, "repoName", dst)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func TestDedupe(t *testing.T) {
Convey("Dedupe", t, func(c C) {
Convey("Nil ImageStore", func() {
var is storageTypes.ImageStore
So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic)
So(func() { _ = is.DedupeBlob("", "", "", "") }, ShouldPanic)
})

Convey("Valid ImageStore", func() {
Expand All @@ -1530,7 +1530,7 @@ func TestDedupe(t *testing.T) {

il := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)

So(il.DedupeBlob("", "", ""), ShouldNotBeNil)
So(il.DedupeBlob("", "", "", ""), ShouldNotBeNil)
})
})
}
Expand Down
44 changes: 22 additions & 22 deletions pkg/storage/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3552,7 +3552,7 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")

// trigger unable to insert blob record
err := imgStore.DedupeBlob("", digest, "")
err := imgStore.DedupeBlob("", digest, "", "")
So(err, ShouldNotBeNil)

imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
Expand All @@ -3565,11 +3565,11 @@ func TestS3DedupeErr(t *testing.T) {
})

// trigger unable to rename blob
err = imgStore.DedupeBlob("", digest, "dst")
err = imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldNotBeNil)

// trigger retry
err = imgStore.DedupeBlob("", digest, "dst")
err = imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldNotBeNil)
})

Expand All @@ -3586,11 +3586,11 @@ func TestS3DedupeErr(t *testing.T) {
})

digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
err := imgStore.DedupeBlob("", digest, "dst")
err := imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldBeNil)

// error will be triggered in driver.SameFile()
err = imgStore.DedupeBlob("", digest, "dst2")
err = imgStore.DedupeBlob("", digest, "", "dst2")
So(err, ShouldBeNil)
})

Expand All @@ -3606,10 +3606,10 @@ func TestS3DedupeErr(t *testing.T) {
})

digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
err := imgStore.DedupeBlob("", digest, "dst")
err := imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldBeNil)

err = imgStore.DedupeBlob("", digest, "dst2")
err = imgStore.DedupeBlob("", digest, "", "dst2")
So(err, ShouldNotBeNil)
})

Expand All @@ -3622,10 +3622,10 @@ func TestS3DedupeErr(t *testing.T) {
})

digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
err := imgStore.DedupeBlob("", digest, "dst")
err := imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldBeNil)

err = imgStore.DedupeBlob("", digest, "")
err = imgStore.DedupeBlob("", digest, "", "")
So(err, ShouldNotBeNil)
})

Expand All @@ -3641,10 +3641,10 @@ func TestS3DedupeErr(t *testing.T) {
})

digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
err := imgStore.DedupeBlob("", digest, "dst")
err := imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldBeNil)

err = imgStore.DedupeBlob("", digest, "dst")
err = imgStore.DedupeBlob("", digest, "", "dst")
So(err, ShouldNotBeNil)
})

Expand All @@ -3665,7 +3665,7 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")

err := imgStore.DedupeBlob("repo", digest, "dst")
err := imgStore.DedupeBlob("repo", digest, "", "dst")
So(err, ShouldBeNil)

_, _, err = imgStore.CheckBlob("repo", digest)
Expand All @@ -3686,7 +3686,7 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")

err := imgStore.DedupeBlob("repo", digest, "dst")
err := imgStore.DedupeBlob("repo", digest, "", "dst")
So(err, ShouldBeNil)

_, _, err = imgStore.CheckBlob("repo", digest)
Expand All @@ -3704,7 +3704,7 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")

err := imgStore.DedupeBlob("repo", digest, "dst")
err := imgStore.DedupeBlob("repo", digest, "", "dst")
So(err, ShouldBeNil)

_, _, err = imgStore.CheckBlob("repo", digest)
Expand All @@ -3719,10 +3719,10 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")

err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1")
err := imgStore.DedupeBlob("/src/dst", digest, "", "/repo1/dst1")
So(err, ShouldBeNil)

err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2")
err = imgStore.DedupeBlob("/src/dst", digest, "", "/repo2/dst2")
So(err, ShouldBeNil)

// copy cache db to the new imagestore
Expand Down Expand Up @@ -3767,10 +3767,10 @@ func TestS3DedupeErr(t *testing.T) {
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")

err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1")
err := imgStore.DedupeBlob("/src/dst", digest, "", "/repo1/dst1")
So(err, ShouldBeNil)

err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2")
err = imgStore.DedupeBlob("/src/dst", digest, "", "/repo2/dst2")
So(err, ShouldBeNil)

// copy cache db to the new imagestore
Expand Down Expand Up @@ -3871,7 +3871,7 @@ func TestS3DedupeErr(t *testing.T) {
},
})

err := imgStore.DedupeBlob("repo", digest, blobPath)
err := imgStore.DedupeBlob("repo", digest, "", blobPath)
So(err, ShouldBeNil)

_, _, err = imgStore.CheckBlob("repo2", digest)
Expand Down Expand Up @@ -3922,19 +3922,19 @@ func TestInjectDedupe(t *testing.T) {
return &FileInfoMock{}, errS3
},
})
err := imgStore.DedupeBlob("blob", "digest", "newblob")
err := imgStore.DedupeBlob("blob", "digest", "", "newblob")
So(err, ShouldBeNil)

injected := inject.InjectFailure(0)
err = imgStore.DedupeBlob("blob", "digest", "newblob")
err = imgStore.DedupeBlob("blob", "digest", "", "newblob")
if injected {
So(err, ShouldNotBeNil)
} else {
So(err, ShouldBeNil)
}

injected = inject.InjectFailure(1)
err = imgStore.DedupeBlob("blob", "digest", "newblob")
err = imgStore.DedupeBlob("blob", "digest", "", "newblob")
if injected {
So(err, ShouldNotBeNil)
} else {
Expand Down
Loading

0 comments on commit 1ab971b

Please sign in to comment.