Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-13845] - implement Avro.parse_column, Avro.serialize_column #265

Merged
merged 4 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/265.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``Avro.parse_column`` and ``Avro.serialize_column`` methods to enhance the handling of Avro binary data within Spark. These methods allow for direct parsing of binary Avro data into structured Spark DataFrame columns and serialization of Spark DataFrame columns back into Avro binary format.
90 changes: 90 additions & 0 deletions docs/connection/db_connection/kafka/format_handling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,93 @@ For serializing data into JSON format and sending it back to Kafka, use the :obj
# |{"name":"Alice","age":20}|
# |{"name":"Bob","age":25} |
# +-------------------------+

Avro Format Handling
--------------------

``DBReader``
~~~~~~~~~~~~

To process Avro formatted data from Kafka, use the :obj:`Avro.parse_column <onetl.file.format.avro.Avro.parse_column>` method. This method allows you to convert a column containing Avro binary data directly into a structured Spark DataFrame using a predefined schema.

.. code-block:: python

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from onetl.db import DBReader
from onetl.file.format import Avro
from onetl.connection import Kafka

spark = SparkSession.builder.appName("KafkaAvroExample").getOrCreate()

kafka = Kafka(...)
avro = Avro(
schema_dict={
"type": "record",
"name": "Person",
"fields": [{"name": "name", "type": "string"}, {"name": "age", "type": "int"}],
}
)

reader = DBReader(
connection=kafka,
topic="topic_name",
)
df = reader.run()

df.show()
# +----+------------------------------------+----------+---------+------+-----------------------+-------------+
# |key |value |topic |partition|offset|timestamp |timestampType|
# +----+------------------------------------+----------+---------+------+-----------------------+-------------+
# |[31]|[02 02 02 08 76 6... (binary data)] |topicAvro |0 |0 |2024-04-24 13:02:25.911|0 |
# |[32]|[02 04 02 08 76 6... (binary data)] |topicAvro |0 |1 |2024-04-24 13:02:25.922|0 |
# +----+------------------------------------+----------+---------+------+-----------------------+-------------+

parsed_df = df.select(avro.parse_column("value"))
parsed_df.show()
# +-----+----+
# | name| age|
# +-----+----+
# |Alice| 20|
# | Bob| 25|
# +-----+----+

``DBWriter``
~~~~~~~~~~~~

To serialize structured data into Avro format and write it back to a Kafka topic, use the :obj:`Avro.serialize_column <onetl.file.format.avro.Avro.serialize_column>` method.

.. code-block:: python

from onetl.db import DBWriter
from onetl.file.format import Avro
from onetl.connection import Kafka

kafka = Kafka(...)
avro = Avro(
schema_dict={
"type": "record",
"name": "Person",
"fields": [{"name": "name", "type": "string"}, {"name": "age", "type": "int"}],
}
)

df.select("value").show()
# +-----------+
# |value |
# +-----------+
# |{Alice, 20}|
# |{Bob, 25} |
# +-----------+

# serializing data into Avro format
serialized_df = df.select(avro.serialize_column("value"))

serialized_df.show()
# +---+------------------------------------+
# |key|value |
# +---+------------------------------------+
# | 1|[02 02 02 08 76 6... (binary data)] |
# | 2|[02 04 02 08 76 6... (binary data)] |
# +---+------------------------------------+
2 changes: 1 addition & 1 deletion docs/file_df/file_formats/avro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ Avro
.. currentmodule:: onetl.file.format.avro

.. autoclass:: Avro
:members: get_packages
:members: get_packages, parse_column, serialize_column
179 changes: 178 additions & 1 deletion onetl/file/format/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from onetl.hooks import slot, support_hooks

if TYPE_CHECKING:
from pyspark.sql import DataFrameReader, DataFrameWriter, SparkSession
from pyspark.sql import Column, DataFrameReader, DataFrameWriter, SparkSession


