Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Failing Build for Databricks 11.3 on branch 23.12 [databricks] #10435

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ def is_spark_330_or_later():
def is_spark_340_or_later():
return spark_version() >= "3.4.0"

def is_spark_341():
return spark_version() == "3.4.1"

def is_spark_350_or_later():
return spark_version() >= "3.5.0"

Expand Down
17 changes: 10 additions & 7 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from conftest import is_at_least_precommit_run, is_not_utc
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_340_or_later

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version

Expand Down Expand Up @@ -43,6 +43,12 @@
import pyarrow
from typing import Iterator, Tuple


if is_databricks_runtime() and is_spark_340_or_later():
# Databricks 13.3 does not use separate reader/writer threads for Python UDFs
# which can lead to hangs. Skipping these tests until the Python UDF handling is updated.
pytestmark = pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9493")

arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
Expand Down Expand Up @@ -176,10 +182,7 @@ def group_size_udf(to_process: pd.Series) -> int:

low_upper_win = Window.partitionBy('a').orderBy('b').rowsBetween(-3, 3)

running_win_param = pytest.param(pre_cur_win, marks=pytest.mark.xfail(
condition=is_databricks_runtime() and is_spark_341(),
reason='DB13.3 wrongly uses RunningWindowFunctionExec to evaluate a PythonUDAF and it will fail even on CPU'))
udf_windows = [no_part_win, unbounded_win, cur_follow_win, running_win_param, low_upper_win]
udf_windows = [no_part_win, unbounded_win, cur_follow_win, pre_cur_win, low_upper_win]
window_ids = ['No_Partition', 'Unbounded', 'Unbounded_Following', 'Unbounded_Preceding',
'Lower_Upper']

Expand Down Expand Up @@ -335,8 +338,8 @@ def create_df(spark, data_gen, left_length, right_length):
@ignore_order
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
def test_cogroup_apply_udf(data_gen):
def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)
def asof_join(l, r):
return pd.merge_asof(l, r, on='a', by='b')

