diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index a5c8168d6..946239b37 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -12,6 +12,7 @@ from .exchange_rates_to_redshift_spectrum import exchange_rates_api_to_redshift_spectrum from .genesys_to_adls import genesys_to_adls from .hubspot_to_adls import hubspot_to_adls +from .mediatool_to_adls import mediatool_to_adls from .mindful_to_adls import mindful_to_adls from .outlook_to_adls import outlook_to_adls from .sap_to_parquet import sap_to_parquet @@ -41,6 +42,7 @@ "exchange_rates_to_databricks", "genesys_to_adls", "hubspot_to_adls", + "mediatool_to_adls", "mindful_to_adls", "outlook_to_adls", "sap_to_parquet", diff --git a/src/viadot/orchestration/prefect/flows/mediatool_to_adls.py b/src/viadot/orchestration/prefect/flows/mediatool_to_adls.py new file mode 100644 index 000000000..57819aa31 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/mediatool_to_adls.py @@ -0,0 +1,67 @@ +"""'mediatool_to_adls.py'.""" + +from typing import Any + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import df_to_adls, mediatool_to_df + + +@flow( + name="Mediatool extraction to ADLS", + description="Extract data from Mediatool and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, +) +def mediatool_to_adls( + config_key: str | None = None, + azure_key_vault_secret: str | None = None, + organization_ids: list[str] | None = None, + media_entries_columns: list[str] | None = None, + adls_credentials: dict[str, Any] | None = None, + adls_config_key: str | None = None, + adls_azure_key_vault_secret: str | None = None, + adls_path: str | None = None, + adls_path_overwrite: bool = False, +) -> None: + """Download data from Mediatool to Azure Data Lake. + + Args: + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret + where credentials are stored. Defaults to None. + organization_ids (list[str], optional): List of organization IDs. + Defaults to None. + media_entries_columns (list[str], optional): Columns to get from media entries. + Defaults to None. + adls_credentials (dict[str, Any], optional): The credentials as a dictionary. + Defaults to None. + adls_config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + adls_azure_key_vault_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + adls_path (str, optional): Azure Data Lake destination file path. + Defaults to None. + adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS. + Defaults to True. + """ + data_frame = mediatool_to_df( + config_key=config_key, + azure_key_vault_secret=azure_key_vault_secret, + organization_ids=organization_ids, + media_entries_columns=media_entries_columns, + ) + + return df_to_adls( + df=data_frame, + path=adls_path, + credentials=adls_credentials, + credentials_secret=adls_azure_key_vault_secret, + config_key=adls_config_key, + overwrite=adls_path_overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index d3f69b75c..9382594e8 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -13,6 +13,7 @@ from .git import clone_repo from .hubspot import hubspot_to_df from .luma import luma_ingest_task +from .mediatool import mediatool_to_df from .mindful import mindful_to_df from .minio import df_to_minio from .outlook import outlook_to_df @@ -44,6 +45,7 @@ "genesys_to_df", "hubspot_to_df", "luma_ingest_task", + "mediatool_to_df", "mindful_to_df", "outlook_to_df", "s3_upload_file", diff --git a/src/viadot/orchestration/prefect/tasks/mediatool.py b/src/viadot/orchestration/prefect/tasks/mediatool.py new file mode 100644 index 000000000..a2d0dbd6f --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/mediatool.py @@ -0,0 +1,160 @@ +"""'mediatool.py'.""" + +import pandas as pd +from prefect import get_run_logger, task + +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import Mediatool +from viadot.utils import join_dfs + + +@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60) +def mediatool_to_df( + config_key: str | None = None, + azure_key_vault_secret: str | None = None, + organization_ids: list[str] | None = None, + media_entries_columns: list[str] | None = None, +) -> pd.DataFrame: + """Task to download data from Mediatool API. + + Data from different endpoints are retrieved and combined. A final object is created + containing data for all organizations from the list. + + Args: + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret + where credentials are stored. Defaults to None. + organization_ids (list[str], optional): List of organization IDs. + Defaults to None. + media_entries_columns (list[str], optional): Columns to get from media entries. + Defaults to None. + + Raises: + ValueError: 'organization_ids' argument is None, and one is mandatory. + ValueError: One 'organization_id' is not in organizations. + + Returns: + pd.DataFrame: The response data as a Pandas Data Frame. + """ + logger = get_run_logger() + + if not (azure_key_vault_secret or config_key): + raise MissingSourceCredentialsError + + if not config_key: + credentials = get_credentials(azure_key_vault_secret) + + mediatool = Mediatool( + credentials=credentials, + config_key=config_key, + ) + # first method ORGANIZATIONS + method = "organizations" + organizations_data = mediatool.api_connection(get_data_from=method) + df_organizations = mediatool.to_df(data=organizations_data, column_suffix=method) + + if organization_ids is None: + message = "No organizations were defined." + raise ValueError(message) + + list_of_organizations_df = [] + for organization_id in organization_ids: + if organization_id in df_organizations["_id_organizations"].unique(): + logger.info(f"Downloading data for: {organization_id} ...") + + # extract media entries per organization + media_entries_data = mediatool.api_connection( + get_data_from="media_entries", + organization_id=organization_id, + ) + df_media_entries = mediatool.to_df( + data=media_entries_data, drop_columns=media_entries_columns + ) + unique_vehicle_ids = df_media_entries["vehicleId"].unique() + unique_media_type_ids = df_media_entries["mediaTypeId"].unique() + + # extract vehicles + method = "vehicles" + vehicles_data = mediatool.api_connection( + get_data_from=method, + vehicle_ids=unique_vehicle_ids, + ) + df_vehicles = mediatool.to_df(data=vehicles_data, column_suffix=method) + + # extract campaigns + method = "campaigns" + campaigns_data = mediatool.api_connection( + get_data_from=method, + organization_id=organization_id, + ) + df_campaigns = mediatool.to_df(data=campaigns_data, column_suffix=method) + + # extract media types + method = "media_types" + media_types_data = mediatool.api_connection( + get_data_from=method, + media_type_ids=unique_media_type_ids, + ) + df_media_types = mediatool.to_df( + data=media_types_data, column_suffix=method + ) + + # join media entries & organizations + df_merged_entries_orgs = join_dfs( + df_left=df_media_entries, + df_right=df_organizations, + left_on="organizationId", + right_on="_id_organizations", + columns_from_right_df=[ + "_id_organizations", + "name_organizations", + "abbreviation_organizations", + ], + how="left", + ) + + # join the previous merge & campaigns + df_merged_campaigns = join_dfs( + df_left=df_merged_entries_orgs, + df_right=df_campaigns, + left_on="campaignId", + right_on="_id_campaigns", + columns_from_right_df=[ + "_id_campaigns", + "name_campaigns", + "conventionalName_campaigns", + ], + how="left", + ) + + # join the previous merge & vehicles + df_merged_vehicles = join_dfs( + df_left=df_merged_campaigns, + df_right=df_vehicles, + left_on="vehicleId", + right_on="_id_vehicles", + columns_from_right_df=["_id_vehicles", "name_vehicles"], + how="left", + ) + + # join the previous merge & media types + df_merged_media_types = join_dfs( + df_left=df_merged_vehicles, + df_right=df_media_types, + left_on="mediaTypeId", + right_on="_id_media_types", + columns_from_right_df=["_id_media_types", "name_media_types"], + how="left", + ) + + list_of_organizations_df.append(df_merged_media_types) + + else: + message = ( + f"Organization - {organization_id} not found in organizations list." + ) + raise ValueError(message) + + return pd.concat(list_of_organizations_df) diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 5644114a4..70a374ae7 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -10,6 +10,7 @@ from .exchange_rates import ExchangeRates from .genesys import Genesys from .hubspot import Hubspot +from .mediatool import Mediatool from .mindful import Mindful from .outlook import Outlook from .sftp import Sftp @@ -26,6 +27,7 @@ "ExchangeRates", "Genesys", "Hubspot", + "Mediatool", "Mindful", "Sftp", "Outlook", diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py new file mode 100644 index 000000000..fb018deab --- /dev/null +++ b/src/viadot/sources/mediatool.py @@ -0,0 +1,334 @@ +"""'mediatool.py'.""" + +import json + +import pandas as pd +from pydantic import BaseModel + +from viadot.config import get_source_credentials +from viadot.exceptions import APIError, CredentialError +from viadot.sources.base import Source +from viadot.utils import add_viadot_metadata_columns, handle_api_response + + +class MediatoolCredentials(BaseModel): + """Checking for values in Mediatool credentials dictionary. + + Two key values are held in the Mediatool connector: + - user_id: The unique ID for the organization. + token: A unique token to be used as the password for API requests. + + Args: + BaseModel (pydantic.main.ModelMetaclass): A base class for creating + Pydantic models. + """ + + user_id: str + token: str + + +class Mediatool(Source): + """Class implementing the Mediatool API. + + Download data from Mediatool platform. Using Mediatool class user is able to + download organizations, media entries, campaigns, vehicles, and media types data. + """ + + def __init__( + self, + *args, + credentials: MediatoolCredentials | None = None, + config_key: str | None = None, + user_id: str | None = None, + **kwargs, + ): + """Create an instance of the Mediatool class. + + Args: + credentials (MediatoolCredentials, optional): Meditaool credentials. + Defaults to None. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + user_id (str, optional): User ID. Defaults to None. + """ + credentials = credentials or get_source_credentials(config_key) or None + if credentials is None: + message = "Missing credentials." + raise CredentialError(message) + + validated_creds = dict(MediatoolCredentials(**credentials)) + super().__init__(*args, credentials=validated_creds, **kwargs) + + self.header = {"Authorization": f"Bearer {credentials.get('token')}"} + self.user_id = user_id or credentials.get("user_id") + + self.url_abbreviation = None + + def _rename_columns( + self, + df: pd.DataFrame, + column_suffix: str, + ) -> pd.DataFrame: + """Rename columns. + + Args: + df (pd.DataFrame): Incoming Data frame. + column_suffix (str): String to be added at the end of column name. + + Returns: + pd.DataFrame: Modified Data Frame. + """ + column_suffix = column_suffix.split("get_")[-1] + dict_mapped_names = { + column_name: f"{column_name}_{column_suffix}" for column_name in df.columns + } + + return df.rename(columns=dict_mapped_names) + + def _get_organizations( + self, + user_id: str | None = None, + ) -> list[dict[str, str]]: + """Get organizations data based on the user ID. + + Args: + user_id (str, optional): User ID. Defaults to None. + + Returns: + list[dict[str, str]]: A list of dicts will be returned. + """ + user_id = user_id or self.user_id + url_organizations = f"https://api.mediatool.com/users/{user_id}/organizations" + + response = handle_api_response( + url=url_organizations, + headers=self.header, + method="GET", + ) + response_dict = json.loads(response.text) + organizations = response_dict["organizations"] + + list_organizations = [] + for org in organizations: + list_organizations.append( + { + "_id": org.get("_id"), + "name": org.get("name"), + "abbreviation": org.get("abbreviation"), + } + ) + + return list_organizations + + def _get_media_entries( + self, + organization_id: str, + ) -> list[dict[str, str]]: + """Data for media entries. + + Args: + organization_id (str): Organization ID. + + Returns: + list[dict[str, str]]: A list of dicts will be returned. + """ + url = ( + "https://api.mediatool.com/searchmediaentries?q=" + + f'{{"organizationId": "{organization_id}"}}' + ) + + response = handle_api_response( + url=url, + headers=self.header, + method="GET", + ) + response_dict = json.loads(response.text) + + return response_dict["mediaEntries"] + + def _get_vehicles( + self, + vehicle_ids: list[str], + ) -> list[dict[str, str]]: + """Vehicles data based on the organization IDs. + + Args: + vehicle_ids (list[str]): List of organization IDs. + + Raises: + APIError: Mediatool API does not recognise the vehicle id. + + Returns: + list[dict[str, str]]: A list of dicts will be returned. + """ + response_list = [] + missing_vehicles = [] + + for vid in vehicle_ids: + url = f"https://api.mediatool.com/vehicles/{vid}" + try: + response = handle_api_response( + url=url, + headers=self.header, + method="GET", + ) + except APIError: + missing_vehicles.append(vid) + else: + response_dict = json.loads(response.text) + response_list.append(response_dict["vehicle"]) + + if missing_vehicles: + self.logger.error(f"Vehicle were not found for: {missing_vehicles}.") + + return response_list + + def _get_campaigns( + self, + organization_id: str, + ) -> list[dict[str, str]]: + """Campaign data based on the organization ID. + + Args: + organization_id (str): Organization ID. + + Returns: + list[dict[str, str]]: A list of dicts will be returned. + """ + url_campaigns = ( + f"https://api.mediatool.com/organizations/{organization_id}/campaigns" + ) + + response = handle_api_response( + url=url_campaigns, + headers=self.header, + method="GET", + ) + response_dict = json.loads(response.text) + + return response_dict["campaigns"] + + def _get_media_types( + self, + media_type_ids: list[str], + ) -> list[dict[str, str]]: + """Media types data based on the media types ID. + + Args: + media_type_ids (list[str]): List of media type IDs. + + Returns: + list[dict[str, str]]: A list of dicts will be returned. + """ + list_media_types = [] + for id_media_type in media_type_ids: + response = handle_api_response( + url=f"https://api.mediatool.com/mediatypes/{id_media_type}", + headers=self.header, + method="GET", + ) + response_dict = json.loads(response.text) + list_media_types.append( + { + "_id": response_dict.get("mediaType").get("_id"), + "name": response_dict.get("mediaType").get("name"), + "type": response_dict.get("mediaType").get("type"), + } + ) + + return list_media_types + + def api_connection( + self, + get_data_from: str, + organization_id: str | None = None, + vehicle_ids: list[str] | None = None, + media_type_ids: list[str] | None = None, + ) -> list[dict[str, str]]: + """General method to connect to Mediatool API and generate the response. + + Args: + get_data_from (str): Method to be used to extract data from. + organization_id (str, optional): Organization ID. Defaults to None. + vehicle_ids (list[str]): List of organization IDs. Defaults to None. + media_type_ids (list[str]): List of media type IDs. Defaults to None. + + Returns: + list[dict[str, str]]: Data from Mediatool API connection. + """ + self.url_abbreviation = get_data_from + + if self.url_abbreviation == "organizations": + returned_data = self._get_organizations(self.user_id) + + elif self.url_abbreviation == "media_entries": + returned_data = self._get_media_entries(organization_id=organization_id) + + elif self.url_abbreviation == "vehicles": + returned_data = self._get_vehicles(vehicle_ids=vehicle_ids) + + elif self.url_abbreviation == "campaigns": + returned_data = self._get_campaigns(organization_id=organization_id) + + elif self.url_abbreviation == "media_types": + returned_data = self._get_media_types(media_type_ids=media_type_ids) + + return returned_data + + @add_viadot_metadata_columns + def to_df( + self, + if_empty: str = "warn", + **kwargs, + ) -> pd.DataFrame: + """Pandas Data Frame with the data in the Response object and metadata. + + Args: + if_empty (str, optional): What to do if a fetch produce no data. + Defaults to "warn + + Returns: + pd.Dataframe: The response data as a Pandas Data Frame plus viadot metadata. + """ + data = kwargs.get("data", False) + column_suffix = kwargs.get("column_suffix", None) + columns = kwargs.get("columns", None) + + super().to_df(if_empty=if_empty) + + data_frame = pd.DataFrame.from_dict(data) + + if column_suffix == "campaigns": + data_frame.replace( + to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], + value=["", ""], + regex=True, + inplace=True, + ) + + if column_suffix: + data_frame = self._rename_columns( + df=data_frame, column_suffix=column_suffix + ) + + if columns: + if set(columns).issubset(set(data_frame.columns)): + data_frame = data_frame[columns] + elif not set(columns).issubset(set(data_frame.columns)): + self.logger.error( + f"Columns '{', '.join(columns)}' are incorrect. " + + "Whole dictionary for 'mediaEntries' will be returned." + ) + + if data_frame.empty: + self._handle_if_empty( + if_empty=if_empty, + message="The response does not contain any data.", + ) + else: + self.logger.info( + "Successfully downloaded data from " + + f"the Mediatool API ({self.url_abbreviation})." + ) + + return data_frame diff --git a/src/viadot/utils.py b/src/viadot/utils.py index 4b53beb29..a7916ae92 100644 --- a/src/viadot/utils.py +++ b/src/viadot/utils.py @@ -479,6 +479,40 @@ def df_snakecase_column_names(df: pd.DataFrame) -> pd.DataFrame: return df +def join_dfs( + df_left: pd.DataFrame, + df_right: pd.DataFrame, + left_on: str, + right_on: str, + columns_from_right_df: list[str] | None = None, + how: Literal["left", "right", "outer", "inner", "cross"] = "left", +) -> pd.DataFrame: + """Combine Data Frames according to the chosen method. + + Args: + df_left (pd.DataFrame): Left dataframe. + df_right (pd.DataFrame): Right dataframe. + left_on (str): Column or index level names to join on in the left DataFrame. + right_on (str): Column or index level names to join on in the right DataFrame. + columns_from_right_df (list[str], optional): List of column to get from right + dataframe. Defaults to None. + how (Literal["left", "right", "outer", "inner", "cross"], optional): Type of + merge to be performed. Defaults to "left". + + Returns: + pd.DataFrame: Final dataframe after merging. + """ + if columns_from_right_df is None: + columns_from_right_df = df_right.columns + + return df_left.merge( + df_right[columns_from_right_df], + left_on=left_on, + right_on=right_on, + how=how, + ) + + def add_viadot_metadata_columns(func: Callable) -> Callable: """A decorator for the 'to_df()' method. diff --git a/tests/.env.example b/tests/.env.example index ec8179361..fb3ec6076 100644 --- a/tests/.env.example +++ b/tests/.env.example @@ -4,12 +4,16 @@ TEST_ADLS_FILE_PATH_PARQUET= TEST_ADLS_FILE_PATH_CSV= # Sensitive variables. +VIADOT_TEST_ADLS_AZURE_KEY_VAULT_SECRET= AZURE_ORG_NAME= AZURE_PROJECT_NAME= AZURE_REPO_NAME= AZURE_REPO_URL= DBT_REPO_URL= C4C_API_URL= +VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET= +VIADOT_TEST_MEDIATOOL_ADLS_PATH= +VIADOT_TEST_MEDIATOOL_ORG= VIADOT_SHAREPOINT_URL= VIADOT_S3_BUCKET= LUMA_URL=http://localhost:8000 diff --git a/tests/conftest.py b/tests/conftest.py index c43ea92e7..e1dec73c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -200,3 +200,23 @@ def databricks_credentials_secret(): @pytest.fixture(scope="session", autouse=True) def c4c_credentials_secret(): return os.environ.get("VIADOT_TEST_C4C_CREDENTIALS_SECRET") + + +@pytest.fixture(scope="session", autouse=True) +def VIADOT_TEST_MEDIATOOL_ORG(): + return os.environ.get("VIADOT_TEST_MEDIATOOL_ORG") + + +@pytest.fixture(scope="session", autouse=True) +def VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET(): + return os.environ.get("VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET") + + +@pytest.fixture(scope="session", autouse=True) +def VIADOT_TEST_MEDIATOOL_ADLS_PATH(): + return os.environ.get("VIADOT_TEST_MEDIATOOL_ADLS_PATH") + + +@pytest.fixture(scope="session", autouse=True) +def VIADOT_TEST_ADLS_AZURE_KEY_VAULT_SECRET(): + return os.environ.get("VIADOT_TEST_ADLS_AZURE_KEY_VAULT_SECRET") diff --git a/tests/integration/orchestration/prefect/flows/test_mediatool.py b/tests/integration/orchestration/prefect/flows/test_mediatool.py new file mode 100644 index 000000000..2b3aff3b9 --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_mediatool.py @@ -0,0 +1,43 @@ +"""'test_mediatool.py'.""" + +from viadot.orchestration.prefect.flows import mediatool_to_adls + + +media_entries_columns = [ + "_id", + "organizationId", + "mediaTypeId", + "vehicleId", + "startDate", + "endDate", + "businessAreaId", + "campaignObjectiveId", + "campaignId", + "netMediaCostChosenCurrency", + "currencyId", + "eurExchangeRate", + "netMediaCostEur", + "fee", + "totalFeeOfNetMediaCostEur", + "totalCostToClientChosenCurrency", + "totalCostToClientEur", + "nonBiddableMediaCostEur", +] + + +def test_genesys_to_adls( + VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET, + VIADOT_TEST_MEDIATOOL_ORG, + VIADOT_TEST_MEDIATOOL_ADLS_PATH, + VIADOT_TEST_ADLS_AZURE_KEY_VAULT_SECRET, +): + state = mediatool_to_adls( + azure_key_vault_secret=VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET, + organization_ids=VIADOT_TEST_MEDIATOOL_ORG, + media_entries_columns=media_entries_columns, + adls_path=VIADOT_TEST_MEDIATOOL_ADLS_PATH, + adls_azure_key_vault_secret=VIADOT_TEST_ADLS_AZURE_KEY_VAULT_SECRET, + adls_path_overwrite=True, + ) + + assert state.is_successful() diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py new file mode 100644 index 000000000..f472e449e --- /dev/null +++ b/tests/unit/test_mediatool.py @@ -0,0 +1,164 @@ +"""'test_mediatool.py'.""" + +import json +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +from viadot.exceptions import CredentialError +from viadot.sources import Mediatool +from viadot.sources.mediatool import MediatoolCredentials + + +variables = { + "credentials": {"user_id": "test_user", "token": "test_token"}, + "organizations": { + "organizations": [{"_id": "1", "name": "Org1", "abbreviation": "O1"}] + }, + "media_entries": {"mediaEntries": [{"_id": "1", "name": "Entry1"}]}, + "vehicle": {"vehicle": {"_id": "1", "name": "Vehicle1"}}, + "campaigns": {"campaigns": [{"_id": "1", "name": "Campaign1"}]}, + "media_types": {"mediaType": {"_id": "1", "name": "Type1", "type": "Type"}}, +} + + +@pytest.mark.basic +def test_mediatool_credentials(): + """Test Mediatool credentials.""" + MediatoolCredentials(user_id="test_user", token="test_token") # noqa: S106 + + +@pytest.mark.basic +@patch("viadot.sources.mediatool.get_source_credentials", return_value=None) +def test_missing_credentials(mock_get_source_credentials): + """Test raise error without credentials.""" + with pytest.raises(CredentialError): + Mediatool() + + mock_get_source_credentials.assert_called_once() + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_get_organizations(mock_handle_api_response): + """Test Mediatool `_get_organizations` function.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["organizations"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._get_organizations(user_id="test_user") + expected_result = [{"_id": "1", "name": "Org1", "abbreviation": "O1"}] + assert result == expected_result + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_get_media_entries(mock_handle_api_response): + """Test Mediatool `_get_media_entries` function.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["media_entries"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._get_media_entries(organization_id="org_id") + expected_result = [{"_id": "1", "name": "Entry1"}] + assert result == expected_result + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_get_vehicles(mock_handle_api_response): + """Test Mediatool `_get_vehicles` function.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["vehicle"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._get_vehicles(vehicle_ids=["1"]) + expected_result = [{"_id": "1", "name": "Vehicle1"}] + assert result == expected_result + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_get_campaigns(mock_handle_api_response): + """Test Mediatool `_get_campaigns` function.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["campaigns"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._get_campaigns(organization_id="org_id") + expected_result = [{"_id": "1", "name": "Campaign1"}] + assert result == expected_result + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_get_media_types(mock_handle_api_response): + """Test Mediatool `_get_media_types` function.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["media_types"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._get_media_types(media_type_ids=["1"]) + expected_result = [{"_id": "1", "name": "Type1", "type": "Type"}] + assert result == expected_result + + +@pytest.mark.functions +def test_rename_columns(): + """Test Mediatool `_rename_columns` function.""" + df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool._rename_columns(df, column_suffix="test") + expected_result = pd.DataFrame({"col1_test": [1, 2], "col2_test": [3, 4]}) + pd.testing.assert_frame_equal(result, expected_result) + + +@pytest.mark.connect +@patch("viadot.sources.mediatool.handle_api_response") +def test_api_connection(mock_handle_api_response): + """Test Mediatool `api_connection` method.""" + mock_response = MagicMock() + mock_response.text = json.dumps(variables["organizations"]) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + result = mediatool.api_connection(get_data_from="organizations") + expected_result = [{"_id": "1", "name": "Org1", "abbreviation": "O1"}] + assert result == expected_result + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_to_df(mock_handle_api_response): + """Test Mediatool `to_df` method.""" + mock_response = MagicMock() + mock_response.text = json.dumps({"mediaEntries": [{"_id": "1", "name": "Entry1"}]}) + mock_handle_api_response.return_value = mock_response + + mediatool = Mediatool(credentials=variables["credentials"]) + + data = [{"_id": "1", "name": "Entry1"}] + result_df = mediatool.to_df(data=data, column_suffix="media_entries") + result_df.drop( + columns=["_viadot_source", "_viadot_downloaded_at_utc"], + inplace=True, + axis=1, + ) + expected_result = pd.DataFrame( + {"_id_media_entries": ["1"], "name_media_entries": ["Entry1"]} + ) + pd.testing.assert_frame_equal(result_df, expected_result)