Skip to content

Commit 3b1dd0e

Browse files
feat: Public GC function of oci.Store (#656)
Part of #472 Signed-off-by: Xiaoxuan Wang <[email protected]>
1 parent c34d275 commit 3b1dd0e

File tree

4 files changed

+387
-0
lines changed

4 files changed

+387
-0
lines changed

content/oci/oci.go

+93
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"io"
2626
"os"
27+
"path"
2728
"path/filepath"
2829
"sync"
2930

@@ -454,6 +455,77 @@ func (s *Store) writeIndexFile() error {
454455
return os.WriteFile(s.indexPath, indexJSON, 0666)
455456
}
456457

458+
// reloadIndex reloads the index and updates metadata by creating a new store.
459+
func (s *Store) reloadIndex(ctx context.Context) error {
460+
newStore, err := NewWithContext(ctx, s.root)
461+
if err != nil {
462+
return err
463+
}
464+
s.index = newStore.index
465+
s.storage = newStore.storage
466+
s.tagResolver = newStore.tagResolver
467+
s.graph = newStore.graph
468+
return nil
469+
}
470+
471+
// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
472+
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
473+
// The garbage to be cleaned are:
474+
// - unreferenced (dangling) blobs in Store which have no predecessors
475+
// - garbage blobs in the storage whose metadata is not stored in Store
476+
func (s *Store) GC(ctx context.Context) error {
477+
s.sync.Lock()
478+
defer s.sync.Unlock()
479+
480+
// get reachable nodes by reloading the index
481+
err := s.reloadIndex(ctx)
482+
if err != nil {
483+
return fmt.Errorf("unable to reload index: %w", err)
484+
}
485+
reachableNodes := s.graph.DigestSet()
486+
487+
// clean up garbage blobs in the storage
488+
rootpath := filepath.Join(s.root, ocispec.ImageBlobsDir)
489+
algDirs, err := os.ReadDir(rootpath)
490+
if err != nil {
491+
return err
492+
}
493+
for _, algDir := range algDirs {
494+
if !algDir.IsDir() {
495+
continue
496+
}
497+
alg := algDir.Name()
498+
// skip unsupported directories
499+
if !isKnownAlgorithm(alg) {
500+
continue
501+
}
502+
algPath := path.Join(rootpath, alg)
503+
digestEntries, err := os.ReadDir(algPath)
504+
if err != nil {
505+
return err
506+
}
507+
for _, digestEntry := range digestEntries {
508+
if err := isContextDone(ctx); err != nil {
509+
return err
510+
}
511+
dgst := digestEntry.Name()
512+
blobDigest := digest.NewDigestFromEncoded(digest.Algorithm(alg), dgst)
513+
if err := blobDigest.Validate(); err != nil {
514+
// skip irrelevant content
515+
continue
516+
}
517+
if !reachableNodes.Contains(blobDigest) {
518+
// remove the blob from storage if it does not exist in Store
519+
err = os.Remove(path.Join(algPath, dgst))
520+
if err != nil {
521+
return err
522+
}
523+
}
524+
}
525+
}
526+
return nil
527+
}
528+
457529
// unsafeStore is used to bypass lock restrictions in Delete.
458530
type unsafeStore struct {
459531
*Store
@@ -467,6 +539,17 @@ func (s *unsafeStore) Predecessors(ctx context.Context, node ocispec.Descriptor)
467539
return s.graph.Predecessors(ctx, node)
468540
}
469541

542+
// isContextDone returns an error if the context is done.
543+
// Reference: https://pkg.go.dev/context#Context
544+
func isContextDone(ctx context.Context) error {
545+
select {
546+
case <-ctx.Done():
547+
return ctx.Err()
548+
default:
549+
return nil
550+
}
551+
}
552+
470553
// validateReference validates ref.
471554
func validateReference(ref string) error {
472555
if ref == "" {
@@ -476,3 +559,13 @@ func validateReference(ref string) error {
476559
// TODO: may enforce more strict validation if needed.
477560
return nil
478561
}
562+
563+
// isKnownAlgorithm checks is a string is a supported hash algorithm
564+
func isKnownAlgorithm(alg string) bool {
565+
switch digest.Algorithm(alg) {
566+
case digest.SHA256, digest.SHA512, digest.SHA384:
567+
return true
568+
default:
569+
return false
570+
}
571+
}

content/oci/oci_test.go

+206
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"io"
2626
"os"
27+
"path"
2728
"path/filepath"
2829
"reflect"
2930
"strconv"
@@ -2844,6 +2845,199 @@ func TestStore_UntagErrorPath(t *testing.T) {
28442845
}
28452846
}
28462847

2848+
func TestStore_GC(t *testing.T) {
2849+
tempDir := t.TempDir()
2850+
s, err := New(tempDir)
2851+
if err != nil {
2852+
t.Fatal("New() error =", err)
2853+
}
2854+
ctx := context.Background()
2855+
2856+
// generate test content
2857+
var blobs [][]byte
2858+
var descs []ocispec.Descriptor
2859+
appendBlob := func(mediaType string, blob []byte) {
2860+
blobs = append(blobs, blob)
2861+
descs = append(descs, ocispec.Descriptor{
2862+
MediaType: mediaType,
2863+
Digest: digest.FromBytes(blob),
2864+
Size: int64(len(blob)),
2865+
})
2866+
}
2867+
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
2868+
manifest := ocispec.Manifest{
2869+
Config: config,
2870+
Subject: subject,
2871+
Layers: layers,
2872+
}
2873+
manifestJSON, err := json.Marshal(manifest)
2874+
if err != nil {
2875+
t.Fatal(err)
2876+
}
2877+
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
2878+
}
2879+
generateImageIndex := func(manifests ...ocispec.Descriptor) {
2880+
index := ocispec.Index{
2881+
Manifests: manifests,
2882+
}
2883+
indexJSON, err := json.Marshal(index)
2884+
if err != nil {
2885+
t.Fatal(err)
2886+
}
2887+
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
2888+
}
2889+
generateArtifactManifest := func(blobs ...ocispec.Descriptor) {
2890+
var manifest spec.Artifact
2891+
manifest.Blobs = append(manifest.Blobs, blobs...)
2892+
manifestJSON, err := json.Marshal(manifest)
2893+
if err != nil {
2894+
t.Fatal(err)
2895+
}
2896+
appendBlob(spec.MediaTypeArtifactManifest, manifestJSON)
2897+
}
2898+
2899+
appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
2900+
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
2901+
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
2902+
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
2903+
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
2904+
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
2905+
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
2906+
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
2907+
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 3")) // Blob 8, dangling layer
2908+
generateArtifactManifest(descs[6]) // blob 9, dangling artifact
2909+
generateImageIndex(descs[7], descs[5]) // blob 10, dangling image index
2910+
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 1")) // Blob 11, garbage layer 1
2911+
generateManifest(descs[0], nil, descs[4]) // Blob 12, garbage manifest 1
2912+
appendBlob(ocispec.MediaTypeImageConfig, []byte("garbage config")) // Blob 13, garbage config
2913+
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
2914+
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
2915+
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest
2916+
2917+
// push blobs 0 - blobs 10 into s
2918+
for i := 0; i <= 10; i++ {
2919+
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
2920+
if err != nil {
2921+
t.Errorf("failed to push test content to src: %d: %v", i, err)
2922+
}
2923+
}
2924+
2925+
// remove blobs 4 - blobs 10 from index.json
2926+
for i := 4; i <= 10; i++ {
2927+
s.tagResolver.Untag(string(descs[i].Digest))
2928+
}
2929+
s.SaveIndex()
2930+
2931+
// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
2932+
// doesn't exist in s
2933+
for i := 11; i < len(blobs); i++ {
2934+
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
2935+
if err != nil {
2936+
t.Errorf("failed to push test content to src: %d: %v", i, err)
2937+
}
2938+
}
2939+
2940+
// confirm that all the blobs are in the storage
2941+
for i := 11; i < len(blobs); i++ {
2942+
exists, err := s.Exists(ctx, descs[i])
2943+
if err != nil {
2944+
t.Fatal(err)
2945+
}
2946+
if !exists {
2947+
t.Fatalf("descs[%d] should exist", i)
2948+
}
2949+
}
2950+
2951+
// perform GC
2952+
if err = s.GC(ctx); err != nil {
2953+
t.Fatal(err)
2954+
}
2955+
2956+
// verify existence
2957+
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
2958+
for i, wantValue := range wantExistence {
2959+
exists, err := s.Exists(ctx, descs[i])
2960+
if err != nil {
2961+
t.Fatal(err)
2962+
}
2963+
if exists != wantValue {
2964+
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
2965+
}
2966+
}
2967+
}
2968+
2969+
func TestStore_GCErrorPath(t *testing.T) {
2970+
tempDir := t.TempDir()
2971+
s, err := New(tempDir)
2972+
if err != nil {
2973+
t.Fatal("New() error =", err)
2974+
}
2975+
ctx := context.Background()
2976+
2977+
// generate test content
2978+
var blobs [][]byte
2979+
var descs []ocispec.Descriptor
2980+
appendBlob := func(mediaType string, blob []byte) {
2981+
blobs = append(blobs, blob)
2982+
descs = append(descs, ocispec.Descriptor{
2983+
MediaType: mediaType,
2984+
Digest: digest.FromBytes(blob),
2985+
Size: int64(len(blob)),
2986+
})
2987+
}
2988+
appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob")) // Blob 0
2989+
2990+
// push the valid blob
2991+
err = s.Push(ctx, descs[0], bytes.NewReader(blobs[0]))
2992+
if err != nil {
2993+
t.Error("failed to push test content to src")
2994+
}
2995+
2996+
// write random contents
2997+
algPath := path.Join(tempDir, "blobs")
2998+
dgstPath := path.Join(algPath, "sha256")
2999+
if err := os.WriteFile(path.Join(algPath, "other"), []byte("random"), 0444); err != nil {
3000+
t.Fatal("error calling WriteFile(), error =", err)
3001+
}
3002+
if err := os.WriteFile(path.Join(dgstPath, "other2"), []byte("random2"), 0444); err != nil {
3003+
t.Fatal("error calling WriteFile(), error =", err)
3004+
}
3005+
3006+
// perform GC
3007+
if err = s.GC(ctx); err != nil {
3008+
t.Fatal(err)
3009+
}
3010+
3011+
appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob 2")) // Blob 1
3012+
3013+
// push the valid blob
3014+
err = s.Push(ctx, descs[1], bytes.NewReader(blobs[1]))
3015+
if err != nil {
3016+
t.Error("failed to push test content to src")
3017+
}
3018+
3019+
// unknown algorithm
3020+
if err := os.Mkdir(path.Join(algPath, "sha666"), 0777); err != nil {
3021+
t.Fatal(err)
3022+
}
3023+
if err = s.GC(ctx); err != nil {
3024+
t.Fatal("this error should be silently ignored")
3025+
}
3026+
3027+
// os.Remove() error
3028+
badDigest := digest.FromBytes([]byte("bad digest")).Encoded()
3029+
badPath := path.Join(algPath, "sha256", badDigest)
3030+
if err := os.Mkdir(badPath, 0777); err != nil {
3031+
t.Fatal(err)
3032+
}
3033+
if err := os.WriteFile(path.Join(badPath, "whatever"), []byte("extra content"), 0444); err != nil {
3034+
t.Fatal("error calling WriteFile(), error =", err)
3035+
}
3036+
if err = s.GC(ctx); err == nil {
3037+
t.Fatal("expect an error when os.Remove()")
3038+
}
3039+
}
3040+
28473041
func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descriptor) bool {
28483042
if len(actual) != len(expected) {
28493043
return false
@@ -2863,3 +3057,15 @@ func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descript
28633057
}
28643058
return true
28653059
}
3060+
3061+
func Test_isContextDone(t *testing.T) {
3062+
ctx := context.Background()
3063+
ctx, cancel := context.WithCancel(ctx)
3064+
if err := isContextDone(ctx); err != nil {
3065+
t.Errorf("expect error = %v, got %v", nil, err)
3066+
}
3067+
cancel()
3068+
if err := isContextDone(ctx); err != context.Canceled {
3069+
t.Errorf("expect error = %v, got %v", context.Canceled, err)
3070+
}
3071+
}

internal/graph/memory.go

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"sync"
2222

23+
"github.com/opencontainers/go-digest"
2324
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2425
"oras.land/oras-go/v2/content"
2526
"oras.land/oras-go/v2/errdef"
@@ -147,6 +148,15 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {
147148
return danglings
148149
}
149150

151+
// DigestSet returns the set of node digest in memory.
152+
func (m *Memory) DigestSet() set.Set[digest.Digest] {
153+
s := set.New[digest.Digest]()
154+
for desc := range m.nodes {
155+
s.Add(desc.Digest)
156+
}
157+
return s
158+
}
159+
150160
// index indexes predecessors for each direct successor of the given node.
151161
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
152162
successors, err := content.Successors(ctx, fetcher, node)

0 commit comments

Comments
 (0)