-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28548: Subdivide memory size allocated to parallel operators #5478
base: master
Are you sure you want to change the base?
Conversation
b9aca7a
to
e7ee193
Compare
e7ee193
to
cc973c7
Compare
|
@@ -625,7 +625,7 @@ private void computeMemoryLimits() { | |||
aggregationBatchInfo.getAggregatorsFixedSize(); | |||
|
|||
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); | |||
maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); | |||
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess Tez without LLAP also should use this conf. This is the only case where checking if it is LLAP or not when using getMaxMemoryAvailable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @abstractdog
@@ -411,13 +412,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { | |||
* aggregation only | |||
**/ | |||
private void computeMaxEntriesHashAggr() throws HiveException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass maxMemory
as a param here to avoid reordering issues after refactor in future?
@@ -389,14 +390,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { | |||
isLlap = LlapDaemonInfo.INSTANCE.isLlap(); | |||
numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; | |||
firstRow = true; | |||
memoryMXBean = ManagementFactory.getMemoryMXBean(); | |||
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that safe to use memoryMXBean.getHeapMemoryUsage().getMax()
?
Returns the maximum amount of memory in bytes that can be used for memory management. This method returns -1 if the maximum memory size is undefined.
when Runtime.getRuntime().maxMemory()
Returns the maximum amount of memory that the Java virtual machine will attempt to use. If there is no inherent limit then the value Long.MAX_VALUE will be returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the maximum size of heap size is determined by YARN, but I don't have any real hints justifying my hypotheses. I copied the line as it is
return; | ||
} | ||
|
||
final long memoryAvailablePerOperator = memoryAvailable / operators.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we have uneven distribution? won't it cause even more frequent OOMs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uneven allocations could happen when SWO merges different types of operators or merged operators consume memory differently. This change would not increase the risk of OOM, but the more memory-consuming GroupByOperator has to flush itself more than before.
Let's say the maxMemoryAvailable is 1024MB and 2 GroupByOperators are merged. Previously, it was possible that GBO1 used only 100MB and GBO2 used 800MB as the 1024MB are overcommitted. With this PR, GBO1 would use 100MB and GBO2 would maintain itself so that it uses up to 500MB.
@@ -450,7 +450,7 @@ public void initialize(Configuration hconf) throws HiveException { | |||
+ "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length, | |||
numRowsCompareHashAggr, minReductionHashAggr); | |||
} | |||
computeMemoryLimits(); | |||
computeMemoryLimits(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we set isTez
in initializeOp, similar to isLlap
and drop isLLap
since it's not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree but I am wondering if we should take # of executors into account if it is LLAP. I'd like you to find an LLAP expert if possible...
hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
Lines 875 to 877 in 18f34e7
// TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache. | |
// Assuming the used memory is equally divided among all executors. | |
usedMemory = isLlap ? usedMemory / numExecutors : usedMemory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @abstractdog
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
|
return; | ||
} | ||
|
||
final long memoryAvailablePerOperator = memoryAvailable / operators.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use a number less than operators.size()
since not all operators require significant memory that should be managed by getMaxMemoryAvailable()
. For instance, we can likely ignore select and filter operators, as they behave more like fire-and-forget operations and do not consume much memory. Given that OperatorDesc.getMaxMemoryAvailable()
is referenced by MapJoin, ReduceSink (with TopN), and GroupBy (with Hash Aggregate Mode), I think #MapJoin + #(RS with TopN) + #(GBY with HashMode) + 1(for the rest) could be a candidate.
Just for your information, we tested this patch using TPC-DS queries 1-25 on a 10TB dataset. The total execution times were similar, though query 17 became slower (54s → 94s), while query 5 improved (142s → 101s). I haven't investigated the differences in detail, so they could be due to factors like stragglers or other irrelevant issues. Overall, I think the total execution times show that the patch doesn’t introduce any significant performance problems in typical cases, and the patch seems like a reasonable change to me. |
What changes were proposed in this pull request?
Let each operator know how much it can use at maximum.
Why are the changes needed?
We observed OOM happens when SharedWorkOptimizer merges heavy operators such as GroupByOperators with a map-side hash aggregation. I guess it is hard for GroupByOperator to control its memory correctly in that case.
I confirmed Tez has a similar feature to reallocate memory when a task is connected to multiple edges.
https://github.com/apache/tez/blob/rel/release-0.10.4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
https://issues.apache.org/jira/browse/HIVE-28548
I also found it is still problematic when # of merged operators is huge, e.g. 100, because the assignment per operator gets tiny. I will handle such case in HIVE-28549.
Does this PR introduce any user-facing change?
No.
Is the change a dependency upgrade?
No.
How was this patch tested?
Checked local logs.
The total memory reduced from 358MB to 71MB.