From 25e66b2a89d59a7bed515ea6f794923242ab4f0f Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Wed, 25 Oct 2023 14:41:23 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Added=20validate=5Fdf=5Fdict=20para?= =?UTF-8?q?m=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flows/test_bigquery_to_adls.py | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index 1a867b58c..f986d0791 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,11 +1,16 @@ import os import pendulum -from prefect.tasks.secrets import PrefectSecret +import pytest +from unittest import mock +import pandas as pd +from prefect.tasks.secrets import PrefectSecret from viadot.flows import BigQueryToADLS from viadot.tasks import AzureDataLakeRemove +from viadot.exceptions import ValidationError + ADLS_DIR_PATH = "raw/tests/" ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" BIGQ_CREDENTIAL_KEY = "BIGQUERY-TESTS" @@ -72,6 +77,69 @@ def test_bigquery_to_adls_false(): assert result.is_failed() os.remove("test_bigquery_to_adls_overwrite_false.parquet") os.remove("test_bigquery_to_adls_overwrite_false.json") + + +DATA = { + "type": ["banner", "banner"], + "country": ["PL", "DE"], +} + + +@mock.patch( + "viadot.tasks.BigQueryToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_bigquery_to_adls_validate_df_fail(mocked_data): + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS validate df fail", + dataset_name="official_empty", + table_name="space", + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + validation_df_dict={"column_list_to_match": ["type", "country", "test"]}, + ) + + try: + result = flow_bigquery.run() + except ValidationError: + pass + + os.remove("test_bigquery_to_adls_validate_df_fail.parquet") + os.remove("test_bigquery_to_adls_validate_df_fail.json") + + +@mock.patch( + "viadot.tasks.BigQueryToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_bigquery_to_adls_validate_df_success(mocked_data): + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS validate df success", + dataset_name="official_empty", + table_name="space", + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + validation_df_dict={"column_list_to_match": ["type", "country"]}, + ) + result = flow_bigquery.run() + + result = flow_bigquery.run() + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) + + os.remove("test_bigquery_to_adls_validate_df_success.parquet") + os.remove("test_bigquery_to_adls_validate_df_success.json") + rm = AzureDataLakeRemove( path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" )