def do_it(spark):
left, right = create_df(spark, data_gen, 500, 500)
Expand Down
10 changes: 2 additions & 8 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,13 +52,7 @@ declare -A artifacts
initialize()
{
# install rsync to be used for copying onto the databricks nodes
sudo apt install -y rsync

if [[ ! -d $HOME/apache-maven-3.6.3 ]]; then
wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P /tmp
tar xf /tmp/apache-maven-3.6.3-bin.tar.gz -C $HOME
sudo ln -s $HOME/apache-maven-3.6.3/bin/mvn /usr/local/bin/mvn
fi
sudo apt install -y maven rsync
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not revert this part, deterministic Maven version is more robust either way regardless whether the apt package for Maven is back


# Archive file location of the plugin repository
SPARKSRCTGZ=${SPARKSRCTGZ:-''}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private static StructType structFromTypes(DataType[] format) {
return new StructType(fields);
}

public static StructType structFromAttributes(List<Attribute> format) {
private static StructType structFromAttributes(List<Attribute> format) {
StructField[] fields = new StructField[format.size()];
int i = 0;
for (Attribute attribute: format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}

/**
* Utility class with methods for calculating various metrics about GPU memory usage
* prior to allocation, along with some operations with batches.
* prior to allocation.
*/
object GpuBatchUtils {

Expand Down Expand Up @@ -179,37 +175,4 @@ object GpuBatchUtils {
bytes
}
}

/**
* Concatenate the input batches into a single one.
* The caller is responsible for closing the returned batch.
*
* @param spillBatches the batches to be concatenated, will be closed after the call
* returns.
* @return the concatenated SpillableColumnarBatch or None if the input is empty.
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.spark.sql.rapids.execution
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand All @@ -40,7 +41,27 @@ object GpuSubPartitionHashJoin {
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
GpuBatchUtils.concatSpillBatchesAndClose(spillBatches)
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import ai.rapids.cudf
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuBasePythonRunner
import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -196,7 +195,7 @@ private[python] object BatchGroupUtils {
def executePython[IN](
pyInputIterator: Iterator[IN],
output: Seq[Attribute],
pyRunner: GpuBasePythonRunner[IN],
pyRunner: GpuPythonRunnerBase[IN],
outputRows: GpuMetric,
outputBatches: GpuMetric): Iterator[ColumnarBatch] = {
val context = TaskContext.get()
Expand Down Expand Up @@ -395,72 +394,38 @@ private[python] object BatchGroupedIterator {
class CombiningIterator(
inputBatchQueue: BatchQueue,
pythonOutputIter: Iterator[ColumnarBatch],
pythonArrowReader: GpuArrowOutput,
pythonArrowReader: GpuPythonArrowOutput,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric) extends Iterator[ColumnarBatch] {

// This is only for the input.
private var pendingInput: Option[SpillableColumnarBatch] = None
Option(TaskContext.get()).foreach(onTaskCompletion(_)(pendingInput.foreach(_.close())))

// The Python output should line up row for row so we only look at the Python output
// iterator and no need to check the `inputPending` who will be consumed when draining
// the Python output.
override def hasNext: Boolean = pythonOutputIter.hasNext
// For `hasNext` we are waiting on the queue to have something inserted into it
// instead of waiting for a result to be ready from Python. The reason for this
// is to let us know the target number of rows in the batch that we want when reading.
// It is a bit hacked up but it works. In the future when we support spilling we should
// store the number of rows separate from the batch. That way we can get the target batch
// size out without needing to grab the GpuSemaphore which we cannot do if we might block
// on a read operation.
override def hasNext: Boolean = inputBatchQueue.hasNext || pythonOutputIter.hasNext

override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchNumRows()
val numRows = inputBatchQueue.peekBatchSize
// Updates the expected batch size for next read
pythonArrowReader.setMinReadTargetNumRows(numRows)
pythonArrowReader.setMinReadTargetBatchSize(numRows)
// Reads next batch from Python and combines it with the input batch by the left side.
withResource(pythonOutputIter.next()) { cbFromPython =>
// Here may get a batch has a larger rows number than the current input batch.
assert(cbFromPython.numRows() >= numRows,
s"Expects >=$numRows rows but got ${cbFromPython.numRows()} from the Python worker")
withResource(concatInputBatch(cbFromPython.numRows())) { concated =>
assert(cbFromPython.numRows() == numRows)
withResource(inputBatchQueue.remove()) { origBatch =>
numOutputBatches += 1
numOutputRows += numRows
GpuColumnVector.combineColumns(concated, cbFromPython)
combine(origBatch, cbFromPython)
}
}
}

private def concatInputBatch(targetNumRows: Int): ColumnarBatch = {
withResource(mutable.ArrayBuffer[SpillableColumnarBatch]()) { buf =>
var curNumRows = pendingInput.map(_.numRows()).getOrElse(0)
pendingInput.foreach(buf.append(_))
pendingInput = None
while (curNumRows < targetNumRows) {
val scb = inputBatchQueue.remove()
if (scb != null) {
buf.append(scb)
curNumRows = curNumRows + scb.numRows()
}
}
assert(buf.nonEmpty, "The input queue is empty")

if (curNumRows > targetNumRows) {
// Need to split the last batch
val Array(first, second) = withRetryNoSplit(buf.remove(buf.size - 1)) { lastScb =>
val splitIdx = lastScb.numRows() - (curNumRows - targetNumRows)
withResource(lastScb.getColumnarBatch()) { lastCb =>
val batchTypes = GpuColumnVector.extractTypes(lastCb)
withResource(GpuColumnVector.from(lastCb)) { table =>
table.contiguousSplit(splitIdx).safeMap(
SpillableColumnarBatch(_, batchTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}
}
buf.append(first)
pendingInput = Some(second)
}

val ret = GpuBatchUtils.concatSpillBatchesAndClose(buf.toSeq)
// "ret" should be non empty because we checked the buf is not empty ahead.
withResource(ret.get) { concatedScb =>
concatedScb.getColumnarBatch()
}
} // end of withResource(mutable.ArrayBuffer)
private def combine(lBatch: ColumnarBatch, rBatch: ColumnarBatch): ColumnarBatch = {
val lColumns = GpuColumnVector.extractColumns(lBatch).map(_.incRefCount())
val rColumns = GpuColumnVector.extractColumns(rBatch).map(_.incRefCount())
new ColumnarBatch(lColumns ++ rColumns, lBatch.numRows())
}

}
Expand Down Expand Up @@ -595,4 +560,3 @@ class CoGroupedIterator(
keyOrdering.compare(leftKeyRow, rightKeyRow)
}
}

Loading
Loading