Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add SAPBW source and Prefect tasks #1053

Merged
merged 22 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .hubspot_to_adls import hubspot_to_adls
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .sap_bw_to_adls import sap_bw_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sharepoint_to_adls import sharepoint_to_adls
Expand Down Expand Up @@ -40,6 +41,7 @@
"hubspot_to_adls",
"mindful_to_adls",
"outlook_to_adls",
"sap_bw_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sharepoint_to_adls",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task to download data from SAP BW API into a Pandas DataFrame."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sap_bw_to_df


@flow(
name="SAP BW extraction to ADLS",
description="Extract data from SAP BW and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sap_bw_to_adls(
credentials: dict[str, Any] | None = None,
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
adls_credentials: str | None = None,
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
adls_azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Flow for downloading data from SAP BW API to Azure Data Lake.

Args:
credentials (Optional[Dict[str, Any]], optional): Hubspot credentials as a
dictionary. Defaults to None.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.
adls_credentials (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_path (str, optional): Azure Data Lake destination folder/catalog path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to False.
"""
data_frame = sap_bw_to_df(
credentials=credentials,
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
mdx_query=mdx_query,
mapping_dict=mapping_dict,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials=adls_credentials,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
7 changes: 3 additions & 4 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
from .outlook import outlook_to_df
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .sap_bw import sap_bw_to_df
from .sap_rfc import sap_rfc_to_df
from .sharepoint import (
sharepoint_download_file,
sharepoint_to_df,
)
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df

Expand All @@ -46,6 +44,7 @@
"mindful_to_df",
"outlook_to_df",
"s3_upload_file",
"sap_bw_to_df",
"sap_rfc_to_df",
"sharepoint_download_file",
"sharepoint_to_df",
Expand Down
62 changes: 62 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sap_bw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Task to download data from SAP BW into a Pandas DataFrame."""

import contextlib
from typing import Any

import pandas as pd
from prefect import task

from viadot.exceptions import APIError
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials


with contextlib.suppress(ImportError):
from viadot.sources import SAPBW
angelika233 marked this conversation as resolved.
Show resolved Hide resolved


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sap_bw_to_df(
credentials: dict[str, Any] | None = None,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Task to download data from SAP BW to DataFrame.

Args:
credentials (Optional[Dict[str, Any]], optional): SAPBW credentials as a
dictionary. Defaults to None.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.

Raises:
MissingSourceCredentialsError: If none credentials have been provided.
APIError: The `mdx_query` is a "must" requirement.

Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
if not (azure_key_vault_secret or config_key or credentials):
raise MissingSourceCredentialsError

if not config_key:
credentials = credentials or get_credentials(azure_key_vault_secret)

if mdx_query is None:
message = "SAP BW `mdx_query` is a mandatory requirement."
raise APIError(message)
angelika233 marked this conversation as resolved.
Show resolved Hide resolved

sap_bw = SAPBW(
credentials=credentials,
config_key=config_key,
)
sap_bw.api_connection(mdx_query=mdx_query)

return sap_bw.to_df(mapping_dict=mapping_dict)
6 changes: 5 additions & 1 deletion src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
"Outlook",
"SQLServer",
"Sharepoint",
"Sharepoint",
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
"Supermetrics",
"SupermetricsCredentials", # pragma: allowlist-secret
"Trino",
"Trino",
angelika233 marked this conversation as resolved.
Show resolved Hide resolved
"UKCarbonIntensity",
]
if find_spec("adlfs"):
Expand All @@ -53,9 +55,11 @@

__all__.extend(["MinIO"])
if find_spec("pyrfc"):
from viadot.sources.sap_bw import SAPBW # noqa: F401
from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401

__all__.extend(["SAPRFC", "SAPRFCV2"])
__all__.extend(["SAPRFC", "SAPRFCV2", "SAPBW"])

if find_spec("pyspark"):
from viadot.sources.databricks import Databricks # noqa: F401

Expand Down
Loading