Skip to content

Commit

Permalink
[DOP-13846] - update test for xml parse column
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed May 3, 2024
1 parent e32b7e7 commit dc73f99
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 54 deletions.
6 changes: 3 additions & 3 deletions docs/connection/db_connection/kafka/format_handling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ To process XML formatted data from Kafka, use the :obj:`XML.parse_column <onetl.
df.show()
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
# |key |value |topic |partition|offset|timestamp |timestampType|
# +----+-------------------------------------------------+----------+---------+------+-----------------------+-----------+------+-----------------------+-------------+
# |[31]|"<?xml version=\"1.0\" encoding=\"UTF-8\"?><person><name>Alice</name><age>20</age></person>"|topicXML |0 |0 |2024-04-24 13:02:25.911|0 |
# |[32]|"<?xml version=\"1.0\" encoding=\"UTF-8\"?><person><name>Bob</name><age>25</age></person>" |topicXML |0 |1 |2024-04-24 13:02:25.922|0 |
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
# |[31]|"<person><name>Alice</name><age>20</age></person>" |topicXML |0 |0 |2024-04-24 13:02:25.911|0 |
# |[32]|"<person><name>Bob</name><age>25</age></person>" |topicXML |0 |1 |2024-04-24 13:02:25.922|0 |
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
xml_schema = StructType(
Expand Down
2 changes: 1 addition & 1 deletion onetl/file/format/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def parse_column(self, column: str | Column, schema: StructType) -> Column:
.. note::
This method parses each XML string in the DataFrame individually; therefore, each string must contain exactly one occurrence of the ``rowTag`` without any surrounding root tags. If your XML data includes a root tag that encapsulates the row tags, you must preprocess the XML to remove or ignore this root tag before parsing.
This method parses each DataFrame row individually; therefore for specific column each row must contain exactly one occurrence of the ``rowTag`` without any surrounding root tags. If your XML data includes a root tag that encapsulates the row tags, you must preprocess the XML to remove or ignore this root tag before parsing.
.. code-block:: xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
Do not test all the possible options and combinations, we are not testing Spark here.
"""

import datetime

import pytest

from onetl._util.spark import get_spark_version
from onetl.file import FileDFReader, FileDFWriter
from onetl.file.format import XML

try:
from pyspark.sql import Row
from pyspark.sql.functions import col

from tests.util.assert_df import assert_equal_df
Expand Down Expand Up @@ -170,58 +173,39 @@ def test_xml_reader_with_attributes(
assert_equal_df(read_df, expected_xml_attributes_df, order_by="id")


@pytest.mark.parametrize(
"xml_input, expected_row",
[
(
"""<item>
<id>1</id>
<str_value>Alice</str_value>
<int_value>123</int_value>
<date_value>2021-01-01</date_value>
<datetime_value>2021-01-01T07:01:01Z</datetime_value>
<float_value>1.23</float_value>
</item>""",
Row(
xml_string=Row(
id=1,
str_value="Alice",
int_value=123,
date_value=datetime.date(2021, 1, 1),
datetime_value=datetime.datetime(2021, 1, 1, 7, 1, 1),
float_value=1.23,
),
),
),
],
ids=["basic-case"],
)
@pytest.mark.parametrize("column_type", [str, col])
def test_xml_parse_column(
spark,
local_fs_file_df_connection_with_path_and_files,
expected_xml_attributes_df,
file_df_dataframe,
file_df_schema,
column_type,
):
from pyspark.sql.types import StringType

def test_xml_parse_column(spark, xml_input: str, expected_row: Row, column_type, file_df_schema):
from onetl.file.format import XML

spark_version = get_spark_version(spark)
if spark_version.major < 3:
pytest.skip("XML files are supported on Spark 3.x only")

def to_xml(row):
# convert datetime to UTC
import pytz

utc_datetime = row.datetime_value.astimezone(pytz.utc)
utc_datetime_str = utc_datetime.isoformat()

return f"""<item>
<id>{row.id}</id>
<str_value>{row.str_value}</str_value>
<int_value>{row.int_value}</int_value>
<date_value>{row.date_value}</date_value>
<datetime_value>{utc_datetime_str}</datetime_value>
<float_value>{row.float_value}</float_value>
</item>"""

xml_rdd = spark.sparkContext.parallelize(expected_xml_attributes_df.rdd.map(to_xml).collect())
df = spark.createDataFrame(xml_rdd, StringType()).toDF("xml_string")

xml = XML(row_tag="item")
df = spark.createDataFrame([(xml_input,)], ["xml_string"])
parsed_df = df.select(xml.parse_column(column_type("xml_string"), schema=file_df_schema))
transformed_df = parsed_df.select(
"xml_string.id",
"xml_string.str_value",
"xml_string.int_value",
"xml_string.date_value",
"xml_string.datetime_value",
"xml_string.float_value",
)
expected_df_selected = expected_xml_attributes_df.select(
"id",
"str_value",
"int_value",
"date_value",
"datetime_value",
"float_value",
)
assert_equal_df(transformed_df, expected_df_selected)
result_row = parsed_df.first()

assert result_row == expected_row

0 comments on commit dc73f99

Please sign in to comment.