Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): allow remote storage #2007

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ vendor/
.vscode/
examples/config-sync-localhost.json
node_modules
.tool-versions
7 changes: 0 additions & 7 deletions pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,6 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {

return zerr.ErrBadConfig
}

// enforce filesystem storage in case sync feature is enabled
if config.Extensions != nil && config.Extensions.Sync != nil {
log.Error().Err(zerr.ErrBadConfig).Msg("sync supports only filesystem storage")

return zerr.ErrBadConfig
}
}

// enforce s3 driver on subpaths in case of using storage driver
Expand Down
1 change: 1 addition & 0 deletions pkg/extensions/config/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Credentials struct {
type Config struct {
Enable *bool
CredentialsFile string
TmpDir string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would remove this.

Registries []RegistryConfig
}

Expand Down
39 changes: 24 additions & 15 deletions pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package extensions
import (
"net"
"net/url"
"os"
"strings"

zerr "zotregistry.io/zot/errors"
Expand Down Expand Up @@ -41,23 +42,31 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand

if isPeriodical || isOnDemand {
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
storeController, metaDB, log)
if err != nil {
return nil, err
}
if !(isPeriodical || isOnDemand) {
continue
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}
tmpDir := config.Extensions.Sync.TmpDir
if tmpDir == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say, that by default, it should still sync temporarily under each repo, like today(main). if a TmpDir is given in config, then use that, this way all the tests should pass.

right @rchincha ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not want to step outside of our "storageRootDir" path. That's all the space we will own and manage, and nothing outside of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elee1766 My suggestion is, if there is a cloud backend defined in storage, then instead, do a second step of writing to that backend after the local sync is done

// use an os tmpdir as tmpdir if not set
tmpDir = os.TempDir()
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile, tmpDir,
storeController, metaDB, log)
if err != nil {
return nil, err
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
}

Expand Down
30 changes: 20 additions & 10 deletions pkg/extensions/sync/local.go → pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,35 @@ import (
storageTypes "zotregistry.io/zot/pkg/storage/types"
)

type LocalRegistry struct {
type DestinationRegistry struct {
storeController storage.StoreController
tempStorage OciLayoutStorage
metaDB mTypes.MetaDB
log log.Logger
}

func NewLocalRegistry(storeController storage.StoreController, metaDB mTypes.MetaDB, log log.Logger) Local {
return &LocalRegistry{
func NewDestinationRegistry(
storeController storage.StoreController,
tmpStorage OciLayoutStorage,
metaDB mTypes.MetaDB,
log log.Logger,
) Destination {
if tmpStorage == nil {
// to allow passing nil we can do this, noting that it will only work for a local StoreController
tmpStorage = NewOciLayoutStorage(storeController)
}

return &DestinationRegistry{
storeController: storeController,
metaDB: metaDB,
// first we sync from remote (using containers/image copy from docker:// to oci:) to a temp imageStore
// then we copy the image from tempStorage to zot's storage using ImageStore APIs
tempStorage: NewOciLayoutStorage(storeController),
tempStorage: tmpStorage,
log: log,
}
}

func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
func (registry *DestinationRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
// check image already synced
imageStore := registry.storeController.GetImageStore(repo)

Expand Down Expand Up @@ -75,16 +85,16 @@ func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest
return true, nil
}

func (registry *LocalRegistry) GetContext() *types.SystemContext {
func (registry *DestinationRegistry) GetContext() *types.SystemContext {
return registry.tempStorage.GetContext()
}

func (registry *LocalRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
func (registry *DestinationRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
return registry.tempStorage.GetImageReference(repo, reference)
}

// finalize a syncing image.
func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
func (registry *DestinationRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
imageStore := registry.storeController.GetImageStore(repo)

tempImageStore := getImageStoreFromImageReference(imageReference, repo, reference)
Expand Down Expand Up @@ -180,7 +190,7 @@ func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference,
return nil
}

func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte, reference string,
func (registry *DestinationRegistry) copyManifest(repo string, manifestContent []byte, reference string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
Expand Down Expand Up @@ -239,7 +249,7 @@ func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte,
}

// Copy a blob from one image store to another image store.
func (registry *LocalRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
func (registry *DestinationRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
Expand Down
50 changes: 33 additions & 17 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"zotregistry.io/zot/pkg/log"
mTypes "zotregistry.io/zot/pkg/meta/types"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
)

type BaseService struct {
config syncconf.RegistryConfig
credentials syncconf.CredentialsFile
remote Remote
local Local
destination Destination
retryOptions *retry.RetryOptions
contentManager ContentManager
storeController storage.StoreController
Expand All @@ -37,8 +38,13 @@ type BaseService struct {
log log.Logger
}

func New(opts syncconf.RegistryConfig, credentialsFilepath string,
storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger,
func New(
opts syncconf.RegistryConfig,
credentialsFilepath string,
tmpDir string,
storeController storage.StoreController,
metadb mTypes.MetaDB,
log log.Logger,
) (Service, error) {
service := &BaseService{}

Expand All @@ -60,7 +66,14 @@ func New(opts syncconf.RegistryConfig, credentialsFilepath string,
service.credentials = credentialsFile

service.contentManager = NewContentManager(opts.Content, log)
service.local = NewLocalRegistry(storeController, metadb, log)

tmpImageStore := local.NewImageStore(tmpDir,
false, false, log, nil, nil, nil,
)

tmpStorage := NewOciLayoutStorage(storage.StoreController{DefaultStore: tmpImageStore})

service.destination = NewDestinationRegistry(storeController, tmpStorage, metadb, log)

retryOptions := &retry.RetryOptions{}

Expand Down Expand Up @@ -289,7 +302,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Msgf("sync: syncing tags %v", tags)

// apply content.destination rule
localRepo := service.contentManager.GetRepoDestination(repo)
destinationRepo := service.contentManager.GetRepoDestination(repo)

for _, tag := range tags {
select {
Expand All @@ -305,7 +318,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
var manifestDigest digest.Digest

if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)
manifestDigest, err = service.syncTag(ctx, destinationRepo, repo, tag)

return err
}, service.retryOptions); err != nil {
Expand All @@ -322,7 +335,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {

if manifestDigest != "" {
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String())
err = service.references.SyncAll(ctx, destinationRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
}
Expand All @@ -342,8 +355,11 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
return nil
}

func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())
func (service *BaseService) syncTag(
ctx context.Context,
destinationRepo, remoteRepo, tag string,
) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.destination.GetContext())

policyContext, err := getPolicyContext(service.log)
if err != nil {
Expand Down Expand Up @@ -386,38 +402,38 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo,
}
}

skipImage, err := service.local.CanSkipImage(localRepo, tag, manifestDigest)
skipImage, err := service.destination.CanSkipImage(destinationRepo, tag, manifestDigest)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).
Str("repo", destinationRepo).Str("reference", tag).
Msg("couldn't check if the local image can be skipped")
}

if !skipImage {
localImageRef, err := service.local.GetImageReference(localRepo, tag)
localImageRef, err := service.destination.GetImageReference(destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't get a local image reference")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't get a local image reference")

return "", err
}

service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("syncing image")

_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("coulnd't sync image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("coulnd't sync image")

return "", err
}

err = service.local.CommitImage(localImageRef, localRepo, tag)
err = service.destination.CommitImage(localImageRef, destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't commit image to local image store")

return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Remote interface {
}

// Local registry.
type Local interface {
type Destination interface {
Registry
// Check if an image is already synced
CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error)
Expand Down
12 changes: 6 additions & 6 deletions pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}

service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

err = service.SyncRepo(context.Background(), "repo")
So(err, ShouldNotBeNil)
})
}

func TestLocalRegistry(t *testing.T) {
func TestDestinationRegistry(t *testing.T) {
Convey("make StoreController", t, func() {
dir := t.TempDir()

Expand All @@ -185,7 +185,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "repo"

registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
imageReference, err := registry.GetImageReference(repoName, "1.0")
So(err, ShouldBeNil)
So(imageReference, ShouldNotBeNil)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver)
repoName := "repo"

registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)

err = registry.CommitImage(imageReference, repoName, "1.0")
So(err, ShouldBeNil)
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestLocalRegistry(t *testing.T) {
})

Convey("trigger metaDB error on index manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error {
if reference == "1.0" {
return zerr.ErrRepoMetaNotFound
Expand All @@ -351,7 +351,7 @@ func TestLocalRegistry(t *testing.T) {
})

Convey("trigger metaDB error on image manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error {
return zerr.ErrRepoMetaNotFound
},
Expand Down
1 change: 1 addition & 0 deletions pkg/extensions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func makeDownstreamServer(
BaseConfig: extconf.BaseConfig{Enable: &defVal},
}
destConfig.Extensions.Sync = syncConfig
destConfig.Extensions.Sync.TmpDir = destDir
destConfig.Log.Output = path.Join(destDir, "sync.log")
destConfig.Log.Level = "debug"

Expand Down
2 changes: 1 addition & 1 deletion test/blackbox/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ BATS=${SCRIPTPATH}/../../hack/tools/bin/bats
PATH=$PATH:${SCRIPTPATH}/../../hack/tools/bin

tests=("pushpull" "pushpull_authn" "delete_images" "referrers" "metadata" "anonymous_policy"
"annotations" "detect_manifest_collision" "cve" "sync" "sync_docker" "sync_replica_cluster"
"annotations" "detect_manifest_collision" "cve" "sync" "sync_docker" "sync_remote" "sync_replica_cluster"
"scrub" "garbage_collect" "metrics" "metrics_minimal")

for test in ${tests[*]}; do
Expand Down
Loading
Loading