Skip to content

Commit

Permalink
feat: support deletion for memory graph (#606)
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxuan Wang <[email protected]>

Part 1/4 of #454

Based on draft PR #582 

Current behavior regarding `graph.Memory.Remove(node)`:
* `node` entry in `m.successors` and `m.nodes` is removed.
* `node` is removed from its successors predecessors list.
* `node` entry in `m.predecessors` is NOT removed, **unless all its
predecessors no longer exist**.
* `node` is NOT removed from its predecessors' `m.successors` list. The
`m.successors` is always in accordance with the actual content.

Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 authored Oct 16, 2023
1 parent e8e4f84 commit 459a246
Show file tree
Hide file tree
Showing 4 changed files with 692 additions and 47 deletions.
5 changes: 5 additions & 0 deletions internal/container/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (s Set[T]) Contains(item T) bool {
_, ok := s[item]
return ok
}

// Delete deletes an item from the set.
func (s Set[T]) Delete(item T) {
delete(s, item)
}
8 changes: 8 additions & 0 deletions internal/container/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,12 @@ func TestSet(t *testing.T) {
if got, want := len(set), 2; got != want {
t.Errorf("len(Set) = %v, want %v", got, want)
}
// test deleting a key
set.Delete(key1)
if got, want := set.Contains(key1), false; got != want {
t.Errorf("Set.Contains(%s) = %v, want %v", key1, got, want)
}
if got, want := len(set), 1; got != want {
t.Errorf("len(Set) = %v, want %v", got, want)
}
}
114 changes: 67 additions & 47 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,54 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/container/set"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/status"
"oras.land/oras-go/v2/internal/syncutil"
)

// Memory is a memory based PredecessorFinder.
type Memory struct {
predecessors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
indexed sync.Map // map[descriptor.Descriptor]any
nodes map[descriptor.Descriptor]ocispec.Descriptor // nodes saves the map keys of ocispec.Descriptor
predecessors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
successors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
lock sync.RWMutex
}

// NewMemory creates a new memory PredecessorFinder.
func NewMemory() *Memory {
return &Memory{}
return &Memory{
nodes: make(map[descriptor.Descriptor]ocispec.Descriptor),
predecessors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
successors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
}
}

// Index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return err
}

m.index(ctx, node, successors)
return nil
_, err := m.index(ctx, fetcher, node)
return err
}

// Index indexes predecessors for all the successors of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
// track content status
tracker := status.NewTracker()

var fn syncutil.GoFunc[ocispec.Descriptor]
fn = func(ctx context.Context, region *syncutil.LimitedRegion, desc ocispec.Descriptor) error {
// skip the node if other go routine is working on it
_, committed := tracker.TryCommit(desc)
if !committed {
return nil
}

// skip the node if it has been indexed
key := descriptor.FromOCI(desc)
_, exists := m.indexed.Load(key)
if exists {
return nil
}

successors, err := content.Successors(ctx, fetcher, desc)
successors, err := m.index(ctx, fetcher, desc)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
// skip the node if it does not exist
return nil
}
return err
}
m.index(ctx, desc, successors)
m.indexed.Store(key, nil)

if len(successors) > 0 {
// traverse and index successors
return syncutil.Go(ctx, nil, fn, successors...)
Expand All @@ -96,39 +82,73 @@ func (m *Memory) IndexAll(ctx context.Context, fetcher content.Fetcher, node oci

// Predecessors returns the nodes directly pointing to the current node.
// Predecessors returns nil without error if the node does not exists in the
// store.
// Like other operations, calling Predecessors() is go-routine safe. However,
// it does not necessarily correspond to any consistent snapshot of the stored
// contents.
// store. Like other operations, calling Predecessors() is go-routine safe.
// However, it does not necessarily correspond to any consistent snapshot of
// the stored contents.
func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
m.lock.RLock()
defer m.lock.RUnlock()

key := descriptor.FromOCI(node)
value, exists := m.predecessors.Load(key)
set, exists := m.predecessors[key]
if !exists {
return nil, nil
}
predecessors := value.(*sync.Map)

var res []ocispec.Descriptor
predecessors.Range(func(key, value interface{}) bool {
res = append(res, value.(ocispec.Descriptor))
return true
})
for k := range set {
res = append(res, m.nodes[k])
}
return res, nil
}

// Remove removes the node from its predecessors and successors.
func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) error {
m.lock.Lock()
defer m.lock.Unlock()

nodeKey := descriptor.FromOCI(node)
// remove the node from its successors' predecessor list
for successorKey := range m.successors[nodeKey] {
predecessorEntry := m.predecessors[successorKey]
predecessorEntry.Delete(nodeKey)

// if none of the predecessors of the node still exists, we remove the
// predecessors entry. Otherwise, we do not remove the entry.
if len(predecessorEntry) == 0 {
delete(m.predecessors, successorKey)
}
}
delete(m.successors, nodeKey)
delete(m.nodes, nodeKey)
return nil
}

// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
if len(successors) == 0 {
return
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return nil, err
}
m.lock.Lock()
defer m.lock.Unlock()

// index the node
nodeKey := descriptor.FromOCI(node)
m.nodes[nodeKey] = node

predecessorKey := descriptor.FromOCI(node)
// for each successor, put it into the node's successors list, and
// put node into the succeesor's predecessors list
successorSet := set.New[descriptor.Descriptor]()
m.successors[nodeKey] = successorSet
for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
value, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessors := value.(*sync.Map)
predecessors.Store(predecessorKey, node)
successorSet.Add(successorKey)
predecessorSet, exists := m.predecessors[successorKey]
if !exists {
predecessorSet = set.New[descriptor.Descriptor]()
m.predecessors[successorKey] = predecessorSet
}
predecessorSet.Add(nodeKey)
}
return successors, nil
}
Loading

0 comments on commit 459a246

Please sign in to comment.