Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Spark3.3 #2074

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5ed7a23
chore: bump to v0.11.2 (#2011)
mhamilton723 Jul 10, 2023
22279bc
chore: no-op PR to avoid double publishing
mhamilton723 Jul 10, 2023
7ff4c59
docs: remove pre-commit from docs
mhamilton723 Jul 12, 2023
54f0b51
fix: modified the search engine in the demo notebook to bing (#2013)
sherylZhaoCode Jul 12, 2023
2d1ee53
build: bump semver from 5.7.1 to 5.7.2 in /website (#2012)
dependabot[bot] Jul 13, 2023
e17756e
docs: add prerequisites - openai and cognitive services resources (#2…
JessicaXYWang Jul 13, 2023
c2ce58e
docs: update notebooks - bring back fabric reviewers changes. (#1979)
JessicaXYWang Jul 17, 2023
9e1da76
docs: fix docker link (#2019)
niehaus59 Jul 18, 2023
19f898c
docs: Refactor docs and docgen framework (#2021)
mhamilton723 Jul 19, 2023
272003b
chore: bump databricks e2e timeout (#2024)
mhamilton723 Jul 20, 2023
37d474a
docs: add dead link checker (#2022)
mhamilton723 Jul 20, 2023
6c9dfb3
docs: fix broken links (#2025)
mhamilton723 Jul 20, 2023
8d56053
docs: continue fixing broken links (#2026)
mhamilton723 Jul 20, 2023
3285331
docs: fix broken links (#2027)
mhamilton723 Jul 20, 2023
c5294c6
docs: fix broken link (#2032)
mhamilton723 Jul 24, 2023
8196541
docs: add QandA notebook. (#2029)
aydan-at-microsoft Jul 24, 2023
bab2bde
build: bump actions/checkout from 2 to 3 (#2030)
dependabot[bot] Jul 24, 2023
875b635
chore: remove build exclusions from pipeline.yaml
mhamilton723 Jul 24, 2023
6f6ef6e
chore: remove exclusions from pipeline.yml
mhamilton723 Jul 24, 2023
e8d865a
docs: fix variable formatting for QandA nb (#2033)
aydan-at-microsoft Jul 24, 2023
8e94d1a
fix: Fix ONNX link (#2035)
iemejia Jul 27, 2023
b009871
fix: Improve LGBM exception and logging (#2037)
svotaw Aug 2, 2023
4cd6a3a
docs: fix broken links (#2042)
JessicaXYWang Aug 4, 2023
be48e85
docs: initial POC of Jessica's fabric doc generator (#2023)
mhamilton723 Aug 4, 2023
a0b4ee4
docs: fix small error in docgen docs
mhamilton723 Aug 7, 2023
6563189
fix: improve docgen (#2043)
eisber Aug 7, 2023
01640dd
docs: add badges to readme
mhamilton723 Aug 8, 2023
ee656e6
feat: Support langchain transformer on fabric (#2036)
lhrotk Aug 10, 2023
250e895
chore: remove secret scanner (#2048)
mhamilton723 Aug 11, 2023
1f18559
Fix problem with empty partition assigned to validation data (#2059)
svotaw Aug 31, 2023
d3fe930
chore: fix daily midnight build chronjob
mhamilton723 Sep 5, 2023
d4c6028
fix: fixed broken link to developer readme (#2049)
BrendanWalsh Sep 5, 2023
3e2d380
build: bump actions/checkout from 3 to 4 (#2065)
dependabot[bot] Sep 5, 2023
b935a54
feat: add Azure Cognitive Search vector store (#2041)
aydan-at-microsoft Sep 7, 2023
212828e
fix: updated gpt-review to version 0.9.5 to fix break (#2069)
BrendanWalsh Sep 11, 2023
cc29c90
chore: fix some of the failing build issues (#2071)
mhamilton723 Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .acrolinx-config.edn
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{:allowed-branchname-matches ["master" "release-.*"]
:allowed-filename-matches ["notebooks" "website"]}
:allowed-filename-matches ["docs" "website"]}
26 changes: 26 additions & 0 deletions .github/workflows/check-dead-links.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: "Check Dead Links"

on:
workflow_dispatch:
push:
branches: [ "master" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master" ]

jobs:
scan_links:
name: Scan Website for Dead Links
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y wget
- name: Scan for dead links
run: |
wget --spider --recursive --no-verbose --tries=3 --retry-connrefused --no-clobber --directory-prefix=site-check https://microsoft.github.io/SynapseML/
2 changes: 1 addition & 1 deletion .github/workflows/clean-acr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
with:
creds: ${{ secrets.clean_acr }}
- name: checkout repo content
uses: actions/checkout@v3 # checkout the repo
uses: actions/checkout@v4 # checkout the repo
- name: setup python
uses: actions/setup-python@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request-target-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Azure OpenAI PR Comment
steps:
- id: review
uses: microsoft/[email protected].4
uses: microsoft/[email protected].5
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
AZURE_OPENAI_API: ${{ secrets.AZURE_OPENAI_API }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scorecards.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

steps:
- name: "Checkout code"
uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8 # v3.1.0
uses: actions/checkout@v4 # v3.1.0
with:
persist-credentials: false

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ metastore_db/
**/build/*
**/dist/*
**/*.egg-info/*

4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ this process:

#### Implement tests

- Set up build environment using the [developer guide](https://microsoft.github.io/SynapseML/docs/reference/developer-readme/)
- Set up build environment using the [developer guide](https://microsoft.github.io/SynapseML/docs/Reference/Developer%20Setup/)
- Test your code locally.
- Add tests using ScalaTests — unit tests are required.
- A sample notebook is required as an end-to-end test.

#### Implement documentation

- Add a [sample Jupyter notebook](notebooks/) that shows the intended use
- Add a [sample Jupyter notebook](docs/) that shows the intended use
case of your algorithm, with instructions in step-by-step manner. (The same
notebook could be used for testing the code.)
- Add in-line ScalaDoc comments to your source code, to generate the [API
Expand Down
53 changes: 27 additions & 26 deletions README.md

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ val coreDependencies = Seq(
"org.apache.spark" %% "spark-mllib" % sparkVersion % "compile",
"org.apache.spark" %% "spark-avro" % sparkVersion % "provided",
"org.apache.spark" %% "spark-tags" % sparkVersion % "test",
"com.globalmentor" % "hadoop-bare-naked-local-fs" % "0.1.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.14" % "test")
val extraDependencies = Seq(
"org.scalactic" %% "scalactic" % "3.2.14",
Expand Down Expand Up @@ -220,7 +221,7 @@ publishDotnetBase := {
packDotnetAssemblyCmd(join(dotnetBaseDir, "target").getAbsolutePath, dotnetBaseDir)
val packagePath = join(dotnetBaseDir,
// Update the version whenever there's a new release
"target", s"SynapseML.DotnetBase.${dotnetedVersion("0.11.1")}.nupkg").getAbsolutePath
"target", s"SynapseML.DotnetBase.${dotnetedVersion("0.11.2")}.nupkg").getAbsolutePath
publishDotnetAssemblyCmd(packagePath, genSleetConfig.value)
}

Expand Down Expand Up @@ -381,11 +382,11 @@ publishBadges := {
uploadBadge("master version", version.value, "blue", "master_version3.svg")
}

val uploadNotebooks = TaskKey[Unit]("uploadNotebooks", "upload notebooks to blob storage")
val uploadNotebooks = TaskKey[Unit]("uploadNotebooks", "upload docs to blob storage")
uploadNotebooks := {
val localNotebooksFolder = join(baseDirectory.value.toString, "notebooks").toString
val localNotebooksFolder = join(baseDirectory.value.toString, "docs").toString
val blobNotebooksFolder = version.value
uploadToBlob(localNotebooksFolder, blobNotebooksFolder, "notebooks")
uploadToBlob(localNotebooksFolder, blobNotebooksFolder, "docs")
}

val settings = Seq(
Expand Down Expand Up @@ -493,8 +494,8 @@ setupTask := {

val convertNotebooks = TaskKey[Unit]("convertNotebooks", "convert notebooks to markdown for website display")
convertNotebooks := {
runCmdStr("python -m docs.python.documentprojection " +
"--customchannels docs/python/synapseml_channels -c website . docs/manifest.yaml -p")
runCmd(Seq("pip", "install", "-e", "."), wd=join(baseDirectory.value, "tools/docgen"))
runCmd(Seq("python", "__main__.py"), wd=join(baseDirectory.value, "tools/docgen/docgen"))
}

val testWebsiteDocs = TaskKey[Unit]("testWebsiteDocs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from pyspark.sql.functions import udf
from typing import cast, Optional, TypeVar, Type
from synapse.ml.core.platform import running_on_synapse_internal

OPENAI_API_VERSION = "2022-12-01"
RL = TypeVar("RL", bound="MLReadable")
Expand Down Expand Up @@ -125,6 +126,14 @@ def __init__(
self.subscriptionKey = Param(self, "subscriptionKey", "openai api key")
self.url = Param(self, "url", "openai api base")
self.apiVersion = Param(self, "apiVersion", "openai api version")
self.running_on_synapse_internal = running_on_synapse_internal()
if running_on_synapse_internal():
from synapse.ml.fabric.service_discovery import get_fabric_env_config

self._setDefault(
url=get_fabric_env_config().fabric_env_config.ml_workload_endpoint
+ "cognitive/openai"
)
kwargs = self._input_kwargs
if subscriptionKey:
kwargs["subscriptionKey"] = subscriptionKey
Expand Down Expand Up @@ -196,10 +205,15 @@ def _transform(self, dataset):
def udfFunction(x):
import openai

openai.api_type = "azure"
openai.api_key = self.getSubscriptionKey()
openai.api_base = self.getUrl()
openai.api_version = self.getApiVersion()
if self.running_on_synapse_internal and not self.isSet(self.url):
from synapse.ml.fabric.prerun.openai_prerun import OpenAIPrerun

OpenAIPrerun(api_base=self.getUrl()).init_personalized_session(None)
else:
openai.api_type = "azure"
openai.api_key = self.getSubscriptionKey()
openai.api_base = self.getUrl()
openai.api_version = self.getApiVersion()
return self.getChain().run(x)

outCol = self.getOutputCol()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.apache.spark.internal.{Logging => SLogging}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.sql.functions.{col, expr, struct, to_json}
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -142,7 +144,7 @@ class AddDocuments(override val uid: String) extends CognitiveServicesBase(uid)
override def responseDataType: DataType = ASResponses.schema
}

object AzureSearchWriter extends IndexParser with SLogging {
object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging {

val Logger: Logger = LogManager.getRootLogger

Expand All @@ -166,9 +168,11 @@ object AzureSearchWriter extends IndexParser with SLogging {
private def convertFields(fields: Seq[StructField],
keyCol: String,
searchActionCol: String,
vectorCols: Option[Seq[VectorColParams]],
prefix: Option[String]): Seq[IndexField] = {
fields.filterNot(_.name == searchActionCol).map { sf =>
val fullName = prefix.map(_ + sf.name).getOrElse(sf.name)
val isVector = vectorCols.exists(_.exists(_.name == fullName))
val (innerType, _) = sparkTypeToEdmType(sf.dataType)
IndexField(
sf.name,
Expand All @@ -177,31 +181,44 @@ object AzureSearchWriter extends IndexParser with SLogging {
if (keyCol == fullName) Some(true) else None,
None, None, None, None,
structFieldToSearchFields(sf.dataType,
keyCol, searchActionCol, prefix = Some(prefix.getOrElse("") + sf.name + "."))
keyCol, searchActionCol, None, prefix = Some(prefix.getOrElse("") + sf.name + ".")),
if (isVector) vectorCols.get.find(_.name == fullName).map(_.dimension) else None,
if (isVector) Some(AzureSearchAPIConstants.VectorConfigName) else None
)
}
}

private def structFieldToSearchFields(schema: DataType,
keyCol: String,
searchActionCol: String,
vectorCols: Option[Seq[VectorColParams]],
prefix: Option[String] = None
): Option[Seq[IndexField]] = {
schema match {
case StructType(fields) => Some(convertFields(fields, keyCol, searchActionCol, prefix))
case ArrayType(StructType(fields), _) => Some(convertFields(fields, keyCol, searchActionCol, prefix))
case StructType(fields) => Some(convertFields(fields, keyCol, searchActionCol, vectorCols, prefix))
// TODO: Support vector search in nested fields
case ArrayType(StructType(fields), _) => Some(convertFields(fields, keyCol, searchActionCol, None, prefix))
case _ => None
}
}

private def parseVectorColsJson(str: String): Seq[VectorColParams] = {
str.parseJson.convertTo[Seq[VectorColParams]]
}

private def dfToIndexJson(schema: StructType,
indexName: String,
keyCol: String,
searchActionCol: String): String = {
searchActionCol: String,
vectorCols: Option[Seq[VectorColParams]]): String = {

val vectorConfig = Some(VectorSearch(Seq(AlgorithmConfigs(AzureSearchAPIConstants.VectorConfigName,
AzureSearchAPIConstants.VectorSearchAlgorithm))))
val is = IndexInfo(
Some(indexName),
structFieldToSearchFields(schema, keyCol, searchActionCol).get,
None, None, None, None, None, None, None, None
structFieldToSearchFields(schema, keyCol, searchActionCol, vectorCols).get,
None, None, None, None, None, None, None, None,
if (vectorCols.isEmpty) None else vectorConfig
)
is.toJson.compactPrint
}
Expand All @@ -210,7 +227,7 @@ object AzureSearchWriter extends IndexParser with SLogging {
options: Map[String, String] = Map()): DataFrame = {
val applicableOptions = Set(
"subscriptionKey", "actionCol", "serviceName", "indexName", "indexJson",
"apiVersion", "batchSize", "fatalErrors", "filterNulls", "keyCol"
"apiVersion", "batchSize", "fatalErrors", "filterNulls", "keyCol", "vectorCols"
)

options.keys.foreach(k =>
Expand All @@ -224,11 +241,12 @@ object AzureSearchWriter extends IndexParser with SLogging {
val batchSize = options.getOrElse("batchSize", "100").toInt
val fatalErrors = options.getOrElse("fatalErrors", "true").toBoolean
val filterNulls = options.getOrElse("filterNulls", "false").toBoolean
val vectorColsInfo = options.get("vectorCols")

val keyCol = options.get("keyCol")
val indexName = options.getOrElse("indexName", parseIndexJson(indexJsonOpt.get).name.get)
if (indexJsonOpt.isDefined) {
List("keyCol", "indexName").foreach(opt =>
List("keyCol", "indexName", "vectorCols").foreach(opt =>
assert(!options.contains(opt), s"Cannot set both indexJson options and $opt")
)
}
Expand All @@ -242,22 +260,41 @@ object AzureSearchWriter extends IndexParser with SLogging {
}
}

val indexJson = indexJsonOpt.getOrElse {
dfToIndexJson(df.schema, indexName, keyCol.get, actionCol)
val (indexJson, preppedDF) = if (getExisting(subscriptionKey, serviceName, apiVersion).contains(indexName)) {
if (indexJsonOpt.isDefined) {
println(f"indexJsonOpt is specified, however an index for $indexName already exists," +
f"we will use the index definition obtained from the existing index instead")
}
val existingIndexJson = getIndexJsonFromExistingIndex(subscriptionKey, serviceName, indexName)
val vectorColNameTypeTuple = getVectorColConf(existingIndexJson)
(existingIndexJson, makeColsCompatible(vectorColNameTypeTuple, df))
} else if (indexJsonOpt.isDefined) {
val vectorColNameTypeTuple = getVectorColConf(indexJsonOpt.get)
(indexJsonOpt.get, makeColsCompatible(vectorColNameTypeTuple, df))
} else {
val vectorCols = vectorColsInfo.map(parseVectorColsJson)
val vectorColNameTypeTuple = vectorCols.map(_.map(vc => (vc.name, "Collection(Edm.Single)"))).getOrElse(Seq.empty)
val newDF = makeColsCompatible(vectorColNameTypeTuple, df)
val inferredIndexJson = dfToIndexJson(newDF.schema, indexName, keyCol.getOrElse(""), actionCol, vectorCols)
(inferredIndexJson, newDF)
}

// TODO: Support vector search in nested fields
// Throws an exception if any nested field is a vector in the schema
parseIndexJson(indexJson).fields.foreach(_.fields.foreach(assertNoNestedVectors))

SearchIndex.createIfNoneExists(subscriptionKey, serviceName, indexJson, apiVersion)

logInfo("checking schema parity")
checkSchemaParity(df.schema, indexJson, actionCol)
checkSchemaParity(preppedDF.schema, indexJson, actionCol)

val df1 = if (filterNulls) {
val collectionColumns = parseIndexJson(indexJson).fields
.filter(_.`type`.startsWith("Collection"))
.map(_.name)
collectionColumns.foldLeft(df) { (ndf, c) => filterOutNulls(ndf, c) }
collectionColumns.foldLeft(preppedDF) { (ndf, c) => filterOutNulls(ndf, c) }
} else {
df
preppedDF
}

new AddDocuments()
Expand All @@ -273,6 +310,48 @@ object AzureSearchWriter extends IndexParser with SLogging {
UDFUtils.oldUdf(checkForErrors(fatalErrors) _, ErrorUtils.ErrorSchema)(col("error"), col("input")))
}

private def assertNoNestedVectors(fields: Seq[IndexField]): Unit = {
def checkVectorField(field: IndexField): Unit = {
if (field.dimensions.nonEmpty && field.vectorSearchConfiguration.nonEmpty) {
throw new IllegalArgumentException(s"Nested field ${field.name} is a vector field, vector fields in nested" +
s" fields are not supported.")
}
field.fields.foreach(_.foreach(checkVectorField))
}
fields.foreach(checkVectorField)
}

private def getVectorColConf(indexJson: String): Seq[(String, String)] = {
parseIndexJson(indexJson).fields
.filter(f => f.vectorSearchConfiguration.nonEmpty && f.dimensions.nonEmpty)
.map(f => (f.name, f.`type`))
}
private def makeColsCompatible(vectorColNameTypeTuple: Seq[(String, String)],
df: DataFrame): DataFrame = {
vectorColNameTypeTuple.foldLeft(df) { case (accDF, (colName, colType)) =>
if (!accDF.columns.contains(colName)) {
println(s"Column $colName is specified in either indexJson or vectorCols but not found in dataframe " +
s"columns ${accDF.columns.toList}")
accDF
}
else {
val colDataType = accDF.schema(colName).dataType
assert(colDataType match {
case ArrayType(elementType, _) => elementType == FloatType || elementType == DoubleType
case VectorType => true
case _ => false
}, s"Vector column $colName needs to be one of (ArrayType(FloatType), ArrayType(DoubleType), VectorType)")
if (colDataType.isInstanceOf[ArrayType]) {
accDF.withColumn(colName, accDF(colName).cast(edmTypeToSparkType(colType, None)))
} else {
// first cast vectorUDT to array<double>, then cast it to correct array type
val modifiedDF = accDF.withColumn(colName, vector_to_array(accDF(colName)))
modifiedDF.withColumn(colName, modifiedDF(colName).cast(edmTypeToSparkType(colType, None)))
}
}
}
}

private def isEdmCollection(t: String): Boolean = {
t.startsWith("Collection(") && t.endsWith(")")
}
Expand All @@ -290,6 +369,7 @@ object AzureSearchWriter extends IndexParser with SLogging {
case "Edm.Int64" => LongType
case "Edm.Int32" => IntegerType
case "Edm.Double" => DoubleType
case "Edm.Single" => FloatType
case "Edm.DateTimeOffset" => StringType //See if there's a way to use spark datetimes
case "Edm.GeographyPoint" => StringType
case "Edm.ComplexType" => StructType(fields.get.map(f =>
Expand All @@ -310,10 +390,12 @@ object AzureSearchWriter extends IndexParser with SLogging {
case IntegerType => ("Edm.Int32", None)
case LongType => ("Edm.Int64", None)
case DoubleType => ("Edm.Double", None)
case FloatType => ("Edm.Single", None)
case DateType => ("Edm.DateTimeOffset", None)
case StructType(fields) => ("Edm.ComplexType", Some(fields.map { f =>
val (innerType, innerFields) = sparkTypeToEdmType(f.dataType)
IndexField(f.name, innerType, None, None, None, None, None, None, None, None, None, None, innerFields)
IndexField(f.name, innerType, None, None, None, None, None, None, None, None, None, None, innerFields,
None, None) // TODO: Support vector search in nested fields
}))
}
}
Expand Down
Loading