Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor full join iterator to allow access to build tracker #10246

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add comments and close stream iterator earlier
  • Loading branch information
jlowe committed Jan 23, 2024
commit 8c9a93c32f08b10c48e59bf7048ebe2399942b54
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,28 @@ class ConditionalHashJoinIterator(
}


/**
* An iterator that does the stream-side only of a hash full join In other words, it performs the
* left or right outer join for the stream side's view of a full outer join. As the join is
* performed, the build-side rows that are referenced during the join are tracked and can be
* retrieved after the iteration has completed to assist in performing the anti-join needed to
* produce the final results needed for the full outer join.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
* closed by the iterator.
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param compiledCondition compiled AST expression for the inequality condition of the join,
* if any. NOTE: This will *not* be closed by the iterator.
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinStreamSideIterator(
built: LazySpillableColumnarBatch,
boundBuiltKeys: Seq[Expression],
Expand Down Expand Up @@ -668,6 +690,11 @@ class HashFullJoinStreamSideIterator(
// Need to avoid close on exhaust so others can access the built side tracker after iteration.
override protected val shouldAutoCloseOnExhaust: Boolean = false

/**
* Retrieve the tracking data for the build side rows that have been referenced during the
* join. This is normally called after iteration has completed. The caller takes ownership
* of the resulting data and is responsible for closing it.
*/
def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = {
val result = builtSideTracker
builtSideTracker = None
Expand Down Expand Up @@ -747,6 +774,18 @@ class HashFullJoinStreamSideIterator(
* An iterator that does a hash full join against a stream of batches. It does this by
* doing a left or right outer join and keeping track of the hits on the build side. It then
* produces a final batch of all the build side rows that were not already included.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param boundCondition expression for the inequality condition of the join, if any
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinIterator(
built: LazySpillableColumnarBatch,
Expand Down Expand Up @@ -777,6 +816,9 @@ class HashFullJoinIterator(
true
} else {
finalBatch = getFinalBatch()
// Now that we've manifested the final batch, we can close the stream iterator early to free
// GPU resources.
streamJoinIter.close()
finalBatch.isDefined
}
}
Expand All @@ -797,6 +839,7 @@ class HashFullJoinIterator(
override def close(): Unit = {
if (!closed) {
super.close()
streamJoinIter.close()
finalBatch.foreach(_.close())
finalBatch = None
}
Expand Down