Skip to content

Commit

Permalink
Refactor store index into its structure
Browse files Browse the repository at this point in the history
Kubernetes-commit: 7c94ce3076a96acab4c7e88489cd596f1aad40e0
  • Loading branch information
wojtek-t authored and k8s-publishing-bot committed May 5, 2022
1 parent 4fbef5b commit b69a16c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 79 deletions.
179 changes: 105 additions & 74 deletions tools/cache/thread_safe_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,89 @@ type ThreadSafeStore interface {
Resync() error
}

// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

func (i *storeIndex) reset() {
i.indices = Indices{}
}

func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := i.indices[indexName]

var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}

return storeKeySet, nil
}

func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

index := i.indices[indexName]
return index[indexedValue], nil
}

func (i *storeIndex) getIndexValues(indexName string) []string {
index := i.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}

func (i *storeIndex) addIndexers(newIndexers Indexers) error {
oldKeys := sets.StringKeySet(i.indexers)
newKeys := sets.StringKeySet(newIndexers)

if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}

for k, v := range newIndexers {
i.indexers[k] = v
}
return nil
}

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
// index implements the indexing functionality
index *storeIndex
}

func (c *threadSafeMap) Add(key string, obj interface{}) {
Expand All @@ -79,14 +153,14 @@ func (c *threadSafeMap) Update(key string, obj interface{}) {
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
c.index.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.updateIndices(obj, nil, key)
c.index.updateIndices(obj, nil, key)
delete(c.items, key)
}
}
Expand Down Expand Up @@ -126,9 +200,9 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
c.items = items

// rebuild any index
c.indices = Indices{}
c.index.reset()
for key, item := range c.items {
c.updateIndices(nil, item, key)
c.index.updateIndices(nil, item, key)
}
}

Expand All @@ -138,32 +212,10 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

indexedValues, err := indexFunc(obj)
storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]

var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}

list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
Expand All @@ -177,14 +229,10 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{},
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}

index := c.indices[indexName]

set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
Expand All @@ -199,31 +247,22 @@ func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, err
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}

index := c.indices[indexName]

set := index[indexedValue]
return set.List(), nil
}

func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()

index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
return c.index.getIndexValues(indexName)
}

func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
return c.index.indexers
}

func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
Expand All @@ -234,28 +273,18 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
return fmt.Errorf("cannot add indexers to running index")
}

oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)

if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}

for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
return c.index.addIndexers(newIndexers)
}

// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range c.indexers {
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
Expand All @@ -274,10 +303,10 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}

index := c.indices[name]
index := i.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
i.indices[name] = index
}

if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
Expand All @@ -286,15 +315,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
}

for _, value := range oldIndexValues {
c.deleteKeyFromIndex(key, value, index)
i.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
c.addKeyToIndex(key, value, index)
i.addKeyToIndex(key, value, index)
}
}
}

func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
set = sets.String{}
Expand All @@ -303,7 +332,7 @@ func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
set.Insert(key)
}

func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) {
func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
return
Expand All @@ -325,8 +354,10 @@ func (c *threadSafeMap) Resync() error {
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
items: map[string]interface{}{},
index: &storeIndex{
indexers: indexers,
indices: indices,
},
}
}
10 changes: 5 additions & 5 deletions tools/cache/thread_safe_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) {
store.Add(testKey, testKey)

// Assumption check, there should be a set for the `testKey` with one element in the added index
set := store.indices[testIndexer][testKey]
set := store.index.indices[testIndexer][testKey]

if len(set) != 1 {
t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set))
return
}

store.Delete(testKey)
set, present := store.indices[testIndexer][testKey]
set, present := store.index.indices[testIndexer][testKey]

if present {
t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set))
Expand All @@ -76,15 +76,15 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
store.Add("delete", "delete")

// Assumption check, there should be a set for the `testIndex` with two elements
set := store.indices[testIndexer][testIndex]
set := store.index.indices[testIndexer][testIndex]

if len(set) != 2 {
t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set))
return
}

store.Delete("delete")
set, present := store.indices[testIndexer][testIndex]
set, present := store.index.indices[testIndexer][testIndex]

if !present {
t.Errorf("Index backing string set erroneously deleted from index.")
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) {
assert := assert.New(t)

compare := func(key string, expected []string) error {
values := store.indices[testIndexer][key].List()
values := store.index.indices[testIndexer][key].List()
if cmp.Equal(values, expected) {
return nil
}
Expand Down

0 comments on commit b69a16c

Please sign in to comment.