Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28548: Subdivide memory size allocated to parallel operators #5478

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

okumin
Copy link
Contributor

@okumin okumin commented Oct 1, 2024

What changes were proposed in this pull request?

Let each operator know how much it can use at maximum.

Why are the changes needed?

We observed OOM happens when SharedWorkOptimizer merges heavy operators such as GroupByOperators with a map-side hash aggregation. I guess it is hard for GroupByOperator to control its memory correctly in that case.

I confirmed Tez has a similar feature to reallocate memory when a task is connected to multiple edges.
https://github.com/apache/tez/blob/rel/release-0.10.4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java

https://issues.apache.org/jira/browse/HIVE-28548

I also found it is still problematic when # of merged operators is huge, e.g. 100, because the assignment per operator gets tiny. I will handle such case in HIVE-28549.

Does this PR introduce any user-facing change?

No.

Is the change a dependency upgrade?

No.

How was this patch tested?

Checked local logs.

--! qt:dataset:src

set hive.auto.convert.join=true;

SELECT *
FROM (SELECT key, count(*) AS num FROM src WHERE key LIKE '%1%' GROUP BY key) t1
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%2%' GROUP BY key) t2 ON t1.key = t2.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%3%' GROUP BY key) t3 ON t1.key = t3.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%4%' GROUP BY key) t4 ON t1.key = t4.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%5%' GROUP BY key) t5 ON t1.key = t5.key;

set hive.vectorized.execution.enabled=false;

SELECT *
FROM (SELECT key, count(*) AS num FROM src WHERE key LIKE '%1%' GROUP BY key) t1
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%2%' GROUP BY key) t2 ON t1.key = t2.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%3%' GROUP BY key) t3 ON t1.key = t3.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%4%' GROUP BY key) t4 ON t1.key = t4.key
LEFT OUTER JOIN (SELECT key, count(*) AS num FROM src WHERE key LIKE '%5%' GROUP BY key) t5 ON t1.key = t5.key;

The total memory reduced from 358MB to 71MB.

% cat org.apache.hadoop.hive.cli.TestMiniLlapLocalCliDriver-output.txt | grep OperatorUtils | grep Assigning
2024-09-30T23:54:38,486  INFO [TezTR-672468_1_3_0_0_0] exec.OperatorUtils: Assigning 75161926 bytes to 5 operators
2024-09-30T23:54:38,831  INFO [TezTR-672468_1_3_0_0_1] exec.OperatorUtils: Assigning 75161926 bytes to 5 operators
2024-09-30T23:54:42,015  INFO [TezTR-672468_1_4_0_0_0] exec.OperatorUtils: Assigning 75161926 bytes to 5 operators
2024-09-30T23:54:42,310  INFO [TezTR-672468_1_4_0_0_1] exec.OperatorUtils: Assigning 75161926 bytes to 5 operators
% cat org.apache.hadoop.hive.cli.TestMiniLlapLocalCliDriver-output.txt | grep GroupByOperator | grep 'Max hash table'
2024-09-30T23:54:36,280  INFO [TezTR-672468_1_2_0_0_0] exec.GroupByOperator: Max hash table memory: 179.20MB (358.40MB * 0.5)
2024-09-30T23:54:42,015  INFO [TezTR-672468_1_4_0_0_0] exec.GroupByOperator: Max hash table memory: 35.84MB (71.68MB * 0.5)
2024-09-30T23:54:42,017  INFO [TezTR-672468_1_4_0_0_0] exec.GroupByOperator: Max hash table memory: 35.84MB (71.68MB * 0.5)
2024-09-30T23:54:42,018  INFO [TezTR-672468_1_4_0_0_0] exec.GroupByOperator: Max hash table memory: 35.84MB (71.68MB * 0.5)
2024-09-30T23:54:42,018  INFO [TezTR-672468_1_4_0_0_0] exec.GroupByOperator: Max hash table memory: 35.84MB (71.68MB * 0.5)
...
% cat org.apache.hadoop.hive.cli.TestMiniLlapLocalCliDriver-output.txt | grep VectorGroupByOperator | grep 'GBY memory limits'
2024-09-30T23:54:38,491  INFO [TezTR-672468_1_3_0_0_0] vector.VectorGroupByOperator: GBY memory limits - isTez: true isLlap: true maxHashTblMemory: 64.51MB (71.68MB * 0.9) fixSize:660 (key:472 agg:144)
2024-09-30T23:54:38,499  INFO [TezTR-672468_1_3_0_0_0] vector.VectorGroupByOperator: GBY memory limits - isTez: true isLlap: true maxHashTblMemory: 64.51MB (71.68MB * 0.9) fixSize:660 (key:472 agg:144)
2024-09-30T23:54:38,500  INFO [TezTR-672468_1_3_0_0_0] vector.VectorGroupByOperator: GBY memory limits - isTez: true isLlap: true maxHashTblMemory: 64.51MB (71.68MB * 0.9) fixSize:660 (key:472 agg:144)
2024-09-30T23:54:38,500  INFO [TezTR-672468_1_3_0_0_0] vector.VectorGroupByOperator: GBY memory limits - isTez: true isLlap: true maxHashTblMemory: 64.51MB (71.68MB * 0.9) fixSize:660 (key:472 agg:144)
...

