Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add VidClub source and Prefect tasks #1044

Merged
merged 21 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/viadot/orchestration/prefect/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Download data from Vid CLub API and load it into Azure Data Lake Storage."""

from typing import Any, Dict, List, Literal
from typing import Any, Literal

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df


Expand All @@ -13,32 +15,31 @@
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def vid_club_to_adls(
*args: List[Any],
endpoint: Literal["jobs", "product", "company", "survey"] = None,
def vid_club_to_adls( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str = None,
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: List[str] = None,
cols_to_drop: list[str] | None = None,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
validate_df_dict: dict = None,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: Dict[str, Any]
**kwargs: dict[str, Any],
) -> None:
"""
Flow for downloading data from the Vid Club via API to a CSV or Parquet file.
"""Flow for downloading data from the Vid Club via API to a CSV or Parquet file.

Then upload it to Azure Data Lake.

Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional): The
endpoint (Literal["jobs", "product", "company", "survey"], optional): The
endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
Expand Down Expand Up @@ -85,7 +86,7 @@ def vid_club_to_adls(
azure_key_vault_secret=azure_key_vault_secret,
validate_df_dict=validate_df_dict,
timeout=timeout,
kawrgs=kwargs
kawrgs=kwargs,
)

return df_to_adls(
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .sftp import sftp_list, sftp_to_df
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
from .vid_club import vid_club_to_df


Expand Down Expand Up @@ -63,4 +64,5 @@
"sql_server_query",
"sql_server_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
35 changes: 18 additions & 17 deletions src/viadot/orchestration/prefect/tasks/vid_club.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
"""Task for downloading data from Vid Club Cloud API."""

from typing import Any, Dict, List, Literal
from typing import Any, Literal

import pandas as pd
from prefect import task
from viadot.sources import VidClub
from viadot.orchestration.prefect.utils import get_credentials

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import VidClub


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60)
def vid_club_to_df(
*args: List[Any],
endpoint: Literal["jobs", "product", "company", "survey"] = None,
def vid_club_to_df( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str = None,
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: List[str] = None,
cols_to_drop: list[str] | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
validate_df_dict: dict = None,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: Dict[str, Any],
**kwargs: dict[str, Any],
) -> pd.DataFrame:
"""
Task to downloading data from Vid Club APIs to Pandas DataFrame.
"""Task to downloading data from Vid Club APIs to Pandas DataFrame.

Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint
source to be accessed. Defaults to None.
endpoint (Literal["jobs", "product", "company", "survey"], optional):
The endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
Expand Down Expand Up @@ -72,7 +73,7 @@ def vid_club_to_df(
vid_club_credentials=credentials,
validate_df_dict=validate_df_dict,
timeout=timeout,
kwargs=kwargs
)
kwargs=kwargs,
)

return vc_obj.to_df()
5 changes: 2 additions & 3 deletions src/viadot/sources/vid_club.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def __init__(

Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional): The
endpoint source to be accessed. Defaults to None.
endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the
oldest date in the data 2022-03-22.
oldest date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
Expand Down Expand Up @@ -324,7 +324,6 @@ def get_response(

def to_df(
self,
if_empty: str = "warn",
trymzet marked this conversation as resolved.
Show resolved Hide resolved
) -> pd.DataFrame:
"""Looping get_response and iterating by date ranges defined in intervals.

Expand Down
13 changes: 8 additions & 5 deletions tests/integration/orchestration/prefect/flows/test_vid_club.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from src.viadot.orchestration.prefect.flows import vid_club_to_adls
from src.viadot.sources import AzureDataLake


TEST_FILE_PATH = "test/path/to/adls.parquet"
TEST_SOURCE = "jobs"
TEST_SOURCE = "jobs"
TEST_FROM_DATE = "2023-01-01"
TEST_TO_DATE = "2023-12-31"
ADLS_CREDENTIALS_SECRET = "test_adls_secret"
VIDCLUB_CREDENTIALS_SECRET = "test_vidclub_secret"
ADLS_CREDENTIALS_SECRET = "test_adls_secret" # pragma: allowlist secret # noqa: S105
VIDCLUB_CREDENTIALS_SECRET = (
"test_vidclub_secret" # pragma: allowlist secret # noqa: S105
)


def test_vid_club_to_adls():
lake = AzureDataLake(config_key="adls_test")

assert not lake.exists(TEST_FILE_PATH)

vid_club_to_adls(
Expand All @@ -20,7 +23,7 @@ def test_vid_club_to_adls():
to_date=TEST_TO_DATE,
adls_path=TEST_FILE_PATH,
adls_azure_key_vault_secret=ADLS_CREDENTIALS_SECRET,
vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET
vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET,
)

assert lake.exists(TEST_FILE_PATH)
Expand Down
25 changes: 13 additions & 12 deletions tests/integration/orchestration/prefect/tasks/test_vid_club.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
import pytest
import pandas as pd
from src.viadot.orchestration.prefect.tasks import vid_club_to_df
import pytest

from src.viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from src.viadot.orchestration.prefect.tasks import vid_club_to_df


EXPECTED_DF = pd.DataFrame({
"id": [1, 2],
"name": ["Company A", "Company B"],
"region": ["pl", "ro"]
})
EXPECTED_DF = pd.DataFrame(
{"id": [1, 2], "name": ["Company A", "Company B"], "region": ["pl", "ro"]}
)


class MockVidClub:
def __init__(self, *args, **kwargs):
"""Init method."""
pass

def to_df(self):
return EXPECTED_DF


def test_vid_club_to_df(mocker):
mocker.patch('viadot.orchestration.prefect.tasks.VidClub', new=MockVidClub)

mocker.patch("viadot.orchestration.prefect.tasks.VidClub", new=MockVidClub)

df = vid_club_to_df(
endpoint="company",
from_date="2023-01-01",
to_date="2023-12-31",
items_per_page=100,
region="pl",
vidclub_credentials_secret="VIDCLUB"
vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106
)

assert isinstance(df, pd.DataFrame)
Expand All @@ -38,7 +37,9 @@ def test_vid_club_to_df(mocker):


def test_vid_club_to_df_missing_credentials(mocker):
mocker.patch('viadot.orchestration.prefect.tasks.get_credentials', return_value=None)
mocker.patch(
"viadot.orchestration.prefect.tasks.get_credentials", return_value=None
)

with pytest.raises(MissingSourceCredentialsError):
vid_club_to_df(
Expand All @@ -47,5 +48,5 @@ def test_vid_club_to_df_missing_credentials(mocker):
to_date="2023-12-31",
items_per_page=100,
region="pl",
vidclub_credentials_secret="VIDCLUB"
vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106
)