Skip to content

Commit

Permalink
✨ Added validate_df_dict param tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jkrobicki committed Oct 25, 2023
1 parent ff50849 commit 25e66b2
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion tests/integration/flows/test_bigquery_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os

import pendulum
from prefect.tasks.secrets import PrefectSecret
import pytest
from unittest import mock
import pandas as pd

from prefect.tasks.secrets import PrefectSecret
from viadot.flows import BigQueryToADLS
from viadot.tasks import AzureDataLakeRemove

from viadot.exceptions import ValidationError

ADLS_DIR_PATH = "raw/tests/"
ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet"
BIGQ_CREDENTIAL_KEY = "BIGQUERY-TESTS"
Expand Down Expand Up @@ -72,6 +77,69 @@ def test_bigquery_to_adls_false():
assert result.is_failed()
os.remove("test_bigquery_to_adls_overwrite_false.parquet")
os.remove("test_bigquery_to_adls_overwrite_false.json")


DATA = {
"type": ["banner", "banner"],
"country": ["PL", "DE"],
}


@mock.patch(
"viadot.tasks.BigQueryToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_bigquery_to_adls_validate_df_fail(mocked_data):
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS validate df fail",
dataset_name="official_empty",
table_name="space",
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
validation_df_dict={"column_list_to_match": ["type", "country", "test"]},
)

try:
result = flow_bigquery.run()
except ValidationError:
pass

os.remove("test_bigquery_to_adls_validate_df_fail.parquet")
os.remove("test_bigquery_to_adls_validate_df_fail.json")


@mock.patch(
"viadot.tasks.BigQueryToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_bigquery_to_adls_validate_df_success(mocked_data):
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS validate df success",
dataset_name="official_empty",
table_name="space",
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
validation_df_dict={"column_list_to_match": ["type", "country"]},
)
result = flow_bigquery.run()

result = flow_bigquery.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

os.remove("test_bigquery_to_adls_validate_df_success.parquet")
os.remove("test_bigquery_to_adls_validate_df_success.json")

rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
Expand Down

0 comments on commit 25e66b2

Please sign in to comment.