diff --git a/Archive.zip b/Archive.zip new file mode 100644 index 0000000000000..19ce400a7d5e1 Binary files /dev/null and b/Archive.zip differ diff --git a/Existing_ES_Changes.txt b/Existing_ES_Changes.txt new file mode 100644 index 0000000000000..91eaa2efcdf97 --- /dev/null +++ b/Existing_ES_Changes.txt @@ -0,0 +1,50 @@ + +1. Parent breaker's real memory breaking excluded for specified child circuit breakers + +Files changed: +- ChildMemoryCircuitBreaker.java + Pass along child memory breaker's name as well in call to checkParentLimit() + +- ClusterSettings.java + Add HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR setting + +- HierarchyCircuitBreakerService.java + Add setting "indices.breaker.total.exclude_real_memory_for" that takes a list of child circuit breakers' names. + For these paths, the parent real memory breaker will not break. + +2. Add hook for plugins to get hold of HierarchyCircuitBreakerService: + +Files changed: +- CircuitBreakerServicePlugin.java + An interface for all plugins that need HierarchyCircuitBreakerService instance. The plugins can use the same passed + to setCircuitBreakerService() + +- Node.java + Use pluginsService to filter all plugins implementing CircuitBreakerServicePlugin and pass the breaker service + to them. + +3. Circuit Breaking Wrappers over java Collections + +Files Added: +- CircuitBreakingCollection.java +- CircuitBreakingHashMap.java +- CircuitBreakingHashSet.java +- CircuitBreakingList.java +- CircuitBreakingMap.java +- CircuitBreakingSet.java +- CircuitBreakingTreeMap.java +- CircuitBreakingTreeSet.java + Circuit Breaking wrappers over java collections. All these reserve sizes preemptively matching the growth rate of the arrays used in inner containers' implementation. For TreeMap and TreeSet, an imaginary growth rate similar to ArrayList is used to use CB in bulk with less overhead. + +- CBUtilsFactory.java + Factory class that takes a circuit breaker instance, and provides factory methods with same signatures as inner + collections' constructors for the circuit breaking wrappers and finalizing circuit breaking wrappers. + +4. Append index-shard & task-id to the search/throttled search pool thread names + +Files changed: +- SearchService.java + Add a AutoClosable private class ThreadInfoAppenderOnName whose constructor takes the shardId and task. + It appends the thread name if not already appended as required. The close() resets the thread name to original. + + Was used with a try() {} finally {} block wrapping around the thread's actual execution. diff --git a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java index d964c18368ce1..8058c2d2b5a95 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -95,7 +95,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws Cir // Additionally, we need to check that we haven't exceeded the parent's limit try { - parent.checkParentLimit((long) (bytes * overheadConstant), label); + parent.checkParentLimit((long) (bytes * overheadConstant), label, this.name); } catch (CircuitBreakingException e) { // If the parent breaker is tripped, this breaker has to be // adjusted back down because the allocation is "blocked" but the diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 8edb8fca173c9..42c973fdc79d6 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -301,6 +301,7 @@ public void apply(Settings value, Settings current, Settings previous) { HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE, HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING, HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CBUtilsFactory.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CBUtilsFactory.java new file mode 100644 index 0000000000000..96e86548f24e7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CBUtilsFactory.java @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; + +public class CBUtilsFactory { + private final CircuitBreaker circuitBreaker; + + public CBUtilsFactory(CircuitBreaker circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + public List newArrayList() { + return new CircuitBreakingList<>(circuitBreaker); + } + + public List newArrayList(int initialCapacity) { + return new CircuitBreakingList<>(circuitBreaker, initialCapacity); + } + + public List newArrayList(Collection collection) { + return new CircuitBreakingList<>(circuitBreaker, collection); + } + + public List newFinalizingArrayList() { + return new CircuitBreakingList(circuitBreaker) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public List newFinalizingArrayList(int initialCapacity) { + return new CircuitBreakingList(circuitBreaker, initialCapacity) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public List newFinalizingArrayList(Collection collection) { + return new CircuitBreakingList(circuitBreaker, collection) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newHashMap() { + return new CircuitBreakingHashMap<>(circuitBreaker); + } + + public Map newHashMap(int initialCapacity, float loadFactor) { + return new CircuitBreakingHashMap<>(circuitBreaker, initialCapacity, loadFactor); + } + + public Map newHashMap(int initialCapacity) { + return new CircuitBreakingHashMap<>(circuitBreaker, initialCapacity); + } + + public Map newFinalizingHashMap() { + return new CircuitBreakingHashMap(circuitBreaker) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newFinalizingHashMap(int initialCapacity, float loadFactor) { + return new CircuitBreakingHashMap(circuitBreaker, initialCapacity, loadFactor) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newFinalizingHashMap(int initialCapacity) { + return new CircuitBreakingHashMap(circuitBreaker, initialCapacity) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newTreeMap() { + return new CircuitBreakingTreeMap<>(circuitBreaker); + } + + public Map newTreeMap(Comparator comparator) { + return new CircuitBreakingTreeMap<>(circuitBreaker, comparator); + } + + public Map newTreeMap(Map m) { + return new CircuitBreakingTreeMap<>(circuitBreaker, m); + } + + public Map newTreeMap(SortedMap m) { + return new CircuitBreakingTreeMap<>(circuitBreaker, m); + } + + public Map newFinalizingTreeMap() { + return new CircuitBreakingTreeMap(circuitBreaker) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newFinalizingTreeMap(Comparator comparator) { + return new CircuitBreakingTreeMap(circuitBreaker, comparator) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newFinalizingTreeMap(Map m) { + return new CircuitBreakingTreeMap(circuitBreaker, m) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Map newFinalizingTreeMap(SortedMap m) { + return new CircuitBreakingTreeMap(circuitBreaker, m) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newHashSet() { + return new CircuitBreakingHashSet<>(circuitBreaker); + } + + public Set newHashSet(Collection c) { + return new CircuitBreakingHashSet<>(circuitBreaker, c); + } + + public Set newHashSet(int initialCapacity, float loadFactor) { + return new CircuitBreakingHashSet<>(circuitBreaker, initialCapacity, loadFactor); + } + + public Set newHashSet(int initialCapacity) { + return new CircuitBreakingHashSet<>(circuitBreaker, initialCapacity); + } + + public Set newFinalizingHashSet() { + return new CircuitBreakingHashSet(circuitBreaker) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingHashSet(Collection c) { + return new CircuitBreakingHashSet(circuitBreaker, c) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingHashSet(int initialCapacity, float loadFactor) { + return new CircuitBreakingHashSet(circuitBreaker, initialCapacity, loadFactor) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingHashSet(int initialCapacity) { + return new CircuitBreakingHashSet(circuitBreaker, initialCapacity) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newTreeSet() { + return new CircuitBreakingTreeSet<>(circuitBreaker); + } + + public Set newTreeSet(Collection c) { + return new CircuitBreakingTreeSet<>(circuitBreaker, c); + } + + public Set newTreeSet(Comparator comparator) { + return new CircuitBreakingTreeSet<>(circuitBreaker, comparator); + } + + public Set newTreeSet(SortedSet s) { + return new CircuitBreakingTreeSet<>(circuitBreaker, s); + } + + public Set newFinalizingTreeSet() { + return new CircuitBreakingTreeSet(circuitBreaker) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingTreeSet(Collection c) { + return new CircuitBreakingTreeSet(circuitBreaker, c) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingTreeSet(Comparator comparator) { + return new CircuitBreakingTreeSet(circuitBreaker, comparator) { + @Override + public void finalize() { + this.close(); + } + }; + } + + public Set newFinalizingTreeSet(SortedSet s) { + return new CircuitBreakingTreeSet(circuitBreaker, s) { + @Override + public void finalize() { + this.close(); + } + }; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingCollection.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingCollection.java new file mode 100644 index 0000000000000..9b3f631c43537 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingCollection.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.lease.Releasable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.function.Predicate; + +public abstract class CircuitBreakingCollection implements Collection, Releasable { + private final CircuitBreaker circuitBreaker; + protected final Collection collection; + private long requestBytesAdded = 0; + // bytes for the above fields themselves aren't counted. + + public CircuitBreakingCollection(CircuitBreaker circuitBreaker, Collection collection) { + this.circuitBreaker = circuitBreaker; + this.collection = collection; + } + + protected void addToBreaker(long bytes, boolean checkBreaker) { + // Since this method is called after collection already grew, update reservedSize even if breaking. + this.requestBytesAdded += bytes; + if (bytes >= 0 && checkBreaker) { + circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, ""); + } else { + circuitBreaker.addWithoutBreaking(bytes); + } + } + + protected abstract void resizeIfRequired(); + protected abstract long bytesRequired(); + + protected void updateBreaker() { + resizeIfRequired(); + updateBreaker(bytesRequired()); + } + + protected void updateBreaker(long bytesRequired) { + long bytesDiff = bytesRequired - requestBytesAdded; + if (bytesDiff == 0) { + return; + } + // If it breaks, then the already created data will not be accounted for. + // So we first add without breaking, and then check. + addToBreaker(bytesDiff, false); + addToBreaker(0, true); + } + + @Override + public int size() { + return collection.size(); + } + + @Override + public boolean isEmpty() { + return collection.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return collection.contains(o); + } + + @Override + public Iterator iterator() { + return collection.iterator(); + } + + @Override + public Object[] toArray() { + return collection.toArray(); + } + + @Override + public T[] toArray(T[] ts) { + return collection.toArray(ts); + } + + @Override + public boolean add(E e) { + try { + return collection.add(e); + } finally { + updateBreaker(); + } + } + + @Override + public boolean remove(Object o) { + try { + return collection.remove(o); + } finally { + updateBreaker(); + } + } + + @Override + public boolean containsAll(Collection collection) { + return this.collection.containsAll(collection); + } + + @Override + public boolean addAll(Collection collection) { + try { + return this.collection.addAll(collection); + } finally { + updateBreaker(); + } + } + + @Override + public boolean removeAll(Collection collection) { + try { + return this.collection.removeAll(collection); + } finally { + updateBreaker(); + } + } + + @Override + public boolean removeIf(Predicate filter) { + try { + return collection.removeIf(filter); + } finally { + updateBreaker(); + } + } + + @Override + public boolean retainAll(Collection collection) { + try { + return this.collection.retainAll(collection); + } finally { + updateBreaker(); + } + } + + @Override + public void clear() { + try { + collection.clear(); + } finally { + updateBreaker(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CircuitBreakingCollection that = (CircuitBreakingCollection) o; + return Objects.equals(collection, that.collection); + } + + @Override + public int hashCode() { + return Objects.hash(collection); + } + + @Override + public void close() { + clear(); + updateBreaker(0); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashMap.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashMap.java new file mode 100644 index 0000000000000..a697c73785240 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashMap.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.HashMap; +import java.util.Optional; + +public class CircuitBreakingHashMap extends CircuitBreakingMap { + private long perElementSize = 0; + private long perElementObjectSize = 0; + private long baseSize = 0; + protected int capacity; + protected int threshold; + protected float loadFactor = DEFAULT_LOAD_FACTOR; + /** + * Copied from HashMap Coretto-1.8.0_292 + */ + static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 + static final int MAXIMUM_CAPACITY = 1 << 30; + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + // bytes for the above fields themselves aren't counted. + + public CircuitBreakingHashMap(CircuitBreaker circuitBreaker) { + super(circuitBreaker, new HashMap<>()); + updateBreaker(); + } + + public CircuitBreakingHashMap(CircuitBreaker circuitBreaker, int initialCapacity, float loadFactor) { + super(circuitBreaker, new HashMap<>(initialCapacity, loadFactor)); + // Copied from HashMap Coretto-1.8.0_292 + if (initialCapacity < 0) + throw new IllegalArgumentException("Illegal initial capacity: " + + initialCapacity); + if (initialCapacity > MAXIMUM_CAPACITY) + initialCapacity = MAXIMUM_CAPACITY; + if (loadFactor <= 0 || Float.isNaN(loadFactor)) + throw new IllegalArgumentException("Illegal load factor: " + + loadFactor); + this.loadFactor = loadFactor; + this.threshold = tableSizeFor(initialCapacity); + updateBreaker(); + } + + public CircuitBreakingHashMap(CircuitBreaker circuitBreaker, int initialCapacity) { + this(circuitBreaker, initialCapacity, DEFAULT_LOAD_FACTOR); + updateBreaker(); + } + + /** + * Returns a power of two size for the given target capacity. + */ + static int tableSizeFor(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; + } + + @Override + protected void resizeIfRequired() { + while (size() > threshold) { + // Copy pasted from HashMap + int newCapacity, newThreshold = 0; + if (capacity > 0) { + if (capacity >= MAXIMUM_CAPACITY) { + threshold = Integer.MAX_VALUE; + return; + } else if ((newCapacity = capacity << 1) < MAXIMUM_CAPACITY && capacity >= DEFAULT_INITIAL_CAPACITY) { + newThreshold = threshold << 1; // double threshold + } + } else if (threshold > 0) {// initial capacity was placed in threshold + newCapacity = threshold; + } else { // zero initial threshold signifies using defaults + newCapacity = DEFAULT_INITIAL_CAPACITY; + newThreshold = (int) (DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); + } + + if (newThreshold == 0) { + float ft = (float) newCapacity * loadFactor; + newThreshold = (newCapacity < MAXIMUM_CAPACITY && ft < (float) MAXIMUM_CAPACITY ? + (int) ft : Integer.MAX_VALUE); + } + this.threshold = newThreshold; + this.capacity = newCapacity; + } + } + + @Override + protected long bytesRequired() { + if (perElementSize == 0) { + calculatePerElementSizes(); + baseSize = RamUsageEstimator.shallowSizeOf(map); + } + return baseSize + capacity * perElementSize + threshold * perElementObjectSize; + } + + protected void calculatePerElementSizes() { + if (map.size() == 0) { + return; + } + Optional> optionalEntry = map.entrySet().stream().findAny(); + Entry entry = optionalEntry.get(); + // there will never be elements for capacity - threshold elements + perElementSize = RamUsageEstimator.shallowSizeOf(entry); + perElementObjectSize = RamUsageEstimator.sizeOfObject(entry.getKey(), 0) + + RamUsageEstimator.sizeOfObject(entry.getValue(), 0); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashSet.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashSet.java new file mode 100644 index 0000000000000..39e28f01f8dfa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingHashSet.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; + +public class CircuitBreakingHashSet extends CircuitBreakingSet{ + private long perElementSize = 0; + private long perElementObjectSize = 0; + private long baseSize; + protected int capacity; + protected int threshold; + protected float loadFactor = DEFAULT_LOAD_FACTOR; + /** + * Copied from HashMap Coretto-1.8.0_292 + */ + static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 + static final int MAXIMUM_CAPACITY = 1 << 30; + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + public CircuitBreakingHashSet(CircuitBreaker circuitBreaker) { + super(circuitBreaker, new HashSet<>()); + updateBreaker(); + } + + public CircuitBreakingHashSet(CircuitBreaker circuitBreaker, Collection c) { + super(circuitBreaker, new HashSet<>(c)); + capacity = Math.max((int)((float)c.size() / 0.75F) + 1, 16); + updateBreaker(); + } + + public CircuitBreakingHashSet(CircuitBreaker circuitBreaker, int initialCapacity, float loadFactor) { + super(circuitBreaker, new HashSet<>(initialCapacity, loadFactor)); + // Copied from HashMap Coretto-1.8.0_292 + if (initialCapacity < 0) + throw new IllegalArgumentException("Illegal initial capacity: " + + initialCapacity); + if (initialCapacity > MAXIMUM_CAPACITY) + initialCapacity = MAXIMUM_CAPACITY; + if (loadFactor <= 0 || Float.isNaN(loadFactor)) + throw new IllegalArgumentException("Illegal load factor: " + + loadFactor); + this.loadFactor = loadFactor; + this.threshold = tableSizeFor(initialCapacity); + updateBreaker(); + } + + public CircuitBreakingHashSet(CircuitBreaker circuitBreaker, int initialCapacity) { + this(circuitBreaker, initialCapacity, DEFAULT_LOAD_FACTOR); + } + + /** + * Returns a power of two size for the given target capacity. + */ + static int tableSizeFor(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; + } + + @Override + protected void resizeIfRequired() { + if(size() > threshold) { + // Copy pasted from HashMap + int newCapacity, newThreshold = 0; + if (capacity > 0) { + if (capacity >= MAXIMUM_CAPACITY) { + threshold = Integer.MAX_VALUE; + return; + } else if ((newCapacity = capacity << 1) < MAXIMUM_CAPACITY && capacity >= DEFAULT_INITIAL_CAPACITY) { + newThreshold = threshold << 1; // double threshold + } + } else if (threshold > 0) {// initial capacity was placed in threshold + newCapacity = threshold; + } else { // zero initial threshold signifies using defaults + newCapacity = DEFAULT_INITIAL_CAPACITY; + newThreshold = (int) (DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); + } + + if (newThreshold == 0) { + float ft = (float) newCapacity * loadFactor; + newThreshold = (newCapacity < MAXIMUM_CAPACITY && ft < (float) MAXIMUM_CAPACITY ? + (int) ft : Integer.MAX_VALUE); + } + this.threshold = newThreshold; + this.capacity = newCapacity; + } + } + + @Override + protected long bytesRequired() { + if (perElementSize == 0) { + calculatePerElementSizes(); + baseSize = RamUsageEstimator.shallowSizeOf(set); + } + return baseSize + capacity * perElementSize + threshold * perElementObjectSize; + } + + protected void calculatePerElementSizes() { + if (set.size() == 0) { + return; + } + Optional optionalElement = set.stream().findAny(); + //estimate size of inner map in the hash set + E element = optionalElement.get(); + Map innerMap = new HashMap<>(); + innerMap.put(element, new Object()); + + Optional> optionalEntry = innerMap.entrySet().stream().findAny(); + Map.Entry entry = optionalEntry.get(); + + // there will never be elements for capacity - threshold elements + perElementSize = RamUsageEstimator.shallowSizeOf(entry); + perElementObjectSize = RamUsageEstimator.sizeOfObject(entry.getKey(), 0) + + RamUsageEstimator.sizeOfObject(entry.getValue(), 0); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingList.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingList.java new file mode 100644 index 0000000000000..f5b1c211ef29f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingList.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.*; + +/** + * A wrapper around a List that updates a CircuitBreaker, every time the list's size changes. + * This uses ArrayList internally. + */ +public class CircuitBreakingList extends CircuitBreakingCollection implements List { + + List list; + private static final int DEFAULT_CAPACITY = -1; + int capacity = DEFAULT_CAPACITY; + private long perElementSize = 0; + private long baseSize = 0; + + public CircuitBreakingList(CircuitBreaker circuitBreaker) { + super(circuitBreaker, new ArrayList<>()); + list = (List) super.collection; + updateBreaker(); + } + + public CircuitBreakingList(CircuitBreaker circuitBreaker, int initialCapacity) { + super(circuitBreaker, new ArrayList<>(initialCapacity)); + list = (List) super.collection; + capacity = initialCapacity; + updateBreaker(); + } + + public CircuitBreakingList(CircuitBreaker circuitBreaker, Collection collection) { + this(circuitBreaker, collection.size()); + addAll(collection); + updateBreaker(); + } + + @Override + protected void resizeIfRequired() { + while (size() > capacity) { + // Copy pasted from ArrayList.java(JBR - 11) so that capacity grows same as ArrayList's internal capacity + int minCapacity = size(); + if (capacity == DEFAULT_CAPACITY) { + capacity = Math.max(10, minCapacity); + } else { + int newCapacity = capacity + (capacity >> 1); + if (newCapacity - minCapacity <= 0) { + capacity = minCapacity; + } else { + capacity = newCapacity - 2147483639 <= 0 ? newCapacity : (minCapacity > 2147483639 ? 2147483647 : 2147483639); + } + } + } + } + + @Override + protected long bytesRequired() { + if (perElementSize == 0) { + calculatePerElementSize(); + baseSize = RamUsageEstimator.shallowSizeOf(list); + } + return baseSize + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + capacity * perElementSize; + } + + private void calculatePerElementSize() { + if(size() == 0) { + return; + } + perElementSize = RamUsageEstimator.sizeOfObject(collection.toArray()[0], 0) + RamUsageEstimator.NUM_BYTES_OBJECT_REF; + } + + public void shrinkReservationToSize() { + if (list instanceof ArrayList) { + ((ArrayList) list).trimToSize(); + capacity = size(); + updateBreaker(); + } else { + throw new UnsupportedOperationException("Can not shrink internal list"); + } + } + + @Override + public boolean addAll(int i, Collection collection) { + try { + return list.addAll(i, collection); + } finally { + updateBreaker(); + } + } + + @Override + public E get(int i) { + return list.get(i); + } + + @Override + public E set(int i, E e) { + return list.set(i, e); + } + + @Override + public void add(int i, E e) { + try { + list.add(i, e); + } finally { + updateBreaker(); + } + } + + @Override + public E remove(int i) { + try { + return list.remove(i); + } finally { + updateBreaker(); + } + } + + @Override + public int indexOf(Object o) { + return list.indexOf(o); + } + + @Override + public int lastIndexOf(Object o) { + return list.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + return list.listIterator(); + } + + @Override + public ListIterator listIterator(int i) { + return list.listIterator(i); + } + + @Override + public List subList(int i, int i1) { + return list.subList(i, i1); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingMap.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingMap.java new file mode 100644 index 0000000000000..e8d48b5f4b410 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingMap.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.lease.Releasable; + +import java.util.*; + +public abstract class CircuitBreakingMap implements Map, Releasable { + private final CircuitBreaker circuitBreaker; + protected final Map map; + private long requestBytesAdded = 0; + // bytes for the above fields themselves aren't counted. + + public CircuitBreakingMap(CircuitBreaker circuitBreaker, Map map) { + this.circuitBreaker = circuitBreaker; + this.map = map; + } + + protected void addToBreaker(long bytes, boolean checkBreaker) { + // Since this method is called after collection already grew, update reservedSize even if breaking. + this.requestBytesAdded += bytes; + if (bytes >= 0 && checkBreaker) { + circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, ""); + } else { + circuitBreaker.addWithoutBreaking(bytes); + } + } + + protected abstract void resizeIfRequired(); + protected abstract long bytesRequired(); + + protected void updateBreaker() { + resizeIfRequired(); + updateBreaker(bytesRequired()); + } + + protected void updateBreaker(long bytesRequired) { + long bytesDiff = bytesRequired - requestBytesAdded; + if (bytesDiff == 0) { + return; + } + // If it breaks, then the already created data will not be accounted for. + // So we first add without breaking, and then check. + addToBreaker(bytesDiff, false); + addToBreaker(0, true); + } + + @Override + public void close() { + map.clear(); + updateBreaker(0); + } + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object o) { + return map.containsKey(o); + } + + @Override + public boolean containsValue(Object o) { + return map.containsValue(o); + } + + @Override + public V get(Object o) { + return map.get(o); + } + + @Override + public V put(K k, V v) { + try { + return map.put(k, v); + } finally { + updateBreaker(); + } + } + + @Override + public V remove(Object o) { + try { + return map.remove(o); + } finally { + updateBreaker(); + } + } + + @Override + public void putAll(Map map) { + try { + this.map.putAll(map); + } finally { + updateBreaker(); + } + } + + @Override + public void clear() { + try { + map.clear(); + } finally { + updateBreaker(); + } + } + + @Override + public Set keySet() { + return map.keySet(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public Set> entrySet() { + return map.entrySet(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CircuitBreakingMap that = (CircuitBreakingMap) o; + return Objects.equals(map, that.map); + } + + @Override + public int hashCode() { + return Objects.hash(map); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingSet.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingSet.java new file mode 100644 index 0000000000000..7bd5bd6380cf6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingSet.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.Collection; +import java.util.Set; + +public abstract class CircuitBreakingSet extends CircuitBreakingCollection implements Set { + Set set; + public CircuitBreakingSet(CircuitBreaker circuitBreaker, Set set) { + super(circuitBreaker, set); + this.set = set; + } + + @Override + public T[] toArray(T[] ts) { + return set.toArray(ts); + } + + @Override + public boolean add(E o) { + try { + return set.add(o); + } finally { + updateBreaker(); + } + } + + @Override + public boolean addAll(Collection collection) { + try { + return set.addAll(collection); + } finally { + updateBreaker(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeMap.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeMap.java new file mode 100644 index 0000000000000..f380a4ff08e54 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeMap.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; + +public class CircuitBreakingTreeMap extends CircuitBreakingMap { + private long imaginaryCapacity; + private long perElementSize = 0; + private long baseSize; + + public CircuitBreakingTreeMap(CircuitBreaker circuitBreaker) { + super(circuitBreaker, new TreeMap<>()); + updateBreaker(); + } + + public CircuitBreakingTreeMap(CircuitBreaker circuitBreaker, Comparator comparator) { + super(circuitBreaker, new TreeMap<>(comparator)); + updateBreaker(); + } + + public CircuitBreakingTreeMap(CircuitBreaker circuitBreaker, Map m) { + super(circuitBreaker, new TreeMap<>(m)); + updateBreaker(); + } + + public CircuitBreakingTreeMap(CircuitBreaker circuitBreaker, SortedMap m) { + super(circuitBreaker, new TreeMap<>(m)); + updateBreaker(); + } + + @Override + protected void resizeIfRequired() { + while (size() > imaginaryCapacity) { + if(imaginaryCapacity == 0) { + imaginaryCapacity = Math.max(10, size()); + } else { + imaginaryCapacity += imaginaryCapacity >> 1; + } + } + } + + @Override + protected long bytesRequired() { + if (perElementSize == 0) { + calculatePerElementSizes(); + baseSize = RamUsageEstimator.shallowSizeOf(map); + } + return baseSize + imaginaryCapacity * perElementSize; + } + + protected void calculatePerElementSizes() { + if(size() == 0) { + return; + } + Optional> optionalEntry = map.entrySet().stream().findAny(); + Entry entry = optionalEntry.get(); + + perElementSize = RamUsageEstimator.shallowSizeOf(entry) + RamUsageEstimator.sizeOfObject(entry.getKey(), 0) + + RamUsageEstimator.sizeOfObject(entry.getValue(), 0); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeSet.java b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeSet.java new file mode 100644 index 0000000000000..74fabca176148 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/breakingcollections/CircuitBreakingTreeSet.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.breakingcollections; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; + +public class CircuitBreakingTreeSet extends CircuitBreakingSet{ + + private long imaginaryCapacity; + private long perElementSize = 0; + private long baseSize; + + protected CircuitBreakingTreeSet(CircuitBreaker circuitBreaker, TreeSet treeSet) { + super(circuitBreaker, treeSet); + updateBreaker(); + } + + public CircuitBreakingTreeSet(CircuitBreaker circuitBreaker) { + this(circuitBreaker, new TreeSet<>()); + } + + public CircuitBreakingTreeSet(CircuitBreaker circuitBreaker, Collection c) { + this(circuitBreaker, new TreeSet<>(c)); + } + + public CircuitBreakingTreeSet(CircuitBreaker circuitBreaker, Comparator comparator) { + this(circuitBreaker, new TreeSet<>(comparator)); + } + + public CircuitBreakingTreeSet(CircuitBreaker circuitBreaker, SortedSet s) { + this(circuitBreaker, new TreeSet<>(s)); + } + + @Override + protected void resizeIfRequired() { + while (size() > imaginaryCapacity) { + if(imaginaryCapacity == 0) { + imaginaryCapacity = Math.max(10, size()); + } else { + imaginaryCapacity += imaginaryCapacity >> 1; + } + } + } + + @Override + protected long bytesRequired() { + if (perElementSize == 0) { + calculatePerElementSizes(); + baseSize = RamUsageEstimator.shallowSizeOf(set); + } + return baseSize + imaginaryCapacity * perElementSize; + } + + protected void calculatePerElementSizes() { + if (size() == 0) { + return; + } + Optional optionalElement = set.stream().findAny(); + //estimate size of inner map in the hash set + E element = optionalElement.get(); + Map innerMap = new HashMap<>(); + innerMap.put(element, new Object()); + + Optional> optionalEntry = innerMap.entrySet().stream().findAny(); + Map.Entry entry = optionalEntry.get(); + + perElementSize = RamUsageEstimator.shallowSizeOf(entry) + RamUsageEstimator.sizeOfObject(entry.getKey(), 0) + + RamUsageEstimator.sizeOfObject(entry.getValue(), 0); + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index cb868a061a96b..06d3884ce5bd8 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -69,6 +69,11 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { } }, Property.Dynamic, Property.NodeScope); + private static final List TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR_DEFAULT = new ArrayList<>(); + public static final Setting> TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR = + Setting.listSetting("indices.breaker.total.exclude_real_memory_for", TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR_DEFAULT, Function.identity(), + Property.NodeScope); + public static final Setting FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.fielddata.limit", "40%", Property.Dynamic, Property.NodeScope); public static final Setting FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = @@ -98,6 +103,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); private final boolean trackRealMemoryUsage; + private final List excludeRealMemoryFor; private volatile BreakerSettings parentSettings; // Tripped count for when redistribution was attempted but wasn't successful @@ -155,6 +161,7 @@ public HierarchyCircuitBreakerService(Settings settings, List c logger.trace(() -> new ParameterizedMessage("parent circuit breaker with settings {}", this.parentSettings)); this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings); + this.excludeRealMemoryFor = TOTAL_CIRCUIT_BREAKER_EXCLUDE_REAL_MEMORY_FOR.get(settings); clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); @@ -299,40 +306,45 @@ public long getParentLimit() { /** * Checks whether the parent breaker has been tripped */ - public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { + public void checkParentLimit(long newBytesReserved, String label, String childBreakerName) throws CircuitBreakingException { final MemoryUsage memoryUsed = memoryUsed(newBytesReserved); long parentLimit = this.parentSettings.getLimit(); if (memoryUsed.totalUsage > parentLimit && overLimitStrategy.overLimit(memoryUsed).totalUsage > parentLimit) { - this.parentTripCount.incrementAndGet(); - final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" + - " would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" + - ", which is larger than the limit of [" + - parentLimit + "/" + new ByteSizeValue(parentLimit) + "]"); - if (this.trackRealMemoryUsage) { - final long realUsage = memoryUsed.baseUsage; - message.append(", real usage: ["); - message.append(realUsage); - message.append("/"); - message.append(new ByteSizeValue(realUsage)); - message.append("], new bytes reserved: ["); - message.append(newBytesReserved); - message.append("/"); - message.append(new ByteSizeValue(newBytesReserved)); + // should not break if it is real memory and the child breaker is excluded + if (this.trackRealMemoryUsage && excludeRealMemoryFor.contains(childBreakerName)) { + logger.debug("not breaking since child breaker " + childBreakerName + " is excluded"); + } else { + this.parentTripCount.incrementAndGet(); + final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" + + " would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" + + ", which is larger than the limit of [" + + parentLimit + "/" + new ByteSizeValue(parentLimit) + "]"); + if (this.trackRealMemoryUsage) { + final long realUsage = memoryUsed.baseUsage; + message.append(", real usage: ["); + message.append(realUsage); + message.append("/"); + message.append(new ByteSizeValue(realUsage)); + message.append("], new bytes reserved: ["); + message.append(newBytesReserved); + message.append("/"); + message.append(new ByteSizeValue(newBytesReserved)); + message.append("]"); + } + message.append(", usages ["); + message.append(this.breakers.entrySet().stream().map(e -> { + final CircuitBreaker breaker = e.getValue(); + final long breakerUsed = (long)(breaker.getUsed() * breaker.getOverhead()); + return e.getKey() + "=" + breakerUsed + "/" + new ByteSizeValue(breakerUsed); + }).collect(Collectors.joining(", "))); message.append("]"); + // derive durability of a tripped parent breaker depending on whether the majority of memory tracked by + // child circuit breakers is categorized as transient or permanent. + CircuitBreaker.Durability durability = memoryUsed.transientChildUsage >= memoryUsed.permanentChildUsage ? + CircuitBreaker.Durability.TRANSIENT : CircuitBreaker.Durability.PERMANENT; + logger.debug(() -> new ParameterizedMessage("{}", message.toString())); + throw new CircuitBreakingException(message.toString(), memoryUsed.totalUsage, parentLimit, durability); } - message.append(", usages ["); - message.append(this.breakers.entrySet().stream().map(e -> { - final CircuitBreaker breaker = e.getValue(); - final long breakerUsed = (long)(breaker.getUsed() * breaker.getOverhead()); - return e.getKey() + "=" + breakerUsed + "/" + new ByteSizeValue(breakerUsed); - }).collect(Collectors.joining(", "))); - message.append("]"); - // derive durability of a tripped parent breaker depending on whether the majority of memory tracked by - // child circuit breakers is categorized as transient or permanent. - CircuitBreaker.Durability durability = memoryUsed.transientChildUsage >= memoryUsed.permanentChildUsage ? - CircuitBreaker.Durability.TRANSIENT : CircuitBreaker.Durability.PERMANENT; - logger.debug(() -> new ParameterizedMessage("{}", message.toString())); - throw new CircuitBreakingException(message.toString(), memoryUsed.totalUsage, parentLimit, durability); } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2dd40dd722c1d..74abbed0178ae 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -125,6 +125,7 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.CircuitBreakerPlugin; +import org.elasticsearch.plugins.CircuitBreakerServicePlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.EnginePlugin; @@ -563,6 +564,11 @@ protected Node(final Environment initialEnvironment, repositoriesServiceReference::get).stream()) .collect(Collectors.toList()); + pluginsService.filterPlugins(CircuitBreakerServicePlugin.class) + .forEach(plugin -> { + plugin.setCircuitBreakerService(circuitBreakerService); + }); + ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices); diff --git a/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerServicePlugin.java b/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerServicePlugin.java new file mode 100644 index 0000000000000..bfac18a15daff --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerServicePlugin.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.CircuitBreakerService; + +/** + * Plugins that need a hook of the HierarchyCircuitBreakerService + */ +public interface CircuitBreakerServicePlugin { + + /** + * When HierarchyCircuitBreakerService is initialized, this method is invoked. + */ + default void setCircuitBreakerService(CircuitBreakerService circuitBreakerService) { + } + +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 147d999a7668c..8bdd9f0694789 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -110,6 +110,7 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -329,7 +330,11 @@ public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInCont @Override public void onResponse(ShardSearchRequest rewritten) { // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener); + runAsync(getExecutor(shard), () -> { + try (ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { + return executeDfsPhase(request, task, keepStatesInContext); + } + }, listener); } @Override @@ -392,7 +397,11 @@ public void onResponse(ShardSearchRequest orig) { } } // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); + runAsync(getExecutor(shard), () -> { + try (ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { + return executeQueryPhase(orig, task, keepStatesInContext); + } + }, listener); } @Override @@ -474,10 +483,12 @@ public void executeQueryPhase(InternalScrollSearchRequest request, freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + final IndexShard shard = readerContext.indexShard(); + runAsync(getExecutor(shard), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); + ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); queryPhase.execute(searchContext); @@ -496,10 +507,12 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); - runAsync(getExecutor(readerContext.indexShard()), () -> { + final IndexShard shard = readerContext.indexShard(); + runAsync(getExecutor(shard), () -> { readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); + ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { @@ -546,10 +559,12 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + final IndexShard shard = readerContext.indexShard(); + runAsync(getExecutor(shard), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); + ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); @@ -570,8 +585,10 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); - runAsync(getExecutor(readerContext.indexShard()), () -> { - try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { + final IndexShard shard = readerContext.indexShard(); + runAsync(getExecutor(shard), () -> { + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); + ThreadInfoAppenderOnName ignored = new ThreadInfoAppenderOnName(shard, task)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -1370,4 +1387,30 @@ public void close() { } } } + + /** + * Use this class to append the current search thread's name with the corresponding ShardId and TaskId. + * Upon closing, it resets thread name to that before it renamed. + */ + private class ThreadInfoAppenderOnName implements AutoCloseable { + final String oldName; + final String suffix; + final boolean nameChanged; + + public ThreadInfoAppenderOnName(IndexShard shard, SearchShardTask task) { + oldName = Thread.currentThread().getName(); + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + suffix = "[" + shard.shardId() + "]" + "[taskId=" + taskId + "]"; + if (!(nameChanged = oldName.endsWith(suffix))) { + Thread.currentThread().setName(oldName + suffix); + } + } + + @Override + public void close() throws Exception { + if (nameChanged) { + Thread.currentThread().setName(oldName); + } + } + } }