Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28548: Subdivide memory size allocated to parallel operators #5478

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
Expand Down Expand Up @@ -389,14 +390,14 @@ protected void initializeOp(Configuration hconf) throws HiveException {
isLlap = LlapDaemonInfo.INSTANCE.isLlap();
numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
firstRow = true;
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that safe to use memoryMXBean.getHeapMemoryUsage().getMax()?

Returns the maximum amount of memory in bytes that can be used for memory management. This method returns -1 if the maximum memory size is undefined.

when Runtime.getRuntime().maxMemory()

Returns the maximum amount of memory that the Java virtual machine will attempt to use. If there is no inherent limit then the value Long.MAX_VALUE will be returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the maximum size of heap size is determined by YARN, but I don't have any real hints justifying my hypotheses. I copied the line as it is

// estimate the number of hash table entries based on the size of each
// entry. Since the size of a entry
// is not known, estimate that based on the number of entries
if (hashAggr) {
computeMaxEntriesHashAggr();
}
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
memoryThreshold = this.getConf().getMemoryThreshold();
LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory);
}
Expand All @@ -411,13 +412,12 @@ protected void initializeOp(Configuration hconf) throws HiveException {
* aggregation only
**/
private void computeMaxEntriesHashAggr() throws HiveException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass maxMemory as a param here to avoid reordering issues after refactor in future?

float memoryPercentage = this.getConf().getGroupByMemoryUsage();
if (isTez) {
maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable());
} else {
maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
final float memoryPercentage = this.getConf().getGroupByMemoryUsage();
maxHashTblMemory = (long) (memoryPercentage * maxMemory);
if (LOG.isInfoEnabled()) {
LOG.info("Max hash table memory: {} ({} * {})", LlapUtil.humanReadableByteCount(maxHashTblMemory),
LlapUtil.humanReadableByteCount(maxMemory), memoryPercentage);
}
LOG.info("Max hash table memory: {} bytes", maxHashTblMemory);
estimateRowSize();
}

Expand Down
13 changes: 9 additions & 4 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,22 @@ public static int countOperatorsUpstream(Operator<?> start, Set<Class<? extends
}

