diff --git a/go.mod b/go.mod index 040c72a9fc..b5fef2d29f 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 google.golang.org/protobuf v1.28.1 k8s.io/api v0.0.0-20221103075246-5448eb39ae20 - k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d + k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 k8s.io/klog/v2 v2.80.1 k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 k8s.io/utils v0.0.0-20220922133306-665eaaec4324 @@ -60,5 +60,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20221103075246-5448eb39ae20 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 ) diff --git a/go.sum b/go.sum index 67f8066891..36886abd6d 100644 --- a/go.sum +++ b/go.sum @@ -478,8 +478,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20221103075246-5448eb39ae20 h1:L4yzu0YYenWmYNm1WG/jJy/6ioJxAlgDewudcjz0uUE= k8s.io/api v0.0.0-20221103075246-5448eb39ae20/go.mod h1:G/3aN5AoXC2S2FQkBj1R024v+pPQBOzaaOCf3qJdj+A= -k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d h1:fg/DbLqFKxFESf3AnU5iwCexZWOUnFFLb0JraG20wZo= -k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE= +k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 h1:0nhI6fiyouN4H8MXOcMcCOybGhw4FgxwQbadTKPIRlA= +k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 1182ea144b..145e93ee53 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -59,15 +59,159 @@ type ThreadSafeStore interface { Resync() error } +// storeIndex implements the indexing functionality for Store interface +type storeIndex struct { + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices +} + +func (i *storeIndex) reset() { + i.indices = Indices{} +} + +func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexedValues, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := i.indices[indexName] + + var storeKeySet sets.String + if len(indexedValues) == 1 { + // In majority of cases, there is exactly one value matching. + // Optimize the most common path - deduping is not needed here. + storeKeySet = index[indexedValues[0]] + } else { + // Need to de-dupe the return list. + // Since multiple keys are allowed, this can happen. + storeKeySet = sets.String{} + for _, indexedValue := range indexedValues { + for key := range index[indexedValue] { + storeKeySet.Insert(key) + } + } + } + + return storeKeySet, nil +} + +func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + index := i.indices[indexName] + return index[indexedValue], nil +} + +func (i *storeIndex) getIndexValues(indexName string) []string { + index := i.indices[indexName] + names := make([]string, 0, len(index)) + for key := range index { + names = append(names, key) + } + return names +} + +func (i *storeIndex) addIndexers(newIndexers Indexers) error { + oldKeys := sets.StringKeySet(i.indexers) + newKeys := sets.StringKeySet(newIndexers) + + if oldKeys.HasAny(newKeys.List()...) { + return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) + } + + for k, v := range newIndexers { + i.indexers[k] = v + } + return nil +} + +// updateIndices modifies the objects location in the managed indexes: +// - for create you must provide only the newObj +// - for update you must provide both the oldObj and the newObj +// - for delete you must provide only the oldObj +// updateIndices must be called from a function that already has a lock on the cache +func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { + var oldIndexValues, indexValues []string + var err error + for name, indexFunc := range i.indexers { + if oldObj != nil { + oldIndexValues, err = indexFunc(oldObj) + } else { + oldIndexValues = oldIndexValues[:0] + } + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + + if newObj != nil { + indexValues, err = indexFunc(newObj) + } else { + indexValues = indexValues[:0] + } + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + + index := i.indices[name] + if index == nil { + index = Index{} + i.indices[name] = index + } + + if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { + // We optimize for the most common case where indexFunc returns a single value which has not been changed + continue + } + + for _, value := range oldIndexValues { + i.deleteKeyFromIndex(key, value, index) + } + for _, value := range indexValues { + i.addKeyToIndex(key, value, index) + } + } +} + +func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + set = sets.String{} + index[indexValue] = set + } + set.Insert(key) +} + +func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + return + } + set.Delete(key) + // If we don't delete the set when zero, indices with high cardinality + // short lived resources can cause memory to increase over time from + // unused empty sets. See `kubernetes/kubernetes/issues/84959`. + if len(set) == 0 { + delete(index, indexValue) + } +} + // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} - // indexers maps a name to an IndexFunc - indexers Indexers - // indices maps a name to an Index - indices Indices + // index implements the indexing functionality + index *storeIndex } func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -79,14 +223,14 @@ func (c *threadSafeMap) Update(key string, obj interface{}) { defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj - c.updateIndices(oldObject, obj, key) + c.index.updateIndices(oldObject, obj, key) } func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { - c.updateIndices(obj, nil, key) + c.index.updateIndices(obj, nil, key) delete(c.items, key) } } @@ -126,9 +270,9 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.items = items // rebuild any index - c.indices = Indices{} + c.index.reset() for key, item := range c.items { - c.updateIndices(nil, item, key) + c.index.updateIndices(nil, item, key) } } @@ -138,32 +282,10 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) - } - - indexedValues, err := indexFunc(obj) + storeKeySet, err := c.index.getKeysFromIndex(indexName, obj) if err != nil { return nil, err } - index := c.indices[indexName] - - var storeKeySet sets.String - if len(indexedValues) == 1 { - // In majority of cases, there is exactly one value matching. - // Optimize the most common path - deduping is not needed here. - storeKeySet = index[indexedValues[0]] - } else { - // Need to de-dupe the return list. - // Since multiple keys are allowed, this can happen. - storeKeySet = sets.String{} - for _, indexedValue := range indexedValues { - for key := range index[indexedValue] { - storeKeySet.Insert(key) - } - } - } list := make([]interface{}, 0, storeKeySet.Len()) for storeKey := range storeKeySet { @@ -177,14 +299,10 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) @@ -199,14 +317,10 @@ func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, err c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] return set.List(), nil } @@ -214,16 +328,11 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { c.lock.RLock() defer c.lock.RUnlock() - index := c.indices[indexName] - names := make([]string, 0, len(index)) - for key := range index { - names = append(names, key) - } - return names + return c.index.getIndexValues(indexName) } func (c *threadSafeMap) GetIndexers() Indexers { - return c.indexers + return c.index.indexers } func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { @@ -234,87 +343,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { return fmt.Errorf("cannot add indexers to running index") } - oldKeys := sets.StringKeySet(c.indexers) - newKeys := sets.StringKeySet(newIndexers) - - if oldKeys.HasAny(newKeys.List()...) { - return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) - } - - for k, v := range newIndexers { - c.indexers[k] = v - } - return nil -} - -// updateIndices modifies the objects location in the managed indexes: -// - for create you must provide only the newObj -// - for update you must provide both the oldObj and the newObj -// - for delete you must provide only the oldObj -// updateIndices must be called from a function that already has a lock on the cache -func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { - var oldIndexValues, indexValues []string - var err error - for name, indexFunc := range c.indexers { - if oldObj != nil { - oldIndexValues, err = indexFunc(oldObj) - } else { - oldIndexValues = oldIndexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - if newObj != nil { - indexValues, err = indexFunc(newObj) - } else { - indexValues = indexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - index := c.indices[name] - if index == nil { - index = Index{} - c.indices[name] = index - } - - if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { - // We optimize for the most common case where indexFunc returns a single value which has not been changed - continue - } - - for _, value := range oldIndexValues { - c.deleteKeyFromIndex(key, value, index) - } - for _, value := range indexValues { - c.addKeyToIndex(key, value, index) - } - } -} - -func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { - set := index[indexValue] - if set == nil { - set = sets.String{} - index[indexValue] = set - } - set.Insert(key) -} - -func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) { - set := index[indexValue] - if set == nil { - return - } - set.Delete(key) - // If we don't delete the set when zero, indices with high cardinality - // short lived resources can cause memory to increase over time from - // unused empty sets. See `kubernetes/kubernetes/issues/84959`. - if len(set) == 0 { - delete(index, indexValue) - } + return c.index.addIndexers(newIndexers) } func (c *threadSafeMap) Resync() error { @@ -325,8 +354,10 @@ func (c *threadSafeMap) Resync() error { // NewThreadSafeStore creates a new instance of ThreadSafeStore. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ - items: map[string]interface{}{}, - indexers: indexers, - indices: indices, + items: map[string]interface{}{}, + index: &storeIndex{ + indexers: indexers, + indices: indices, + }, } } diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index c4cbd4d08f..53f6020831 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -43,7 +43,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { store.Add(testKey, testKey) // Assumption check, there should be a set for the `testKey` with one element in the added index - set := store.indices[testIndexer][testKey] + set := store.index.indices[testIndexer][testKey] if len(set) != 1 { t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set)) @@ -51,7 +51,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { } store.Delete(testKey) - set, present := store.indices[testIndexer][testKey] + set, present := store.index.indices[testIndexer][testKey] if present { t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set)) @@ -76,7 +76,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { store.Add("delete", "delete") // Assumption check, there should be a set for the `testIndex` with two elements - set := store.indices[testIndexer][testIndex] + set := store.index.indices[testIndexer][testIndex] if len(set) != 2 { t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set)) @@ -84,7 +84,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { } store.Delete("delete") - set, present := store.indices[testIndexer][testIndex] + set, present := store.index.indices[testIndexer][testIndex] if !present { t.Errorf("Index backing string set erroneously deleted from index.") @@ -114,7 +114,7 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) { assert := assert.New(t) compare := func(key string, expected []string) error { - values := store.indices[testIndexer][key].List() + values := store.index.indices[testIndexer][key].List() if cmp.Equal(values, expected) { return nil }