-
Notifications
You must be signed in to change notification settings - Fork 833
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Ubuntu
committed
Sep 26, 2023
1 parent
47dbe79
commit 712f269
Showing
114 changed files
with
22,144 additions
and
0 deletions.
There are no files selected for viewing
228 changes: 228 additions & 0 deletions
228
website/versioned_docs/version-0.11.3/Deploy Models/Overview.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
--- | ||
title: Spark Serving | ||
hide_title: true | ||
sidebar_label: About | ||
--- | ||
|
||
<img src="https://mmlspark.blob.core.windows.net/graphics/SparkServing3.svg" width="90" align="left" /> | ||
|
||
# Spark Serving | ||
|
||
### An Engine for Deploying Spark Jobs as Distributed Web Services | ||
|
||
- **Distributed**: Takes full advantage of Node, JVM, and thread level | ||
parallelism that Spark is famous for. | ||
- **Fast**: No single node bottlenecks, no round trips to Python. | ||
Requests can be routed directly to and from worker JVMs through | ||
network switches. Spin up a web service in a matter of seconds. | ||
- **Low Latency**: When using continuous serving, | ||
you can achieve latencies as low as 1 millisecond. | ||
- **Deployable Anywhere**: Works anywhere that runs Spark such as | ||
Databricks, HDInsight, AZTK, DSVMs, local, or on your own | ||
cluster. Usable from Spark, PySpark, and SparklyR. | ||
- **Lightweight**: No dependence on costly Kafka or | ||
Kubernetes clusters. | ||
- **Idiomatic**: Uses the same API as batch and structured streaming. | ||
- **Flexible**: Spin up and manage several services on a single Spark | ||
cluster. Synchronous and Asynchronous service management and | ||
extensibility. Deploy any spark job that is expressible as a | ||
structured streaming query. Use serving sources/sinks with other | ||
Spark data sources/sinks for more complex deployments. | ||
|
||
## Usage | ||
|
||
### Jupyter Notebook Examples | ||
|
||
- [Deploy a classifier trained on the Adult Census Dataset](../Quickstart%20-%20Deploying%20a%20Classifier) | ||
- More coming soon! | ||
|
||
### Spark Serving Hello World | ||
|
||
```python | ||
import synapse.ml | ||
import pyspark | ||
from pyspark.sql.functions import udf, col, length | ||
from pyspark.sql.types import * | ||
|
||
df = spark.readStream.server() \ | ||
.address("localhost", 8888, "my_api") \ | ||
.load() \ | ||
.parseRequest(StructType().add("foo", StringType()).add("bar", IntegerType())) | ||
|
||
replies = df.withColumn("fooLength", length(col("foo")))\ | ||
.makeReply("fooLength") | ||
|
||
server = replies\ | ||
.writeStream \ | ||
.server() \ | ||
.replyTo("my_api") \ | ||
.queryName("my_query") \ | ||
.option("checkpointLocation", "file:///path/to/checkpoints") \ | ||
.start() | ||
``` | ||
|
||
### Deploying a Deep Network with the CNTKModel | ||
|
||
```python | ||
import synapse.ml | ||
from synapse.ml.cntk import CNTKModel | ||
import pyspark | ||
from pyspark.sql.functions import udf, col | ||
|
||
df = spark.readStream.server() \ | ||
.address("localhost", 8888, "my_api") | ||
.load() | ||
.parseRequest(<Insert your models input schema here>) | ||
|
||
# See notebook examples for how to create and save several | ||
# examples of CNTK models | ||
network = CNTKModel.load("file:///path/to/my_cntkmodel.mml") | ||
|
||
transformed_df = network.transform(df).makeReply(<Whatever column you wish to send back>) | ||
|
||
server = transformed_df \ | ||
.writeStream \ | ||
.server() \ | ||
.replyTo("my_api") \ | ||
.queryName("my_query") \ | ||
.option("checkpointLocation", "file:///path/to/checkpoints") \ | ||
.start() | ||
``` | ||
|
||
## Architecture | ||
|
||
Spark Serving adds special streaming sources and sinks to turn any | ||
structured streaming job into a web service. Spark Serving comes | ||
with two deployment options that vary based on what form of load balancing | ||
is being used. | ||
|
||
In brief you can use: | ||
`spark.readStream.server()`: For head node load balanced services | ||
`spark.readStream.distributedServer()`: For custom load balanced services | ||
`spark.readStream.continuousServer()`: For a custom load balanced, submillisecond-latency continuous server | ||
|
||
to create the various different serving dataframes and use the equivalent statements after `df.writeStream` | ||
for replying to the web requests. | ||
|
||
### Head Node Load Balanced | ||
|
||
You can deploy head node load balancing with the `HTTPSource` and | ||
`HTTPSink` classes. This mode spins up a queue on the head node, | ||
distributes work across partitions, then collects response data back to | ||
the head node. All HTTP requests are kept and replied to on the head | ||
node. In both python and Scala these classes can be access by using | ||
`spark.readStream.server()` after importing SynapseML. | ||
This mode allows for more complex windowing, repartitioning, and | ||
SQL operations. This option is also idea for rapid setup and testing, | ||
as it doesn't require any further load balancing or network | ||
switches. A diagram of this configuration can be seen in this image: | ||
|
||
<p align="center"> | ||
<img src="https://mmlspark.blob.core.windows.net/graphics/HeadNodeDistributed2.png" width="600" /> | ||
</p> | ||
|
||
### Fully Distributed (Custom Load Balancer) | ||
|
||
You can configure Spark Serving for a custom load balancer using the | ||
`DistributedHTTPSource` and `DistributedHTTPSink` classes. This mode | ||
spins up servers on each executor JVM. | ||
In both python and Scala these classes can be access by using | ||
`spark.readStream.distributedServer()` after importing SynapseML. | ||
Each server will feed its | ||
executor's partitions in parallel. This mode is key for high throughput | ||
and low latency as data doesn't need to be transferred to and from the | ||
head node. This deployment results in several web services that all | ||
route into the same spark computation. You can deploy an external load | ||
balancer to unify the executor's services under a single IP address. | ||
Support for automatic load balancer management and deployment is | ||
targeted for the next release of SynapseML. A diagram of this | ||
configuration can be seen here: | ||
|
||
<p align="center"> | ||
<img src="https://mmlspark.blob.core.windows.net/graphics/FullyDistributed2.png" width="600" /> | ||
</p> | ||
|
||
Queries that involve data movement across workers, such as a nontrivial | ||
SQL join, need special consideration. The user must ensure that the | ||
right machine replies to each request. One can route data back to the | ||
originating partition with a broadcast join. In the future, request | ||
routing will be automatically handled by the sink. | ||
|
||
### Sub-Millisecond Latency with Continuous Processing | ||
|
||
<p align="center"> | ||
<img src="https://mmlspark.blob.core.windows.net/graphics/latency_comparison.png" width="600" /> | ||
</p> | ||
|
||
Continuous processing can be enabled by hooking into the `HTTPSourceV2` class using: | ||
|
||
spark.readStream.continuousServer() | ||
... | ||
|
||
In continuous serving, much like continuous streaming you need to add a trigger to your write statement: | ||
|
||
df.writeStream | ||
.continuousServer() | ||
.trigger(continuous="1 second") | ||
... | ||
|
||
The architecture is similar to the custom load balancer setup described earlier. | ||
More specifically, Spark will manage a web service on each partition. | ||
These webservices can be unified together using an Azure Load Balancer, | ||
Kubernetes Service Endpoint, Azure Application gateway or any other way to load balance a distributed service. | ||
It's currently the user's responsibility to optionally unify these services as they see fit. | ||
In the future, we'll include options to dynamically spin up and manage a load balancer. | ||
|
||
#### Databricks Setup | ||
|
||
Databricks is a managed architecture and they've restricted | ||
all incoming traffic to the nodes of the cluster. | ||
If you create a web service in your databricks cluster (head or worker nodes), | ||
your cluster can communicate with the service, but the outside world can't. | ||
However, in the future, Databricks will support Virtual Network Injection, so problem will not arise. | ||
In the meantime, you must use SSH tunneling to forward the services to another machine(s) | ||
to act as a networking gateway. This machine can be any machine that accepts SSH traffic and requests. | ||
We have included settings to automatically configure this SSH tunneling for convenience. | ||
|
||
##### Linux Gateway Setup - Azure | ||
|
||
1. [Create a Linux VM using SSH](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/quick-create-portal) | ||
2. [Open ports 8000-9999 from the Azure portal](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/nsg-quickstart-portal) | ||
3. Open the port on the firewall on the VM | ||
```$xslt | ||
firewall-cmd --zone=public --add-port=8000-10000/tcp --permanent | ||
firewall-cmd --reload | ||
echo "GatewayPorts yes" >> /etc/ssh/sshd_config | ||
service ssh --full-restart | ||
``` | ||
4. Add your private key to a private container in [Azure Storage Blob](https://docs.microsoft.com/en-us/azure/storage/common/storage-quickstart-create-account?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&tabs=portal). | ||
5. Generate a SAS link for your key and save it. | ||
6. Include the following parameters on your reader to configure the SSH tunneling: | ||
serving_inputs = (spark.readStream.continuousServer() | ||
.option("numPartitions", 1) | ||
.option("forwarding.enabled", True) # enable ssh forwarding to a gateway machine | ||
.option("forwarding.username", "username") | ||
.option("forwarding.sshHost", "ip or dns") | ||
.option("forwarding.keySas", "SAS url from the previous step") | ||
.address("localhost", 8904, "my_api") | ||
.load() | ||
This setup will make your service require an extra jump and affect latency. | ||
It's important to pick a gateway that has good connectivity to your spark cluster. | ||
For best performance and ease of configuration, we suggest using Spark Serving | ||
on an open cluster environment such as Kubernetes, Mesos, or Azure Batch. | ||
## Parameters | ||
| Parameter Name | Description | Necessary | Default Value | Applicable When | | ||
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------- | ------------- | ----------------------------------------------------------------------------------------------------- | | ||
| host | The host to spin up a server on | Yes | | | | ||
| port | The starting port when creating the web services. Web services will increment this port several times to find an open port. In the future, the flexibility of this param will be expanded | yes | | | | ||
| name | The Path of the api a user would call. The format is `hostname:port/name` | yes | | | | ||
| forwarding.enabled | Whether to forward the services to a gateway machine | no | false | When you need to forward services out of a protected network. Only Supported for Continuous Serving. | | ||
| forwarding.username | the username to connect to on the remote host | no | | | | ||
| forwarding.sshport | the port to ssh connect to | no | 22 | | | ||
| forwarding.sshHost | the host of the gateway machine | no | | | | ||
| forwarding.keySas | A Secure access link that can be used to automatically download the required ssh private key | no | | Sometimes more convenient than a directory | | ||
| forwarding.keyDir | A directory on the machines holding the private key | no | "~/.ssh" | Useful if you can't send keys over the wire securely | |
114 changes: 114 additions & 0 deletions
114
...sioned_docs/version-0.11.3/Deploy Models/Quickstart - Deploying a Classifier.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
--- | ||
title: Quickstart - Deploying a Classifier | ||
hide_title: true | ||
status: stable | ||
--- | ||
## Model Deployment with Spark Serving | ||
In this example, we try to predict incomes from the *Adult Census* dataset. Then we will use Spark serving to deploy it as a realtime web service. | ||
First, we import needed packages: | ||
|
||
Now let's read the data and split it to train and test sets: | ||
|
||
|
||
```python | ||
data = spark.read.parquet( | ||
"wasbs://[email protected]/AdultCensusIncome.parquet" | ||
) | ||
data = data.select(["education", "marital-status", "hours-per-week", "income"]) | ||
train, test = data.randomSplit([0.75, 0.25], seed=123) | ||
train.limit(10).toPandas() | ||
``` | ||
|
||
`TrainClassifier` can be used to initialize and fit a model, it wraps SparkML classifiers. | ||
You can use `help(synapse.ml.TrainClassifier)` to view the different parameters. | ||
|
||
Note that it implicitly converts the data into the format expected by the algorithm. More specifically it: | ||
tokenizes, hashes strings, one-hot encodes categorical variables, assembles the features into a vector | ||
etc. The parameter `numFeatures` controls the number of hashed features. | ||
|
||
|
||
```python | ||
from synapse.ml.train import TrainClassifier | ||
from pyspark.ml.classification import LogisticRegression | ||
|
||
model = TrainClassifier( | ||
model=LogisticRegression(), labelCol="income", numFeatures=256 | ||
).fit(train) | ||
``` | ||
|
||
After the model is trained, we score it against the test dataset and view metrics. | ||
|
||
|
||
```python | ||
from synapse.ml.train import ComputeModelStatistics, TrainedClassifierModel | ||
|
||
prediction = model.transform(test) | ||
prediction.printSchema() | ||
``` | ||
|
||
|
||
```python | ||
metrics = ComputeModelStatistics().transform(prediction) | ||
metrics.limit(10).toPandas() | ||
``` | ||
|
||
First, we will define the webservice input/output. | ||
For more information, you can visit the [documentation for Spark Serving](https://github.com/Microsoft/SynapseML/blob/master/docs/mmlspark-serving.md) | ||
|
||
|
||
```python | ||
from pyspark.sql.types import * | ||
from synapse.ml.io import * | ||
import uuid | ||
|
||
serving_inputs = ( | ||
spark.readStream.server() | ||
.address("localhost", 8898, "my_api") | ||
.option("name", "my_api") | ||
.load() | ||
.parseRequest("my_api", test.schema) | ||
) | ||
|
||
serving_outputs = model.transform(serving_inputs).makeReply("prediction") | ||
|
||
server = ( | ||
serving_outputs.writeStream.server() | ||
.replyTo("my_api") | ||
.queryName("my_query") | ||
.option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) | ||
.start() | ||
) | ||
``` | ||
|
||
Test the webservice | ||
|
||
|
||
```python | ||
import requests | ||
|
||
data = '{"education":" 10th","marital-status":"Divorced","hours-per-week":40.0}' | ||
r = requests.post(data=data, url="http://localhost:8898/my_api") | ||
print("Response {}".format(r.text)) | ||
``` | ||
|
||
|
||
```python | ||
import requests | ||
|
||
data = '{"education":" Masters","marital-status":"Married-civ-spouse","hours-per-week":40.0}' | ||
r = requests.post(data=data, url="http://localhost:8898/my_api") | ||
print("Response {}".format(r.text)) | ||
``` | ||
|
||
|
||
```python | ||
import time | ||
|
||
time.sleep(20) # wait for server to finish setting up (just to be safe) | ||
server.stop() | ||
``` | ||
|
||
|
||
```python | ||
|
||
``` |
Oops, something went wrong.