diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 88b7d546b72c..acb2a30876e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.type.TimestampTZ; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -389,14 +390,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { isLlap = LlapDaemonInfo.INSTANCE.isLlap(); numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; firstRow = true; + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry // is not known, estimate that based on the number of entries if (hashAggr) { computeMaxEntriesHashAggr(); } - memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = this.getConf().getMemoryThreshold(); LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory); } @@ -411,13 +412,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { * aggregation only **/ private void computeMaxEntriesHashAggr() throws HiveException { - float memoryPercentage = this.getConf().getGroupByMemoryUsage(); - if (isTez) { - maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable()); - } else { - maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); + final float memoryPercentage = this.getConf().getGroupByMemoryUsage(); + maxHashTblMemory = (long) (memoryPercentage * maxMemory); + if (LOG.isInfoEnabled()) { + LOG.info("Max hash table memory: {} ({} * {})", LlapUtil.humanReadableByteCount(maxHashTblMemory), + LlapUtil.humanReadableByteCount(maxMemory), memoryPercentage); } - LOG.info("Max hash table memory: {} bytes", maxHashTblMemory); estimateRowSize(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 31400a2903b5..18508e5e4103 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -385,17 +385,22 @@ public static int countOperatorsUpstream(Operator start, Set> operators, - final long memoryAvailableToTask) { - if (operators == null) { + final long memoryAvailable) { + if (operators == null || operators.isEmpty()) { return; } + final long memoryAvailablePerOperator = memoryAvailable / operators.size(); + Preconditions.checkArgument(memoryAvailablePerOperator > 0); + if (operators.size() > 1) { + LOG.info("Assigning {} bytes to {} operators", memoryAvailablePerOperator, operators.size()); + } for (Operator op : operators) { if (op.getConf() != null) { - op.getConf().setMaxMemoryAvailable(memoryAvailableToTask); + op.getConf().setMaxMemoryAvailable(memoryAvailablePerOperator); } if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) { - setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask); + setMemoryAvailable(op.getChildOperators(), memoryAvailablePerOperator); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 505db9e5e611..16687e4156a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -450,7 +450,7 @@ public void initialize(Configuration hconf) throws HiveException { + "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length, numRowsCompareHashAggr, minReductionHashAggr); } - computeMemoryLimits(); + computeMemoryLimits(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")); LOG.debug("using hash aggregation processing mode"); if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof VectorHashKeyWrapperGeneral) { @@ -616,7 +616,7 @@ private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) { /** * Computes the memory limits for hash table flush (spill). */ - private void computeMemoryLimits() { + private void computeMemoryLimits(boolean isTez) { JavaDataModel model = JavaDataModel.get(); fixedHashEntrySize = @@ -625,7 +625,7 @@ private void computeMemoryLimits() { aggregationBatchInfo.getAggregatorsFixedSize(); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = conf.getMemoryThreshold(); // Tests may leave this unitialized, so better set it to 1 if (memoryThreshold == 0.0f) { @@ -634,15 +634,16 @@ private void computeMemoryLimits() { maxHashTblMemory = (int)(maxMemory * memoryThreshold); - if (LOG.isDebugEnabled()) { - LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", - isLlap, - LlapUtil.humanReadableByteCount(maxHashTblMemory), - LlapUtil.humanReadableByteCount(maxMemory), - memoryThreshold, - fixedHashEntrySize, - keyWrappersBatch.getKeysFixedSize(), - aggregationBatchInfo.getAggregatorsFixedSize()); + if (LOG.isInfoEnabled()) { + LOG.info("GBY memory limits - isTez: {} isLlap: {} maxHashTblMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", + isTez, + isLlap, + LlapUtil.humanReadableByteCount(maxHashTblMemory), + LlapUtil.humanReadableByteCount(maxMemory), + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize()); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index f189894169f9..cf09922a8ac4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -103,6 +103,13 @@ public class TestVectorGroupByOperator { HiveConf hconf = new HiveConf(); + private static GroupByDesc newGroupByDesc() { + GroupByDesc desc = new GroupByDesc(); + long memoryAvailable = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + desc.setMaxMemoryAvailable(memoryAvailable); + return desc; + } + private static ExprNodeDesc buildColumnDesc( VectorizationContext ctx, String column, @@ -199,7 +206,7 @@ private static Pair buildGroupByDescType( ArrayList outputColumnNames = new ArrayList(); outputColumnNames.add("_col0"); - GroupByDesc desc = new GroupByDesc(); + GroupByDesc desc = newGroupByDesc(); VectorGroupByDesc vectorDesc = new VectorGroupByDesc(); desc.setOutputColumnNames(outputColumnNames); @@ -219,7 +226,7 @@ private static Pair buildGroupByDescCountStar( ArrayList outputColumnNames = new ArrayList(); outputColumnNames.add("_col0"); - GroupByDesc desc = new GroupByDesc(); + GroupByDesc desc = newGroupByDesc(); VectorGroupByDesc vectorDesc = new VectorGroupByDesc(); vectorDesc.setVecAggrDescs( new VectorAggregationDesc[] { @@ -2425,7 +2432,7 @@ private void testMultiKey( TypeInfoFactory.getPrimitiveTypeInfo(columnTypes[i]))); } - GroupByDesc desc = new GroupByDesc(); + GroupByDesc desc = newGroupByDesc(); VectorGroupByDesc vectorGroupByDesc = new VectorGroupByDesc(); desc.setOutputColumnNames(outputColumnNames); @@ -2542,7 +2549,7 @@ private void testKeyTypeAggregate( outputColumnNames.add("_col0"); outputColumnNames.add("_col1"); - GroupByDesc desc = new GroupByDesc(); + GroupByDesc desc = newGroupByDesc(); VectorGroupByDesc vectorGroupByDesc = new VectorGroupByDesc(); desc.setOutputColumnNames(outputColumnNames);