Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add SAPBW source and Prefect tasks #1053

Merged
merged 22 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .hubspot_to_adls import hubspot_to_adls
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .sap_bw_to_adls import sap_bw_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sharepoint_to_adls import sharepoint_to_adls
Expand All @@ -24,7 +25,6 @@
from .transform import transform
from .transform_and_catalog import transform_and_catalog


__all__ = [
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
Expand All @@ -39,6 +39,7 @@
"hubspot_to_adls",
"mindful_to_adls",
"outlook_to_adls",
"sap_bw_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sharepoint_to_adls",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task to download data from SAP BW API into a Pandas DataFrame."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sap_bw_to_df


@flow(
name="SAP BW extraction to ADLS",
description="Extract data from SAP BW and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sap_bw_to_adls(
credentials: dict[str, Any] | None = None,
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
adls_credentials: str | None = None,
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
adls_azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Flow for downloading data from SAP BW API to Azure Data Lake.

Args:
credentials (Optional[Dict[str, Any]], optional): Hubspot credentials as a
dictionary. Defaults to None.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.
adls_credentials (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_path (str, optional): Azure Data Lake destination folder/catalog path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to False.
"""
data_frame = sap_bw_to_df(
credentials=credentials,
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
mdx_query=mdx_query,
mapping_dict=mapping_dict,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials=adls_credentials,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
8 changes: 3 additions & 5 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
from .outlook import outlook_to_df
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .sap_bw import sap_bw_to_df
from .sap_rfc import sap_rfc_to_df
from .sharepoint import (
sharepoint_download_file,
sharepoint_to_df,
)
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df


__all__ = [
"adls_upload",
"df_to_adls",
Expand All @@ -44,6 +41,7 @@
"outlook_to_df",
"df_to_redshift_spectrum",
"s3_upload_file",
"sap_bw_to_df",
"sap_rfc_to_df",
"sharepoint_download_file",
"sharepoint_to_df",
Expand Down
58 changes: 58 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sap_bw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Task to download data from SAP BW API into a Pandas DataFrame."""

from typing import Any

import pandas as pd
from prefect import task

from viadot.exceptions import APIError
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Sapbw


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sap_bw_to_df(
credentials: dict[str, Any] | None = None,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Task to download data from SAP BW API to Data Frame.

Args:
credentials (Optional[Dict[str, Any]], optional): Hubspot credentials as a
dictionary. Defaults to None.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.

Raises:
MissingSourceCredentialsError: If none credentials have been provided.
APIError: The `mdx_query` is a "must" requirement.

Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
if not (azure_key_vault_secret or config_key or credentials):
raise MissingSourceCredentialsError

if not config_key:
credentials = credentials or get_credentials(azure_key_vault_secret)

if mdx_query is None:
message = "SAP BW API `mdx_query` is a mandatory requirement."
raise APIError(message)

sap_bw = Sapbw(
credentials=credentials,
config_key=config_key,
)
sap_bw.api_connection(mdx_query=mdx_query)

return sap_bw.to_df(mapping_dict=mapping_dict)
5 changes: 3 additions & 2 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
from .hubspot import Hubspot
from .mindful import Mindful
from .outlook import Outlook
from .sap_bw import Sapbw
from .sharepoint import Sharepoint
from .sql_server import SQLServer
from .trino import Trino
from .uk_carbon_intensity import UKCarbonIntensity


__all__ = [
"CloudForCustomers",
"Epicor",
"ExchangeRates",
"Genesys",
"Outlook",
"Hubspot",
"Mindful",
"Outlook",
"Sapbw",
"Sharepoint",
"Trino",
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
"SQLServer",
Expand Down
Loading
Loading