Skip to content

Commit

Permalink
🎨 changed len
Browse files Browse the repository at this point in the history
  • Loading branch information
judynah committed Oct 25, 2023
1 parent 92d2a4b commit bd492c6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
6 changes: 3 additions & 3 deletions tests/integration/flows/test_customer_gauge_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_customer_gauge_to_adls_run_flow(mocked_class):
)
result = flow.run()
assert result.is_successful()
assert len(flow.tasks) == 5
assert len(flow.tasks) == 10
os.remove("test_customer_gauge_to_adls_flow_run.parquet")
os.remove("test_customer_gauge_to_adls_flow_run.json")

Expand All @@ -58,11 +58,11 @@ def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class):
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
validate_df_dict={"column_size":{"user_name":2}},
validate_df_dict={"column_size": {"user_address_state": 2}},
)
result = flow.run()
assert result.is_successful()
assert len(flow.tasks) == 6
assert len(flow.tasks) == 11

os.remove("test_customer_gauge_to_adls_flow_run.parquet")
os.remove("test_customer_gauge_to_adls_flow_run.json")
Expand Down
10 changes: 8 additions & 2 deletions viadot/flows/customer_gauge_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ def gen_flow(self) -> Flow:

if self.validate_df_dict:
validation_task = validate_df.bind(
df=customerg_df, tests=self.validate_df_dict, flow=self
customerg_df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df=customerg_df, flow=self)
validation_task.set_upstream(customerg_df, flow=self)

if self.anonymize == True:
anonymized_df = anonymize_df.bind(
Expand All @@ -191,6 +191,12 @@ def gen_flow(self) -> Flow:
days=self.days,
flow=self,
)

if self.validate_df_dict:
validation_task = validate_df.bind(
anonymized_df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(anonymized_df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(
anonymized_df, flow=self
Expand Down

0 comments on commit bd492c6

Please sign in to comment.