From b69a16cf38a529196b8d9a9846bcfc3a4d2a107f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 5 May 2022 17:30:24 +0200 Subject: [PATCH 1/2] Refactor store index into its structure Kubernetes-commit: 7c94ce3076a96acab4c7e88489cd596f1aad40e0 --- tools/cache/thread_safe_store.go | 179 +++++++++++++++----------- tools/cache/thread_safe_store_test.go | 10 +- 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 1182ea144b..686b94e217 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -59,15 +59,89 @@ type ThreadSafeStore interface { Resync() error } +// storeIndex implements the indexing functionality for Store interface +type storeIndex struct { + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices +} + +func (i *storeIndex) reset() { + i.indices = Indices{} +} + +func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexedValues, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := i.indices[indexName] + + var storeKeySet sets.String + if len(indexedValues) == 1 { + // In majority of cases, there is exactly one value matching. + // Optimize the most common path - deduping is not needed here. + storeKeySet = index[indexedValues[0]] + } else { + // Need to de-dupe the return list. + // Since multiple keys are allowed, this can happen. + storeKeySet = sets.String{} + for _, indexedValue := range indexedValues { + for key := range index[indexedValue] { + storeKeySet.Insert(key) + } + } + } + + return storeKeySet, nil +} + +func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + index := i.indices[indexName] + return index[indexedValue], nil +} + +func (i *storeIndex) getIndexValues(indexName string) []string { + index := i.indices[indexName] + names := make([]string, 0, len(index)) + for key := range index { + names = append(names, key) + } + return names +} + +func (i *storeIndex) addIndexers(newIndexers Indexers) error { + oldKeys := sets.StringKeySet(i.indexers) + newKeys := sets.StringKeySet(newIndexers) + + if oldKeys.HasAny(newKeys.List()...) { + return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) + } + + for k, v := range newIndexers { + i.indexers[k] = v + } + return nil +} + // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} - // indexers maps a name to an IndexFunc - indexers Indexers - // indices maps a name to an Index - indices Indices + // index implements the indexing functionality + index *storeIndex } func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -79,14 +153,14 @@ func (c *threadSafeMap) Update(key string, obj interface{}) { defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj - c.updateIndices(oldObject, obj, key) + c.index.updateIndices(oldObject, obj, key) } func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { - c.updateIndices(obj, nil, key) + c.index.updateIndices(obj, nil, key) delete(c.items, key) } } @@ -126,9 +200,9 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.items = items // rebuild any index - c.indices = Indices{} + c.index.reset() for key, item := range c.items { - c.updateIndices(nil, item, key) + c.index.updateIndices(nil, item, key) } } @@ -138,32 +212,10 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) - } - - indexedValues, err := indexFunc(obj) + storeKeySet, err := c.index.getKeysFromIndex(indexName, obj) if err != nil { return nil, err } - index := c.indices[indexName] - - var storeKeySet sets.String - if len(indexedValues) == 1 { - // In majority of cases, there is exactly one value matching. - // Optimize the most common path - deduping is not needed here. - storeKeySet = index[indexedValues[0]] - } else { - // Need to de-dupe the return list. - // Since multiple keys are allowed, this can happen. - storeKeySet = sets.String{} - for _, indexedValue := range indexedValues { - for key := range index[indexedValue] { - storeKeySet.Insert(key) - } - } - } list := make([]interface{}, 0, storeKeySet.Len()) for storeKey := range storeKeySet { @@ -177,14 +229,10 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) @@ -199,14 +247,10 @@ func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, err c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] return set.List(), nil } @@ -214,16 +258,11 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { c.lock.RLock() defer c.lock.RUnlock() - index := c.indices[indexName] - names := make([]string, 0, len(index)) - for key := range index { - names = append(names, key) - } - return names + return c.index.getIndexValues(indexName) } func (c *threadSafeMap) GetIndexers() Indexers { - return c.indexers + return c.index.indexers } func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { @@ -234,17 +273,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { return fmt.Errorf("cannot add indexers to running index") } - oldKeys := sets.StringKeySet(c.indexers) - newKeys := sets.StringKeySet(newIndexers) - - if oldKeys.HasAny(newKeys.List()...) { - return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) - } - - for k, v := range newIndexers { - c.indexers[k] = v - } - return nil + return c.index.addIndexers(newIndexers) } // updateIndices modifies the objects location in the managed indexes: @@ -252,10 +281,10 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { // - for update you must provide both the oldObj and the newObj // - for delete you must provide only the oldObj // updateIndices must be called from a function that already has a lock on the cache -func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { +func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { var oldIndexValues, indexValues []string var err error - for name, indexFunc := range c.indexers { + for name, indexFunc := range i.indexers { if oldObj != nil { oldIndexValues, err = indexFunc(oldObj) } else { @@ -274,10 +303,10 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } - index := c.indices[name] + index := i.indices[name] if index == nil { index = Index{} - c.indices[name] = index + i.indices[name] = index } if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { @@ -286,15 +315,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke } for _, value := range oldIndexValues { - c.deleteKeyFromIndex(key, value, index) + i.deleteKeyFromIndex(key, value, index) } for _, value := range indexValues { - c.addKeyToIndex(key, value, index) + i.addKeyToIndex(key, value, index) } } } -func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { +func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) { set := index[indexValue] if set == nil { set = sets.String{} @@ -303,7 +332,7 @@ func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { set.Insert(key) } -func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) { +func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) { set := index[indexValue] if set == nil { return @@ -325,8 +354,10 @@ func (c *threadSafeMap) Resync() error { // NewThreadSafeStore creates a new instance of ThreadSafeStore. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ - items: map[string]interface{}{}, - indexers: indexers, - indices: indices, + items: map[string]interface{}{}, + index: &storeIndex{ + indexers: indexers, + indices: indices, + }, } } diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index c4cbd4d08f..53f6020831 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -43,7 +43,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { store.Add(testKey, testKey) // Assumption check, there should be a set for the `testKey` with one element in the added index - set := store.indices[testIndexer][testKey] + set := store.index.indices[testIndexer][testKey] if len(set) != 1 { t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set)) @@ -51,7 +51,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { } store.Delete(testKey) - set, present := store.indices[testIndexer][testKey] + set, present := store.index.indices[testIndexer][testKey] if present { t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set)) @@ -76,7 +76,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { store.Add("delete", "delete") // Assumption check, there should be a set for the `testIndex` with two elements - set := store.indices[testIndexer][testIndex] + set := store.index.indices[testIndexer][testIndex] if len(set) != 2 { t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set)) @@ -84,7 +84,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { } store.Delete("delete") - set, present := store.indices[testIndexer][testIndex] + set, present := store.index.indices[testIndexer][testIndex] if !present { t.Errorf("Index backing string set erroneously deleted from index.") @@ -114,7 +114,7 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) { assert := assert.New(t) compare := func(key string, expected []string) error { - values := store.indices[testIndexer][key].List() + values := store.index.indices[testIndexer][key].List() if cmp.Equal(values, expected) { return nil } From 5e7ba1f8d7cb93e3c9d109c61cd6957a5115c0c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 2 Nov 2022 13:02:29 +0100 Subject: [PATCH 2/2] Minor cleanup of thread safe store Kubernetes-commit: 72194de96f0bb5c1c1e3247a129cf9e3f11972ef --- tools/cache/thread_safe_store.go | 140 +++++++++++++++---------------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 686b94e217..145e93ee53 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -135,6 +135,76 @@ func (i *storeIndex) addIndexers(newIndexers Indexers) error { return nil } +// updateIndices modifies the objects location in the managed indexes: +// - for create you must provide only the newObj +// - for update you must provide both the oldObj and the newObj +// - for delete you must provide only the oldObj +// updateIndices must be called from a function that already has a lock on the cache +func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { + var oldIndexValues, indexValues []string + var err error + for name, indexFunc := range i.indexers { + if oldObj != nil { + oldIndexValues, err = indexFunc(oldObj) + } else { + oldIndexValues = oldIndexValues[:0] + } + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + + if newObj != nil { + indexValues, err = indexFunc(newObj) + } else { + indexValues = indexValues[:0] + } + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + + index := i.indices[name] + if index == nil { + index = Index{} + i.indices[name] = index + } + + if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { + // We optimize for the most common case where indexFunc returns a single value which has not been changed + continue + } + + for _, value := range oldIndexValues { + i.deleteKeyFromIndex(key, value, index) + } + for _, value := range indexValues { + i.addKeyToIndex(key, value, index) + } + } +} + +func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + set = sets.String{} + index[indexValue] = set + } + set.Insert(key) +} + +func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + return + } + set.Delete(key) + // If we don't delete the set when zero, indices with high cardinality + // short lived resources can cause memory to increase over time from + // unused empty sets. See `kubernetes/kubernetes/issues/84959`. + if len(set) == 0 { + delete(index, indexValue) + } +} + // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex @@ -276,76 +346,6 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { return c.index.addIndexers(newIndexers) } -// updateIndices modifies the objects location in the managed indexes: -// - for create you must provide only the newObj -// - for update you must provide both the oldObj and the newObj -// - for delete you must provide only the oldObj -// updateIndices must be called from a function that already has a lock on the cache -func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { - var oldIndexValues, indexValues []string - var err error - for name, indexFunc := range i.indexers { - if oldObj != nil { - oldIndexValues, err = indexFunc(oldObj) - } else { - oldIndexValues = oldIndexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - if newObj != nil { - indexValues, err = indexFunc(newObj) - } else { - indexValues = indexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - index := i.indices[name] - if index == nil { - index = Index{} - i.indices[name] = index - } - - if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { - // We optimize for the most common case where indexFunc returns a single value which has not been changed - continue - } - - for _, value := range oldIndexValues { - i.deleteKeyFromIndex(key, value, index) - } - for _, value := range indexValues { - i.addKeyToIndex(key, value, index) - } - } -} - -func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) { - set := index[indexValue] - if set == nil { - set = sets.String{} - index[indexValue] = set - } - set.Insert(key) -} - -func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) { - set := index[indexValue] - if set == nil { - return - } - set.Delete(key) - // If we don't delete the set when zero, indices with high cardinality - // short lived resources can cause memory to increase over time from - // unused empty sets. See `kubernetes/kubernetes/issues/84959`. - if len(set) == 0 { - delete(index, indexValue) - } -} - func (c *threadSafeMap) Resync() error { // Nothing to do return nil