Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add TM1 source and Prefect tasks #1105

Merged
merged 11 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"paramiko>=3.5.0",
# awswrangler 2.x. depends on pandas 1.x.
"pandas<2.0",
"tm1py>=2.0.4",
]
requires-python = ">=3.10"
readme = "README.md"
Expand Down
9 changes: 9 additions & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ idna==3.7
# via httpx
# via requests
# via yarl
ijson==3.3.0
# via tm1py
imagehash==4.3.1
# via viadot2
importlib-metadata==6.11.0
Expand Down Expand Up @@ -315,6 +317,8 @@ mdit-py-plugins==0.4.1
# via jupytext
mdurl==0.1.2
# via markdown-it-py
mdxpy==1.3.2
# via tm1py
mergedeep==1.3.4
# via mkdocs
# via mkdocs-get-deps
Expand Down Expand Up @@ -372,6 +376,7 @@ neoteroi-mkdocs==1.1.0
nest-asyncio==1.6.0
# via ipykernel
networkx==3.3
# via tm1py
# via visions
numpy==1.23.4
# via db-dtypes
Expand Down Expand Up @@ -533,6 +538,7 @@ pytz==2024.1
# via dateparser
# via pandas
# via prefect
# via tm1py
# via trino
# via zeep
pytzdata==2020.1
Expand Down Expand Up @@ -584,6 +590,7 @@ requests==2.32.3
# via responses
# via sharepy
# via simple-salesforce
# via tm1py
# via trino
# via viadot2
# via zeep
Expand Down Expand Up @@ -675,6 +682,8 @@ tinycss2==1.3.0
# via cairosvg
# via cssselect2
# via nbconvert
tm1py==2.0.4
# via viadot2
toml==0.10.2
# via prefect
tomli==2.0.1
Expand Down
9 changes: 9 additions & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ idna==3.7
# via httpx
# via requests
# via yarl
ijson==3.3.0
# via tm1py
imagehash==4.3.1
# via viadot2
importlib-resources==6.1.3
Expand Down Expand Up @@ -200,6 +202,8 @@ markupsafe==2.1.5
# via mako
mdurl==0.1.2
# via markdown-it-py
mdxpy==1.3.2
# via tm1py
more-itertools==10.5.0
# via simple-salesforce
multidict==6.0.5
Expand All @@ -208,6 +212,7 @@ multidict==6.0.5
multimethod==1.12
# via visions
networkx==3.3
# via tm1py
# via visions
numpy==1.23.4
# via db-dtypes
Expand Down Expand Up @@ -318,6 +323,7 @@ pytz==2024.1
# via dateparser
# via pandas
# via prefect
# via tm1py
# via trino
# via zeep
pytzdata==2020.1
Expand Down Expand Up @@ -349,6 +355,7 @@ requests==2.32.3
# via requests-toolbelt
# via sharepy
# via simple-salesforce
# via tm1py
# via trino
# via viadot2
# via zeep
Expand Down Expand Up @@ -420,6 +427,8 @@ tangled-up-in-unicode==0.2.0
# via visions
text-unidecode==1.3
# via python-slugify
tm1py==2.0.4
# via viadot2
toml==0.10.2
# via prefect
trino==0.328.0
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .sql_server_to_parquet import sql_server_to_parquet
from .sql_server_transform import sql_server_transform
from .supermetrics_to_adls import supermetrics_to_adls
from .tm1_to_parquet import tm1_to_parquet
from .transform import transform
from .transform_and_catalog import transform_and_catalog
from .vid_club_to_adls import vid_club_to_adls
Expand Down Expand Up @@ -70,6 +71,7 @@
"sql_server_to_parquet",
"sql_server_transform",
"supermetrics_to_adls",
"tm1_to_parquet",
"transform",
"transform_and_catalog",
"vid_club_to_adls",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/flows/tm1_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Flow for downloading data from TM1 to a Parquet file."""

from typing import Literal

from prefect import flow

from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet
from viadot.orchestration.prefect.tasks.tm1 import tm1_to_df


@flow(
name="extract--tm1--parquet",
description="Extract data from TM1 and load it into Parquet file",
retries=1,
retry_delay_seconds=60,
)
def tm1_to_parquet( # noqa: PLR0913
path: str | None = None,
mdx_query: str | None = None,
cube: str | None = None,
view: str | None = None,
limit: int | None = None,
private: bool = False,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
) -> None:
"""Download data from TM1 to a Parquet file.

Args:
mdx_query (str, optional): MDX select query needed to download the data.
Defaults to None.
cube (str, optional): Cube name from which data will be downloaded.
Defaults to None.
view (str, optional): View name from which data will be downloaded.
Defaults to None.
limit (str, optional): How many rows should be extracted.
If None all the available rows will be downloaded. Defaults to None.
private (bool, optional): Whether or not data download should be private.
Defaults to False.
credentials_secret (dict[str, Any], optional): The name of the secret that
stores TM1 credentials.
More info on: https://docs.prefect.io/concepts/blocks/. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "TM1".
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional):
What to do if the table exists. Defaults to "replace".
verify (bool, optional): Whether or not verify SSL certificate.
Defaults to False.
"""
df = tm1_to_df(
mdx_query=mdx_query,
cube=cube,
view=view,
limit=limit,
private=private,
credentials_secret=credentials_secret,
config_key=config_key,
verify=verify,
if_empty=if_empty,
)
return df_to_parquet(
df=df,
path=path,
if_exists=if_exists,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
from .tm1 import tm1_to_df
from .vid_club import vid_club_to_df


Expand Down Expand Up @@ -68,6 +69,7 @@
"sharepoint_to_df",
"sql_server_query",
"sql_server_to_df",
"tm1_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
80 changes: 80 additions & 0 deletions src/viadot/orchestration/prefect/tasks/tm1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Task for downloading data from TM1 to a pandas DataFrame."""

from pandas import DataFrame
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.tm1 import TM1


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def tm1_to_df(
mdx_query: str | None = None,
cube: str | None = None,
view: str | None = None,
limit: int | None = None,
private: bool = False,
credentials_secret: str | None = None,
config_key: str | None = None,
verify: bool = False,
if_empty: str = "skip",
) -> DataFrame:
"""Download data from TM1 to pandas DataFrame.

Args:
mdx_query (str, optional): MDX select query needed to download the data.
Defaults to None.
cube (str, optional): Cube name from which data will be downloaded.
Defaults to None.
view (str, optional): View name from which data will be downloaded.
Defaults to None.
limit (str, optional): How many rows should be extracted.
If None all the available rows will be downloaded. Defaults to None.
private (bool, optional): Whether or not data download should be private.
Defaults to False.
credentials_secret (dict[str, Any], optional): The name of the secret that
stores TM1 credentials.
More info on: https://docs.prefect.io/concepts/blocks/. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "TM1".
verify (bool, optional): Whether or not verify SSL certificate.
Defaults to False.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".

"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

bc = TM1(
credentials=credentials,
config_key=config_key,
limit=limit,
private=private,
verify=verify,
)

df = bc.to_df(
if_empty=if_empty,
mdx_query=mdx_query,
cube=cube,
view=view,
)

nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)

return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .sql_server import SQLServer
from .sqlite import SQLite
from .supermetrics import Supermetrics
from .tm1 import TM1
from .uk_carbon_intensity import UKCarbonIntensity
from .vid_club import VidClub

Expand All @@ -48,6 +49,7 @@
"Sftp",
"Sharepoint",
"Supermetrics",
"TM1",
"Trino",
"UKCarbonIntensity",
"VidClub",
Expand Down
Loading
Loading