Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added df_validation to HubspotToADLS #778

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions tests/integration/flows/test_hubspot_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from viadot.flows import HubspotToADLS
from viadot.exceptions import ValidationError

DATA = {
"id": {"0": "820306930"},
Expand Down Expand Up @@ -60,3 +61,81 @@ def test_hubspot_to_adls_flow_run(mocked_class):
assert result.is_successful()
os.remove("test_hubspot_to_adls_flow_run.parquet")
os.remove("test_hubspot_to_adls_flow_run.json")


@mock.patch(
"viadot.tasks.HubspotToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_hubspot_to_adls_flow_run_validate_fail(mocked_class):
flow = HubspotToADLS(
"test_hubspot_to_adls_flow_run",
hubspot_credentials_key="HUBSPOT",
endpoint="line_items",
filters=[
{
"filters": [
{
"propertyName": "createdate",
"operator": "BETWEEN",
"highValue": "2021-01-01",
"value": "2021-01-01",
},
{"propertyName": "quantity", "operator": "EQ", "value": "2"},
]
},
{
"filters": [
{"propertyName": "amount", "operator": "EQ", "value": "3744.000"}
]
},
],
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={"column_size": {"id": 0}},
)
try:
flow.run()
except ValidationError:
pass


@mock.patch(
"viadot.tasks.HubspotToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_hubspot_to_adls_flow_run_validate_success(mocked_class):
flow = HubspotToADLS(
"test_hubspot_to_adls_flow_run",
hubspot_credentials_key="HUBSPOT",
endpoint="line_items",
filters=[
{
"filters": [
{
"propertyName": "createdate",
"operator": "BETWEEN",
"highValue": "2021-01-01",
"value": "2021-01-01",
},
{"propertyName": "quantity", "operator": "EQ", "value": "2"},
]
},
{
"filters": [
{"propertyName": "amount", "operator": "EQ", "value": "3744.000"}
]
},
],
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={"column_unique_values": ["id"]},
)
result = flow.run()
assert result.is_successful()
os.remove("test_hubspot_to_adls_flow_run.parquet")
os.remove("test_hubspot_to_adls_flow_run.json")
11 changes: 11 additions & 0 deletions viadot/flows/hubspot_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
df_to_parquet,
dtypes_to_json_task,
update_dtypes_dict,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload, HubspotToDF

Expand All @@ -33,6 +34,7 @@ def __init__(
adls_dir_path: str = None,
if_exists: Literal["replace", "append", "delete"] = "replace",
overwrite_adls: bool = True,
validate_df_dict: Dict[str, Any] = None,
vault_name: str = None,
sp_credentials_secret: str = None,
*args: List[any],
Expand Down Expand Up @@ -75,6 +77,8 @@ def __init__(
output_file_extension (str, optional): Output file extension. Defaults to ".parquet".
if_exists (Literal, optional): What to do if the table exists. Defaults to "replace".
overwrite_adls (str, optional): Whether to overwrite the destination file in ADLS. Defaults to True.
validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
"""
Expand All @@ -86,6 +90,7 @@ def __init__(
self.hubspot_credentials = hubspot_credentials
self.hubspot_credentials_key = hubspot_credentials_key
self.output_file_extension = output_file_extension
self.validate_df_dict = validate_df_dict

self.local_file_path = (
local_file_path or self.slugify(name) + self.output_file_extension
Expand Down Expand Up @@ -137,6 +142,12 @@ def gen_flow(self) -> Flow:
df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self)

if self.validate_df_dict:
validation_task = validate_df.bind(
df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df, flow=self)

df_to_be_loaded = df_map_mixed_dtypes_for_parquet(
df_viadot_downloaded, dtypes_dict, flow=self
)
Expand Down
Loading