From bd492c65573f1544c9721968951a0672c6614d1f Mon Sep 17 00:00:00 2001 From: judynah Date: Wed, 25 Oct 2023 15:24:06 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20changed=20len?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/integration/flows/test_customer_gauge_to_adls.py | 6 +++--- viadot/flows/customer_gauge_to_adls.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index 155bbca51..f077ee875 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -39,7 +39,7 @@ def test_customer_gauge_to_adls_run_flow(mocked_class): ) result = flow.run() assert result.is_successful() - assert len(flow.tasks) == 5 + assert len(flow.tasks) == 10 os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") @@ -58,11 +58,11 @@ def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class): adls_dir_path=ADLS_DIR_PATH, adls_file_name=ADLS_FILE_NAME, overwrite_adls=True, - validate_df_dict={"column_size":{"user_name":2}}, + validate_df_dict={"column_size": {"user_address_state": 2}}, ) result = flow.run() assert result.is_successful() - assert len(flow.tasks) == 6 + assert len(flow.tasks) == 11 os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 22cd00806..8fb8556c9 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -177,9 +177,9 @@ def gen_flow(self) -> Flow: if self.validate_df_dict: validation_task = validate_df.bind( - df=customerg_df, tests=self.validate_df_dict, flow=self + customerg_df, tests=self.validate_df_dict, flow=self ) - validation_task.set_upstream(df=customerg_df, flow=self) + validation_task.set_upstream(customerg_df, flow=self) if self.anonymize == True: anonymized_df = anonymize_df.bind( @@ -191,6 +191,12 @@ def gen_flow(self) -> Flow: days=self.days, flow=self, ) + + if self.validate_df_dict: + validation_task = validate_df.bind( + anonymized_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(anonymized_df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind( anonymized_df, flow=self