Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Adding Spark35 support #2182

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers

In Azure Synapse notebooks please place the following in the first cell of your notebook.

- For Spark 3.5 Pools:

```bash
%%configure -f
{
"name": "synapseml",
"conf": {
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3",
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
"spark.yarn.user.classpath.first": "true",
"spark.sql.parquet.enableVectorizedReader": "false"
}
}
```

- For Spark 3.4 Pools:

```bash
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer}
import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _}

val condaEnvName = "synapseml"
val sparkVersion = "3.4.1"
val sparkVersion = "3.5.0"
name := "synapseml"
ThisBuild / organization := "com.microsoft.azure"
ThisBuild / scalaVersion := "2.12.17"
Expand All @@ -34,7 +34,7 @@ val extraDependencies = Seq(
"com.jcraft" % "jsch" % "0.1.54",
"org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3",
"org.apache.httpcomponents" % "httpmime" % "4.5.13",
"com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4"
"com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5"
exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12")
exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12")
exclude("org.apache.spark", "spark-sql_2.12"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.ComplexParamsReadable
import org.apache.spark.ml.util._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types._
import spray.json.DefaultJsonProtocol._
Expand Down Expand Up @@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria
): Lambda = {
Lambda({ df =>
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.toDF().mapPartitions { rows =>
val futures = rows.map { row: Row =>
(Future {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.http.entity.AbstractHttpEntity
import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, functions => F, types => T}
Expand Down Expand Up @@ -119,7 +119,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer
} else {
row
}
})(RowEncoder(df.schema))
})(ExpressionEncoder(df.schema))
}

override def transform(dataset: Dataset[_]): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import spray.json.DefaultJsonProtocol.StringJsonFormat
Expand Down Expand Up @@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String)
converter(row.getAs[Row](row.fieldIndex(getOutputCol)))
)
new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row
})(RowEncoder({
})(ExpressionEncoder({
newSchema
}))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand Down Expand Up @@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer
ArrayType(responseTypeBinding.schema)
}

val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
val sc = df.sparkSession.sparkContext
val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration))
val isUriAudio = df.schema(getAudioDataCol).dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils}
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util._
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String)
}
Row.fromSeq(row.toSeq ++ Seq(errorRow))
}.get
}(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
}(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object PackageUtils {
// Use a fixed version for local testing
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.8"

private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0"
val PackageRepository: String = SparkMLRepository

// If testing onnx package with snapshots repo, make sure to switch to using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType

import scala.reflect.runtime.universe.TypeTag
Expand All @@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable {

lazy val schema: StructType = enc.schema
private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind()
private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind()
private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind()

// WARNING: each time you use this function on a dataframe, you should make a new converter.
// Spark does some magic that makes this leak memory if re-used on a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -44,7 +44,7 @@ object LIMEUtils extends SLogging {
case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType))
case f => f
})
val encoder = RowEncoder(schema)
val encoder = ExpressionEncoder(schema)
val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex)
df.mapPartitions { it =>
val isEmpty = it.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

Expand Down Expand Up @@ -56,7 +56,7 @@ class MultiNGram(override val uid: String)
.map(col => row.getAs[Seq[String]](col))
.reduce(_ ++ _)
Row.fromSeq(row.toSeq :+ mergedNGrams)
}(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
}(ExpressionEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
.drop(intermediateOutputCols: _*)
}, dataset.columns.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.schema.BinaryFileSchema
import com.microsoft.azure.synapse.ml.core.utils.AsyncUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

Expand Down Expand Up @@ -85,7 +85,7 @@ object BinaryFileReader {
timeout: Int
): DataFrame = {
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
val hconf = ConfUtils.getHConf(df)

df.mapPartitions { rows =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand Down Expand Up @@ -118,7 +118,7 @@ class HTTPTransformer(val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val df = dataset.toDF()
val enc = RowEncoder(transformSchema(df.schema))
val enc = ExpressionEncoder(transformSchema(df.schema))
val colIndex = df.schema.fieldNames.indexOf(getInputCol)
val fromRow = HTTPRequestData.makeFromRowConverter
val toRow = HTTPResponseData.makeToRowConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.ml.ImageInjections
import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Row}

import java.awt.color.ColorSpace
Expand Down Expand Up @@ -117,7 +117,7 @@ object ImageUtils {

def readFromPaths(df: DataFrame, pathCol: String, imageCol: String = "image"): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
val hconf = ConfUtils.getHConf(df)
df.mapPartitions { rows =>
rows.map { row =>
Expand All @@ -133,7 +133,7 @@ object ImageUtils {

def readFromBytes(df: DataFrame, pathCol: String, bytesCol: String, imageCol: String = "image"): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.mapPartitions { rows =>
rows.map { row =>
val path = row.getAs[String](pathCol)
Expand All @@ -150,7 +150,7 @@ object ImageUtils {
imageCol: String = "image",
dropPrefix: Boolean = false): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.mapPartitions { rows =>
rows.map { row =>
val encoded = row.getAs[String](bytesCol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.param.TransformerParam
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand All @@ -35,7 +35,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl
def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val outputSchema = transformSchema(dataset.schema)
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)
dataset.toDF().mapPartitions { it =>
if (it.isEmpty) {
it
Expand Down Expand Up @@ -215,7 +215,7 @@ class FlattenBatch(val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val outputSchema = transformSchema(dataset.schema)
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)

dataset.toDF().mapPartitions(it =>
it.flatMap { rowOfLists =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
import org.apache.spark.ml.{ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

Expand Down Expand Up @@ -39,7 +39,7 @@ class PartitionConsolidator(val uid: String)
} else {
Iterator()
}
}(RowEncoder(dataset.schema))
}(ExpressionEncoder(dataset.schema))
}, dataset.columns.length)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas
import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -252,7 +252,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer
confusionMatrix: Matrix,
resultDF: DataFrame): DataFrame = {
val schema = resultDF.schema.add(MetricConstants.ConfusionMatrix, SQLDataTypes.MatrixType)
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(RowEncoder(schema))
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(ExpressionEncoder(schema))
}

private def selectAndCastToDF(dataset: Dataset[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -118,7 +118,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg
if (requiredSchema.isEmpty) {
filteredResult.map(_ => emptyUnsafeRow)
} else {
val converter = RowEncoder(requiredSchema)
val converter = ExpressionEncoder(requiredSchema)
filteredResult.map(row => converter.createSerializer()(row))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.io.http.{HTTPRequestData, HTTPResponseData
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.execution.streaming.continuous.HTTPSourceV2
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
Expand Down Expand Up @@ -218,7 +218,7 @@ class DistributedHTTPSource(name: String,
private[spark] val infoSchema = new StructType()
.add("machine", StringType).add("ip", StringType).add("id", StringType)

private[spark] val infoEnc = RowEncoder(infoSchema)
private[spark] val infoEnc = ExpressionEncoder(infoSchema)

// Access point to run code on nodes through mapPartitions
// TODO do this by hooking deeper into spark,
Expand Down Expand Up @@ -284,7 +284,7 @@ class DistributedHTTPSource(name: String,
.map{ case (id, request) =>
Row.fromSeq(Seq(Row(null, id, null), toRow(request))) //scalastyle:ignore null
}.toIterator
}(RowEncoder(HTTPSourceV2.Schema))
}(ExpressionEncoder(HTTPSourceV2.Schema))
}

override def commit(end: OffsetV2): Unit = synchronized {
Expand Down
Loading
Loading