Skip to content

Commit

Permalink
xfail and modify allow_non_gpu based on parquet reader type
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Jan 17, 2025
1 parent a0455ce commit c34d4e1
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ case class GpuDeltaParquetFileFormat(

object GpuDeltaParquetFileFormat {
def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
if (!meta.conf.isParquetPerFileReadEnabled) {
meta.willNotWorkOnGpu("Deletion vectors only supported for PERFILE reader")
}
}

/** Utility method to create a new writable vector */
Expand Down
11 changes: 9 additions & 2 deletions integration_tests/src/main/python/delta_lake_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
from marks import allow_non_gpu, delta_lake, ignore_order
from parquet_test import reader_opt_confs_no_native
from spark_session import with_cpu_session, with_gpu_session, is_databricks_runtime, \
is_spark_320_or_later, is_spark_340_or_later, supports_delta_lake_deletion_vectors
is_spark_320_or_later, is_spark_340_or_later, supports_delta_lake_deletion_vectors, is_databricks143_or_later

_conf = {'spark.rapids.sql.explain': 'ALL'}

@delta_lake
@allow_non_gpu('FileSourceScanExec')
@pytest.mark.skipif(not (is_databricks_runtime() or is_spark_320_or_later()), \
reason="Delta Lake is already configured on Databricks and CI supports Delta Lake OSS with Spark 3.2.x so far")
@pytest.mark.xfail(condition=is_databricks143_or_later(), reason='We support deletion vectors read on Databricks 14.3')
def test_delta_metadata_query_fallback(spark_tmp_table_factory):
table = spark_tmp_table_factory.get()
def setup_delta_table(spark):
Expand Down Expand Up @@ -74,6 +75,7 @@ def merge(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.skipif(not supports_delta_lake_deletion_vectors(),
reason="Delta Lake deletion vector support is required")
@pytest.mark.xfail(condition=is_databricks143_or_later(), reason='We support deletion vectors read on Databricks 14.3')
def test_delta_deletion_vector_read_fallback(spark_tmp_path, use_cdf):
data_path = spark_tmp_path + "/DELTA_DATA"
conf = {"spark.databricks.delta.delete.deletionVectors.persistent": "true"}
Expand Down Expand Up @@ -101,6 +103,11 @@ def setup_tables(spark):
@pytest.mark.parametrize("reader_confs", reader_opt_confs_no_native, ids=idfn)
@pytest.mark.parametrize("mapping", column_mappings, ids=idfn)
def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
non_gpu_conf = {}
if (reader_confs["spark.rapids.sql.format.parquet.reader.type"] != "PERFILE" and is_databricks143_or_later()):
# We only support deletion vector reads on Databricks 14.3
non_gpu_conf = {"spark.rapids.sql.test.allowedNonGpu": "FileSourceScanExec,ColumnarToRowExec,FilterExec"}

data_path = spark_tmp_path + "/DELTA_DATA"
gen_list = [("a", int_gen),
("b", SetValuesGen(StringType(), ["x", "y", "z"])),
Expand All @@ -112,7 +119,7 @@ def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
"spark.databricks.delta.properties.defaults.minReaderVersion": "2",
"spark.databricks.delta.properties.defaults.minWriterVersion": "5",
"spark.sql.parquet.fieldId.read.enabled": "true"
})
}, non_gpu_conf)
with_cpu_session(
lambda spark: gen_df(spark, gen_list).coalesce(1).write.format("delta") \
.partitionBy("b", "d") \
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def with_gpu_session(func, conf={}):
copy['spark.rapids.sql.test.enabled'] = 'false'
else:
copy['spark.rapids.sql.test.enabled'] = 'true'
copy['spark.rapids.sql.test.allowedNonGpu'] = ','.join(get_non_gpu_allowed())
copy['spark.rapids.sql.test.allowedNonGpu'] = copy.get('spark.rapids.sql.test.allowedNonGpu', '') + ','.join(get_non_gpu_allowed())

copy['spark.rapids.sql.test.validateExecsInGpuPlan'] = ','.join(get_validate_execs_in_gpu_plan())
return with_spark_session(func, conf=copy)
Expand Down Expand Up @@ -275,6 +275,9 @@ def is_databricks122_or_later():
def is_databricks133_or_later():
return is_databricks_version_or_later(13, 3)

def is_databricks143_or_later():
return is_databricks_version_or_later(14, 3)

def supports_delta_lake_deletion_vectors():
if is_databricks_runtime():
return is_databricks122_or_later()
Expand Down

0 comments on commit c34d4e1

Please sign in to comment.