Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add SAPBW source and Prefect tasks #1053

Merged
merged 22 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .salesforce_to_adls import salesforce_to_adls
from .sap_bw_to_adls import sap_bw_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sftp_to_adls import sftp_to_adls
Expand Down Expand Up @@ -47,6 +48,7 @@
"mindful_to_adls",
"outlook_to_adls",
"salesforce_to_adls",
"sap_bw_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sftp_to_adls",
Expand Down
60 changes: 60 additions & 0 deletions src/viadot/orchestration/prefect/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Task to download data from SAP BW API into a Pandas DataFrame."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sap_bw_to_df


@flow(
name="SAP BW extraction to ADLS",
description="Extract data from SAP BW and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sap_bw_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Flow for downloading data from SAP BW API to Azure Data Lake.

Args:
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_path (str, optional): Azure Data Lake destination folder/catalog path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to False.
"""
data_frame = sap_bw_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
mdx_query=mdx_query,
mapping_dict=mapping_dict,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
6 changes: 4 additions & 2 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .salesforce import salesforce_to_df
from .sap_bw import sap_bw_to_df
from .sap_rfc import sap_rfc_to_df
from .sftp import sftp_list, sftp_to_df
from .sharepoint import sharepoint_download_file, sharepoint_to_df
Expand All @@ -32,12 +33,12 @@
"bcp",
"clone_repo",
"cloud_for_customers_to_df",
"customer_gauge_to_df",
"df_to_databricks",
"create_sql_server_table",
"customer_gauge_to_df",
"dbt_task",
"df_to_adls",
"df_to_databricks",
"df_to_databricks",
"df_to_minio",
"df_to_redshift_spectrum",
"duckdb_query",
Expand All @@ -51,6 +52,7 @@
"outlook_to_df",
"s3_upload_file",
"salesforce_to_df",
"sap_bw_to_df",
"sap_rfc_to_df",
"sftp_list",
"sftp_to_df",
Expand Down
54 changes: 54 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sap_bw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Task to download data from SAP BW into a Pandas DataFrame."""

import contextlib
from typing import Any

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials


with contextlib.suppress(ImportError):
from viadot.sources import SAPBW
angelika233 marked this conversation as resolved.
Show resolved Hide resolved


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sap_bw_to_df(
mdx_query: str,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mapping_dict: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Task to download data from SAP BW to DataFrame.

Args:
mdx_query (str, required): The MDX query to be passed to connection.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.

Raises:
MissingSourceCredentialsError: If none credentials have been provided.


Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sap_bw = SAPBW(
credentials=credentials,
config_key=config_key,
)
sap_bw.api_connection(mdx_query=mdx_query)

return sap_bw.to_df(mapping_dict=mapping_dict)
6 changes: 4 additions & 2 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
"Hubspot",
"Mediatool",
"Mindful",
"Sftp",
"Outlook",
"SQLServer",
"Salesforce",
"Sftp",
"Sharepoint",
"Supermetrics",
"SupermetricsCredentials", # pragma: allowlist-secret
Expand Down Expand Up @@ -61,9 +61,11 @@

__all__.extend(["MinIO"])
if find_spec("pyrfc"):
from viadot.sources.sap_bw import SAPBW # noqa: F401
from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401

__all__.extend(["SAPRFC", "SAPRFCV2"])
__all__.extend(["SAPBW", "SAPRFC", "SAPRFCV2"])

if find_spec("pyspark"):
from viadot.sources.databricks import Databricks # noqa: F401

Expand Down
Loading