Skip to content

Commit

Permalink
✨ Add Mediatool source and Prefect tasks (#1038)
Browse files Browse the repository at this point in the history
* ✨ added mediatool source file.

* ✨ added mediatool task file.

* ✨ added mediatool flow file.

* ✨ added integration test file.

* ✨ added unit test file.

* ✅ added join_dfs function in utilities.

* 📝 updated commented code.

* ✅ updated integration test folder.

* 📝 updated comments.

* 📝 updated unit test file.

* 📝 updated comments.

* 📝 included PR feedback.

* 📝 removed some comments.

* 🎨 Remove ids from test

* 🎨 Imporve formating

🎨 Remove parentheses PT023

🎨 Extra formating changes

* ♻️  Adding missing values to `conftest.py`

* ♻️  Adding values to

* 🎨 Adding `VIADOT_TEST` prefix to variables

---------

Co-authored-by: Maciej Gardzinski <[email protected]>
  • Loading branch information
Diego-H-S and mgardzinski authored Sep 27, 2024
1 parent 3de17df commit f2cf164
Show file tree
Hide file tree
Showing 11 changed files with 832 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .exchange_rates_to_redshift_spectrum import exchange_rates_api_to_redshift_spectrum
from .genesys_to_adls import genesys_to_adls
from .hubspot_to_adls import hubspot_to_adls
from .mediatool_to_adls import mediatool_to_adls
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .sap_to_parquet import sap_to_parquet
Expand Down Expand Up @@ -41,6 +42,7 @@
"exchange_rates_to_databricks",
"genesys_to_adls",
"hubspot_to_adls",
"mediatool_to_adls",
"mindful_to_adls",
"outlook_to_adls",
"sap_to_parquet",
Expand Down
67 changes: 67 additions & 0 deletions src/viadot/orchestration/prefect/flows/mediatool_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""'mediatool_to_adls.py'."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, mediatool_to_df


@flow(
name="Mediatool extraction to ADLS",
description="Extract data from Mediatool and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def mediatool_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
organization_ids: list[str] | None = None,
media_entries_columns: list[str] | None = None,
adls_credentials: dict[str, Any] | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Download data from Mediatool to Azure Data Lake.
Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
organization_ids (list[str], optional): List of organization IDs.
Defaults to None.
media_entries_columns (list[str], optional): Columns to get from media entries.
Defaults to None.
adls_credentials (dict[str, Any], optional): The credentials as a dictionary.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (str, optional): Azure Data Lake destination file path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = mediatool_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
organization_ids=organization_ids,
media_entries_columns=media_entries_columns,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials=adls_credentials,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .git import clone_repo
from .hubspot import hubspot_to_df
from .luma import luma_ingest_task
from .mediatool import mediatool_to_df
from .mindful import mindful_to_df
from .minio import df_to_minio
from .outlook import outlook_to_df
Expand Down Expand Up @@ -44,6 +45,7 @@
"genesys_to_df",
"hubspot_to_df",
"luma_ingest_task",
"mediatool_to_df",
"mindful_to_df",
"outlook_to_df",
"s3_upload_file",
Expand Down
160 changes: 160 additions & 0 deletions src/viadot/orchestration/prefect/tasks/mediatool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""'mediatool.py'."""

import pandas as pd
from prefect import get_run_logger, task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Mediatool
from viadot.utils import join_dfs


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def mediatool_to_df(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
organization_ids: list[str] | None = None,
media_entries_columns: list[str] | None = None,
) -> pd.DataFrame:
"""Task to download data from Mediatool API.
Data from different endpoints are retrieved and combined. A final object is created
containing data for all organizations from the list.
Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
organization_ids (list[str], optional): List of organization IDs.
Defaults to None.
media_entries_columns (list[str], optional): Columns to get from media entries.
Defaults to None.
Raises:
ValueError: 'organization_ids' argument is None, and one is mandatory.
ValueError: One 'organization_id' is not in organizations.
Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
logger = get_run_logger()

if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

mediatool = Mediatool(
credentials=credentials,
config_key=config_key,
)
# first method ORGANIZATIONS
method = "organizations"
organizations_data = mediatool.api_connection(get_data_from=method)
df_organizations = mediatool.to_df(data=organizations_data, column_suffix=method)

if organization_ids is None:
message = "No organizations were defined."
raise ValueError(message)

list_of_organizations_df = []
for organization_id in organization_ids:
if organization_id in df_organizations["_id_organizations"].unique():
logger.info(f"Downloading data for: {organization_id} ...")

# extract media entries per organization
media_entries_data = mediatool.api_connection(
get_data_from="media_entries",
organization_id=organization_id,
)
df_media_entries = mediatool.to_df(
data=media_entries_data, drop_columns=media_entries_columns
)
unique_vehicle_ids = df_media_entries["vehicleId"].unique()
unique_media_type_ids = df_media_entries["mediaTypeId"].unique()

# extract vehicles
method = "vehicles"
vehicles_data = mediatool.api_connection(
get_data_from=method,
vehicle_ids=unique_vehicle_ids,
)
df_vehicles = mediatool.to_df(data=vehicles_data, column_suffix=method)

# extract campaigns
method = "campaigns"
campaigns_data = mediatool.api_connection(
get_data_from=method,
organization_id=organization_id,
)
df_campaigns = mediatool.to_df(data=campaigns_data, column_suffix=method)

# extract media types
method = "media_types"
media_types_data = mediatool.api_connection(
get_data_from=method,
media_type_ids=unique_media_type_ids,
)
df_media_types = mediatool.to_df(
data=media_types_data, column_suffix=method
)

# join media entries & organizations
df_merged_entries_orgs = join_dfs(
df_left=df_media_entries,
df_right=df_organizations,
left_on="organizationId",
right_on="_id_organizations",
columns_from_right_df=[
"_id_organizations",
"name_organizations",
"abbreviation_organizations",
],
how="left",
)

# join the previous merge & campaigns
df_merged_campaigns = join_dfs(
df_left=df_merged_entries_orgs,
df_right=df_campaigns,
left_on="campaignId",
right_on="_id_campaigns",
columns_from_right_df=[
"_id_campaigns",
"name_campaigns",
"conventionalName_campaigns",
],
how="left",
)

# join the previous merge & vehicles
df_merged_vehicles = join_dfs(
df_left=df_merged_campaigns,
df_right=df_vehicles,
left_on="vehicleId",
right_on="_id_vehicles",
columns_from_right_df=["_id_vehicles", "name_vehicles"],
how="left",
)

# join the previous merge & media types
df_merged_media_types = join_dfs(
df_left=df_merged_vehicles,
df_right=df_media_types,
left_on="mediaTypeId",
right_on="_id_media_types",
columns_from_right_df=["_id_media_types", "name_media_types"],
how="left",
)

list_of_organizations_df.append(df_merged_media_types)

else:
message = (
f"Organization - {organization_id} not found in organizations list."
)
raise ValueError(message)

return pd.concat(list_of_organizations_df)
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .exchange_rates import ExchangeRates
from .genesys import Genesys
from .hubspot import Hubspot
from .mediatool import Mediatool
from .mindful import Mindful
from .outlook import Outlook
from .sftp import Sftp
Expand All @@ -26,6 +27,7 @@
"ExchangeRates",
"Genesys",
"Hubspot",
"Mediatool",
"Mindful",
"Sftp",
"Outlook",
Expand Down
Loading

0 comments on commit f2cf164

Please sign in to comment.