Skip to content

Commit

Permalink
[DOP-13845] - add avro tests with responses
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Apr 27, 2024
1 parent 5f961c1 commit 5c097a7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements/tests/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pytest<8
pytest-lazy-fixture
pytest-mock
pytest-rerunfailures
responses
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextlib

import pytest
import responses

from onetl._util.spark import get_spark_version
from onetl._util.version import Version
Expand Down Expand Up @@ -159,3 +160,54 @@ def test_avro_serialize_and_parse_column(
assert isinstance(serialized_df.schema["combined"].dataType, BinaryType)
parsed_df = serialized_df.select(avro.parse_column(column_type("combined")))
assert combined_df.select("combined").collect() == parsed_df.collect()


@pytest.mark.parametrize("column_type", [str, col])
def test_avro_serialize_and_parse_no_schema(
spark,
local_fs_file_df_connection_with_path,
file_df_dataframe,
column_type,
):
from pyspark.sql.functions import struct
from pyspark.sql.types import BinaryType

df = file_df_dataframe
avro = Avro()

combined_df = df.withColumn("combined", struct([col(c) for c in df.columns]))
serialized_df = combined_df.select(avro.serialize_column(column_type("combined")))
assert isinstance(serialized_df.schema["combined"].dataType, BinaryType)

with pytest.raises(
ValueError,
match="Avro.parse_column can be used only with defined `schema_dict` or `schema_url`",
):
serialized_df.select(avro.parse_column(column_type("combined")))


@pytest.mark.parametrize("column_type", [str, col])
@responses.activate
def test_avro_serialize_and_parse_with_schema_url(
spark,
local_fs_file_df_connection_with_path,
file_df_dataframe,
column_type,
avro_schema,
):
from pyspark.sql.functions import struct
from pyspark.sql.types import BinaryType

# mocking the request to return a JSON schema
schema_url = "http://example.com/avro_schema"
responses.add(responses.GET, schema_url, json=avro_schema, status=200)

df = file_df_dataframe
avro = Avro(schema_url=schema_url)

combined_df = df.withColumn("combined", struct([col(c) for c in df.columns]))

serialized_df = combined_df.select(avro.serialize_column(column_type("combined")))
assert isinstance(serialized_df.schema["combined"].dataType, BinaryType)
parsed_df = serialized_df.select(avro.parse_column(column_type("combined")))
assert combined_df.select("combined").collect() == parsed_df.collect()

0 comments on commit 5c097a7

Please sign in to comment.