-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28548: Subdivide memory size allocated to parallel operators #5478
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.apache.hadoop.hive.common.type.TimestampTZ; | ||
import org.apache.hadoop.hive.conf.HiveConf; | ||
import org.apache.hadoop.hive.llap.LlapDaemonInfo; | ||
import org.apache.hadoop.hive.llap.LlapUtil; | ||
import org.apache.hadoop.hive.ql.CompilationOpContext; | ||
import org.apache.hadoop.hive.ql.metadata.HiveException; | ||
import org.apache.hadoop.hive.ql.parse.OpParseContext; | ||
|
@@ -389,14 +390,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { | |
isLlap = LlapDaemonInfo.INSTANCE.isLlap(); | ||
numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; | ||
firstRow = true; | ||
memoryMXBean = ManagementFactory.getMemoryMXBean(); | ||
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); | ||
// estimate the number of hash table entries based on the size of each | ||
// entry. Since the size of a entry | ||
// is not known, estimate that based on the number of entries | ||
if (hashAggr) { | ||
computeMaxEntriesHashAggr(); | ||
} | ||
memoryMXBean = ManagementFactory.getMemoryMXBean(); | ||
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); | ||
memoryThreshold = this.getConf().getMemoryThreshold(); | ||
LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory); | ||
} | ||
|
@@ -411,13 +412,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { | |
* aggregation only | ||
**/ | ||
private void computeMaxEntriesHashAggr() throws HiveException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we pass |
||
float memoryPercentage = this.getConf().getGroupByMemoryUsage(); | ||
if (isTez) { | ||
maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable()); | ||
} else { | ||
maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); | ||
final float memoryPercentage = this.getConf().getGroupByMemoryUsage(); | ||
maxHashTblMemory = (long) (memoryPercentage * maxMemory); | ||
if (LOG.isInfoEnabled()) { | ||
LOG.info("Max hash table memory: {} ({} * {})", LlapUtil.humanReadableByteCount(maxHashTblMemory), | ||
LlapUtil.humanReadableByteCount(maxMemory), memoryPercentage); | ||
} | ||
LOG.info("Max hash table memory: {} bytes", maxHashTblMemory); | ||
estimateRowSize(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -385,17 +385,22 @@ public static int countOperatorsUpstream(Operator<?> start, Set<Class<? extends | |
} | ||
|
||
public static void setMemoryAvailable(final List<Operator<? extends OperatorDesc>> operators, | ||
final long memoryAvailableToTask) { | ||
if (operators == null) { | ||
final long memoryAvailable) { | ||
if (operators == null || operators.isEmpty()) { | ||
return; | ||
} | ||
|
||
final long memoryAvailablePerOperator = memoryAvailable / operators.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if we have uneven distribution? won't it cause even more frequent OOMs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uneven allocations could happen when SWO merges different types of operators or merged operators consume memory differently. This change would not increase the risk of OOM, but the more memory-consuming GroupByOperator has to flush itself more than before. Let's say the maxMemoryAvailable is 1024MB and 2 GroupByOperators are merged. Previously, it was possible that GBO1 used only 100MB and GBO2 used 800MB as the 1024MB are overcommitted. With this PR, GBO1 would use 100MB and GBO2 would maintain itself so that it uses up to 500MB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could use a number less than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my impression, my naive approach is likely to work in most cases because SWO is likely to merge similar processing. But it would be nice if we could find good heuristics to calculate the cost here. |
||
Preconditions.checkArgument(memoryAvailablePerOperator > 0); | ||
if (operators.size() > 1) { | ||
LOG.info("Assigning {} bytes to {} operators", memoryAvailablePerOperator, operators.size()); | ||
} | ||
for (Operator<? extends OperatorDesc> op : operators) { | ||
if (op.getConf() != null) { | ||
op.getConf().setMaxMemoryAvailable(memoryAvailableToTask); | ||
op.getConf().setMaxMemoryAvailable(memoryAvailablePerOperator); | ||
} | ||
if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) { | ||
setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask); | ||
setMemoryAvailable(op.getChildOperators(), memoryAvailablePerOperator); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -450,7 +450,7 @@ public void initialize(Configuration hconf) throws HiveException { | |||||||
+ "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length, | ||||||||
numRowsCompareHashAggr, minReductionHashAggr); | ||||||||
} | ||||||||
computeMemoryLimits(); | ||||||||
computeMemoryLimits(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree but I am wondering if we should take # of executors into account if it is LLAP. I'd like you to find an LLAP expert if possible... hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Lines 875 to 877 in 18f34e7
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @abstractdog |
||||||||
LOG.debug("using hash aggregation processing mode"); | ||||||||
|
||||||||
if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof VectorHashKeyWrapperGeneral) { | ||||||||
|
@@ -616,7 +616,7 @@ private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) { | |||||||
/** | ||||||||
* Computes the memory limits for hash table flush (spill). | ||||||||
*/ | ||||||||
private void computeMemoryLimits() { | ||||||||
private void computeMemoryLimits(boolean isTez) { | ||||||||
JavaDataModel model = JavaDataModel.get(); | ||||||||
|
||||||||
fixedHashEntrySize = | ||||||||
|
@@ -625,7 +625,7 @@ private void computeMemoryLimits() { | |||||||
aggregationBatchInfo.getAggregatorsFixedSize(); | ||||||||
|
||||||||
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); | ||||||||
maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); | ||||||||
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess Tez without LLAP also should use this conf. This is the only case where checking if it is LLAP or not when using getMaxMemoryAvailable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @abstractdog |
||||||||
memoryThreshold = conf.getMemoryThreshold(); | ||||||||
// Tests may leave this unitialized, so better set it to 1 | ||||||||
if (memoryThreshold == 0.0f) { | ||||||||
|
@@ -634,15 +634,16 @@ private void computeMemoryLimits() { | |||||||
|
||||||||
maxHashTblMemory = (int)(maxMemory * memoryThreshold); | ||||||||
|
||||||||
if (LOG.isDebugEnabled()) { | ||||||||
LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", | ||||||||
isLlap, | ||||||||
LlapUtil.humanReadableByteCount(maxHashTblMemory), | ||||||||
LlapUtil.humanReadableByteCount(maxMemory), | ||||||||
memoryThreshold, | ||||||||
fixedHashEntrySize, | ||||||||
keyWrappersBatch.getKeysFixedSize(), | ||||||||
aggregationBatchInfo.getAggregatorsFixedSize()); | ||||||||
if (LOG.isInfoEnabled()) { | ||||||||
LOG.info("GBY memory limits - isTez: {} isLlap: {} maxHashTblMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", | ||||||||
isTez, | ||||||||
isLlap, | ||||||||
LlapUtil.humanReadableByteCount(maxHashTblMemory), | ||||||||
LlapUtil.humanReadableByteCount(maxMemory), | ||||||||
memoryThreshold, | ||||||||
fixedHashEntrySize, | ||||||||
keyWrappersBatch.getKeysFixedSize(), | ||||||||
aggregationBatchInfo.getAggregatorsFixedSize()); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that safe to use
memoryMXBean.getHeapMemoryUsage().getMax()
?when
Runtime.getRuntime().maxMemory()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the maximum size of heap size is determined by YARN, but I don't have any real hints justifying my hypotheses. I copied the line as it is