Skip to content

Commit

Permalink
[SPARK-45532][DOCS] Restore codetabs for the Protobuf Data Source Guide
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR restores the [Protobuf Data Source Guide](https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html#python)'s code tabs which apache#40614 removed for markdown syntax fixes

In this PR, we introduce a hidden div to hold the code-block marker of markdown, then make both the liquid and markdown happy.

### Why are the changes needed?

improve doc readability and consistency.

### Does this PR introduce _any_ user-facing change?

yes, doc change

### How was this patch tested?

#### Doc build

![image](https://github.com/apache/spark/assets/8326978/8aefeee0-92b2-4048-a3f6-108e4c3f309d)

#### markdown editor and view

![image](https://github.com/apache/spark/assets/8326978/283b0820-390a-4540-8713-647c40f956ac)

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#43361 from yaooqinn/SPARK-45532.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Oct 13, 2023
1 parent 96bac6c commit 0257b77
Showing 1 changed file with 150 additions and 93 deletions.
243 changes: 150 additions & 93 deletions docs/sql-data-sources-protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ license: |
limitations under the License.
---

Since Spark 3.4.0 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing protobuf data.
* This will become a table of contents (this text will be scraped).
{:toc}

Since Spark 3.4.0 release, [Spark SQL](sql-programming-guide.html) provides built-in support for reading and writing protobuf data.

## Deploying
The `spark-protobuf` module is external and not included in `spark-submit` or `spark-shell` by default.
Expand Down Expand Up @@ -46,45 +49,53 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti

Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to `from_protobuf` and `to_protobuf`. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results.

### Python
<div class="codetabs">

<div data-lang="python" markdown="1">

<div class="d-none">
This div is only used to make markdown editor/viewer happy and does not display on web

```python
</div>

{% highlight python %}

from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# string name = 1;
# int64 id = 2;
# string context = 3;
# }

df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df\
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))\
.where('event.name == "alice"')\
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.

output = df\
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))\
.where('event.name == "alice"')
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')

output.printSchema()
# root
Expand All @@ -94,61 +105,75 @@ output.printSchema()
# | |-- context: string (nullable = true)

output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()

{% endhighlight %}

<div class="d-none">
```
</div>

</div>

<div data-lang="scala" markdown="1">

<div class="d-none">
This div is only used to make markdown editor/viewer happy and does not display on web

### Scala
```scala
</div>

{% highlight scala %}
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// string name = 1;
// int64 id = 2;
// string context = 3;
// }

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")

output.printSchema()
// root
Expand All @@ -160,54 +185,67 @@ output.printSchema()
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()

{% endhighlight %}

<div class="d-none">
```
</div>
</div>

<div data-lang="java" markdown="1">

<div class="d-none">
This div is only used to make markdown editor/viewer happy and does not display on web

### Java
```java
</div>

{% highlight java %}
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// string name = 1;
// int64 id = 2;
// string context = 3;
// }

Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")

output.printSchema()
// root
Expand All @@ -221,19 +259,28 @@ output = output.select(
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();

{% endhighlight %}

<div class="d-none">
```
</div>
</div>

</div>

## Supported types for Protobuf -> Spark SQL conversion

Currently Spark supports reading [protobuf scalar types](https://developers.google.com/protocol-buffers/docs/proto3#scalar), [enum types](https://developers.google.com/protocol-buffers/docs/proto3#enum), [nested type](https://developers.google.com/protocol-buffers/docs/proto3#nested), and [maps type](https://developers.google.com/protocol-buffers/docs/proto3#maps) under messages of Protobuf.
In addition to the these types, `spark-protobuf` also introduces support for Protobuf `OneOf` fields. which allows you to handle messages that can have multiple possible sets of fields, but only one set can be present at a time. This is useful for situations where the data you are working with is not always in the same format, and you need to be able to handle messages with different sets of fields without encountering errors.

<table class="table">
<tr><th><b>Protobuf type</b></th><th><b>Spark SQL type</b></th></tr>
<table class="table table-striped">
<thead><tr><th><b>Protobuf type</b></th><th><b>Spark SQL type</b></th></tr></thead>
<tr>
<td>boolean</td>
<td>BooleanType</td>
Expand Down Expand Up @@ -282,16 +329,12 @@ In addition to the these types, `spark-protobuf` also introduces support for Pro
<td>OneOf</td>
<td>Struct</td>
</tr>
<tr>
<td>Any</td>
<td>StructType</td>
</tr>
</table>

It also supports reading the following Protobuf types [Timestamp](https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#timestamp) and [Duration](https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration)

<table class="table">
<tr><th><b>Protobuf logical type</b></th><th><b>Protobuf schema</b></th><th><b>Spark SQL type</b></th></tr>
<table class="table table-striped">
<thead><tr><th><b>Protobuf logical type</b></th><th><b>Protobuf schema</b></th><th><b>Spark SQL type</b></th></tr></thead>
<tr>
<td>duration</td>
<td>MessageType{seconds: Long, nanos: Int}</td>
Expand All @@ -305,10 +348,11 @@ It also supports reading the following Protobuf types [Timestamp](https://develo
</table>

## Supported types for Spark SQL -> Protobuf conversion

Spark supports the writing of all Spark SQL types into Protobuf. For most types, the mapping from Spark types to Protobuf types is straightforward (e.g. IntegerType gets converted to int);

<table class="table">
<tr><th><b>Spark SQL type</b></th><th><b>Protobuf type</b></th></tr>
<table class="table table-striped">
<thead><tr><th><b>Spark SQL type</b></th><th><b>Protobuf type</b></th></tr></thead>
<tr>
<td>BooleanType</td>
<td>boolean</td>
Expand Down Expand Up @@ -356,15 +400,23 @@ Spark supports the writing of all Spark SQL types into Protobuf. For most types,
</table>

## Handling circular references protobuf fields

One common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior.
To address this issue, the latest version of spark-protobuf introduces a new feature: the ability to check for circular references through field types. This allows users use the `recursive.fields.max.depth` option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, `spark-protobuf` will not permit recursive fields by setting `recursive.fields.max.depth` to -1. However, you can set this option to 0 to 10 if needed.
To address this issue, the latest version of spark-protobuf introduces a new feature: the ability to check for circular references through field types. This allows users use the `recursive.fields.max.depth` option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, `spark-protobuf` will not permit recursive fields by setting `recursive.fields.max.depth` to -1. However, you can set this option to 0 to 10 if needed.

Setting `recursive.fields.max.depth` to 0 drops all recursive fields, setting it to 1 allows it to be recursed once, and setting it to 2 allows it to be recursed twice. A `recursive.fields.max.depth` value greater than 10 is not allowed, as it can lead to performance issues and even stack overflows.

SQL Schema for the below protobuf message will vary based on the value of `recursive.fields.max.depth`.

```proto
syntax = "proto3"
<div data-lang="proto" markdown="1">
<div class="d-none">
This div is only used to make markdown editor/viewer happy and does not display on web

```protobuf
</div>
{% highlight protobuf %}
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
Expand All @@ -376,4 +428,9 @@ message Person {
0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
```
{% endhighlight %}
<div class="d-none">
```
</div>
</div>

0 comments on commit 0257b77

Please sign in to comment.