Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TM1 connector #801

Merged
merged 13 commits into from
Nov 13, 2023
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Added `TM1` source class.
- Added `TM1ToDF` task class.

## [0.4.21] - 2023-10-26
### Added
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ dbt-core==1.3.2
dbt-sqlserver==1.3.1
lumaCLI==0.0.19
Office365-REST-Python-Client==2.4.4
TM1py==1.11.3
15 changes: 15 additions & 0 deletions tests/integration/tasks/test_tm1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pandas as pd

from viadot.tasks import TM1ToDF
from viadot.config import local_config

CUBE = local_config.get("test_cube")
VIEW = local_config.get("test_view")


def test_tm1_to_df():
tm1 = TM1ToDF(CUBE, VIEW)
df = tm1.run()

assert isinstance(df, pd.DataFrame)
assert df.empty is False
125 changes: 125 additions & 0 deletions tests/integration/test_tm1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import pandas as pd
import pytest
from viadot.sources import TM1
from viadot.config import local_config
from viadot.exceptions import CredentialError, ValidationError

CUBE = local_config.get("TM1").get("test_cube")
VIEW = local_config.get("TM1").get("test_view")
DIMENSION = local_config.get("TM1").get("test_dim")
HIERARCHY = local_config.get("TM1").get("test_hierarchy")


def test_get_connection():
tm1_source = TM1()
connection = tm1_source.get_connection()

assert connection is not None


def test_get_connection_fail():
test_creds = {
"address": "Addres",
"port": 123,
"username": "user",
}
with pytest.raises(CredentialError):
tm1_source = TM1(credentials=test_creds)


def test_get_cubes_names():
tm1_source = TM1()
cubes = tm1_source.get_cubes_names()

assert len(cubes) > 0


def test_get_dimensions_names():
tm1_source = TM1()
dim = tm1_source.get_dimensions_names()

assert len(dim) > 0


def test_get_views_names():
tm1_source = TM1(cube=CUBE)
views = tm1_source.get_views_names()

assert len(views) > 0


def test_get_hierarchies_names():
tm1_source = TM1(dimension=DIMENSION)
hierarchies = tm1_source.get_hierarchies_names()

assert len(hierarchies) > 0


def test_get_available_elements():
tm1_source = TM1(dimension=DIMENSION, hierarchy=HIERARCHY)
elements = tm1_source.get_available_elements()

assert len(elements) > 0


def test_to_df_view():
tm1_source = TM1(cube=CUBE, view=VIEW)
df = tm1_source.to_df()

assert isinstance(df, pd.DataFrame)
assert df.empty is False


def test_to_df_mdx():
query = (
"""
select
{
[version].[version].[Budget]
}
on columns,
{
[company].[company].MEMBERS
}
on rows

FROM """
+ f"{CUBE}"
)

tm1_source = TM1(mdx_query=query)
df = tm1_source.to_df(if_empty="pass")

assert isinstance(df, pd.DataFrame)


def test_to_df_fail_both():
query = (
"""
select
{
[version].[version].[Budget]
}
on columns,
{
[company].[company].MEMBERS
}
on rows

FROM """
+ f"{CUBE}"
)

tm1_source = TM1(mdx_query=query, cube=CUBE)
with pytest.raises(
ValidationError, match="Specify only one: MDX query or cube and view."
):
tm1_source.to_df(if_empty="pass")


def test_to_df_fail_no():
tm1_source = TM1()
with pytest.raises(
ValidationError, match="MDX query or cube and view are required."
):
tm1_source.to_df(if_empty="pass")
1 change: 1 addition & 0 deletions viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .mindful import Mindful
from .sql_server import SQLServer
from .sqlite import SQLite
from .tm1 import TM1

# APIS
from .uk_carbon_intensity import UKCarbonIntensity
Expand Down
186 changes: 186 additions & 0 deletions viadot/sources/tm1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import pandas as pd

from typing import Any, Dict, Literal
from TM1py.Services import TM1Service
from prefect.utilities import logging


from ..config import local_config
from ..exceptions import CredentialError, ValidationError
from .base import Source

logger = logging.get_logger(__name__)


class TM1(Source):
"""
Class for downloading data from TM1 Software using TM1py library
"""

