Skip to content

Commit

Permalink
Merge branch 'main' into sync-remote-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
elee1766 authored Nov 18, 2023
2 parents 36123d7 + 8e7b2d2 commit f3cded9
Show file tree
Hide file tree
Showing 23 changed files with 765 additions and 160 deletions.
1 change: 1 addition & 0 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ jobs:
- uses: actions/checkout@v4
- name: Check out source code
uses: actions/checkout@v4
- uses: ./.github/actions/clean-runner
- name: Build image
run: |
make docker-image
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,22 +264,22 @@ func TestCreateMetaDBDriver(t *testing.T) {

const perms = 0o600

boltDB, err := bbolt.Open(path.Join(dir, "repo.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
boltDB, err := bbolt.Open(path.Join(dir, "meta.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
So(err, ShouldBeNil)

err = boltDB.Close()
So(err, ShouldBeNil)

err = os.Chmod(path.Join(dir, "repo.db"), 0o200)
err = os.Chmod(path.Join(dir, "meta.db"), 0o200)
So(err, ShouldBeNil)

_, err = meta.New(conf.Storage.StorageConfig, log)
So(err, ShouldNotBeNil)

err = os.Chmod(path.Join(dir, "repo.db"), 0o600)
err = os.Chmod(path.Join(dir, "meta.db"), 0o600)
So(err, ShouldBeNil)

defer os.Remove(path.Join(dir, "repo.db"))
defer os.Remove(path.Join(dir, "meta.db"))
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func LoadConfiguration(config *config.Config, configPath string) error {
}

metaData := &mapstructure.Metadata{}
if err := viperInstance.Unmarshal(&config, metadataConfig(metaData)); err != nil {
if err := viperInstance.UnmarshalExact(&config, metadataConfig(metaData)); err != nil {
log.Error().Err(err).Msg("error while unmarshaling new config")

return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (

const (
graphqlQueryPrefix = constants.FullSearchPrefix
DBFileName = "repo.db"
DBFileName = "meta.db"
)

var (
Expand Down
139 changes: 135 additions & 4 deletions pkg/meta/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
if err != nil {
return err
}

_, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck))
if err != nil {
return err
}
Expand All @@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
}, nil
}

func (bdw *BoltDB) GetAllRepoNames() ([]string, error) {
repoNames := []string{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))

return repoMetaBuck.ForEach(func(repo, _ []byte) error {
repoNames = append(repoNames, string(repo))

return nil
})
})

return repoNames, err
}

func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time {
lastUpdated := time.Time{}

err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))

lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo))
if len(lastUpdatedBlob) == 0 {
return zerr.ErrRepoMetaNotFound
}

protoTime := &timestamppb.Timestamp{}

err := proto.Unmarshal(lastUpdatedBlob, protoTime)
if err != nil {
return err
}

lastUpdated = *mConvert.GetTime(protoTime)

return nil
})
if err != nil {
return time.Time{}
}

return lastUpdated
}

func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(ImageMetaBuck))
Expand Down Expand Up @@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
err = bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
imageBuck := tx.Bucket([]byte(ImageMetaBuck))

// 1. Add image data to db if needed
Expand Down Expand Up @@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference

protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck)
if err != nil {
return err
Expand All @@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
return err
}

func setRepoLastUpdated(repo string, lastUpdated time.Time, repoLastUpdatedBuck *bbolt.Bucket) error {
protoTime := timestamppb.New(lastUpdated)

protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}

return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
}

func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
repoBlobs := &proto_go.RepoBlobs{
Name: repo,
Expand Down Expand Up @@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error {
func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(RepoMetaBuck))
repoLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck))

repoMeta.Name = repo

Expand All @@ -1018,7 +1088,35 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
return err
}

return buck.Put([]byte(repo), repoMetaBlob)
err = buck.Put([]byte(repo), repoMetaBlob)
if err != nil {
return err
}

// The last update time is set to 0 in order to force an update in case of a next storage parsing
return setRepoLastUpdated(repo, time.Time{}, repoLastUpdatedBuck)
})