Copy link

sonarqubecloud bot commented Oct 2, 2024

@okumin okumin changed the title [WIP] HIVE-28548: Subdivide memory size allocated to parallel operators HIVE-28548: Subdivide memory size allocated to parallel operators Oct 7, 2024
@@ -625,7 +625,7 @@ private void computeMemoryLimits() {
aggregationBatchInfo.getAggregatorsFixedSize();

MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
Copy link
Contributor Author

@okumin okumin Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Tez without LLAP also should use this conf. This is the only case where checking if it is LLAP or not when using getMaxMemoryAvailable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -411,13 +412,12 @@ protected void initializeOp(Configuration hconf) throws HiveException {
* aggregation only
**/
private void computeMaxEntriesHashAggr() throws HiveException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass maxMemory as a param here to avoid reordering issues after refactor in future?

@@ -389,14 +390,14 @@ protected void initializeOp(Configuration hconf) throws HiveException {
isLlap = LlapDaemonInfo.INSTANCE.isLlap();
numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
firstRow = true;
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that safe to use memoryMXBean.getHeapMemoryUsage().getMax()?

Returns the maximum amount of memory in bytes that can be used for memory management. This method returns -1 if the maximum memory size is undefined.

when Runtime.getRuntime().maxMemory()

Returns the maximum amount of memory that the Java virtual machine will attempt to use. If there is no inherent limit then the value Long.MAX_VALUE will be returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the maximum size of heap size is determined by YARN, but I don't have any real hints justifying my hypotheses. I copied the line as it is

return;
}

final long memoryAvailablePerOperator = memoryAvailable / operators.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we have uneven distribution? won't it cause even more frequent OOMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uneven allocations could happen when SWO merges different types of operators or merged operators consume memory differently. This change would not increase the risk of OOM, but the more memory-consuming GroupByOperator has to flush itself more than before.

Let's say the maxMemoryAvailable is 1024MB and 2 GroupByOperators are merged. Previously, it was possible that GBO1 used only 100MB and GBO2 used 800MB as the 1024MB are overcommitted. With this PR, GBO1 would use 100MB and GBO2 would maintain itself so that it uses up to 500MB.

@@ -450,7 +450,7 @@ public void initialize(Configuration hconf) throws HiveException {
+ "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length,
numRowsCompareHashAggr, minReductionHashAggr);
}
computeMemoryLimits();
computeMemoryLimits(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"));
Copy link
Member

@deniskuzZ deniskuzZ Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we set isTez in initializeOp, similar to isLlap and drop isLLap since it's not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I am wondering if we should take # of executors into account if it is LLAP. I'd like you to find an LLAP expert if possible...

// TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache.
// Assuming the used memory is equally divided among all executors.
usedMemory = isLlap ? usedMemory / numExecutors : usedMemory;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

github-actions bot commented Jan 5, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Feel free to reach out on the [email protected] list if the patch is in need of reviews.

Copy link

sonarqubecloud bot commented Feb 9, 2025

return;
}

final long memoryAvailablePerOperator = memoryAvailable / operators.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use a number less than operators.size() since not all operators require significant memory that should be managed by getMaxMemoryAvailable(). For instance, we can likely ignore select and filter operators, as they behave more like fire-and-forget operations and do not consume much memory. Given that OperatorDesc.getMaxMemoryAvailable() is referenced by MapJoin, ReduceSink (with TopN), and GroupBy (with Hash Aggregate Mode), I think #MapJoin + #(RS with TopN) + #(GBY with HashMode) + 1(for the rest) could be a candidate.

@ngsg
Copy link
Contributor

ngsg commented Mar 28, 2025

Just for your information, we tested this patch using TPC-DS queries 1-25 on a 10TB dataset. The total execution times were similar, though query 17 became slower (54s → 94s), while query 5 improved (142s → 101s). I haven't investigated the differences in detail, so they could be due to factors like stragglers or other irrelevant issues. Overall, I think the total execution times show that the patch doesn’t introduce any significant performance problems in typical cases, and the patch seems like a reasonable change to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants