diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala
index 37d636bf82e..91e5235a22c 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -79,7 +79,8 @@ abstract class AbstractGpuJoinIterator(
    */
   protected def setupNextGatherer(): Option[JoinGatherer]
 
-  protected def getFinalBatch(): Option[ColumnarBatch] = None
+  /** Whether to automatically call close() on this iterator when it is exhausted. */
+  protected val shouldAutoCloseOnExhaust: Boolean = true
 
   override def hasNext: Boolean = {
     if (closed) {
@@ -107,12 +108,9 @@ abstract class AbstractGpuJoinIterator(
         }
       }
     }
-    if (nextCb.isEmpty) {
-      nextCb = getFinalBatch()
-      if (nextCb.isEmpty) {
-        // Nothing is left to return so close ASAP.
-        opTime.ns(close())
-      }
+    if (nextCb.isEmpty && shouldAutoCloseOnExhaust) {
+      // Nothing is left to return so close ASAP.
+      opTime.ns(close())
     }
     nextCb.isDefined
   }
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala
index cbaa1cbe47c..8bb15dc06fd 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
+ * Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -557,19 +557,37 @@ class ConditionalHashJoinIterator(
   }
 }
 
+
 /**
- * An iterator that does a hash full join against a stream of batches.  It does this by
- * doing a left or right outer join and keeping track of the hits on the build side.  It then
- * produces a final batch of all the build side rows that were not already included.
+ * An iterator that does the stream-side only of a hash full join  In other words, it performs the
+ * left or right outer join for the stream side's view of a full outer join. As the join is
+ * performed, the build-side rows that are referenced during the join are tracked and can be
+ * retrieved after the iteration has completed to assist in performing the anti-join needed to
+ * produce the final results needed for the full outer join.
+ *
+ * @param built spillable form of the build side table. This will be closed by the iterator.
+ * @param boundBuiltKeys bound expressions for the build side equi-join keys
+ * @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
+ *                             closed by the iterator.
+ * @param stream iterator to produce batches for the stream side table
+ * @param boundStreamKeys bound expressions for the stream side equi-join keys
+ * @param streamAttributes schema of the stream side table
+ * @param compiledCondition compiled AST expression for the inequality condition of the join,
+ *                          if any. NOTE: This will *not* be closed by the iterator.
+ * @param targetSize target GPU batch size in bytes
+ * @param buildSide which side of the join is being used for the build side
+ * @param compareNullsEqual whether to compare nulls as equal during the join
+ * @param opTime metric to update for total operation time
+ * @param joinTime metric to update for join time
  */
