Skip to content

Commit

Permalink
✨ Add TM1 source and Prefect tasks (#1105)
Browse files Browse the repository at this point in the history
* 🚀 Added tm1 source

* ⚡️ Updated tm1 source

* ✨ Added tm1 task

* ✨ Added tm1 prefect flow

* ♻️ Refactor structure

* ✅ Add unit tests

* 📝 Update docs

* ♻️ Refactor code

* ♻️ Refactor code

* ⚡️ Improved logic

* ➕ Added tm1py
  • Loading branch information
judynah authored Dec 5, 2024
1 parent 59f2423 commit 4fe969d
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"paramiko>=3.5.0",
# awswrangler 2.x. depends on pandas 1.x.
"pandas<2.0",
"tm1py>=2.0.4",
]
requires-python = ">=3.10"
readme = "README.md"
Expand Down
9 changes: 9 additions & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ idna==3.7
# via httpx
# via requests
# via yarl
ijson==3.3.0
# via tm1py
imagehash==4.3.1
# via viadot2
importlib-metadata==6.11.0
Expand Down Expand Up @@ -315,6 +317,8 @@ mdit-py-plugins==0.4.1
# via jupytext
mdurl==0.1.2
# via markdown-it-py
mdxpy==1.3.2
# via tm1py
mergedeep==1.3.4
# via mkdocs
# via mkdocs-get-deps
Expand Down Expand Up @@ -372,6 +376,7 @@ neoteroi-mkdocs==1.1.0
nest-asyncio==1.6.0
# via ipykernel
networkx==3.3
# via tm1py
# via visions
numpy==1.23.4
# via db-dtypes
Expand Down Expand Up @@ -533,6 +538,7 @@ pytz==2024.1
# via dateparser
# via pandas
# via prefect
# via tm1py
# via trino
# via zeep
pytzdata==2020.1
Expand Down Expand Up @@ -584,6 +590,7 @@ requests==2.32.3
# via responses
# via sharepy
# via simple-salesforce
# via tm1py
# via trino
# via viadot2
# via zeep
Expand Down Expand Up @@ -675,6 +682,8 @@ tinycss2==1.3.0
# via cairosvg
# via cssselect2
# via nbconvert
tm1py==2.0.4
# via viadot2
toml==0.10.2
# via prefect
tomli==2.0.1
Expand Down
9 changes: 9 additions & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ idna==3.7
# via httpx
# via requests
# via yarl
ijson==3.3.0
# via tm1py
imagehash==4.3.1
# via viadot2
importlib-resources==6.1.3
Expand Down Expand Up @@ -200,6 +202,8 @@ markupsafe==2.1.5
# via mako
mdurl==0.1.2
# via markdown-it-py
mdxpy==1.3.2
# via tm1py
more-itertools==10.5.0
# via simple-salesforce
multidict==6.0.5
Expand All @@ -208,6 +212,7 @@ multidict==6.0.5
multimethod==1.12
# via visions
networkx==3.3
# via tm1py
# via visions
numpy==1.23.4
# via db-dtypes
Expand Down Expand Up @@ -318,6 +323,7 @@ pytz==2024.1
# via dateparser
# via pandas
# via prefect
# via tm1py
# via trino
# via zeep
pytzdata==2020.1
Expand Down Expand Up @@ -349,6 +355,7 @@ requests==2.32.3
# via requests-toolbelt
# via sharepy
# via simple-salesforce
# via tm1py
# via trino
# via viadot2
# via zeep
Expand Down Expand Up @@ -420,6 +427,8 @@ tangled-up-in-unicode==0.2.0
# via visions
text-unidecode==1.3
# via python-slugify
tm1py==2.0.4
# via viadot2
toml==0.10.2
# via prefect
trino==0.328.0
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .sql_server_to_parquet import sql_server_to_parquet
from .sql_server_transform import sql_server_transform
from .supermetrics_to_adls import supermetrics_to_adls
from .tm1_to_parquet import tm1_to_parquet
from .transform import transform
from .transform_and_catalog import transform_and_catalog
from .vid_club_to_adls import vid_club_to_adls
Expand Down Expand Up @@ -70,6 +71,7 @@
"sql_server_to_parquet",
"sql_server_transform",
"supermetrics_to_adls",
"tm1_to_parquet",
"transform",
"transform_and_catalog",
"vid_club_to_adls",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/flows/tm1_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Flow for downloading data from TM1 to a Parquet file."""

from typing import Literal

from prefect import flow

from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet
from viadot.orchestration.prefect.tasks.tm1 import tm1_to_df


@flow(
name="extract--tm1--parquet",
description="Extract data from TM1 and load it into Parquet file",
retries=1,
retry_delay_seconds=60,
)
def tm1_to_parquet( # noqa: PLR0913
path: str | None = None,
mdx_query: str | None = None,
cube: str | None = None,
view: str | None = None,
limit: int | None = None,
private: bool = False,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
) -> None:
"""Download data from TM1 to a Parquet file.
Args:
mdx_query (str, optional): MDX select query needed to download the data.
Defaults to None.
cube (str, optional): Cube name from which data will be downloaded.
Defaults to None.
view (str, optional): View name from which data will be downloaded.
Defaults to None.
limit (str, optional): How many rows should be extracted.
If None all the available rows will be downloaded. Defaults to None.
private (bool, optional): Whether or not data download should be private.
Defaults to False.
credentials_secret (dict[str, Any], optional): The name of the secret that
stores TM1 credentials.
More info on: https://docs.prefect.io/concepts/blocks/. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "TM1".
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional):
What to do if the table exists. Defaults to "replace".
verify (bool, optional): Whether or not verify SSL certificate.
Defaults to False.
"""
df = tm1_to_df(
mdx_query=mdx_query,
cube=cube,
view=view,
limit=limit,
private=private,
credentials_secret=credentials_secret,
config_key=config_key,
verify=verify,
if_empty=if_empty,
)
return df_to_parquet(
df=df,
path=path,
if_exists=if_exists,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
from .tm1 import tm1_to_df
from .vid_club import vid_club_to_df


Expand Down Expand Up @@ -68,6 +69,7 @@
"sharepoint_to_df",
"sql_server_query",
"sql_server_to_df",
"tm1_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
80 changes: 80 additions & 0 deletions src/viadot/orchestration/prefect/tasks/tm1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Task for downloading data from TM1 to a pandas DataFrame."""

