Skip to content

Commit

Permalink
add support for optional progress updates channel
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <[email protected]>
  • Loading branch information
deitch committed Dec 20, 2023
1 parent 98758b7 commit 178a2e8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
27 changes: 25 additions & 2 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ type CopyGraphOptions struct {
// source storage to fetch large blobs.
// If FindSuccessors is nil, content.Successors will be used.
FindSuccessors func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error)

// UpdateChannel is an optional channel to receive progress updates.
// Each update will include the number of bytes copied for a particular blob
// or manifest, the expected total size, and the descriptor of the blob or
// manifest. It is up to the consumer of the channel to differentiate
// between updates among different blobs and manifests; no mechanism is
// provided for distinguishing between them, other than the descriptor
// passed with each update. The total size of downloads of all blobs and
// manifests is not provided, as it is not known. You can calculate the
// percentage downloaded for a particular blob in an individual update
// based on the total size of that blob, which is provided in the
// descriptor, and the number of bytes copied, which is provided in the
// update.
// Updates are sent each time a block is copied. The number of bytes copied
// depends upon io.Copy, which, by default, is 32KB. As of now, this cannot
// be changed. We may provided that capability in a future update.
UpdateChannel chan<- CopyUpdate
}

// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
Expand Down Expand Up @@ -266,11 +283,17 @@ func copyGraph(ctx context.Context, src content.ReadOnlyStorage, dst content.Sto
}

// doCopyNode copies a single content from the source CAS to the destination CAS.
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor) error {
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor, ch chan<- CopyUpdate) error {
rc, err := src.Fetch(ctx, desc)
if err != nil {
return err
}
if ch != nil {
rc = &progressReader{
c: ch,
r: rc,
}
}
defer rc.Close()
err = dst.Push(ctx, desc, rc)
if err != nil && !errors.Is(err, errdef.ErrAlreadyExists) {
Expand All @@ -291,7 +314,7 @@ func copyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Stor
}
}

if err := doCopyNode(ctx, src, dst, desc); err != nil {
if err := doCopyNode(ctx, src, dst, desc, opts.UpdateChannel); err != nil {
return err
}

Expand Down
46 changes: 46 additions & 0 deletions progress_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oras

import (
"io"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

type CopyUpdate struct {
Copied int64
Descriptor ocispec.Descriptor
}

type progressReader struct {
desc ocispec.Descriptor
r io.ReadCloser
c chan<- CopyUpdate
}

func (p *progressReader) Close() error {
close(p.c)
return p.r.Close()
}

func (p *progressReader) Read(buf []byte) (int, error) {
n, err := p.r.Read(buf)
if n > 0 {
p.c <- CopyUpdate{Copied: int64(n), Descriptor: p.desc}
}
return n, err
}

0 comments on commit 178a2e8

Please sign in to comment.