-class HashFullJoinIterator(
+class HashFullJoinStreamSideIterator(
     built: LazySpillableColumnarBatch,
     boundBuiltKeys: Seq[Expression],
+    buildSideTrackerInit: Option[SpillableColumnarBatch],
     stream: Iterator[LazySpillableColumnarBatch],
     boundStreamKeys: Seq[Expression],
     streamAttributes: Seq[Attribute],
-    boundCondition: Option[GpuExpression],
-    numFirstConditionTableColumns: Int,
+    compiledCondition: Option[CompiledExpression],
     targetSize: Long,
     buildSide: GpuBuildSide,
     compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
@@ -588,18 +606,13 @@ class HashFullJoinIterator(
       joinTime = joinTime) {
   // Full Join is implemented via LeftOuter or RightOuter join, depending on the build side.
   private val useLeftOuterJoin = (buildSide == GpuBuildRight)
-  private val numBuiltRows = built.numRows
-
-  private[this] var builtSideTracker : Option[SpillableColumnarBatch] = None
 
   private val nullEquality = if (compareNullsEqual) NullEquality.EQUAL else NullEquality.UNEQUAL
 
-  private val compiledConditionRes: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
-    use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
-  }
+  private[this] var builtSideTracker: Option[SpillableColumnarBatch] = buildSideTrackerInit
 
   private def unconditionalLeftJoinGatherMaps(
-      leftKeys: Table, rightKeys: Table) : Array[GatherMap] = {
+      leftKeys: Table, rightKeys: Table): Array[GatherMap] = {
     if (useLeftOuterJoin) {
       leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
     } else {
@@ -614,7 +627,7 @@ class HashFullJoinIterator(
       leftData: LazySpillableColumnarBatch,
       rightKeys: Table,
       rightData: LazySpillableColumnarBatch,
-      compiledCondition: CompiledExpression) : Array[GatherMap] = {
+      compiledCondition: CompiledExpression): Array[GatherMap] = {
     withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable =>
       withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable =>
         if (useLeftOuterJoin) {
@@ -637,8 +650,8 @@ class HashFullJoinIterator(
       rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
     withResource(new NvtxWithMetrics("full hash join gather map",
       NvtxColor.ORANGE, joinTime)) { _ =>
-      val maps = compiledConditionRes.map { compiledCondition =>
-        conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, compiledCondition)
+      val maps = compiledCondition.map { condition =>
+        conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, condition)
       }.getOrElse {
         unconditionalLeftJoinGatherMaps(leftKeys, rightKeys)
       }
@@ -674,55 +687,25 @@ class HashFullJoinIterator(
     }
   }
 
+  // Need to avoid close on exhaust so others can access the built side tracker after iteration.
+  override protected val shouldAutoCloseOnExhaust: Boolean = false
+
+  /**
+   *  Retrieve the tracking data for the build side rows that have been referenced during the
+   *  join. This is normally called after iteration has completed. The caller takes ownership
+   *  of the resulting data and is responsible for closing it.
+   */
+  def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = {
+    val result = builtSideTracker
+    builtSideTracker = None
+    result
+  }
+
   override def close(): Unit = {
     if (!closed) {
       super.close()
-      compiledConditionRes.foreach(_.close())
       builtSideTracker.foreach(_.close())
-    }
-  }
-
-  override def getFinalBatch(): Option[ColumnarBatch] = {
-    withResource(new NvtxWithMetrics("get final batch",
-      NvtxColor.ORANGE, joinTime)) { _ =>
-      builtSideTracker match {
-        case None => None
-        case Some(tracker) => {
-          val filteredBatch = withResource(tracker) { scb =>
-            withResource(scb.getColumnarBatch()) { trackerBatch =>
-              withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
-                val batch = built.getBatch
-                withResource(GpuColumnVector.from(batch)) { builtTable =>
-                  withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
-                    GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
-                  }
-                }
-              }
-            }
-          }
-          // Combine build-side columns with null columns for stream side
-          val ret = withResource(filteredBatch) { builtBatch =>
-            val numFilterRows = builtBatch.numRows()
-            if (numFilterRows > 0) {
-              val streamColumns = streamAttributes.safeMap { attr =>
-                GpuColumnVector.fromNull(numFilterRows, attr.dataType)
-              }
-              withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
-                buildSide match {
-                  case GpuBuildRight =>
-                    Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
-                  case GpuBuildLeft =>
-                    Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
-                }
-              }
-            } else {
-              None
-            }
-          }
-          builtSideTracker = None
-          ret
-        }
-      }
+      builtSideTracker = None
     }
   }
 
@@ -768,7 +751,7 @@ class HashFullJoinIterator(
           }
         }
       }.getOrElse {
-        trueColumnTable(numBuiltRows)
+        trueColumnTable(built.numRows)
       }
       withResource(builtTrackingTable) { trackingTable =>
         withResource(Scalar.fromBool(false)) { falseScalar =>
@@ -787,6 +770,125 @@ class HashFullJoinIterator(
   }
 }
 
+/**
+ * An iterator that does a hash full join against a stream of batches.  It does this by
+ * doing a left or right outer join and keeping track of the hits on the build side.  It then
+ * produces a final batch of all the build side rows that were not already included.
+ *
+ * @param built spillable form of the build side table. This will be closed by the iterator.
+ * @param boundBuiltKeys bound expressions for the build side equi-join keys
+ * @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
+ *                             closed by the iterator.
+ * @param stream iterator to produce batches for the stream side table
+ * @param boundStreamKeys bound expressions for the stream side equi-join keys
+ * @param streamAttributes schema of the stream side table
+ * @param boundCondition expression for the inequality condition of the join, if any
+ * @param targetSize target GPU batch size in bytes
+ * @param buildSide which side of the join is being used for the build side
+ * @param compareNullsEqual whether to compare nulls as equal during the join
+ * @param opTime metric to update for total operation time
+ * @param joinTime metric to update for join time
+ */
+class HashFullJoinIterator(
+    built: LazySpillableColumnarBatch,
+    boundBuiltKeys: Seq[Expression],
+    buildSideTrackerInit: Option[SpillableColumnarBatch],
+    stream: Iterator[LazySpillableColumnarBatch],
+    boundStreamKeys: Seq[Expression],
+    streamAttributes: Seq[Attribute],
+    boundCondition: Option[GpuExpression],
+    numFirstConditionTableColumns: Int,
+    targetSize: Long,
+    buildSide: GpuBuildSide,
+    compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
+    opTime: GpuMetric,
+    joinTime: GpuMetric) extends Iterator[ColumnarBatch] with TaskAutoCloseableResource {
+
+  private val compiledCondition: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
+    use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
+  }
+
+  private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys,
+    buildSideTrackerInit, stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize,
+    buildSide, compareNullsEqual, opTime, joinTime)
+
+  private var finalBatch: Option[ColumnarBatch] = None
+
+  override def hasNext: Boolean = {
+    if (streamJoinIter.hasNext || finalBatch.isDefined) {
+      true
+    } else {
+      finalBatch = getFinalBatch()
+      // Now that we've manifested the final batch, we can close the stream iterator early to free
+      // GPU resources.
+      streamJoinIter.close()
+      finalBatch.isDefined
+    }
+  }
+
+  override def next(): ColumnarBatch = {
+    if (!hasNext) {
+      throw new NoSuchElementException("batches exhausted")
+    }
+    if (streamJoinIter.hasNext) {
+      streamJoinIter.next()
+    } else {
+      val batch = finalBatch.get
+      finalBatch = None
+      batch
+    }
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      super.close()
+      streamJoinIter.close()
+      finalBatch.foreach(_.close())
+      finalBatch = None
+    }
+  }
+
+  private def getFinalBatch(): Option[ColumnarBatch] = {
+    withResource(new NvtxWithMetrics("get final batch", NvtxColor.ORANGE, joinTime)) { _ =>
+      streamJoinIter.releaseBuiltSideTracker() match {
+        case None => None
+        case Some(tracker) =>
+          val filteredBatch = withResource(tracker) { scb =>
+            withResource(scb.getColumnarBatch()) { trackerBatch =>
+              withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
+                val batch = built.getBatch
+                withResource(GpuColumnVector.from(batch)) { builtTable =>
+                  withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
+                    GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
+                  }
+                }
+              }
+            }
+          }
+          // Combine build-side columns with null columns for stream side
+          withResource(filteredBatch) { builtBatch =>
+            val numFilterRows = builtBatch.numRows()
+            if (numFilterRows > 0) {
+              val streamColumns = streamAttributes.safeMap { attr =>
+                GpuColumnVector.fromNull(numFilterRows, attr.dataType)
+              }
+              withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
+                buildSide match {
+                  case GpuBuildRight =>
+                    Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
+                  case GpuBuildLeft =>
+                    Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
+                }
+              }
+            } else {
+              None
+            }
+          }
+      }
+    }
+  }
+}
+
 class HashedExistenceJoinIterator(
   spillableBuiltBatch: LazySpillableColumnarBatch,
   boundBuildKeys: Seq[GpuExpression],
@@ -1019,7 +1121,7 @@ trait GpuHashJoin extends GpuExec {
           opTime,
           joinTime)
       case FullOuter =>
-        new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream,
+        new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, None, lazyStream,
           boundStreamKeys, streamedPlan.output, boundCondition, numFirstConditionTableColumns,
           targetSize, buildSide, compareNullsEqual, opTime, joinTime)
       case _ =>