Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor full join iterator to allow access to build tracker #10246

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,8 @@ abstract class AbstractGpuJoinIterator(
*/
protected def setupNextGatherer(): Option[JoinGatherer]

protected def getFinalBatch(): Option[ColumnarBatch] = None
/** Whether to automatically call close() on this iterator when it is exhausted. */
protected val shouldAutoCloseOnExhaust: Boolean = true

override def hasNext: Boolean = {
if (closed) {
Expand Down Expand Up @@ -107,12 +108,9 @@ abstract class AbstractGpuJoinIterator(
}
}
}
if (nextCb.isEmpty) {
nextCb = getFinalBatch()
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
if (nextCb.isEmpty && shouldAutoCloseOnExhaust) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
nextCb.isDefined
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -557,19 +557,37 @@ class ConditionalHashJoinIterator(
}
}


/**
* An iterator that does a hash full join against a stream of batches. It does this by
* doing a left or right outer join and keeping track of the hits on the build side. It then
* produces a final batch of all the build side rows that were not already included.
* An iterator that does the stream-side only of a hash full join In other words, it performs the
* left or right outer join for the stream side's view of a full outer join. As the join is
* performed, the build-side rows that are referenced during the join are tracked and can be
* retrieved after the iteration has completed to assist in performing the anti-join needed to
* produce the final results needed for the full outer join.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
* closed by the iterator.
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param compiledCondition compiled AST expression for the inequality condition of the join,
* if any. NOTE: This will *not* be closed by the iterator.
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinIterator(
class HashFullJoinStreamSideIterator(
built: LazySpillableColumnarBatch,
boundBuiltKeys: Seq[Expression],
buildSideTrackerInit: Option[SpillableColumnarBatch],
stream: Iterator[LazySpillableColumnarBatch],
boundStreamKeys: Seq[Expression],
streamAttributes: Seq[Attribute],
boundCondition: Option[GpuExpression],
numFirstConditionTableColumns: Int,
compiledCondition: Option[CompiledExpression],
targetSize: Long,
buildSide: GpuBuildSide,
compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
Expand All @@ -588,18 +606,13 @@ class HashFullJoinIterator(
joinTime = joinTime) {
// Full Join is implemented via LeftOuter or RightOuter join, depending on the build side.
private val useLeftOuterJoin = (buildSide == GpuBuildRight)
private val numBuiltRows = built.numRows

private[this] var builtSideTracker : Option[SpillableColumnarBatch] = None

private val nullEquality = if (compareNullsEqual) NullEquality.EQUAL else NullEquality.UNEQUAL

private val compiledConditionRes: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
}
private[this] var builtSideTracker: Option[SpillableColumnarBatch] = buildSideTrackerInit

private def unconditionalLeftJoinGatherMaps(
leftKeys: Table, rightKeys: Table) : Array[GatherMap] = {
leftKeys: Table, rightKeys: Table): Array[GatherMap] = {
if (useLeftOuterJoin) {
leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
} else {
Expand All @@ -614,7 +627,7 @@ class HashFullJoinIterator(
leftData: LazySpillableColumnarBatch,
rightKeys: Table,
rightData: LazySpillableColumnarBatch,
compiledCondition: CompiledExpression) : Array[GatherMap] = {
compiledCondition: CompiledExpression): Array[GatherMap] = {
withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable =>
withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable =>
if (useLeftOuterJoin) {
Expand All @@ -637,8 +650,8 @@ class HashFullJoinIterator(
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
withResource(new NvtxWithMetrics("full hash join gather map",
NvtxColor.ORANGE, joinTime)) { _ =>
val maps = compiledConditionRes.map { compiledCondition =>
conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, compiledCondition)
val maps = compiledCondition.map { condition =>
conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, condition)
}.getOrElse {
unconditionalLeftJoinGatherMaps(leftKeys, rightKeys)
}
Expand Down Expand Up @@ -674,55 +687,25 @@ class HashFullJoinIterator(
}
}

// Need to avoid close on exhaust so others can access the built side tracker after iteration.
override protected val shouldAutoCloseOnExhaust: Boolean = false

/**
* Retrieve the tracking data for the build side rows that have been referenced during the
* join. This is normally called after iteration has completed. The caller takes ownership
* of the resulting data and is responsible for closing it.
*/
def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = {
val result = builtSideTracker
builtSideTracker = None
result
}

override def close(): Unit = {
if (!closed) {
super.close()
compiledConditionRes.foreach(_.close())
builtSideTracker.foreach(_.close())
}
}

override def getFinalBatch(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics("get final batch",
NvtxColor.ORANGE, joinTime)) { _ =>
builtSideTracker match {
case None => None
case Some(tracker) => {
val filteredBatch = withResource(tracker) { scb =>
withResource(scb.getColumnarBatch()) { trackerBatch =>
withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
val batch = built.getBatch
withResource(GpuColumnVector.from(batch)) { builtTable =>
withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
}
}
}
}
}
// Combine build-side columns with null columns for stream side
val ret = withResource(filteredBatch) { builtBatch =>
val numFilterRows = builtBatch.numRows()
if (numFilterRows > 0) {
val streamColumns = streamAttributes.safeMap { attr =>
GpuColumnVector.fromNull(numFilterRows, attr.dataType)
}
withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
buildSide match {
case GpuBuildRight =>
Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
case GpuBuildLeft =>
Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
}
}
} else {
None
}
}
builtSideTracker = None
ret
}
}
builtSideTracker = None
}
}

