Skip to content

Commit

Permalink
per discussion of yesterday
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 committed Sep 12, 2023
1 parent 4c442f5 commit cce32a1
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 313 deletions.
4 changes: 2 additions & 2 deletions content/oci/deletableOci.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type DeletableStore struct {

storage *Storage
tagResolver *resolver.Memory
graph *graph.MemoryWithDelete
graph *graph.DeletableMemory
}

// NewDeletableStore returns a new DeletableStore.
Expand All @@ -82,7 +82,7 @@ func NewDeletableStoreWithContext(ctx context.Context, root string) (*DeletableS
indexPath: filepath.Join(rootAbs, ociImageIndexFile),
storage: storage,
tagResolver: resolver.NewMemory(),
graph: graph.NewMemoryWithDelete(),
graph: graph.NewDeletableMemory(),
}

if err := ensureDir(filepath.Join(rootAbs, ociBlobsDir)); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion content/oci/readonlyoci.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func loadIndex(ctx context.Context, index *ocispec.Index, fetcher content.Fetche
}

// loadIndex loads index into memory.
func loadIndexWithMemoryWithDelete(ctx context.Context, index *ocispec.Index, fetcher content.Fetcher, tagger content.Tagger, graph *graph.MemoryWithDelete) error {
func loadIndexWithMemoryWithDelete(ctx context.Context, index *ocispec.Index, fetcher content.Fetcher, tagger content.Tagger, graph *graph.DeletableMemory) error {
for _, desc := range index.Manifests {
if err := tagger.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions internal/container/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (s Set[T]) Contains(item T) bool {
_, ok := s[item]
return ok
}

// Delete deletes an item from the set
func (s Set[T]) Delete(item T) {
delete(s, item)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,41 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/container/set"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/status"
"oras.land/oras-go/v2/internal/syncutil"
)

// MemoryWithDelete is a MemoryWithDelete based PredecessorFinder.
type MemoryWithDelete struct {
indexed sync.Map // map[descriptor.Descriptor]any, this variable is only used by IndexAll
predecessors map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
successors map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
lock sync.Mutex
// DeletableMemory is a DeletableMemory based PredecessorFinder.
type DeletableMemory struct {
nodes map[descriptor.Descriptor]ocispec.Descriptor // nodes saves the map keys of ocispec.Descriptor
predecessors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
successors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
lock sync.RWMutex
}

// NewMemoryWithDelete creates a new MemoryWithDelete PredecessorFinder.
func NewMemoryWithDelete() *MemoryWithDelete {
return &MemoryWithDelete{
predecessors: make(map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor),
successors: make(map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor),
// NewDeletableMemory creates a new DeletableMemory.
func NewDeletableMemory() *DeletableMemory {
return &DeletableMemory{
nodes: make(map[descriptor.Descriptor]ocispec.Descriptor),
predecessors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
successors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
}
}

// Index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return err
}

m.lock.Lock()
defer m.lock.Unlock()
m.index(ctx, node, successors)
return nil
func (m *DeletableMemory) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
_, err := m.index(ctx, fetcher, node)
return err
}

// Index indexes predecessors for all the successors of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
func (m *DeletableMemory) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
// track content status
tracker := status.NewTracker()

Expand All @@ -74,32 +69,21 @@ func (m *MemoryWithDelete) IndexAll(ctx context.Context, fetcher content.Fetcher
return nil
}

// skip the node if it has been indexed
key := descriptor.FromOCI(desc)
_, exists := m.indexed.Load(key)
if exists {
return nil
}

successors, err := content.Successors(ctx, fetcher, desc)
successors, err := m.index(ctx, fetcher, desc)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
// skip the node if it does not exist
return nil
}
return err
}
m.index(ctx, desc, successors)
m.indexed.Store(key, nil)

if len(successors) > 0 {
// traverse and index successors
return syncutil.Go(ctx, nil, fn, successors...)
}
return nil
}
m.lock.Lock()
defer m.lock.Unlock()
return syncutil.Go(ctx, nil, fn, node)
}

Expand All @@ -109,65 +93,63 @@ func (m *MemoryWithDelete) IndexAll(ctx context.Context, fetcher content.Fetcher
// Like other operations, calling Predecessors() is go-routine safe. However,
// it does not necessarily correspond to any consistent snapshot of the stored
// contents.
func (m *MemoryWithDelete) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
func (m *DeletableMemory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
m.lock.RLock()
defer m.lock.RUnlock()

key := descriptor.FromOCI(node)
_, exists := m.predecessors[key]
set, exists := m.predecessors[key]
if !exists {
return nil, nil
}
var res []ocispec.Descriptor
for _, v := range m.predecessors[key] {
res = append(res, v)
for k := range set {
res = append(res, m.nodes[k])
}
return res, nil
}

// Remove removes the node from its predecessors and successors.
func (m *MemoryWithDelete) Remove(ctx context.Context, node ocispec.Descriptor) error {
func (m *DeletableMemory) Remove(ctx context.Context, node ocispec.Descriptor) error {
nodeKey := descriptor.FromOCI(node)
m.lock.Lock()
defer m.lock.Unlock()
// remove the node from its successors' predecessor list
for successorKey := range m.successors[nodeKey] {
delete(m.predecessors[successorKey], nodeKey)
m.predecessors[successorKey].Delete(successorKey)
}
m.removeEntriesFromMaps(ctx, node)
delete(m.successors, nodeKey)
return nil
}

// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
m.createEntriesInMaps(ctx, node)
if len(successors) == 0 {
return
func (m *DeletableMemory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return nil, err
}
predecessorKey := descriptor.FromOCI(node)
m.lock.Lock()
defer m.lock.Unlock()

// index the node
nodeKey := descriptor.FromOCI(node)
m.nodes[nodeKey] = node

// index the successors and predecessors
successorSet := set.New[descriptor.Descriptor]()
m.successors[nodeKey] = successorSet

for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
// store in m.predecessors, MemoryWithDelete.predecessors[successorKey].Store(node)
if _, exists := m.predecessors[successorKey]; !exists {
m.predecessors[successorKey] = make(map[descriptor.Descriptor]ocispec.Descriptor)
successorSet.Add(successorKey)
predecessorSet, exists := m.predecessors[nodeKey]
if !exists {
predecessorSet = set.New[descriptor.Descriptor]()
m.predecessors[successorKey] = predecessorSet
}
m.predecessors[successorKey][predecessorKey] = node
// store in m.successors, MemoryWithDelete.successors[predecessorKey].Store(successor)
m.successors[predecessorKey][successorKey] = successor
}
}

func (m *MemoryWithDelete) createEntriesInMaps(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
if _, hasEntry := m.predecessors[key]; !hasEntry {
m.predecessors[key] = make(map[descriptor.Descriptor]ocispec.Descriptor)
}
if _, hasEntry := m.successors[key]; !hasEntry {
m.successors[key] = make(map[descriptor.Descriptor]ocispec.Descriptor)
predecessorSet.Add(nodeKey)
}
}

func (m *MemoryWithDelete) removeEntriesFromMaps(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
delete(m.successors, key)
m.indexed.Delete(key) // pending
return successors, nil
}
Loading

0 comments on commit cce32a1

Please sign in to comment.