PROHIBITED_OPTIONS = frozenset(
(
Expand Down Expand Up @@ -189,8 +190,184 @@ def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
options["avroSchema"] = json.dumps(self.schema_dict)
return writer.format(self.name).options(**options)

def parse_column(self, column: str | Column) -> Column:
"""
Parses an Avro binary column into a structured Spark SQL column using Spark's
`from_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.from_avro.html>`_ function, based on the schema provided within the class.

.. note::

Can be used only with Spark 3.x+

.. warning::

If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL. It should be installed manually, like this:

.. code:: bash

pip install requests

Parameters
----------
column : str | Column
The name of the column or the Column object containing Avro binary data to parse.

Returns
-------
Column
A new Column object with data parsed from Avro binary to the specified structured format.

Raises
------
ValueError
If the Spark version is less than 3.x or if neither schema_dict nor schema_url is defined.
ImportError
If ``schema_url`` is used and the ``requests`` library is not installed.


Examples
--------
.. code:: python

from pyspark.sql import SparkSession

from onetl.file.format import Avro

spark = SparkSession.builder.appName("AvroParsingExample").getOrCreate()
schema_dict = {
"type": "record",
"name": "Person",
"fields": [{"name": "name", "type": "string"}],
}
avro = Avro(schema_dict=schema_dict)
df = spark.createDataFrame([("bytes_data_here",)], ["avro_data"])

parsed_df = df.select(avro.parse_column("avro_data"))
parsed_df.show()

"""
from pyspark.sql import Column, SparkSession # noqa: WPS442
from pyspark.sql.functions import col

spark = SparkSession._instantiatedSession # noqa: WPS437
self.check_if_supported(spark)
self._check_spark_version_for_serialization(spark)

from pyspark.sql.avro.functions import from_avro

if isinstance(column, Column):
column_name = column._jc.toString() # noqa: WPS437
else:
column_name, column = column, col(column).cast("binary")

schema = self._get_schema_json()
if not schema:
raise ValueError("Avro.parse_column can be used only with defined `schema_dict` or `schema_url`")

return from_avro(column, schema).alias(column_name)

def serialize_column(self, column: str | Column) -> Column:
"""
Serializes a structured Spark SQL column into an Avro binary column using Spark's
`to_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.to_avro.html#pyspark.sql.avro.functions.to_avro>`_ function.

.. note::

Can be used only with Spark 3.x+

.. warning::

If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL. It should be installed manually, like this:

.. code:: bash

pip install requests

Parameters
----------
column : str | Column
The name of the column or the Column object containing the data to serialize to Avro format.

Returns
-------
Column
A new Column object with data serialized from Spark SQL structures to Avro binary.

Raises
------
ValueError
If the Spark version is less than 3.x.
ImportError
If ``schema_url`` is used and the ``requests`` library is not installed.

Examples
--------
.. code:: python

from pyspark.sql import SparkSession

from onetl.file.format import Avro

spark = SparkSession.builder.appName("AvroSerializationExample").getOrCreate()
schema_dict = {
"type": "record",
"name": "Person",
"fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}],
}

avro = Avro(schema_dict=schema_dict)
df = spark.createDataFrame([(1, "John Doe"), (2, "Jane Doe")], ["id", "name"])

serialized_df = df.select(avro.serialize_column("name"))
serialized_df.show()

"""
from pyspark.sql import Column, SparkSession # noqa: WPS442
from pyspark.sql.functions import col

spark = SparkSession._instantiatedSession # noqa: WPS437
self.check_if_supported(spark)
self._check_spark_version_for_serialization(spark)

from pyspark.sql.avro.functions import to_avro

if isinstance(column, Column):
column_name = column._jc.toString() # noqa: WPS437
else:
column_name, column = column, col(column)

schema = self._get_schema_json()
return to_avro(column, schema).alias(column_name)

@validator("schema_dict", pre=True)
def _parse_schema_from_json(cls, value):
if isinstance(value, (str, bytes)):
return json.loads(value)
return value

def _check_spark_version_for_serialization(self, spark: SparkSession):
spark_version = get_spark_version(spark)
if spark_version.major < 3:
class_name = self.__class__.__name__
error_msg = (
f"`{class_name}.parse_column` or `{class_name}.serialize_column` are available "
f"only since Spark 3.x, but got {spark_version}."
)
raise ValueError(error_msg)

def _get_schema_json(self) -> str:
if self.schema_dict:
return json.dumps(self.schema_dict)
elif self.schema_url:
try:
import requests

response = requests.get(self.schema_url) # noqa: S113
return response.text
except ImportError as e:
raise ImportError(
"The 'requests' library is required to use 'schema_url' but is not installed. "
"Install it with 'pip install requests' or avoid using 'schema_url'.",
) from e
else:
return ""
2 changes: 2 additions & 0 deletions requirements/tests/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ pytest<8
pytest-lazy-fixture
pytest-mock
pytest-rerunfailures
requests
responses
Loading
Loading