Expand Down Expand Up @@ -768,7 +751,7 @@ class HashFullJoinIterator(
}
}
}.getOrElse {
trueColumnTable(numBuiltRows)
trueColumnTable(built.numRows)
}
withResource(builtTrackingTable) { trackingTable =>
withResource(Scalar.fromBool(false)) { falseScalar =>
Expand All @@ -787,6 +770,125 @@ class HashFullJoinIterator(
}
}

/**
* An iterator that does a hash full join against a stream of batches. It does this by
* doing a left or right outer join and keeping track of the hits on the build side. It then
* produces a final batch of all the build side rows that were not already included.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
* closed by the iterator.
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param boundCondition expression for the inequality condition of the join, if any
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinIterator(
built: LazySpillableColumnarBatch,
boundBuiltKeys: Seq[Expression],
buildSideTrackerInit: Option[SpillableColumnarBatch],
stream: Iterator[LazySpillableColumnarBatch],
boundStreamKeys: Seq[Expression],
streamAttributes: Seq[Attribute],
boundCondition: Option[GpuExpression],
numFirstConditionTableColumns: Int,
targetSize: Long,
buildSide: GpuBuildSide,
compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
opTime: GpuMetric,
joinTime: GpuMetric) extends Iterator[ColumnarBatch] with TaskAutoCloseableResource {

private val compiledCondition: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
}

private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys,
buildSideTrackerInit, stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize,
buildSide, compareNullsEqual, opTime, joinTime)

private var finalBatch: Option[ColumnarBatch] = None

override def hasNext: Boolean = {
if (streamJoinIter.hasNext || finalBatch.isDefined) {
true
} else {
finalBatch = getFinalBatch()
// Now that we've manifested the final batch, we can close the stream iterator early to free
// GPU resources.
streamJoinIter.close()
finalBatch.isDefined
}
}

override def next(): ColumnarBatch = {
if (!hasNext) {
throw new NoSuchElementException("batches exhausted")
}
if (streamJoinIter.hasNext) {
streamJoinIter.next()
} else {
val batch = finalBatch.get
finalBatch = None
batch
}
}

override def close(): Unit = {
if (!closed) {
super.close()
streamJoinIter.close()
finalBatch.foreach(_.close())
finalBatch = None
}
}

private def getFinalBatch(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics("get final batch", NvtxColor.ORANGE, joinTime)) { _ =>
streamJoinIter.releaseBuiltSideTracker() match {
case None => None
case Some(tracker) =>
val filteredBatch = withResource(tracker) { scb =>
withResource(scb.getColumnarBatch()) { trackerBatch =>
withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
val batch = built.getBatch
withResource(GpuColumnVector.from(batch)) { builtTable =>
withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
}
}
}
}
}
// Combine build-side columns with null columns for stream side
withResource(filteredBatch) { builtBatch =>
val numFilterRows = builtBatch.numRows()
if (numFilterRows > 0) {
val streamColumns = streamAttributes.safeMap { attr =>
GpuColumnVector.fromNull(numFilterRows, attr.dataType)
}
withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
buildSide match {
case GpuBuildRight =>
Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
case GpuBuildLeft =>
Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
}
}
} else {
None
}
}
}
}
}
}

class HashedExistenceJoinIterator(
spillableBuiltBatch: LazySpillableColumnarBatch,
boundBuildKeys: Seq[GpuExpression],
Expand Down Expand Up @@ -1019,7 +1121,7 @@ trait GpuHashJoin extends GpuExec {
opTime,
joinTime)
case FullOuter =>
new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream,
new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, None, lazyStream,
boundStreamKeys, streamedPlan.output, boundCondition, numFirstConditionTableColumns,
targetSize, buildSide, compareNullsEqual, opTime, joinTime)
case _ =>
Expand Down