diff --git a/tests/integration/flows/test_hubspot_to_adls.py b/tests/integration/flows/test_hubspot_to_adls.py index a67d00aca..d960fc079 100644 --- a/tests/integration/flows/test_hubspot_to_adls.py +++ b/tests/integration/flows/test_hubspot_to_adls.py @@ -6,6 +6,7 @@ import pytest from viadot.flows import HubspotToADLS +from viadot.exceptions import ValidationError DATA = { "id": {"0": "820306930"}, @@ -60,3 +61,81 @@ def test_hubspot_to_adls_flow_run(mocked_class): assert result.is_successful() os.remove("test_hubspot_to_adls_flow_run.parquet") os.remove("test_hubspot_to_adls_flow_run.json") + + +@mock.patch( + "viadot.tasks.HubspotToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_hubspot_to_adls_flow_run_validate_fail(mocked_class): + flow = HubspotToADLS( + "test_hubspot_to_adls_flow_run", + hubspot_credentials_key="HUBSPOT", + endpoint="line_items", + filters=[ + { + "filters": [ + { + "propertyName": "createdate", + "operator": "BETWEEN", + "highValue": "2021-01-01", + "value": "2021-01-01", + }, + {"propertyName": "quantity", "operator": "EQ", "value": "2"}, + ] + }, + { + "filters": [ + {"propertyName": "amount", "operator": "EQ", "value": "3744.000"} + ] + }, + ], + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_size": {"id": 0}}, + ) + try: + flow.run() + except ValidationError: + pass + + +@mock.patch( + "viadot.tasks.HubspotToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_hubspot_to_adls_flow_run_validate_success(mocked_class): + flow = HubspotToADLS( + "test_hubspot_to_adls_flow_run", + hubspot_credentials_key="HUBSPOT", + endpoint="line_items", + filters=[ + { + "filters": [ + { + "propertyName": "createdate", + "operator": "BETWEEN", + "highValue": "2021-01-01", + "value": "2021-01-01", + }, + {"propertyName": "quantity", "operator": "EQ", "value": "2"}, + ] + }, + { + "filters": [ + {"propertyName": "amount", "operator": "EQ", "value": "3744.000"} + ] + }, + ], + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_unique_values": ["id"]}, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_hubspot_to_adls_flow_run.parquet") + os.remove("test_hubspot_to_adls_flow_run.json") diff --git a/viadot/flows/hubspot_to_adls.py b/viadot/flows/hubspot_to_adls.py index 8f79d5d1b..7a2fe4387 100644 --- a/viadot/flows/hubspot_to_adls.py +++ b/viadot/flows/hubspot_to_adls.py @@ -13,6 +13,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, HubspotToDF @@ -33,6 +34,7 @@ def __init__( adls_dir_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite_adls: bool = True, + validate_df_dict: Dict[str, Any] = None, vault_name: str = None, sp_credentials_secret: str = None, *args: List[any], @@ -75,6 +77,8 @@ def __init__( output_file_extension (str, optional): Output file extension. Defaults to ".parquet". if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite_adls (str, optional): Whether to overwrite the destination file in ADLS. Defaults to True. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. """ @@ -86,6 +90,7 @@ def __init__( self.hubspot_credentials = hubspot_credentials self.hubspot_credentials_key = hubspot_credentials_key self.output_file_extension = output_file_extension + self.validate_df_dict = validate_df_dict self.local_file_path = ( local_file_path or self.slugify(name) + self.output_file_extension @@ -137,6 +142,12 @@ def gen_flow(self) -> Flow: df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( df_viadot_downloaded, dtypes_dict, flow=self )