def __init__(
self,
credentials: Dict[str, Any] = None,
config_key: str = "TM1",
mdx_query: str = None,
cube: str = None,
view: str = None,
dimension: str = None,
hierarchy: str = None,
limit: int = None,
private: bool = False,
verify: bool = False,
*args,
**kwargs,
):
"""
Creating an instance of TM1 source class. To download the data to the dataframe user needs to specify MDX query or
combination of cube and view.

Args:
credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username,
password, address, port. Defaults to None.
config_key (str, optional): Credential key to dictionary where credentials are stored. Defaults to "TM1".
mdx_query (str, optional): MDX select query needed to download the data. Defaults to None.
cube (str, optional): Cube name from which data will be downloaded. Defaults to None.
view (str, optional): View name from which data will be downloaded. Defaults to None.
dimension (str, optional): Dimension name. Defaults to None.
hierarchy (str, optional): Hierarchy name. Defaults to None.
limit (str, optional): How many rows should be extracted. If None all the avaiable rows will
be downloaded. Defaults to None.
private (bool, optional): Whether or not data download shoulb be private. Defaults to False.
verify (bool, optional): Whether or not verify SSL certificates while. Defaults to False.


Raises:
CredentialError: When credentials are not found.
"""
DEFAULT_CREDENTIALS = local_config.get(config_key)
credentials = credentials or DEFAULT_CREDENTIALS

required_credentials = ["address", "port", "username", "password"]
if any([cred_key not in credentials for cred_key in required_credentials]):
not_found = [c for c in required_credentials if c not in credentials]
raise CredentialError(f"Missing credential(s): '{not_found}'.")

self.config_key = config_key
self.mdx_query = mdx_query
self.cube = cube
self.view = view
self.dimension = dimension
self.hierarchy = hierarchy
self.limit = limit
self.private = private
self.verify = verify

super().__init__(*args, credentials=credentials, **kwargs)

def get_connection(self) -> TM1Service:
"""
Start a connection to TM1 instance.

Returns:
TM1Service: Service instance if connection is succesfull.
"""
return TM1Service(
address=self.credentials["address"],
port=self.credentials["port"],
user=self.credentials["username"],
password=self.credentials["password"],
ssl=self.verify,
)

def get_cubes_names(self) -> list:
"""
Get list of avaiable cubes in TM1 instance.

Returns:
list: List containing avaiable cubes names.

"""
conn = self.get_connection()
return conn.cubes.get_all_names()

def get_views_names(self) -> list:
"""
Get list of avaiable views in TM1 cube instance.

Returns:
list: List containing avaiable views names.

"""
conn = self.get_connection()
return conn.views.get_all_names(self.cube)

def get_dimensions_names(self) -> list:
"""
Get list of avaiable dimensions in TM1 instance.

Returns:
list: List containing avaiable dimensions names.

"""
conn = self.get_connection()
return conn.dimensions.get_all_names()

def get_hierarchies_names(self) -> list:
"""
Get list of avaiable hierarchies in TM1 dimension instance.

Returns:
list: List containing avaiable hierarchies names.

"""
conn = self.get_connection()
return conn.hierarchies.get_all_names(self.dimension)

def get_available_elements(self) -> list:
"""
Get list of avaiable elements in TM1 instance based on hierarchy and diemension.
angelika233 marked this conversation as resolved.
Show resolved Hide resolved

Returns:
list: List containing avaiable elements names.

"""
conn = self.get_connection()
return conn.elements.get_element_names(
dimension_name=self.dimension, hierarchy_name=self.hierarchy
)

def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame:
"""
Function for downloading data from TM1 to pd.DataFrame. To download the data to the dataframe user needs to specify MDX query or
combination of cube and view.

Args:
if_empty (Literal["warn", "fail", "skip"], optional): What to do if output DataFrame is empty. Defaults to "skip".

Returns:
pd.DataFrame: DataFrame with data downloaded from TM1 view.

Raises:
ValidationError: When mdx and cube + view are not specified or when combination of both is specified.
"""
conn = self.get_connection()

if self.mdx_query is None and (self.cube is None or self.view is None):
raise ValidationError("MDX query or cube and view are required.")
elif self.mdx_query is not None and (
self.cube is not None or self.view is not None
):
raise ValidationError("Specify only one: MDX query or cube and view.")
elif self.cube is not None and self.view is not None:
df = conn.cubes.cells.execute_view_dataframe(
cube_name=self.cube,
view_name=self.view,
private=self.private,
top=self.limit,
)
elif self.mdx_query is not None:
df = conn.cubes.cells.execute_mdx_dataframe(self.mdx_query)

logger.info(
f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows."
)
if df.empty is True:
self._handle_if_empty(if_empty)
return df
1 change: 1 addition & 0 deletions viadot/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@
from .vid_club import VidClubToDF
from .git import CloneRepo
from .luma import LumaIngest
from .tm1 import TM1ToDF
Loading
Loading