-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch '2.0' of https://github.com/dyvenia/viadot into sapbw_2.0
- Loading branch information
Showing
12 changed files
with
696 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""Flow for downloading the data from Superpetrics and uploading it to ADLS.""" | ||
|
||
from typing import Any | ||
|
||
from prefect import flow | ||
from prefect.task_runners import ConcurrentTaskRunner | ||
|
||
from viadot.orchestration.prefect.tasks import ( | ||
df_to_adls, | ||
supermetrics_to_df, | ||
) | ||
|
||
|
||
@flow( | ||
name="Supermetrics extraction to ADLS", | ||
description="Extract data from Supermetrics and load it into ADLS.", | ||
retries=1, | ||
retry_delay_seconds=60, | ||
task_runner=ConcurrentTaskRunner, | ||
) | ||
def supermetrics_to_adls( | ||
# supermetrics | ||
query_params: dict[str, Any] | None = None, | ||
# ADLS | ||
adls_path: str | None = None, | ||
overwrite: bool = False, | ||
# Auth | ||
supermetrics_credentials_secret: str | None = None, | ||
supermetrics_config_key: str | None = None, | ||
adls_credentials_secret: str | None = None, | ||
adls_config_key: str | None = None, | ||
**kwargs: dict[str, Any] | None, | ||
) -> None: | ||
"""Flow to extract data from the Supermetrics API and save it to ADLS. | ||
This function queries data from the Supermetrics API using the provided query | ||
parameters and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) | ||
as a file. | ||
Args: | ||
---- | ||
query_params (dict[str, Any], optional): | ||
A dictionary of query parameters for the Supermetrics API. These parameters | ||
specify the data to retrieve from Supermetrics. If not provided, the default | ||
parameters from the Supermetrics configuration will be used. | ||
adls_path (str, optional): | ||
The destination path in ADLS where the DataFrame will be saved. This should | ||
include the file name and extension (e.g., 'myfolder/myfile.csv'). If not | ||
provided, the function will use a default path from the configuration | ||
or raise an error. | ||
overwrite (bool, optional): | ||
A flag indicating whether to overwrite the existing file in ADLS. If set | ||
to Falseand the file exists, an error will be raised. Default is False. | ||
supermetrics_credentials_secret (str, optional): | ||
The name of the secret in the secret management system containing | ||
the Supermetrics API credentials. If not provided, the function will use | ||
credentials specified in the configuration. | ||
supermetrics_config_key (str, optional): | ||
The key in the viadot configuration holding relevant credentials. | ||
Defaults to None. | ||
adls_credentials_secret (str, optional): | ||
The name of the secret in the secret management system containing | ||
the ADLS credentials. If not provided, the function will use credentials | ||
specified in the configuration. | ||
adls_config_key (str, optional): | ||
The key in the viadot configuration holding relevant credentials. | ||
Defaults to None. | ||
**kwargs (dict[str, Any], optional): | ||
Additional keyword arguments to pass to the `supermetrics_to_df` function | ||
for further customization of the Supermetrics query. | ||
Raises: | ||
------ | ||
ValueError: | ||
If `adls_path` is not provided and cannot be determined from | ||
the configuration. | ||
""" | ||
df = supermetrics_to_df( | ||
query_params=query_params, | ||
credentials_secret=supermetrics_credentials_secret, | ||
config_key=supermetrics_config_key, | ||
**kwargs, | ||
) | ||
|
||
return df_to_adls( | ||
df=df, | ||
path=adls_path, | ||
credentials_secret=adls_credentials_secret, | ||
config_key=adls_config_key, | ||
overwrite=overwrite, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
"""Task for connecting to Supermetrics API.""" | ||
|
||
import pandas as pd | ||
from prefect import task | ||
|
||
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError | ||
from viadot.orchestration.prefect.utils import get_credentials | ||
from viadot.sources import Supermetrics | ||
|
||
|
||
@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60) | ||
def supermetrics_to_df( | ||
query_params: dict, | ||
config_key: str | None = None, | ||
credentials_secret: str | None = None, | ||
) -> pd.DataFrame: | ||
"""Task to retrive data from Supermetrics and returns it as a pandas DataFrame. | ||
This function queries the Supermetrics API using the provided query parameters and | ||
returns the data as a pandas DataFrame. The function supports both | ||
configuration-based and secret-based credentials. | ||
The function is decorated with a Prefect task, allowing it to handle retries, | ||
logging, and timeout behavior. | ||
Args: | ||
---- | ||
query_params (dict): | ||
A dictionary containing the parameters for querying the Supermetrics API. | ||
These parameters define what data to retrieve and how the query should | ||
be constructed. | ||
config_key (str, optional): The key in the viadot config holding relevant | ||
credentials. Defaults to None. | ||
credentials_secret (str, optional): | ||
The name of the secret in your secret management system that contains | ||
the Supermetrics API credentials. If `config_key` is not provided, | ||
this secret is used to authenticate with the Supermetrics API. | ||
Returns: | ||
------- | ||
pd.DataFrame: | ||
A pandas DataFrame containing the data retrieved from Supermetrics based | ||
on the provided query parameters. | ||
Raises: | ||
------ | ||
MissingSourceCredentialsError: | ||
Raised if neither `credentials_secret` nor `config_key` is provided, | ||
indicating that no valid credentials were supplied to access | ||
the Supermetrics API. | ||
""" | ||
if not (credentials_secret or config_key): | ||
raise MissingSourceCredentialsError | ||
|
||
credentials = get_credentials(credentials_secret) if not config_key else None | ||
|
||
supermetrics = Supermetrics( | ||
credentials=credentials, | ||
config_key=config_key, | ||
) | ||
return supermetrics.to_df(query_params=query_params) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.