Skip to content

Commit

Permalink
Shade ORC dependencies and avoid Spark-provided ORC search applier to…
Browse files Browse the repository at this point in the history
… provide compatibility for Spark with hadoop 3.2 profile
  • Loading branch information
jlowe committed Aug 12, 2019
1 parent ae17478 commit 5a373d6
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 5 deletions.
13 changes: 13 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
RAPIDS plugin for Apache Spark
Copyright (c) 2019, NVIDIA CORPORATION

// ------------------------------------------------------------------
// NOTICE file corresponding to the section 4d of The Apache License,
// Version 2.0, in this case for
// ------------------------------------------------------------------

Apache Spark
Copyright 2014 and onwards The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
24 changes: 24 additions & 0 deletions NOTICE-binary
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
RAPIDS plugin for Apache Spark
Copyright (c) 2019, NVIDIA CORPORATION

// ------------------------------------------------------------------
// NOTICE file corresponding to the section 4d of The Apache License,
// Version 2.0, in this case for
// ------------------------------------------------------------------

Apache Spark
Copyright 2014 and onwards The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

---------------------------------------------------------------------

Apache ORC
Copyright 2013-2019 The Apache Software Foundation

This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).

This product includes software developed by Hewlett-Packard:
(c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ enable it again.
> spark-shell --jars 'rapids-4-spark-0.8-SNAPSHOT.jar,cudf-0.8-SNAPSHOT-cuda10.jar' --conf spark.sql.extensions=ai.rapids.spark.Plugin
```

## Notes on Building

The build requires Apache Spark 3.0+ built against the nohive ORC classifier. Building with a
version of Spark built without the nohive support (e.g.: the Apache Spark hadoop-3.2 profile)
will cause errors during build and test due to the conflicting ORC jars.
85 changes: 82 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
<cuda.version>cuda10</cuda.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
<orc.version>1.5.5</orc.version>
<orc.classifier>nohive</orc.classifier>
<rapids.shade.package>ai.rapids.spark.shaded</rapids.shade.package>
<test.exclude.tags></test.exclude.tags>
<test.include.tags></test.include.tags>
</properties>
Expand All @@ -43,9 +46,76 @@
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>3.0.0-SNAPSHOT</version>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
<classifier>${orc.classifier}</classifier>
<exclusions>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>${orc.version}</version>
<classifier>${orc.classifier}</classifier>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
<exclusion>
<groupId> com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-shims</artifactId>
<version>${orc.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- WE CANNOT SUPPORT XGBOOST YET BECAUSE OF A MISMATCH IN SCALA VERSIONS
<dependency>
Expand All @@ -66,6 +136,15 @@
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
78 changes: 78 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<classifier>${orc.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<classifier>${orc.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand All @@ -39,7 +49,75 @@
</dependencies>

<build>
<resources>
<resource>
<directory>${project.basedir}/..</directory>
<targetPath>META-INF</targetPath>
<includes>
<!-- The NOTICE will be taken care of by the antrun task below -->
<include>LICENSE</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-notice</id>
<goals>
<goal>run</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<target>
<!-- copy NOTICE-binary to NOTICE -->
<copy
todir="${project.build.directory}/classes/META-INF/"
verbose="true">
<fileset dir="${project.basedir}/..">
<include name="NOTICE-binary"/>
</fileset>
<mapper type="glob" from="*-binary" to="*"/>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<artifactSet>
<includes>
<include>org.apache.orc:orc-core</include>
<include>org.apache.orc:orc-mapreduce</include>
<include>org.apache.orc:orc-shims</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.orc</pattern>
<shadedPattern>${rapids.shade.package}.orc</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf25</pattern>
<shadedPattern>${rapids.shade.package}.protobuf25</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- disable surefire as we are using scalatest only -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
18 changes: 16 additions & 2 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.OrcFilters
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType
Expand All @@ -68,10 +69,14 @@ class GpuOrcScan(
readDataSchema, readPartitionSchema, options, pushedFilters) with GpuScan {

override def createReaderFactory(): PartitionReaderFactory = {
// Unset any serialized search argument setup by Spark's OrcScanBuilder as
// it will be incompatible due to shading and potential ORC classifier mismatch.
hadoopConf.unset(OrcConf.KRYO_SARG.getAttribute)

val broadcastedConf = sparkSession.sparkContext.broadcast(
new GpuSerializableConfiguration(hadoopConf))
GpuOrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, rapidsConf)
dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf)
}
}

Expand All @@ -92,6 +97,7 @@ case class GpuOrcPartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
pushedFilters: Array[Filter],
@transient rapidsConf: RapidsConf) extends FilePartitionReaderFactory {
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val debugDumpPrefix = rapidsConf.orcDebugDumpPrefix
Expand All @@ -108,8 +114,9 @@ case class GpuOrcPartitionReaderFactory(
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, orcSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)

val fullSchema = StructType(dataSchema ++ partitionSchema)
val reader = new GpuOrcPartitionReader(conf, partFile, dataSchema, readDataSchema,
debugDumpPrefix)
fullSchema, pushedFilters, debugDumpPrefix)
ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema)
}
}
Expand Down Expand Up @@ -155,6 +162,8 @@ class GpuOrcPartitionReader(
partFile: PartitionedFile,
dataSchema: StructType,
readDataSchema: StructType,
fullSchema: StructType,
pushedFilters: Array[Filter],
debugDumpPrefix: String) extends PartitionReader[ColumnarBatch] with Logging {
private var batch: Option[ColumnarBatch] = None
private var isExhausted: Boolean = false
Expand Down Expand Up @@ -598,6 +607,11 @@ class GpuOrcPartitionReader(
splitStripes: Seq[StripeInformation],
useUTCTimestamp: Boolean): Option[Table] = {
val readerOpts = OrcInputFormat.buildOptions(conf, orcReader, partFile.start, partFile.length)
// create the search argument if we have pushed filters
OrcFilters.createFilter(fullSchema, pushedFilters).foreach { f =>
readerOpts.searchArgument(f, fullSchema.fieldNames)
}

val updatedReadSchema = checkSchemaCompatibility(orcReader.getSchema, readerOpts.getSchema,
readerOpts.getIsSchemaEvolutionCaseAware)

Expand Down
Loading

0 comments on commit 5a373d6

Please sign in to comment.