From 92d2a4be70bb20cca3448cfbcfcaa7cf2fe68633 Mon Sep 17 00:00:00 2001 From: judynah Date: Wed, 25 Oct 2023 13:49:22 +0200 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=A8=20added=20validate=5Fdf=20and=20t?= =?UTF-8?q?ests=20to=20customer=5Fgauge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flows/test_customer_gauge_to_adls.py | 49 +++++++++++++++++++ viadot/flows/customer_gauge_to_adls.py | 12 +++++ 2 files changed, 61 insertions(+) diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index e6cdf1545..155bbca51 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -5,6 +5,7 @@ import pytest from viadot.flows import CustomerGaugeToADLS +from viadot.exceptions import ValidationError DATA = { "user_name": ["Jane", "Bob"], @@ -38,5 +39,53 @@ def test_customer_gauge_to_adls_run_flow(mocked_class): ) result = flow.run() assert result.is_successful() + assert len(flow.tasks) == 5 os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") + +@mock.patch( + "viadot.tasks.CustomerGaugeToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class): + flow = CustomerGaugeToADLS( + "test_customer_gauge_to_adls_flow_run", + endpoint="responses", + total_load=False, + anonymize=True, + columns_to_anonymize=COLUMNS, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + validate_df_dict={"column_size":{"user_name":2}}, + ) + result = flow.run() + assert result.is_successful() + assert len(flow.tasks) == 6 + + os.remove("test_customer_gauge_to_adls_flow_run.parquet") + os.remove("test_customer_gauge_to_adls_flow_run.json") + +@mock.patch( + "viadot.tasks.CustomerGaugeToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_customer_gauge_to_adls_run_flow_validation_failure(mocked_class): + flow = CustomerGaugeToADLS( + "test_customer_gauge_to_adls_flow_run", + endpoint="responses", + total_load=False, + anonymize=True, + columns_to_anonymize=COLUMNS, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + validate_df_dict={"column_size":{"user_name":5}}, + ) + try: + flow.run() + except ValidationError: + pass + diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 8053aeda3..22cd00806 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -17,6 +17,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, CustomerGaugeToDF @@ -52,6 +53,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any] @@ -91,6 +93,7 @@ def __init__( ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. if_exists (str, optional): What to do if the file exists. Defaults to "replace". timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -105,6 +108,9 @@ def __init__( self.end_date = end_date self.customer_gauge_credentials_secret = customer_gauge_credentials_secret + #validate_df + self.validate_df_dict = validate_df_dict + # anonymize_df self.anonymize = anonymize self.columns_to_anonymize = columns_to_anonymize @@ -169,6 +175,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df=customerg_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df=customerg_df, flow=self) + if self.anonymize == True: anonymized_df = anonymize_df.bind( customerg_df, From bd492c65573f1544c9721968951a0672c6614d1f Mon Sep 17 00:00:00 2001 From: judynah Date: Wed, 25 Oct 2023 15:24:06 +0200 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=8E=A8=20changed=20len?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/integration/flows/test_customer_gauge_to_adls.py | 6 +++--- viadot/flows/customer_gauge_to_adls.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index 155bbca51..f077ee875 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -39,7 +39,7 @@ def test_customer_gauge_to_adls_run_flow(mocked_class): ) result = flow.run() assert result.is_successful() - assert len(flow.tasks) == 5 + assert len(flow.tasks) == 10 os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") @@ -58,11 +58,11 @@ def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class): adls_dir_path=ADLS_DIR_PATH, adls_file_name=ADLS_FILE_NAME, overwrite_adls=True, - validate_df_dict={"column_size":{"user_name":2}}, + validate_df_dict={"column_size": {"user_address_state": 2}}, ) result = flow.run() assert result.is_successful() - assert len(flow.tasks) == 6 + assert len(flow.tasks) == 11 os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 22cd00806..8fb8556c9 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -177,9 +177,9 @@ def gen_flow(self) -> Flow: if self.validate_df_dict: validation_task = validate_df.bind( - df=customerg_df, tests=self.validate_df_dict, flow=self + customerg_df, tests=self.validate_df_dict, flow=self ) - validation_task.set_upstream(df=customerg_df, flow=self) + validation_task.set_upstream(customerg_df, flow=self) if self.anonymize == True: anonymized_df = anonymize_df.bind( @@ -191,6 +191,12 @@ def gen_flow(self) -> Flow: days=self.days, flow=self, ) + + if self.validate_df_dict: + validation_task = validate_df.bind( + anonymized_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(anonymized_df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind( anonymized_df, flow=self From d144bd665afb23857f1d05d82e41f03b3ceb110b Mon Sep 17 00:00:00 2001 From: judynah Date: Wed, 25 Oct 2023 16:04:58 +0200 Subject: [PATCH 3/4] :bug: removed if statement --- tests/integration/flows/test_customer_gauge_to_adls.py | 3 +++ viadot/flows/customer_gauge_to_adls.py | 6 ------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index f077ee875..cbb79dd43 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -88,4 +88,7 @@ def test_customer_gauge_to_adls_run_flow_validation_failure(mocked_class): flow.run() except ValidationError: pass + + os.remove("test_customer_gauge_to_adls_flow_run.parquet") + os.remove("test_customer_gauge_to_adls_flow_run.json") diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 8fb8556c9..13eb5d85d 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -191,12 +191,6 @@ def gen_flow(self) -> Flow: days=self.days, flow=self, ) - - if self.validate_df_dict: - validation_task = validate_df.bind( - anonymized_df, tests=self.validate_df_dict, flow=self - ) - validation_task.set_upstream(anonymized_df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind( anonymized_df, flow=self From a8c331012f5b7f0be99236e26b4fbdb5e9d546d6 Mon Sep 17 00:00:00 2001 From: judynah Date: Wed, 25 Oct 2023 16:28:13 +0200 Subject: [PATCH 4/4] :art: removed empty line --- viadot/flows/customer_gauge_to_adls.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 13eb5d85d..a681ee04e 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -53,7 +53,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", - validate_df_dict: dict = None, + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any] @@ -108,7 +108,7 @@ def __init__( self.end_date = end_date self.customer_gauge_credentials_secret = customer_gauge_credentials_secret - #validate_df + # validate_df self.validate_df_dict = validate_df_dict # anonymize_df