Skip to content

Commit

Permalink
Remove Alluxio support (#11920)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Jan 8, 2025
1 parent a62df8c commit 421b26f
Show file tree
Hide file tree
Showing 39 changed files with 94 additions and 2,565 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,8 +63,7 @@ abstract class GpuDeltaParquetFileFormatBase extends GpuReadParquetFileFormat {
pushedFilters,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile,
fileScan.alluxioPathsMap)
fileScan.queryUsesInputFile)
}

override def buildReaderWithPartitionValuesAndMetrics(
Expand All @@ -75,8 +74,7 @@ abstract class GpuDeltaParquetFileFormatBase extends GpuReadParquetFileFormat {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
metrics: Map[String, GpuMetric])
: PartitionedFile => Iterator[InternalRow] = {
super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
Expand All @@ -86,8 +84,7 @@ abstract class GpuDeltaParquetFileFormatBase extends GpuReadParquetFileFormat {
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
metrics)
}

override def supportFieldName(name: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,8 +63,7 @@ trait GpuDeltaParquetFileFormat extends GpuReadParquetFileFormat {
pushedFilters,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile,
fileScan.alluxioPathsMap)
fileScan.queryUsesInputFile)
}

override def buildReaderWithPartitionValuesAndMetrics(
Expand All @@ -75,8 +74,7 @@ trait GpuDeltaParquetFileFormat extends GpuReadParquetFileFormat {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
metrics: Map[String, GpuMetric])
: PartitionedFile => Iterator[InternalRow] = {
super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
Expand All @@ -86,8 +84,7 @@ trait GpuDeltaParquetFileFormat extends GpuReadParquetFileFormat {
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
metrics)
}

override def supportFieldName(name: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,11 +68,8 @@ case class GpuDelta24xParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
metrics: Map[String, GpuMetric])
: PartitionedFile => Iterator[InternalRow] = {


val dataReader = super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
dataSchema,
Expand All @@ -81,8 +78,7 @@ case class GpuDelta24xParquetFileFormat(
if (disablePushDown) Seq.empty else filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
metrics)

val delVecs = broadcastDvMap
val maxDelVecScatterBatchSize = RapidsConf
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,8 +65,7 @@ case class GpuDeltaParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
metrics: Map[String, GpuMetric])
: PartitionedFile => Iterator[InternalRow] = {

val dataReader = super.buildReaderWithPartitionValuesAndMetrics(
Expand All @@ -77,8 +76,7 @@ case class GpuDeltaParquetFileFormat(
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
metrics)

val delVecs = broadcastDvMap
val maxDelVecScatterBatchSize = RapidsConf
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,8 +65,7 @@ case class GpuDeltaParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
metrics: Map[String, GpuMetric])
: PartitionedFile => Iterator[InternalRow] = {

val dataReader = super.buildReaderWithPartitionValuesAndMetrics(
Expand All @@ -77,8 +76,7 @@ case class GpuDeltaParquetFileFormat(
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
metrics)

val delVecs = broadcastDvMap
val maxDelVecScatterBatchSize = RapidsConf
Expand Down
12 changes: 1 addition & 11 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@ For commonly used configurations and examples of setting options, please refer t

Name | Description | Default Value | Applicable at
-----|-------------|--------------|--------------
<a name="alluxio.automount.enabled"></a>spark.rapids.alluxio.automount.enabled|Enable the feature of auto mounting the cloud storage to Alluxio. It requires the Alluxio master is the same node of Spark driver node. The Alluxio master's host and port will be read from alluxio.master.hostname and alluxio.master.rpc.port(default: 19998) from ALLUXIO_HOME/conf/alluxio-site.properties, then replace a cloud path which matches spark.rapids.alluxio.bucket.regex like "s3://bar/b.csv" to "alluxio://0.1.2.3:19998/bar/b.csv", and the bucket "s3://bar" will be mounted to "/bar" in Alluxio automatically.|false|Runtime
<a name="alluxio.bucket.regex"></a>spark.rapids.alluxio.bucket.regex|A regex to decide which bucket should be auto-mounted to Alluxio. E.g. when setting as "^s3://bucket.*", the bucket which starts with "s3://bucket" will be mounted to Alluxio and the path "s3://bucket-foo/a.csv" will be replaced to "alluxio://0.1.2.3:19998/bucket-foo/a.csv". It's only valid when setting spark.rapids.alluxio.automount.enabled=true. The default value matches all the buckets in "s3://" or "s3a://" scheme.|^s3a{0,1}://.*|Runtime
<a name="alluxio.home"></a>spark.rapids.alluxio.home|The Alluxio installation home path or link to the installation home path. |/opt/alluxio|Startup
<a name="alluxio.large.file.threshold"></a>spark.rapids.alluxio.large.file.threshold|The threshold is used to identify whether average size of files is large when reading from S3. If reading large files from S3 and the disks used by Alluxio are slow, directly reading from S3 is better than reading caches from Alluxio, because S3 network bandwidth is faster than local disk. This improvement takes effect when spark.rapids.alluxio.slow.disk is enabled.|67108864|Runtime
<a name="alluxio.master"></a>spark.rapids.alluxio.master|The Alluxio master hostname. If not set, read Alluxio master URL from spark.rapids.alluxio.home locally. This config is useful when Alluxio master and Spark driver are not co-located.||Startup
<a name="alluxio.master.port"></a>spark.rapids.alluxio.master.port|The Alluxio master port. If not set, read Alluxio master port from spark.rapids.alluxio.home locally. This config is useful when Alluxio master and Spark driver are not co-located.|19998|Startup
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding Alluxio scheme. E.g. when configure is set to "s3://foo->alluxio://0.1.2.3:19998/foo,gs://bar->alluxio://0.1.2.3:19998/bar", it means: "s3://foo/a.csv" will be replaced to "alluxio://0.1.2.3:19998/foo/a.csv" and "gs://bar/b.csv" will be replaced to "alluxio://0.1.2.3:19998/bar/b.csv". To use this config, you have to mount the buckets to Alluxio by yourself. If you set this config, spark.rapids.alluxio.automount.enabled won't be valid.|None|Startup
<a name="alluxio.replacement.algo"></a>spark.rapids.alluxio.replacement.algo|The algorithm used when replacing the UFS path with the Alluxio path. CONVERT_TIME and TASK_TIME are the valid options. CONVERT_TIME indicates that we do it when we convert it to a GPU file read, this has extra overhead of creating an entirely new file index, which requires listing the files and getting all new file info from Alluxio. TASK_TIME replaces the path as late as possible inside of the task. By waiting and replacing it at task time, it just replaces the path without fetching the file information again, this is faster but doesn't update locality information if that has a bit impact on performance.|TASK_TIME|Runtime
<a name="alluxio.slow.disk"></a>spark.rapids.alluxio.slow.disk|Indicates whether the disks used by Alluxio are slow. If it's true and reading S3 large files, Rapids Accelerator reads from S3 directly instead of reading from Alluxio caches. Refer to spark.rapids.alluxio.large.file.threshold which defines a threshold that identifying whether files are large. Typically, it's slow disks if speed is less than 300M/second. If using convert time spark.rapids.alluxio.replacement.algo, this may not apply to all file types like Delta files|true|Runtime
<a name="alluxio.user"></a>spark.rapids.alluxio.user|Alluxio user is set on the Alluxio client, which is used to mount or get information. By default it should be the user that running the Alluxio processes. The default value is ubuntu.|ubuntu|Runtime
<a name="filecache.allowPathRegexp"></a>spark.rapids.filecache.allowPathRegexp|A regular expression to decide which paths will be cached when the file cache is enabled. If this is not set, then all paths are allowed to cache. If a path is allowed by this regexp but blocked by spark.rapids.filecache.blockPathRegexp, then the path is blocked to cache.|None|Startup
<a name="filecache.blockPathRegexp"></a>spark.rapids.filecache.blockPathRegexp|A regular expression to decide which paths will not be cached when the file cache is enabled. If a path is blocked by this regexp but is allowed by spark.rapids.filecache.allowPathRegexp, then the path is blocked.|None|Startup
<a name="filecache.checkStale"></a>spark.rapids.filecache.checkStale|Controls whether the cached is checked for being out of date with respect to the input file. When enabled, the data that has been cached locally for a file will be invalidated if the file is updated after being cached. This feature is only necessary if an input file for a Spark application can be changed during the lifetime of the application. If an individual input file will not be overwritten during the Spark application then performance may be improved by setting this to false.|true|Startup
Expand Down Expand Up @@ -69,7 +59,7 @@ Name | Description | Default Value | Applicable at
<a name="sql.castFloatToString.enabled"></a>spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|true|Runtime
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|true|Runtime
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false|Runtime
<a name="sql.coalescing.reader.numFilterParallel"></a>spark.rapids.sql.coalescing.reader.numFilterParallel|This controls the number of files the coalescing reader will run in each thread when it filters blocks for reading. If this value is greater than zero the files will be filtered in a multithreaded manner where each thread filters the number of files set by this config. If this is set to zero the files are filtered serially. This uses the same thread pool as the multithreaded reader, see spark.rapids.sql.multiThreadedRead.numThreads. Note that filtering multithreaded is useful with Alluxio.|0|Runtime
<a name="sql.coalescing.reader.numFilterParallel"></a>spark.rapids.sql.coalescing.reader.numFilterParallel|This controls the number of files the coalescing reader will run in each thread when it filters blocks for reading. If this value is greater than zero the files will be filtered in a multithreaded manner where each thread filters the number of files set by this config. If this is set to zero the files are filtered serially. This uses the same thread pool as the multithreaded reader, see spark.rapids.sql.multiThreadedRead.numThreads.|0|Runtime
<a name="sql.concurrentWriterPartitionFlushSize"></a>spark.rapids.sql.concurrentWriterPartitionFlushSize|The flush size of the concurrent writer cache in bytes for each partition. If specified spark.sql.maxConcurrentOutputFileWriters, use concurrent writer to write data. Concurrent writer first caches data for each partition and begins to flush the data if it finds one partition with a size that is greater than or equal to this config. The default value is 0, which will try to select a size based off of file type specific configs. E.g.: It uses `write.parquet.row-group-size-bytes` config for Parquet type and `orc.stripe.size` config for Orc type. If the value is greater than 0, will use this positive value.Max value may get better performance but not always, because concurrent writer uses spillable cache and big value may cause more IO swaps.|0|Runtime
<a name="sql.csv.read.decimal.enabled"></a>spark.rapids.sql.csv.read.decimal.enabled|CSV reading is not 100% compatible when reading decimals.|false|Runtime
<a name="sql.csv.read.double.enabled"></a>spark.rapids.sql.csv.read.double.enabled|CSV reading is not 100% compatible when reading doubles.|true|Runtime
Expand Down
10 changes: 1 addition & 9 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2024, NVIDIA CORPORATION.
Copyright (c) 2020-2025, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -831,7 +831,6 @@
<spark-rapids-jni.version>25.02.0-SNAPSHOT</spark-rapids-jni.version>
<spark-rapids-private.version>25.02.0-SNAPSHOT</spark-rapids-private.version>
<scala.binary.version>2.12</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
<scala.version>2.12.15</scala.version>
<!--
Expand Down Expand Up @@ -1078,13 +1077,6 @@
<artifactId>scallop_${scala.binary.version}</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<!-- Used for Alluxio mounting -->
<groupId>org.alluxio</groupId>
<artifactId>alluxio-shaded-client</artifactId>
<version>${alluxio.client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<!-- For shade, spark 3.2 and earlier uses an older version(0.9.0), which doesn't
contain the required api we use.
Expand Down
10 changes: 1 addition & 9 deletions scala2.13/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2024, NVIDIA CORPORATION.
Copyright (c) 2020-2025, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -831,7 +831,6 @@
<spark-rapids-jni.version>25.02.0-SNAPSHOT</spark-rapids-jni.version>
<spark-rapids-private.version>25.02.0-SNAPSHOT</spark-rapids-private.version>
<scala.binary.version>2.13</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
<scala.version>2.13.14</scala.version>
<!--
Expand Down Expand Up @@ -1078,13 +1077,6 @@
<artifactId>scallop_${scala.binary.version}</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<!-- Used for Alluxio mounting -->
<groupId>org.alluxio</groupId>
<artifactId>alluxio-shaded-client</artifactId>
<version>${alluxio.client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<!-- For shade, spark 3.2 and earlier uses an older version(0.9.0), which doesn't
contain the required api we use.
Expand Down
7 changes: 1 addition & 6 deletions scala2.13/sql-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2024, NVIDIA CORPORATION.
Copyright (c) 2020-2025, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,11 +92,6 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<!-- Used for Alluxio mounting -->
<groupId>org.alluxio</groupId>
<artifactId>alluxio-shaded-client</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
7 changes: 1 addition & 6 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2024, NVIDIA CORPORATION.
Copyright (c) 2020-2025, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,11 +92,6 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<!-- Used for Alluxio mounting -->
<groupId>org.alluxio</groupId>
<artifactId>alluxio-shaded-client</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -358,8 +358,6 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
false, // ignoreMissingFiles
false, // ignoreCorruptFiles
false, // useFieldId
scala.collection.immutable.Map$.MODULE$.empty(), // alluxioPathReplacementMap
false, // alluxioReplacementTaskTime
queryUsesInputFile,
true, // keepReadsInOrder
new CombineConf(
Expand Down
Loading

0 comments on commit 421b26f

Please sign in to comment.