return err
}

func (bdw *BoltDB) DeleteRepoMeta(repo string) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

err := repoBuck.Delete([]byte(repo))
if err != nil {
return err
}

err = repoBlobsBuck.Delete([]byte(repo))
if err != nil {
return err
}

return repoLastUpdatedBuck.Delete([]byte(repo))
})

return err
Expand Down Expand Up @@ -1212,6 +1310,7 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))
imageMetaBuck := tx.Bucket([]byte(ImageMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))

protoRepoMeta, err := getProtoRepoMeta(repo, repoMetaBuck)
if err != nil {
Expand Down Expand Up @@ -1292,6 +1391,11 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
return err
}

err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}

protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference)

repoBlobsBytes, err = proto.Marshal(repoBlobs)
Expand Down Expand Up @@ -1934,12 +2038,39 @@ func (bdw *BoltDB) ResetDB() error {
}

func resetBucket(transaction *bbolt.Tx, bucketName string) error {
err := transaction.DeleteBucket([]byte(bucketName))
bucket := transaction.Bucket([]byte(bucketName))
if bucket == nil {
return nil
}

// we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer
subBuckets := [][]byte{}

err := bucket.ForEachBucket(func(bucketName []byte) error {
subBuckets = append(subBuckets, bucketName)

return nil
})
if err != nil {
return err
}

_, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
err = transaction.DeleteBucket([]byte(bucketName))
if err != nil {
return err
}

bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}

for _, subBucket := range subBuckets {
_, err := bucket.CreateBucketIfNotExists(subBucket)
if err != nil {
return err
}
}

return err
}
67 changes: 15 additions & 52 deletions pkg/meta/boltdb/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,58 +63,6 @@ func TestWrapperErrors(t *testing.T) {

ctx := userAc.DeriveContext(context.Background())

Convey("ResetDB", func() {
Convey("RepoMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("ImageMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.ImageMetaBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("RepoBlobsBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoBlobsBuck))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserAPIKeysBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserAPIKeysBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})

Convey("UserDataBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserDataBucket))
})
So(err, ShouldBeNil)

err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
})

Convey("RemoveRepoReference", func() {
Convey("getProtoRepoMeta errors", func() {
err := setRepoMeta("repo", badProtoBlob, boltdbWrapper.DB)
Expand Down Expand Up @@ -192,6 +140,21 @@ func TestWrapperErrors(t *testing.T) {
})
})

Convey("GetRepoLastUpdated", func() {
Convey("bad blob in db", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(boltdb.RepoBlobsBuck))
lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(boltdb.RepoLastUpdatedBuck))

return lastUpdatedBuck.Put([]byte("repo"), []byte("bad-blob"))
})
So(err, ShouldBeNil)

lastUpdated := boltdbWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
})

Convey("UpdateStatsOnDownload", func() {
Convey("repo meta not found", func() {
err = boltdbWrapper.UpdateStatsOnDownload("repo", "ref")
Expand Down
6 changes: 5 additions & 1 deletion pkg/meta/boltdb/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package boltdb
const (
ImageMetaBuck = "ImageMeta"
RepoMetaBuck = "RepoMeta"
RepoBlobsBuck = "RepoBlobsMeta"
UserDataBucket = "UserData"
VersionBucket = "Version"
UserAPIKeysBucket = "UserAPIKeys"
)

const (
RepoBlobsBuck = "RepoBlobsMeta"
RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket
)
2 changes: 1 addition & 1 deletion pkg/meta/boltdb/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DBParameters struct {
func GetBoltDriver(params DBParameters) (*bolt.DB, error) {
const perms = 0o600

boltDB, err := bolt.Open(path.Join(params.RootDir, "repo.db"), perms, &bolt.Options{Timeout: time.Second * 10})
boltDB, err := bolt.Open(path.Join(params.RootDir, "meta.db"), perms, &bolt.Options{Timeout: time.Second * 10})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f3cded9

Please sign in to comment.