public static void setMemoryAvailable(final List<Operator<? extends OperatorDesc>> operators,
final long memoryAvailableToTask) {
if (operators == null) {
final long memoryAvailable) {
if (operators == null || operators.isEmpty()) {
return;
}

final long memoryAvailablePerOperator = memoryAvailable / operators.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we have uneven distribution? won't it cause even more frequent OOMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uneven allocations could happen when SWO merges different types of operators or merged operators consume memory differently. This change would not increase the risk of OOM, but the more memory-consuming GroupByOperator has to flush itself more than before.

Let's say the maxMemoryAvailable is 1024MB and 2 GroupByOperators are merged. Previously, it was possible that GBO1 used only 100MB and GBO2 used 800MB as the 1024MB are overcommitted. With this PR, GBO1 would use 100MB and GBO2 would maintain itself so that it uses up to 500MB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use a number less than operators.size() since not all operators require significant memory that should be managed by getMaxMemoryAvailable(). For instance, we can likely ignore select and filter operators, as they behave more like fire-and-forget operations and do not consume much memory. Given that OperatorDesc.getMaxMemoryAvailable() is referenced by MapJoin, ReduceSink (with TopN), and GroupBy (with Hash Aggregate Mode), I think #MapJoin + #(RS with TopN) + #(GBY with HashMode) + 1(for the rest) could be a candidate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my impression, my naive approach is likely to work in most cases because SWO is likely to merge similar processing. But it would be nice if we could find good heuristics to calculate the cost here.

Preconditions.checkArgument(memoryAvailablePerOperator > 0);
if (operators.size() > 1) {
LOG.info("Assigning {} bytes to {} operators", memoryAvailablePerOperator, operators.size());
}
for (Operator<? extends OperatorDesc> op : operators) {
if (op.getConf() != null) {
op.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
op.getConf().setMaxMemoryAvailable(memoryAvailablePerOperator);
}
if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) {
setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask);
setMemoryAvailable(op.getChildOperators(), memoryAvailablePerOperator);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void initialize(Configuration hconf) throws HiveException {
+ "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length,
numRowsCompareHashAggr, minReductionHashAggr);
}
computeMemoryLimits();
computeMemoryLimits(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"));
Copy link
Member

@deniskuzZ deniskuzZ Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we set isTez in initializeOp, similar to isLlap and drop isLLap since it's not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I am wondering if we should take # of executors into account if it is LLAP. I'd like you to find an LLAP expert if possible...

// TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache.
// Assuming the used memory is equally divided among all executors.
usedMemory = isLlap ? usedMemory / numExecutors : usedMemory;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.debug("using hash aggregation processing mode");

if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof VectorHashKeyWrapperGeneral) {
Expand Down Expand Up @@ -616,7 +616,7 @@ private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) {
/**
* Computes the memory limits for hash table flush (spill).
*/
private void computeMemoryLimits() {
private void computeMemoryLimits(boolean isTez) {
JavaDataModel model = JavaDataModel.get();

fixedHashEntrySize =
Expand All @@ -625,7 +625,7 @@ private void computeMemoryLimits() {
aggregationBatchInfo.getAggregatorsFixedSize();

MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
Copy link
Contributor Author

@okumin okumin Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Tez without LLAP also should use this conf. This is the only case where checking if it is LLAP or not when using getMaxMemoryAvailable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memoryThreshold = conf.getMemoryThreshold();
// Tests may leave this unitialized, so better set it to 1
if (memoryThreshold == 0.0f) {
Expand All @@ -634,15 +634,16 @@ private void computeMemoryLimits() {

maxHashTblMemory = (int)(maxMemory * memoryThreshold);

if (LOG.isDebugEnabled()) {
LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})",
isLlap,
LlapUtil.humanReadableByteCount(maxHashTblMemory),
LlapUtil.humanReadableByteCount(maxMemory),
memoryThreshold,
fixedHashEntrySize,
keyWrappersBatch.getKeysFixedSize(),
aggregationBatchInfo.getAggregatorsFixedSize());
if (LOG.isInfoEnabled()) {
LOG.info("GBY memory limits - isTez: {} isLlap: {} maxHashTblMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})",
isTez,
isLlap,
LlapUtil.humanReadableByteCount(maxHashTblMemory),
LlapUtil.humanReadableByteCount(maxMemory),
memoryThreshold,
fixedHashEntrySize,
keyWrappersBatch.getKeysFixedSize(),
aggregationBatchInfo.getAggregatorsFixedSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ public class TestVectorGroupByOperator {

HiveConf hconf = new HiveConf();

private static GroupByDesc newGroupByDesc() {
GroupByDesc desc = new GroupByDesc();
long memoryAvailable = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
desc.setMaxMemoryAvailable(memoryAvailable);
return desc;
}

private static ExprNodeDesc buildColumnDesc(
VectorizationContext ctx,
String column,
Expand Down Expand Up @@ -199,7 +206,7 @@ private static Pair<GroupByDesc,VectorGroupByDesc> buildGroupByDescType(
ArrayList<String> outputColumnNames = new ArrayList<String>();
outputColumnNames.add("_col0");

GroupByDesc desc = new GroupByDesc();
GroupByDesc desc = newGroupByDesc();
VectorGroupByDesc vectorDesc = new VectorGroupByDesc();

desc.setOutputColumnNames(outputColumnNames);
Expand All @@ -219,7 +226,7 @@ private static Pair<GroupByDesc,VectorGroupByDesc> buildGroupByDescCountStar(
ArrayList<String> outputColumnNames = new ArrayList<String>();
outputColumnNames.add("_col0");

GroupByDesc desc = new GroupByDesc();
GroupByDesc desc = newGroupByDesc();
VectorGroupByDesc vectorDesc = new VectorGroupByDesc();
vectorDesc.setVecAggrDescs(
new VectorAggregationDesc[] {
Expand Down Expand Up @@ -2425,7 +2432,7 @@ private void testMultiKey(
TypeInfoFactory.getPrimitiveTypeInfo(columnTypes[i])));
}

GroupByDesc desc = new GroupByDesc();
GroupByDesc desc = newGroupByDesc();
VectorGroupByDesc vectorGroupByDesc = new VectorGroupByDesc();

desc.setOutputColumnNames(outputColumnNames);
Expand Down Expand Up @@ -2542,7 +2549,7 @@ private void testKeyTypeAggregate(
outputColumnNames.add("_col0");
outputColumnNames.add("_col1");

GroupByDesc desc = new GroupByDesc();
GroupByDesc desc = newGroupByDesc();
VectorGroupByDesc vectorGroupByDesc = new VectorGroupByDesc();

desc.setOutputColumnNames(outputColumnNames);
Expand Down
Loading