Skip to content

Commit

Permalink
Reduce CacheImpl lock contention
Browse files Browse the repository at this point in the history
  • Loading branch information
schlosna committed Jan 9, 2024
1 parent 3ee592d commit 3b4474b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;

public class BasicItemStore<V extends HasMetadata> implements ItemStore<V> {

private Function<V, String> keyFunction;
private ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
private final Function<V, String> keyFunction;
private final ConcurrentMap<String, V> store = new ConcurrentHashMap<>();

public BasicItemStore(Function<V, String> keyFunction) {
this.keyFunction = keyFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import io.fabric8.kubernetes.client.utils.Utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -44,13 +45,14 @@ public class CacheImpl<T extends HasMetadata> implements Cache<T> {
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());

// items stores object instances
// @GuardedBy("getLockObj")
private ItemStore<T> items;

// indices stores objects' key by their indices
private final Map<String, Map<String, Set<String>>> indices = new HashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> indices = new ConcurrentHashMap<>();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
Expand All @@ -71,12 +73,12 @@ public void setItemStore(ItemStore<T> items) {
* @return registered indexers
*/
@Override
public synchronized Map<String, Function<T, List<String>>> getIndexers() {
public Map<String, Function<T, List<String>>> getIndexers() {
return Collections.unmodifiableMap(indexers);
}

@Override
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
Expand All @@ -94,12 +96,15 @@ public synchronized void addIndexers(Map<String, Function<T, List<String>>> inde
* @param obj the object
* @return the old object
*/
public synchronized T put(T obj) {
public T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
T oldObj;
synchronized (getLockObject()) {
oldObj = this.items.put(key, obj);
}
this.updateIndices(oldObj, obj, key);
return oldObj;
}
Expand All @@ -110,9 +115,12 @@ public synchronized T put(T obj) {
* @param obj object
* @return the old object
*/
public synchronized T remove(T obj) {
public T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
T old;
synchronized (getLockObject()) {
old = this.items.remove(key);
}
if (old != null) {
this.deleteFromIndices(old, key);
}
Expand All @@ -126,7 +134,9 @@ public synchronized T remove(T obj) {
*/
@Override
public List<String> listKeys() {
return this.items.keySet().collect(Collectors.toList());
synchronized (getLockObject()) {
return this.items.keySet().collect(Collectors.toList());
}
}

/**
Expand All @@ -146,8 +156,10 @@ public T get(T obj) {
*/
@Override
public String getKey(T obj) {
String result = this.items.getKey(obj);
return result == null ? "" : result;
synchronized (getLockObject()) {
String result = this.items.getKey(obj);
return result == null ? "" : result;
}
}

/**
Expand All @@ -157,7 +169,9 @@ public String getKey(T obj) {
*/
@Override
public List<T> list() {
return this.items.values().collect(Collectors.toList());
synchronized (getLockObject()) {
return this.items.values().collect(Collectors.toList());
}
}

/**
Expand All @@ -168,7 +182,9 @@ public List<T> list() {
*/
@Override
public T getByKey(String key) {
return this.items.get(key);
synchronized (getLockObject()) {
return this.items.get(key);
}
}

/**
Expand All @@ -179,17 +195,17 @@ public T getByKey(String key) {
* @return the list
*/
@Override
public synchronized List<T> index(String indexName, T obj) {
if (!this.indexers.containsKey(indexName)) {
public List<T> index(String indexName, T obj) {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
}

List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
Set<String> set = index.get(indexKey);
Expand All @@ -201,11 +217,23 @@ public synchronized List<T> index(String indexName, T obj) {

List<T> items = new ArrayList<>(returnKeySet.size());
for (String absoluteKey : returnKeySet) {
items.add(this.items.get(absoluteKey));
T item;
synchronized (getLockObject()) {
item = this.items.get(absoluteKey);
}
if (item != null) {
items.add(item);
}
}
return items;
}

private void checkContainsIndex(String indexName) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
}

/**
* Index keys list
*
Expand All @@ -214,17 +242,14 @@ public synchronized List<T> index(String indexName, T obj) {
* @return the list
*/
@Override
public synchronized List<String> indexKeys(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
public List<String> indexKeys(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
Set<String> set = index.get(indexKey);
List<String> keys = new ArrayList<>(set.size());
for (String key : set) {
keys.add(key);
if (index == null) {
return new ArrayList<>();
}
return keys;
Set<String> set = index.getOrDefault(indexKey, Collections.emptySet());
return new ArrayList<>(set);
}

/**
Expand All @@ -235,18 +260,25 @@ public synchronized List<String> indexKeys(String indexName, String indexKey) {
* @return the list
*/
@Override
public synchronized List<T> byIndex(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
public List<T> byIndex(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
return new ArrayList<>();
}
Set<String> set = index.get(indexKey);
if (set == null) {
return Arrays.asList();
return new ArrayList<>();
}
List<T> items = new ArrayList<>(set.size());
for (String key : set) {
items.add(this.items.get(key));
T item;
synchronized (getLockObject()) {
item = this.items.get(key);
}
if (item != null) {
items.add(item);
}
}
return items;
}
Expand All @@ -265,21 +297,25 @@ void updateIndices(T oldObj, T newObj, String key) {
deleteFromIndices(oldObj, key);
}

for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);

updateIndex(key, newObj, indexFunc, index);
synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);
if (index != null) {
updateIndex(key, newObj, indexFunc, index);
}
}
}
}

private void updateIndex(String key, T newObj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index) {
List<String> indexValues = indexFunc.apply(newObj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
Set<String> indexSet = index.computeIfAbsent(indexValue, k -> new HashSet<>());
indexSet.add(key);
if (indexValue != null) {
index.computeIfAbsent(indexValue, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}
}
}
Expand All @@ -293,21 +329,25 @@ private void updateIndex(String key, T newObj, Function<T, List<String>> indexFu
* @param key the key
*/
private void deleteFromIndices(T oldObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}
synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}

Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
for (String indexValue : indexValues) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
for (String indexValue : indexValues) {
if (indexValue != null) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
}
}
}
}
}
Expand All @@ -319,12 +359,16 @@ private void deleteFromIndices(T oldObj, String key) {
* @param indexName the index name
* @param indexFunc the index func
*/
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
HashMap<String, Set<String>> index = new HashMap<>();
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
public CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
ConcurrentMap<String, Set<String>> index = new ConcurrentHashMap<>();
synchronized (indexers) {
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

synchronized (getLockObject()) {
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
}
}
return this;
}

Expand Down Expand Up @@ -387,17 +431,19 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
}

@Override
public synchronized void removeIndexer(String name) {
public void removeIndexer(String name) {
this.indices.remove(name);
this.indexers.remove(name);
}

public boolean isFullState() {
return items.isFullState();
synchronized (getLockObject()) {
return items.isFullState();
}
}

public Object getLockObject() {
return this;
return this.items;
}

}

0 comments on commit 3b4474b

Please sign in to comment.