Skip to content

Commit

Permalink
feat(api): repair corrupted blobs when pushed again
Browse files Browse the repository at this point in the history
CheckBlob() returns ErrBlobNotFound on corrupted blobs

closes #1922

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 12, 2023
1 parent ab45356 commit bb0312c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 13 deletions.
41 changes: 28 additions & 13 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,11 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
return err
}

fileReader, err := is.storeDriver.Reader(src, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open file")

return zerr.ErrUploadNotFound
}

defer fileReader.Close()

srcDigest, err := godigest.FromReader(fileReader)
srcDigest, err := getBlobDigest(is, src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")

return zerr.ErrBadBlobDigest
return err
}

if srcDigest != dstDigest {
Expand Down Expand Up @@ -1130,9 +1121,17 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6

binfo, err := is.storeDriver.Stat(blobPath)
if err == nil && binfo.Size() > 0 {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
blobDigest, _ := getBlobDigest(is, blobPath)
// validate that found blob is not corrupted
if digest == blobDigest {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")

return true, binfo.Size(), nil
} else {
is.log.Debug().Str("blob path", blobPath).Msg("found blob path is corrupted")

return true, binfo.Size(), nil
return false, -1, zerr.ErrBlobNotFound
}
}
// otherwise is a 'deduped' blob (empty file)

Expand Down Expand Up @@ -1630,6 +1629,22 @@ func (is *ImageStore) deleteBlob(repo string, digest godigest.Digest) error {
return nil
}

func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) {
fileReader, err := imgStore.storeDriver.Reader(path, 0)
if err != nil {
return "", zerr.ErrUploadNotFound
}

defer fileReader.Close()

digest, err := godigest.FromReader(fileReader)
if err != nil {
return "", zerr.ErrBadBlobDigest
}

return digest, nil
}

func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo, "blobs", "sha256")

Expand Down
66 changes: 66 additions & 0 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,72 @@ func TestDeleteBlobsInUse(t *testing.T) {
}
}

func TestReuploadCorruptedBlob(t *testing.T) {
for _, testcase := range testCases {
testcase := testcase
t.Run(testcase.testCaseName, func(t *testing.T) {
var imgStore storageTypes.ImageStore
var testDir, tdir string
var store driver.StorageDriver
var driver storageTypes.Driver

log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)

if testcase.storageType == storageConstants.S3StorageDriverName {
tskip.SkipS3(t)

uuid, err := guuid.NewV4()
if err != nil {
panic(err)
}

testDir = path.Join("/oci-repo-test", uuid.String())
tdir = t.TempDir()

store, imgStore, _ = createObjectsStore(testDir, tdir)
driver = s3.New(store)
defer cleanupStorage(store, testDir)
} else {
tdir = t.TempDir()
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: tdir,
Name: "cache",
UseRelPaths: true,
}, log)
driver = local.New(true)
imgStore = imagestore.NewImageStore(tdir, tdir, true,
true, log, metrics, nil, driver, cacheDriver)
}

Convey("CheckBlob() should return BlobNotFound on corrupted blobs", t, func() {
storeController := storage.StoreController{DefaultStore: imgStore}

image := CreateRandomImage()

err := WriteImageToFileSystem(image, repoName, tag, storeController)
So(err, ShouldBeNil)

blobDigest := godigest.FromBytes(image.Layers[0])
blobPath := imgStore.BlobPath(repoName, blobDigest)

ok, size, err := imgStore.CheckBlob(repoName, blobDigest)
So(ok, ShouldBeTrue)
So(size, ShouldEqual, len(image.Layers[0]))
So(err, ShouldBeNil)

_, err = driver.WriteFile(blobPath, []byte("corrupted"))
So(err, ShouldBeNil)

ok, size, err = imgStore.CheckBlob(repoName, blobDigest)
So(ok, ShouldBeFalse)
So(size, ShouldNotEqual, len(image.Layers[0]))
So(err, ShouldEqual, zerr.ErrBlobNotFound)
})
})
}
}

func TestStorageHandler(t *testing.T) {
for _, testcase := range testCases {
testcase := testcase
Expand Down

0 comments on commit bb0312c

Please sign in to comment.