from pandas import DataFrame
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.tm1 import TM1


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def tm1_to_df(
mdx_query: str | None = None,
cube: str | None = None,
view: str | None = None,
limit: int | None = None,
private: bool = False,
credentials_secret: str | None = None,
config_key: str | None = None,
verify: bool = False,
if_empty: str = "skip",
) -> DataFrame:
"""Download data from TM1 to pandas DataFrame.
Args:
mdx_query (str, optional): MDX select query needed to download the data.
Defaults to None.
cube (str, optional): Cube name from which data will be downloaded.
Defaults to None.
view (str, optional): View name from which data will be downloaded.
Defaults to None.
limit (str, optional): How many rows should be extracted.
If None all the available rows will be downloaded. Defaults to None.
private (bool, optional): Whether or not data download should be private.
Defaults to False.
credentials_secret (dict[str, Any], optional): The name of the secret that
stores TM1 credentials.
More info on: https://docs.prefect.io/concepts/blocks/. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "TM1".
verify (bool, optional): Whether or not verify SSL certificate.
Defaults to False.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

bc = TM1(
credentials=credentials,
config_key=config_key,
limit=limit,
private=private,
verify=verify,
)

df = bc.to_df(
if_empty=if_empty,
mdx_query=mdx_query,
cube=cube,
view=view,
)

nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)

return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .sql_server import SQLServer
from .sqlite import SQLite
from .supermetrics import Supermetrics
from .tm1 import TM1
from .uk_carbon_intensity import UKCarbonIntensity
from .vid_club import VidClub

Expand All @@ -48,6 +49,7 @@
"Sftp",
"Sharepoint",
"Supermetrics",
"TM1",
"Trino",
"UKCarbonIntensity",
"VidClub",
Expand Down
Loading

0 comments on commit 4fe969d

Please sign in to comment.