From b6c45cc2d8693fd42d321fef90b909a9ac5906a5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 22 Jan 2024 17:20:52 -0600 Subject: [PATCH 1/3] Refactor full join iterator to allow access to build tracker Signed-off-by: Jason Lowe --- .../rapids/AbstractGpuJoinIterator.scala | 14 +- .../sql/rapids/execution/GpuHashJoin.scala | 186 ++++++++++++------ 2 files changed, 127 insertions(+), 73 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala index 37d636bf82e..91e5235a22c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,8 @@ abstract class AbstractGpuJoinIterator( */ protected def setupNextGatherer(): Option[JoinGatherer] - protected def getFinalBatch(): Option[ColumnarBatch] = None + /** Whether to automatically call close() on this iterator when it is exhausted. */ + protected val shouldAutoCloseOnExhaust: Boolean = true override def hasNext: Boolean = { if (closed) { @@ -107,12 +108,9 @@ abstract class AbstractGpuJoinIterator( } } } - if (nextCb.isEmpty) { - nextCb = getFinalBatch() - if (nextCb.isEmpty) { - // Nothing is left to return so close ASAP. - opTime.ns(close()) - } + if (nextCb.isEmpty && shouldAutoCloseOnExhaust) { + // Nothing is left to return so close ASAP. + opTime.ns(close()) } nextCb.isDefined } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index cbaa1cbe47c..94d48d22f8c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -557,19 +557,15 @@ class ConditionalHashJoinIterator( } } -/** - * An iterator that does a hash full join against a stream of batches. It does this by - * doing a left or right outer join and keeping track of the hits on the build side. It then - * produces a final batch of all the build side rows that were not already included. - */ -class HashFullJoinIterator( + +class HashFullJoinStreamSideIterator( built: LazySpillableColumnarBatch, boundBuiltKeys: Seq[Expression], + buildSideTrackerInit: Option[SpillableColumnarBatch], stream: Iterator[LazySpillableColumnarBatch], boundStreamKeys: Seq[Expression], streamAttributes: Seq[Attribute], - boundCondition: Option[GpuExpression], - numFirstConditionTableColumns: Int, + compiledCondition: Option[CompiledExpression], targetSize: Long, buildSide: GpuBuildSide, compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs @@ -588,18 +584,13 @@ class HashFullJoinIterator( joinTime = joinTime) { // Full Join is implemented via LeftOuter or RightOuter join, depending on the build side. private val useLeftOuterJoin = (buildSide == GpuBuildRight) - private val numBuiltRows = built.numRows - - private[this] var builtSideTracker : Option[SpillableColumnarBatch] = None private val nullEquality = if (compareNullsEqual) NullEquality.EQUAL else NullEquality.UNEQUAL - private val compiledConditionRes: Option[CompiledExpression] = boundCondition.map { gpuExpr => - use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile())) - } + private[this] var builtSideTracker: Option[SpillableColumnarBatch] = buildSideTrackerInit private def unconditionalLeftJoinGatherMaps( - leftKeys: Table, rightKeys: Table) : Array[GatherMap] = { + leftKeys: Table, rightKeys: Table): Array[GatherMap] = { if (useLeftOuterJoin) { leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual) } else { @@ -614,7 +605,7 @@ class HashFullJoinIterator( leftData: LazySpillableColumnarBatch, rightKeys: Table, rightData: LazySpillableColumnarBatch, - compiledCondition: CompiledExpression) : Array[GatherMap] = { + compiledCondition: CompiledExpression): Array[GatherMap] = { withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable => withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable => if (useLeftOuterJoin) { @@ -637,8 +628,8 @@ class HashFullJoinIterator( rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { withResource(new NvtxWithMetrics("full hash join gather map", NvtxColor.ORANGE, joinTime)) { _ => - val maps = compiledConditionRes.map { compiledCondition => - conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, compiledCondition) + val maps = compiledCondition.map { condition => + conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, condition) }.getOrElse { unconditionalLeftJoinGatherMaps(leftKeys, rightKeys) } @@ -674,55 +665,20 @@ class HashFullJoinIterator( } } + // Need to avoid close on exhaust so others can access the built side tracker after iteration. + override protected val shouldAutoCloseOnExhaust: Boolean = false + + def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = { + val result = builtSideTracker + builtSideTracker = None + result + } + override def close(): Unit = { if (!closed) { super.close() - compiledConditionRes.foreach(_.close()) builtSideTracker.foreach(_.close()) - } - } - - override def getFinalBatch(): Option[ColumnarBatch] = { - withResource(new NvtxWithMetrics("get final batch", - NvtxColor.ORANGE, joinTime)) { _ => - builtSideTracker match { - case None => None - case Some(tracker) => { - val filteredBatch = withResource(tracker) { scb => - withResource(scb.getColumnarBatch()) { trackerBatch => - withResource(GpuColumnVector.from(trackerBatch)) { trackerTab => - val batch = built.getBatch - withResource(GpuColumnVector.from(batch)) { builtTable => - withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab => - GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch)) - } - } - } - } - } - // Combine build-side columns with null columns for stream side - val ret = withResource(filteredBatch) { builtBatch => - val numFilterRows = builtBatch.numRows() - if (numFilterRows > 0) { - val streamColumns = streamAttributes.safeMap { attr => - GpuColumnVector.fromNull(numFilterRows, attr.dataType) - } - withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch => - buildSide match { - case GpuBuildRight => - Some(GpuColumnVector.combineColumns(streamBatch, builtBatch)) - case GpuBuildLeft => - Some(GpuColumnVector.combineColumns(builtBatch, streamBatch)) - } - } - } else { - None - } - } - builtSideTracker = None - ret - } - } + builtSideTracker = None } } @@ -768,7 +724,7 @@ class HashFullJoinIterator( } } }.getOrElse { - trueColumnTable(numBuiltRows) + trueColumnTable(built.numRows) } withResource(builtTrackingTable) { trackingTable => withResource(Scalar.fromBool(false)) { falseScalar => @@ -787,6 +743,106 @@ class HashFullJoinIterator( } } +/** + * An iterator that does a hash full join against a stream of batches. It does this by + * doing a left or right outer join and keeping track of the hits on the build side. It then + * produces a final batch of all the build side rows that were not already included. + */ +class HashFullJoinIterator( + built: LazySpillableColumnarBatch, + boundBuiltKeys: Seq[Expression], + stream: Iterator[LazySpillableColumnarBatch], + boundStreamKeys: Seq[Expression], + streamAttributes: Seq[Attribute], + boundCondition: Option[GpuExpression], + numFirstConditionTableColumns: Int, + targetSize: Long, + buildSide: GpuBuildSide, + compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs + opTime: GpuMetric, + joinTime: GpuMetric) extends Iterator[ColumnarBatch] with TaskAutoCloseableResource { + + private val compiledCondition: Option[CompiledExpression] = boundCondition.map { gpuExpr => + use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile())) + } + + private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys, None, + stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize, buildSide, + compareNullsEqual, opTime, joinTime) + + private var finalBatch: Option[ColumnarBatch] = None + + override def hasNext: Boolean = { + if (streamJoinIter.hasNext || finalBatch.isDefined) { + true + } else { + finalBatch = getFinalBatch() + finalBatch.isDefined + } + } + + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException("batches exhausted") + } + if (streamJoinIter.hasNext) { + streamJoinIter.next() + } else { + val batch = finalBatch.get + finalBatch = None + batch + } + } + + override def close(): Unit = { + if (!closed) { + super.close() + finalBatch.foreach(_.close()) + finalBatch = None + } + } + + private def getFinalBatch(): Option[ColumnarBatch] = { + withResource(new NvtxWithMetrics("get final batch", NvtxColor.ORANGE, joinTime)) { _ => + streamJoinIter.releaseBuiltSideTracker() match { + case None => None + case Some(tracker) => + val filteredBatch = withResource(tracker) { scb => + withResource(scb.getColumnarBatch()) { trackerBatch => + withResource(GpuColumnVector.from(trackerBatch)) { trackerTab => + val batch = built.getBatch + withResource(GpuColumnVector.from(batch)) { builtTable => + withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab => + GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch)) + } + } + } + } + } + // Combine build-side columns with null columns for stream side + withResource(filteredBatch) { builtBatch => + val numFilterRows = builtBatch.numRows() + if (numFilterRows > 0) { + val streamColumns = streamAttributes.safeMap { attr => + GpuColumnVector.fromNull(numFilterRows, attr.dataType) + } + withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch => + buildSide match { + case GpuBuildRight => + Some(GpuColumnVector.combineColumns(streamBatch, builtBatch)) + case GpuBuildLeft => + Some(GpuColumnVector.combineColumns(builtBatch, streamBatch)) + } + } + } else { + None + } + } + } + } + } +} + class HashedExistenceJoinIterator( spillableBuiltBatch: LazySpillableColumnarBatch, boundBuildKeys: Seq[GpuExpression], From 8c9a93c32f08b10c48e59bf7048ebe2399942b54 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 23 Jan 2024 11:07:53 -0600 Subject: [PATCH 2/3] Add comments and close stream iterator earlier --- .../sql/rapids/execution/GpuHashJoin.scala | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index 94d48d22f8c..e2f6cf7ba28 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -558,6 +558,28 @@ class ConditionalHashJoinIterator( } +/** + * An iterator that does the stream-side only of a hash full join In other words, it performs the + * left or right outer join for the stream side's view of a full outer join. As the join is + * performed, the build-side rows that are referenced during the join are tracked and can be + * retrieved after the iteration has completed to assist in performing the anti-join needed to + * produce the final results needed for the full outer join. + * + * @param built spillable form of the build side table. This will be closed by the iterator. + * @param boundBuiltKeys bound expressions for the build side equi-join keys + * @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be + * closed by the iterator. + * @param stream iterator to produce batches for the stream side table + * @param boundStreamKeys bound expressions for the stream side equi-join keys + * @param streamAttributes schema of the stream side table + * @param compiledCondition compiled AST expression for the inequality condition of the join, + * if any. NOTE: This will *not* be closed by the iterator. + * @param targetSize target GPU batch size in bytes + * @param buildSide which side of the join is being used for the build side + * @param compareNullsEqual whether to compare nulls as equal during the join + * @param opTime metric to update for total operation time + * @param joinTime metric to update for join time + */ class HashFullJoinStreamSideIterator( built: LazySpillableColumnarBatch, boundBuiltKeys: Seq[Expression], @@ -668,6 +690,11 @@ class HashFullJoinStreamSideIterator( // Need to avoid close on exhaust so others can access the built side tracker after iteration. override protected val shouldAutoCloseOnExhaust: Boolean = false + /** + * Retrieve the tracking data for the build side rows that have been referenced during the + * join. This is normally called after iteration has completed. The caller takes ownership + * of the resulting data and is responsible for closing it. + */ def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = { val result = builtSideTracker builtSideTracker = None @@ -747,6 +774,18 @@ class HashFullJoinStreamSideIterator( * An iterator that does a hash full join against a stream of batches. It does this by * doing a left or right outer join and keeping track of the hits on the build side. It then * produces a final batch of all the build side rows that were not already included. + * + * @param built spillable form of the build side table. This will be closed by the iterator. + * @param boundBuiltKeys bound expressions for the build side equi-join keys + * @param stream iterator to produce batches for the stream side table + * @param boundStreamKeys bound expressions for the stream side equi-join keys + * @param streamAttributes schema of the stream side table + * @param boundCondition expression for the inequality condition of the join, if any + * @param targetSize target GPU batch size in bytes + * @param buildSide which side of the join is being used for the build side + * @param compareNullsEqual whether to compare nulls as equal during the join + * @param opTime metric to update for total operation time + * @param joinTime metric to update for join time */ class HashFullJoinIterator( built: LazySpillableColumnarBatch, @@ -777,6 +816,9 @@ class HashFullJoinIterator( true } else { finalBatch = getFinalBatch() + // Now that we've manifested the final batch, we can close the stream iterator early to free + // GPU resources. + streamJoinIter.close() finalBatch.isDefined } } @@ -797,6 +839,7 @@ class HashFullJoinIterator( override def close(): Unit = { if (!closed) { super.close() + streamJoinIter.close() finalBatch.foreach(_.close()) finalBatch = None } From 86865dcf5a8d46b910fc6dd42a1019ee325a0459 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 23 Jan 2024 11:31:44 -0600 Subject: [PATCH 3/3] Allow existing build side tracker to be specified --- .../spark/sql/rapids/execution/GpuHashJoin.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index e2f6cf7ba28..8bb15dc06fd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -777,6 +777,8 @@ class HashFullJoinStreamSideIterator( * * @param built spillable form of the build side table. This will be closed by the iterator. * @param boundBuiltKeys bound expressions for the build side equi-join keys + * @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be + * closed by the iterator. * @param stream iterator to produce batches for the stream side table * @param boundStreamKeys bound expressions for the stream side equi-join keys * @param streamAttributes schema of the stream side table @@ -790,6 +792,7 @@ class HashFullJoinStreamSideIterator( class HashFullJoinIterator( built: LazySpillableColumnarBatch, boundBuiltKeys: Seq[Expression], + buildSideTrackerInit: Option[SpillableColumnarBatch], stream: Iterator[LazySpillableColumnarBatch], boundStreamKeys: Seq[Expression], streamAttributes: Seq[Attribute], @@ -805,9 +808,9 @@ class HashFullJoinIterator( use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile())) } - private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys, None, - stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize, buildSide, - compareNullsEqual, opTime, joinTime) + private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys, + buildSideTrackerInit, stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize, + buildSide, compareNullsEqual, opTime, joinTime) private var finalBatch: Option[ColumnarBatch] = None @@ -1118,7 +1121,7 @@ trait GpuHashJoin extends GpuExec { opTime, joinTime) case FullOuter => - new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, + new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, None, lazyStream, boundStreamKeys, streamedPlan.output, boundCondition, numFirstConditionTableColumns, targetSize, buildSide, compareNullsEqual, opTime, joinTime) case _ =>