diff --git a/CHANGELOG.md b/CHANGELOG.md index 76eb3280b..2ef880c75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,50 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added + +### Fixed + +### Changed + + +## [0.4.22] - 2023-11-15 +### Added +- Added `TM1` source class. +- Added `TM1ToDF` task class. +- Added `set_prefect_kv` parameter to `BigQueryToADLS` with `False` as a default. If there is a need to create new pair in KV Store the parameter can be changed to `True`. +- Added `_rename_duplicated_fields` method to `SharepointListToDF` task class for finding and rename duplicated columns. +- Added new view type `agent_interaction_view_type` in `Genesys`source. +- Added new logic for endpoint `users` in `Genesys`task. +- Added libraries `nltk` and `sklearn` to `requirements`. + +### Fixed +- Fixed bug for endpoint `conversations` in GET method in `Genesys` Task. + +### Changed +- Splitted test for `Eurostat` on source tests and task tests. +- Modified `SharepointList` source class: + -> docstrings update. +- Modified `SharepointToADLS` flow class: + -> docstrings update. + -> changed set_prefect_kv: bool = False to prevent forced KV store append. +- Modified `SharepointListToADLS` flow class: + -> changed set_prefect_kv: bool = False to prevent forced KV store append. +- Modified `SharepointList` source class: + -> docstrings update. + -> Changed `_unpack_fields` method to handle Sharepoint MultiChoiceField type + small improvements. + -> Changed `get_fields` method to handle special characters - different approach to call get() and execute_query(). + -> Renamed method from `select_expandable_user_fields` to `select_fields` + update for MultiChoiceField type. + -> Changed `check_filters` method errors messages and more checks added. + -> Changed `operators_mapping` method errors messages. + -> Changed `make_filter_for_df` method errors messages. +- Modified `SharepointListToDF` task class: + -> docstrings update +- Splitted test for Eurostat on source tests and task tests. +- Modified `CustomerGauge` source class with simplified logic to return json structure. +- Expanded `CustomerGaugeToDF` task class with separate cleaning functions and handling nested json structure flattening with two new methods `_field_reference_unpacker` and `_nested_dict_transformer`. +- Changed `CustomerGaugeToADLS` to containing new arguments. + ## [0.4.21] - 2023-10-26 ### Added @@ -13,13 +57,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `SharepointListToDF` task class. - Added `SharepointListToADLS` flow class. - Added tests for `SharepointList`. -- Added `get_nested_dict` to untils.py. +- Added `get_nested_dict` to utils.py. ### Fixed ### Changed - Changed `GenesysToCSV` logic for end_point == "conversations". Added new fields to extraction. + ## [0.4.20] - 2023-10-12 ### Added - Added `Office365-REST-Python-Client` library to `requirements`. @@ -626,4 +671,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret - Moved from poetry to pip ### Fixed -- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part \ No newline at end of file +- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part diff --git a/requirements.txt b/requirements.txt index 768887e4a..4d6c3a15f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,3 +43,6 @@ dbt-core==1.3.2 dbt-sqlserver==1.3.1 lumaCLI==0.0.19 Office365-REST-Python-Client==2.4.4 +TM1py==1.11.3 +nltk==3.8.1 +scikit-learn==1.3.2 \ No newline at end of file diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index de793344a..b4503c6e9 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,15 +1,14 @@ import os +from unittest import mock +import pandas as pd import pendulum import pytest -from unittest import mock -import pandas as pd - from prefect.tasks.secrets import PrefectSecret -from viadot.flows import BigQueryToADLS -from viadot.tasks import AzureDataLakeRemove from viadot.exceptions import ValidationError +from viadot.flows import BigQueryToADLS +from viadot.tasks import AzureDataLakeRemove ADLS_DIR_PATH = "raw/tests/" ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" diff --git a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py index f0661e314..b0c3128c5 100644 --- a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py +++ b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py @@ -1,6 +1,6 @@ from viadot.config import local_config -from viadot.flows import CloudForCustomersReportToADLS from viadot.exceptions import ValidationError +from viadot.flows import CloudForCustomersReportToADLS def test_cloud_for_customers_report_to_adls(): diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index 0e7afd3e2..6da0bf8b7 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import CustomerGaugeToADLS from viadot.exceptions import ValidationError +from viadot.flows import CustomerGaugeToADLS DATA = { "user_name": ["Jane", "Bob"], @@ -90,6 +90,3 @@ def test_customer_gauge_to_adls_run_flow_validation_failure(mocked_class): flow.run() except ValidationError: pass - - os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.parquet") - os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.json") diff --git a/tests/integration/flows/test_hubspot_to_adls.py b/tests/integration/flows/test_hubspot_to_adls.py index d960fc079..e0c06c20f 100644 --- a/tests/integration/flows/test_hubspot_to_adls.py +++ b/tests/integration/flows/test_hubspot_to_adls.py @@ -5,8 +5,8 @@ import pandas as pd import pytest -from viadot.flows import HubspotToADLS from viadot.exceptions import ValidationError +from viadot.flows import HubspotToADLS DATA = { "id": {"0": "820306930"}, diff --git a/tests/integration/flows/test_mediatool_to_adls.py b/tests/integration/flows/test_mediatool_to_adls.py index d7b5b2658..65cfadf8f 100644 --- a/tests/integration/flows/test_mediatool_to_adls.py +++ b/tests/integration/flows/test_mediatool_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import MediatoolToADLS from viadot.exceptions import ValidationError +from viadot.flows import MediatoolToADLS DATA = {"country": ["DK", "DE"], "sales": [3, 4]} ADLS_FILE_NAME = "test_mediatool.parquet" diff --git a/tests/integration/flows/test_mysql_to_adls.py b/tests/integration/flows/test_mysql_to_adls.py index 942bab99d..c968d48a3 100644 --- a/tests/integration/flows/test_mysql_to_adls.py +++ b/tests/integration/flows/test_mysql_to_adls.py @@ -1,4 +1,5 @@ from unittest import mock + from viadot.flows.mysql_to_adls import MySqlToADLS query = """SELECT * FROM `example-views`.`sales`""" diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py index ec68a1227..8c032f308 100644 --- a/tests/integration/flows/test_salesforce_to_adls.py +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -2,9 +2,9 @@ from prefect.tasks.secrets import PrefectSecret +from viadot.exceptions import ValidationError from viadot.flows import SalesforceToADLS from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError ADLS_FILE_NAME = "test_salesforce.parquet" ADLS_DIR_PATH = "raw/tests/" diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py index 2c01049e8..4259e5c16 100644 --- a/tests/integration/flows/test_sap_bw_to_adls.py +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import SAPBWToADLS from viadot.exceptions import ValidationError +from viadot.flows import SAPBWToADLS DATA = { "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], diff --git a/tests/integration/flows/test_sap_rfc_to_adls.py b/tests/integration/flows/test_sap_rfc_to_adls.py index ed33fa320..5503b4684 100644 --- a/tests/integration/flows/test_sap_rfc_to_adls.py +++ b/tests/integration/flows/test_sap_rfc_to_adls.py @@ -1,8 +1,8 @@ from viadot.config import local_config +from viadot.exceptions import ValidationError from viadot.flows import SAPRFCToADLS from viadot.sources import AzureDataLake from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError try: import pyrfc diff --git a/tests/integration/flows/test_supermetrics_to_adls.py b/tests/integration/flows/test_supermetrics_to_adls.py index 9738ddeb1..15deaa01a 100644 --- a/tests/integration/flows/test_supermetrics_to_adls.py +++ b/tests/integration/flows/test_supermetrics_to_adls.py @@ -4,8 +4,8 @@ import pytest from prefect.storage import Local -from viadot.flows import SupermetricsToADLS from viadot.exceptions import ValidationError +from viadot.flows import SupermetricsToADLS CWD = os.getcwd() adls_dir_path = "raw/tests/supermetrics" diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py index c18eaad10..0f6705579 100644 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import VidClubToADLS from viadot.exceptions import ValidationError +from viadot.flows import VidClubToADLS DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} ADLS_FILE_NAME = "test_vid_club.parquet" diff --git a/tests/integration/tasks/test_customer_gauge.py b/tests/integration/tasks/test_customer_gauge.py index 732205814..0c524fd0a 100644 --- a/tests/integration/tasks/test_customer_gauge.py +++ b/tests/integration/tasks/test_customer_gauge.py @@ -8,10 +8,382 @@ CUR = 185000 PAGESIZE = 1000 +DATA_JSON = { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 266, + "date_email_sent": "2018-02-05 10:42:28", + "properties": [ + {"field": "Postal Code", "reference": "999"}, + {"field": "City", "reference": "Eldorado"}, + {"field": "Currency", "reference": None}, + {"field": "Item Quantity", "reference": "7"}, + {"field": "PostingDate", "reference": "2018-01-10 00:00:00"}, + ], + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": [ + {"label": "Product Quality and Product Performance"}, + {"label": "Function and Design"}, + {"label": "Value for Money"}, + {"label": "Packaging"}, + ], +} + +RAW_JSON = { + "data": [ + { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 266, + "date_email_sent": "2018-02-05 10:42:28", + "properties": [ + {"field": "Postal Code", "reference": "999"}, + {"field": "City", "reference": "Eldorado"}, + {"field": "Currency", "reference": None}, + {"field": "Item Quantity", "reference": "7"}, + {"field": "PostingDate", "reference": "2018-01-10 00:00:00"}, + ], + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": [ + {"label": "Product Quality and Product Performance"}, + {"label": "Function and Design"}, + {"label": "Value for Money"}, + {"label": "Packaging"}, + ], + }, + { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 206, + "date_email_sent": "2018-02-05 10:41:01", + "properties": [ + {"field": "Postal Code", "reference": "0000"}, + {"field": "City", "reference": "Neverland"}, + {"field": "Currency", "reference": None}, + {"field": "Item Quantity", "reference": "1"}, + {"field": "PostingDate", "reference": "2018-01-26 00:00:00"}, + ], + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": [ + {"label": "The website of the online shop (overall impression)"}, + {"label": "Waiting period"}, + ], + }, + ], + "cursor": {"next": 37}, +} + +WRONG_DATA = { + "cols": [ + {"field": "City", "reference": "Eldorado"}, + {"field": "Currency", "reference": None}, + {"field": "Item Quantity", "reference": "7"}, + {"field": "PostingDate", "reference": "2018-01-10 00:00:00"}, + ] +} + @pytest.mark.looping_api_calls def test_customer_gauge_to_df_loop(): + """ + Test the 'run' method with looping API calls. + """ df = CG.run(total_load=True, cursor=CUR, pagesize=PAGESIZE) assert isinstance(df, pd.DataFrame) assert len(df) > PAGESIZE + + +@pytest.mark.get_data +def test_get_data(): + """ + Test the 'get_data' method with valid JSON data. + """ + json_data = CG.get_data(RAW_JSON) + assert isinstance(json_data, list) + + +@pytest.mark.get_data_error +def test_get_data_error_raising(): + """ + Test the 'get_data' method with invalid JSON data that raises a KeyError. + """ + with pytest.raises(KeyError): + CG.get_data(WRONG_DATA) + + +@pytest.mark.field_reference_unpacker_success +def test_field_reference_unpacker(): + """ + Test the '_field_reference_unpacker' method with valid data. It should unpack and modify dictionaries within the specified field and return the expected result. + """ + data = DATA_JSON.copy() + field = "properties" + expected_result = { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 266, + "date_email_sent": "2018-02-05 10:42:28", + "properties": { + "Postal Code": "999", + "City": "Eldorado", + "Currency": None, + "Item Quantity": "7", + "PostingDate": "2018-01-10 00:00:00", + }, + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": [ + {"label": "Product Quality and Product Performance"}, + {"label": "Function and Design"}, + {"label": "Value for Money"}, + {"label": "Packaging"}, + ], + } + + result = CG._field_reference_unpacker(json_response=data, field=field) + + assert result == expected_result + + +@pytest.mark.field_reference_unpacker_value_error +def test_field_reference_unpacker_invalid_data_format(): + """ + Test the '_field_reference_unpacker' method with invalid data format that should raise a ValueError. It should raise a ValueError exception. + """ + data = DATA_JSON.copy() + field = "contact" + with pytest.raises( + ValueError, + match=r"Dictionary within the specified field doesn't contain exactly two items.", + ): + CG._field_reference_unpacker(json_response=data, field=field) + + +@pytest.mark.field_reference_unpacker_key_error +def test_field_reference_unpacker_missing_field(): + """ + Test the '_field_reference_unpacker' method with a missing field that should raise a KeyError. It should raise a KeyError exception. + """ + data = DATA_JSON.copy() + field = "non_existent_field" + with pytest.raises(KeyError): + CG._field_reference_unpacker(json_response=data, field=field) + + +@pytest.mark.nested_dict_transformer_success +def test_nested_dict_transformer(): + """ + Test the '_nested_dict_transformer' method with valid data. It should modify nested dictionaries within the specified field and return the expected result. + """ + data = DATA_JSON.copy() + field = "drivers" + expected_result = { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 266, + "date_email_sent": "2018-02-05 10:42:28", + "properties": [ + {"field": "Postal Code", "reference": "999"}, + {"field": "City", "reference": "Eldorado"}, + {"field": "Currency", "reference": None}, + {"field": "Item Quantity", "reference": "7"}, + {"field": "PostingDate", "reference": "2018-01-10 00:00:00"}, + ], + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": { + "1_label": "Product Quality and Product Performance", + "2_label": "Function and Design", + "3_label": "Value for Money", + "4_label": "Packaging", + }, + } + + result = CG._nested_dict_transformer(json_response=data, field=field) + + assert result == expected_result + + +@pytest.mark.nested_dict_transformer_type_error +def test_nested_dict_transformer_invalid_data_format(): + """ + Test the '_nested_dict_transformer' method with invalid data format. It should return the same data without modification. + """ + data = DATA_JSON.copy() + field = "number_customer" + result = CG._nested_dict_transformer(json_response=data, field=field) + + assert result == data + + +@pytest.mark.nested_dict_transformer_key_error +def test_nested_dict_transformer_missing_field(): + """ + Test the '_nested_dict_transformer' method with a missing field that should raise a KeyError. + """ + data = DATA_JSON.copy() + field = "non_existent_field" + with pytest.raises(KeyError): + CG._nested_dict_transformer(json_response=data, field=field) + + +@pytest.mark.column_unpacker_success +def test_column_unpacker_success_method1_and_method2(): + """ + Test the 'column_unpacker' method with valid data and both Method 1 and Method 2 columns specified. It should return the expected result. + """ + data = RAW_JSON["data"].copy() + unpack_by_field_reference_cols = ["properties"] + unpack_by_nested_dict_transformer = ["drivers"] + + expected_result = [ + { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 266, + "date_email_sent": "2018-02-05 10:42:28", + "properties": { + "Postal Code": "999", + "City": "Eldorado", + "Currency": None, + "Item Quantity": "7", + "PostingDate": "2018-01-10 00:00:00", + }, + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": { + "1_label": "Product Quality and Product Performance", + "2_label": "Function and Design", + "3_label": "Value for Money", + "4_label": "Packaging", + }, + }, + { + "contact": {"first_name": "***", "last_name": "***"}, + "number_customer": 206, + "date_email_sent": "2018-02-05 10:41:01", + "properties": { + "Postal Code": "0000", + "City": "Neverland", + "Currency": None, + "Item Quantity": "1", + "PostingDate": "2018-01-26 00:00:00", + }, + "custom_fields": [{"field": "Assignment_ID", "reference": None}], + "drivers": { + "1_label": "The website of the online shop (overall impression)", + "2_label": "Waiting period", + }, + }, + ] + + result = CG.column_unpacker( + json_list=data, + unpack_by_field_reference_cols=unpack_by_field_reference_cols, + unpack_by_nested_dict_transformer=unpack_by_nested_dict_transformer, + ) + + assert result == expected_result + + +@pytest.mark.test_column_unpacker_missing_json_argument +def test_column_unpacker_missing_json_list(): + """ + Test the 'column_unpacker' method with missing 'json_list' argument. It should raise a ValueError. + """ + unpack_by_field_reference_cols = ["properties"] + unpack_by_nested_dict_transformer = ["drivers"] + with pytest.raises(ValueError, match="Input 'json_list' is required."): + CG.column_unpacker( + json_list=None, + unpack_by_field_reference_cols=unpack_by_field_reference_cols, + unpack_by_nested_dict_transformer=unpack_by_nested_dict_transformer, + ) + + +@pytest.mark.test_column_unpacker_duplicate_columns +def test_column_unpacker_duplicate_columns(): + """ + Test the 'column_unpacker' method with duplicate columns specified in both Method 1 and Method 2. It should raise a ValueError. + """ + data = RAW_JSON["data"].copy() + unpack_by_field_reference_cols = ["properties"] + unpack_by_nested_dict_transformer = ["properties"] + with pytest.raises( + ValueError, + match="{'properties'} were mentioned in both unpack_by_field_reference_cols and unpack_by_nested_dict_transformer. It's not possible to apply two methods to the same field.", + ): + CG.column_unpacker( + json_list=data, + unpack_by_field_reference_cols=unpack_by_field_reference_cols, + unpack_by_nested_dict_transformer=unpack_by_nested_dict_transformer, + ) + + +@pytest.mark.test_flatten_json +def test_flatten_json(): + """ + Test the 'flatten_json' method with nested JSON data. It should return a flattened dictionary with expected keys and values. + """ + nested_json = { + "user": { + "name": "Jane", + "address": { + "street": "456 Elm St", + "city": "San Francisco", + "state": "CA", + "zip": "94109", + "country": {"name": "United States", "code": "US"}, + }, + "phone_numbers": {"type": "home", "number": "555-4321"}, + } + } + + expected_output = { + "user_name": "Jane", + "user_address_street": "456 Elm St", + "user_address_city": "San Francisco", + "user_address_state": "CA", + "user_address_zip": "94109", + "user_address_country_name": "United States", + "user_address_country_code": "US", + "user_phone_numbers_type": "home", + "user_phone_numbers_number": "555-4321", + } + + output = CG.flatten_json(nested_json) + assert output == expected_output + + +@pytest.mark.flatten_json_non_dict_input +def test_flatten_json_non_dict_input(): + """ + Test the 'flatten_json' method with non-dictionary input. It should raise a TypeError. + """ + input_json = [1, 2, 3] + with pytest.raises(TypeError): + CG.flatten_json(input_json) + + +@pytest.mark.square_brackets_remover +def test_square_brackets_remover_success(): + """ + Test the 'square_brackets_remover' method with a DataFrame containing square brackets. It should remove square brackets from the DataFrame. + """ + data = { + "Column1": ["Value1", "[Value2]", "Value3", "[Value4]"], + "Column2": ["1", "[2]", "3", "[4]"], + } + sample_df = pd.DataFrame(data) + + expected_data = { + "Column1": ["Value1", "Value2", "Value3", "Value4"], + "Column2": ["1", "2", "3", "4"], + } + expected_df = pd.DataFrame(expected_data) + + result = CG.square_brackets_remover(sample_df) + pd.testing.assert_frame_equal(result, expected_df) + + +@pytest.mark.drivers_cleaner +def test_drivers_cleaner_success(): + """ + Test the '_drivers_cleaner' method with valid 'drivers' data. It should clean and format the 'drivers' data and return the expected result. + """ + data = "{'label': 'Driver1'}, {'label': 'Driver2'}, {'label': 'Driver3'}" + expected_result = "Driver1, Driver2, Driver3" + result = CG._drivers_cleaner(data) + assert result == expected_result diff --git a/tests/integration/tasks/test_eurostat.py b/tests/integration/tasks/test_eurostat.py index 475d60190..ed3f20596 100644 --- a/tests/integration/tasks/test_eurostat.py +++ b/tests/integration/tasks/test_eurostat.py @@ -3,139 +3,7 @@ import pandas as pd import pytest -from viadot.tasks import eurostat - - -def test_and_validate_dataset_code_without_params(caplog): - """This function is designed to test the accuracy of the data retrieval feature in a program. - Specifically, it tests to ensure that the program returns a non-empty DataFrame when a correct - dataset code is provided without any parameters. The function is intended to be used in software - development to verify that the program is correctly retrieving data from the appropriate dataset. - """ - task = eurostat.EurostatToDF(dataset_code="ILC_DI04").run() - assert isinstance(task, pd.DataFrame) - assert not task.empty - assert caplog.text == "" - - -def test_wrong_dataset_code_logger(caplog): - """This function is designed to test the accuracy of the error logging feature in a program. - Specifically, it tests to ensure that the program is able to correctly identify and log errors - when provided with only incorrect dataset code. - The function is intended to be used in software development to identify correct type errors - and messages in the program's handling of codes. - """ - task = eurostat.EurostatToDF(dataset_code="ILC_DI04E") - - with pytest.raises(ValueError, match="DataFrame is empty!"): - with caplog.at_level(logging.ERROR): - task.run() - assert ( - f"Failed to fetch data for ILC_DI04E, please check correctness of dataset code!" - in caplog.text - ) - - -def test_wrong_parameters_codes_logger(caplog): - """This function is designed to test the accuracy of the error logging feature in a program. - Specifically, it tests to ensure that the program is able to correctly identify and log errors - when provided with a correct dataset_code and correct parameters are provided, but both parameters codes are incorrect. - The function is intended to be used in software development to identify correct type errors - and messages in the program's handling of codes. - """ - task = eurostat.EurostatToDF( - dataset_code="ILC_DI04", - params={"hhtyp": "total1", "indic_il": "non_existing_code"}, - ) - - with pytest.raises(ValueError, match="DataFrame is empty!"): - with caplog.at_level(logging.ERROR): - task.run() - assert ( - f"Parameters codes: 'total1 | non_existing_code' are not available. Please check your spelling!" - in caplog.text - ) - assert ( - f"You can find everything via link: https://ec.europa.eu/eurostat/databrowser/view/ILC_DI04/default/table?lang=en" - in caplog.text - ) - - -def test_parameter_codes_as_list_logger(caplog): - """This function is designed to test the accuracy of the error logging feature in a program. - Specifically, it tests to ensure that the program is able to correctly identify and log errors - when provided with a correct dataset code, correct parameters, but incorrect parameters codes structure - (as a list with strings, instead of single string). - The function is intended to be used in software development to identify correct type errors - and messages in the program's handling of codes. - """ - - task = eurostat.EurostatToDF( - dataset_code="ILC_DI04", - params={"hhtyp": ["totale", "nottotale"], "indic_il": "med_e"}, - ) - with pytest.raises(ValueError, match="Wrong structure of params!"): - with caplog.at_level(logging.ERROR): - task.run() - assert ( - "You can provide only one code per one parameter as 'str' in params!\n" - in caplog.text - ) - assert ( - "CORRECT: params = {'unit': 'EUR'} | INCORRECT: params = {'unit': ['EUR', 'USD', 'PLN']}" - in caplog.text - ) - - -def test_wrong_parameters(caplog): - """This function is designed to test the accuracy of the error logging feature in a program. - Specifically, it tests to ensure that the program is able to correctly identify and log errors - when provided with a correct dataset_code, but incorrect parameters keys. - The function is intended to be used in software development to identify correct type errors - and messages in the program's handling of codes. - """ - - task = eurostat.EurostatToDF( - dataset_code="ILC_DI04", params={"hhhtyp": "total", "indic_ilx": "med_e"} - ) - with pytest.raises(ValueError, match="DataFrame is empty!"): - with caplog.at_level(logging.ERROR): - task.run() - assert ( - f"Parameters: 'hhhtyp | indic_ilx' are not in dataset. Please check your spelling!\n" - in caplog.text - ) - assert ( - f"Possible parameters: freq | hhtyp | indic_il | unit | geo | time" - in caplog.text - ) - - -def test_params_as_list(): - """This function is designed to test the accuracy of the error logging feature in a program. - Specifically, it tests to ensure that the program is able to correctly identify and log error - when provided with a correct dataset_code, but incorrect params structure (as list instead of dict). - The function is intended to be used in software development to identify correct type errors - and messages in the program's handling of codes. - """ - with pytest.raises(TypeError, match="Params should be a dictionary."): - eurostat.EurostatToDF(dataset_code="ILC_DI04", params=["total", "med_e"]).run() - - -def test_correct_params_and_dataset_code(caplog): - """This function is designed to test the accuracy of the data retrieval feature in a program. - Specifically, it tests to ensure that the program returns a non-empty DataFrame when a correct - dataset code is provided with correct params. The function is intended to be used in software - development to verify that the program is correctly retrieving data from the appropriate dataset. - """ - - task = eurostat.EurostatToDF( - dataset_code="ILC_DI04", params={"hhtyp": "total", "indic_il": "med_e"} - ).run() - - assert isinstance(task, pd.DataFrame) - assert not task.empty - assert caplog.text == "" +from viadot.tasks import EurostatToDF def task_correct_requested_columns(caplog): @@ -145,7 +13,7 @@ def task_correct_requested_columns(caplog): The function is intended to be used in software development to verify that the program is correctly retrieving data from the appropriate dataset. """ - task = eurostat.EurostatToDF( + task = EurostatToDF( dataset_code="ILC_DI04", params={"hhtyp": "total", "indic_il": "med_e"}, requested_columns=["updated", "geo", "indicator"], @@ -155,7 +23,7 @@ def task_correct_requested_columns(caplog): assert isinstance(task, pd.DataFrame) assert not task.empty assert caplog.text == "" - assert list(task.columns) == task.needed_columns + assert list(task.columns) == task.requested_columns def test_wrong_needed_columns_names(caplog): @@ -165,7 +33,7 @@ def test_wrong_needed_columns_names(caplog): The function is intended to be used in software development to identify correct type errors and messages in the program's handling of codes. """ - task = eurostat.EurostatToDF( + task = EurostatToDF( dataset_code="ILC_DI04", params={"hhtyp": "total", "indic_il": "med_e"}, requested_columns=["updated1", "geo1", "indicator1"], @@ -188,7 +56,7 @@ def test_wrong_params_and_wrong_requested_columns_names(caplog): params validation. The function is intended to be used in software development to identify correct type errors and messages in the program's handling of codes. """ - task = eurostat.EurostatToDF( + task = EurostatToDF( dataset_code="ILC_DI04", params={"hhhtyp": "total", "indic_ilx": "med_e"}, requested_columns=["updated1", "geo1", "indicator1"], @@ -217,8 +85,32 @@ def test_requested_columns_not_in_list(): with pytest.raises( TypeError, match="Requested columns should be provided as list of strings." ): - eurostat.EurostatToDF( + EurostatToDF( dataset_code="ILC_DI04", params={"hhtyp": "total", "indic_il": "med_e"}, requested_columns="updated", ).run() + + +def test_requested_columns_not_provided(caplog): + """Test the behavior when 'requested_columns' are not provided to EurostatToDF. + + This test checks the behavior of the EurostatToDF class when 'requested_columns' are not provided. + It ensures that the resulting DataFrame is of the correct type, not empty, and that no error + messages are logged using the 'caplog' fixture. + + Parameters: + - caplog: pytest fixture for capturing log messages. + + Usage: + - Invoke this test function to check the behavior of EurostatToDF when 'requested_columns' are not provided. + """ + task = EurostatToDF( + dataset_code="ILC_DI04", + params={"hhtyp": "total", "indic_il": "med_e"}, + ) + df = task.run() + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert caplog.text == "" diff --git a/tests/integration/tasks/test_tm1.py b/tests/integration/tasks/test_tm1.py new file mode 100644 index 000000000..68527b5f7 --- /dev/null +++ b/tests/integration/tasks/test_tm1.py @@ -0,0 +1,15 @@ +import pandas as pd + +from viadot.config import local_config +from viadot.tasks import TM1ToDF + +CUBE = local_config.get("test_cube") +VIEW = local_config.get("test_view") + + +def test_tm1_to_df(): + tm1 = TM1ToDF(CUBE, VIEW) + df = tm1.run() + + assert isinstance(df, pd.DataFrame) + assert df.empty is False diff --git a/tests/integration/test_customer_gauge.py b/tests/integration/test_customer_gauge.py index 666a73251..a29ff3585 100644 --- a/tests/integration/test_customer_gauge.py +++ b/tests/integration/test_customer_gauge.py @@ -3,12 +3,23 @@ import pandas as pd import pytest +from viadot.exceptions import CredentialError from viadot.sources import CustomerGauge ENDPOINT = random.choice(["responses", "non-responses"]) CG = CustomerGauge(endpoint=ENDPOINT) +def test_wrong_endpoint(): + with pytest.raises(ValueError, match="Incorrect endpoint name"): + CustomerGauge(endpoint=["wrong_endpoint"]) + + +def test_endpoint_and_url_not_provided(): + with pytest.raises(ValueError, match="Provide endpoint name"): + CustomerGauge() + + def test_get_json_content(): json_response = CG.get_json_response() assert isinstance(json_response, dict) @@ -17,52 +28,6 @@ def test_get_json_content(): assert isinstance(json_response["cursor"], dict) -def test_properties_cleaning(): - json_response = CG.get_json_response() - data = json_response["data"][2].copy() - cleaned_data = CG.properties_cleaning(data.copy()) - assert isinstance(data["properties"], list) - assert isinstance(cleaned_data["properties"], dict) - - -def test_flatten_json(): - nested_json = { - "user": { - "name": "Jane", - "address": { - "street": "456 Elm St", - "city": "San Francisco", - "state": "CA", - "zip": "94109", - "country": {"name": "United States", "code": "US"}, - }, - "phone_numbers": {"type": "home", "number": "555-4321"}, - } - } - - expected_output = { - "user_name": "Jane", - "user_address_street": "456 Elm St", - "user_address_city": "San Francisco", - "user_address_state": "CA", - "user_address_zip": "94109", - "user_address_country_name": "United States", - "user_address_country_code": "US", - "user_phone_numbers_type": "home", - "user_phone_numbers_number": "555-4321", - } - - output = CG.flatten_json(nested_json) - assert output == expected_output - - -def test_pagesize_and_to_df(): - json_response = CG.get_json_response(pagesize=1) - df = CG.to_df(json_response) - assert isinstance(df, pd.DataFrame) - assert len(df) == 1 - - def test_pass_specific_cursor(): # for default pagesize=1000 returned cursor value should be bigger than passed cur = random.randint(1, 9999) @@ -71,11 +36,16 @@ def test_pass_specific_cursor(): assert cur_retrieved > cur +def test_cursor_is_not_provided(): + with pytest.raises( + ValueError, match="Provided argument doesn't contain 'cursor' value" + ): + CG.get_cursor(json_response={}) + + def test_uncomplete_date_arguments(): with pytest.raises(ValueError, match="Missing date arguments"): - json_response = CG.get_json_response( - date_field="date_sent", start_date="2012-01-03" - ) + CG.get_json_response(date_field="date_sent", start_date="2012-01-03") def test_endpoint_url_argument(): @@ -84,3 +54,40 @@ def test_endpoint_url_argument(): CG = CustomerGauge(url=ENDPOINT_URL) json_response = CG.get_json_response() assert isinstance(json_response, dict) + + +@pytest.mark.endpoint_valueerror +def test_wrong_endpoint_valueerror_raising(): + with pytest.raises( + ValueError, + match=r"Incorrect endpoint name. Choose: 'responses' or 'non-responses'", + ): + wrong_endpoint_name = "wrong-endpoint" + CG = CustomerGauge(endpoint=wrong_endpoint_name) + + +@pytest.mark.endpoint_valueerror +def test_no_endpoint_valueerror_raising(): + with pytest.raises( + ValueError, + match=r"Provide endpoint name. Choose: 'responses' or 'non-responses'. Otherwise, provide URL", + ): + CG = CustomerGauge() + + +@pytest.mark.endpoint_credentialserror +def test_credentialserror_raising(): + wrong_secret = "wrong" + with pytest.raises(CredentialError, match=r"Credentials not provided."): + CG = CustomerGauge(endpoint=ENDPOINT, credentials_secret=wrong_secret) + + +@pytest.mark.get_cursor_valueerror +def test_get_cursor_valueerror_raising(): + wrong_json = {} + with pytest.raises( + ValueError, + match=r"Provided argument doesn't contain 'cursor' value. Pass json returned from the endpoint.", + ): + CG = CustomerGauge(endpoint=ENDPOINT) + CG.get_cursor(json_response=wrong_json) diff --git a/tests/integration/test_epicor.py b/tests/integration/test_epicor.py index 8e9155059..60c1f3410 100644 --- a/tests/integration/test_epicor.py +++ b/tests/integration/test_epicor.py @@ -1,7 +1,8 @@ +import pandas as pd import pytest from viadot.config import local_config -from viadot.exceptions import DataRangeError +from viadot.exceptions import CredentialError, DataRangeError from viadot.sources import Epicor @@ -48,3 +49,25 @@ def test_connection(epicor): def test_validate_filter(epicor_error): with pytest.raises(DataRangeError): epicor_error.validate_filter() + + +def test_credentials_not_provided(): + with pytest.raises(CredentialError): + Epicor( + base_url=local_config.get("EPICOR").get("test_url"), + credentials={"username": "user12", "port": 1111}, + filters_xml=""" + + + 001 + + 2022-05-16 + 3 + + """, + ) + + +def test_to_df_return_type(epicor): + df = epicor.to_df() + assert isinstance(df, pd.DataFrame) diff --git a/tests/integration/test_eurostat.py b/tests/integration/test_eurostat.py new file mode 100644 index 000000000..6fb64cbea --- /dev/null +++ b/tests/integration/test_eurostat.py @@ -0,0 +1,140 @@ +import logging + +import pandas as pd +import pytest + +from viadot.sources import Eurostat + + +def test_and_validate_dataset_code_without_params(caplog): + """This function is designed to test the accuracy of the data retrieval feature in a program. + Specifically, it tests to ensure that the program returns a non-empty DataFrame when a correct + dataset code is provided without any parameters. The function is intended to be used in software + development to verify that the program is correctly retrieving data from the appropriate dataset. + """ + source = Eurostat(dataset_code="ILC_DI04").get_data_frame_from_response() + assert isinstance(source, pd.DataFrame) + assert not source.empty + assert caplog.text == "" + + +def test_wrong_dataset_code_logger(caplog): + """This function is designed to test the accuracy of the error logging feature in a program. + Specifically, it tests to ensure that the program is able to correctly identify and log errors + when provided with only incorrect dataset code. + The function is intended to be used in software development to identify correct type errors + and messages in the program's handling of codes. + """ + source = Eurostat(dataset_code="ILC_DI04E") + + with pytest.raises(ValueError, match="DataFrame is empty!"): + with caplog.at_level(logging.ERROR): + source.get_data_frame_from_response() + assert ( + f"Failed to fetch data for ILC_DI04E, please check correctness of dataset code!" + in caplog.text + ) + + +def test_wrong_parameters_codes_logger(caplog): + """This function is designed to test the accuracy of the error logging feature in a program. + Specifically, it tests to ensure that the program is able to correctly identify and log errors + when provided with a correct dataset_code and correct parameters are provided, but both parameters codes are incorrect. + The function is intended to be used in software development to identify correct type errors + and messages in the program's handling of codes. + """ + source = Eurostat( + dataset_code="ILC_DI04", + params={"hhtyp": "total1", "indic_il": "non_existing_code"}, + ) + + with pytest.raises(ValueError, match="DataFrame is empty!"): + with caplog.at_level(logging.ERROR): + source.get_data_frame_from_response() + assert ( + f"Parameters codes: 'total1 | non_existing_code' are not available. Please check your spelling!" + in caplog.text + ) + assert ( + f"You can find everything via link: https://ec.europa.eu/eurostat/databrowser/view/ILC_DI04/default/table?lang=en" + in caplog.text + ) + + +def test_parameter_codes_as_list_logger(caplog): + """This function is designed to test the accuracy of the error logging feature in a program. + Specifically, it tests to ensure that the program is able to correctly identify and log errors + when provided with a correct dataset code, correct parameters, but incorrect parameters codes structure + (as a list with strings, instead of single string). + The function is intended to be used in software development to identify correct type errors + and messages in the program's handling of codes. + """ + + source = Eurostat( + dataset_code="ILC_DI04", + params={"hhtyp": ["totale", "nottotale"], "indic_il": "med_e"}, + ) + with pytest.raises(ValueError, match="Wrong structure of params!"): + with caplog.at_level(logging.ERROR): + source.get_data_frame_from_response() + assert ( + "You can provide only one code per one parameter as 'str' in params!\n" + in caplog.text + ) + assert ( + "CORRECT: params = {'unit': 'EUR'} | INCORRECT: params = {'unit': ['EUR', 'USD', 'PLN']}" + in caplog.text + ) + + +def test_wrong_parameters(caplog): + """This function is designed to test the accuracy of the error logging feature in a program. + Specifically, it tests to ensure that the program is able to correctly identify and log errors + when provided with a correct dataset_code, but incorrect parameters keys. + The function is intended to be used in software development to identify correct type errors + and messages in the program's handling of codes. + """ + + source = Eurostat( + dataset_code="ILC_DI04", params={"hhhtyp": "total", "indic_ilx": "med_e"} + ) + with pytest.raises(ValueError, match="DataFrame is empty!"): + with caplog.at_level(logging.ERROR): + source.get_data_frame_from_response() + assert ( + f"Parameters: 'hhhtyp | indic_ilx' are not in dataset. Please check your spelling!\n" + in caplog.text + ) + assert ( + f"Possible parameters: freq | hhtyp | indic_il | unit | geo | time" + in caplog.text + ) + + +def test_params_as_list(): + """This function is designed to test the accuracy of the error logging feature in a program. + Specifically, it tests to ensure that the program is able to correctly identify and log error + when provided with a correct dataset_code, but incorrect params structure (as list instead of dict). + The function is intended to be used in software development to identify correct type errors + and messages in the program's handling of codes. + """ + with pytest.raises(TypeError, match="Params should be a dictionary."): + Eurostat( + dataset_code="ILC_DI04", params=["total", "med_e"] + ).get_data_frame_from_response() + + +def test_correct_params_and_dataset_code(caplog): + """This function is designed to test the accuracy of the data retrieval feature in a program. + Specifically, it tests to ensure that the program returns a non-empty DataFrame when a correct + dataset code is provided with correct params. The function is intended to be used in software + development to verify that the program is correctly retrieving data from the appropriate dataset. + """ + + source = Eurostat( + dataset_code="ILC_DI04", params={"hhtyp": "total", "indic_il": "med_e"} + ).get_data_frame_from_response() + + assert isinstance(source, pd.DataFrame) + assert not source.empty + assert caplog.text == "" diff --git a/tests/integration/test_genesys.py b/tests/integration/test_genesys.py index 817e590b5..f91318b96 100644 --- a/tests/integration/test_genesys.py +++ b/tests/integration/test_genesys.py @@ -1,3 +1,4 @@ +import logging from unittest import mock import pytest @@ -138,6 +139,13 @@ def test_default_credential_param(): assert g.credentials != None and type(g.credentials) == dict +@pytest.mark.init +def test_default_credentials_provided(caplog): + with caplog.at_level(logging.INFO): + Genesys(credentials_genesys={"CREDENTIALS_KEY": "value"}) + assert "Credentials provided by user" in caplog.text + + @pytest.mark.init def test_environment_param(): g = Genesys() @@ -169,15 +177,35 @@ def test_generate_api_connection(mock_api_response, var_dictionary): mock_api_response.assert_called() +def test_api_connection_return_type(): + conn_dict = Genesys().genesys_api_connection(post_data_list=["test_value_to_post"]) + assert isinstance(conn_dict, dict) + + +def test_load_reporting_exports_return_type(caplog): + with caplog.at_level(logging.INFO): + load_return = Genesys().load_reporting_exports() + assert isinstance(load_return, dict) + + assert "loaded" in caplog.text + + @mock.patch.object(Genesys, "download_report") @pytest.mark.dependency(depends=["test_generate_api_connection"]) @pytest.mark.download -def test_download_reports(mock_download_files, var_dictionary): +def test_download_reports(mock_download_files, var_dictionary, caplog): g = Genesys() g.ids_mapping = var_dictionary["ids_mapping"] g.report_data = var_dictionary["report_data"] g.start_date = var_dictionary["start_date"] - file_name_list = g.download_all_reporting_exports() + with caplog.at_level(logging.INFO): + file_name_list = g.download_all_reporting_exports() + assert "IDS_MAPPING loaded" in caplog.text + + g.ids_mapping = None + with caplog.at_level(logging.WARNING): + file_name_list = g.download_all_reporting_exports() + assert "IDS_MAPPING is not provided" in caplog.text assert type(file_name_list) == list and len(file_name_list) > 0 mock_download_files.assert_called() diff --git a/tests/integration/test_hubspot.py b/tests/integration/test_hubspot.py index 0907e031a..c3f303b4c 100644 --- a/tests/integration/test_hubspot.py +++ b/tests/integration/test_hubspot.py @@ -3,6 +3,7 @@ import pandas as pd import pytest +from viadot.exceptions import CredentialError from viadot.sources import Hubspot from viadot.task_utils import credentials_loader @@ -40,6 +41,11 @@ def var_dictionary(): yield variables +def test_credentials_not_provided(): + with pytest.raises(CredentialError, match="Credentials not found."): + Hubspot(credentials={}) + + def test_clean_special_characters(): test_value = "762##28cd7-e$69d-4708-be31-726bb!859befd" clean_chars = HUBSPOT.clean_special_characters(value=test_value) @@ -79,3 +85,8 @@ def test_to_json(var_dictionary): trigger = HUBSPOT.to_json(url=api_url, body=api_body, method="POST") assert isinstance(trigger, dict) + + +def test_get_properties_url(var_dictionary): + url = HUBSPOT.get_properties_url(endpoint=var_dictionary["endpoint"]) + assert isinstance(url, str) diff --git a/tests/integration/test_mediatool.py b/tests/integration/test_mediatool.py index f5a2d81a2..45b9da48b 100644 --- a/tests/integration/test_mediatool.py +++ b/tests/integration/test_mediatool.py @@ -5,7 +5,7 @@ import pytest from prefect.tasks.secrets import PrefectSecret -from viadot.exceptions import APIError +from viadot.exceptions import APIError, CredentialError from viadot.sources import Mediatool from viadot.task_utils import credentials_loader @@ -13,6 +13,11 @@ MTOOL = Mediatool(credentials=CREDENTIALS) +def test_init_empty_credentials(): + with pytest.raises(CredentialError, match=r"Missing credentials."): + Mediatool(credentials={}) + + def test_get_campaigns_df(): camps = MTOOL.get_campaigns(CREDENTIALS["ORG"]) assert isinstance(camps, pd.DataFrame) @@ -29,6 +34,13 @@ def test_get_organizations(): assert isinstance(orgs, pd.DataFrame) +def test_get_organizations_return_list(): + orgs = MTOOL.get_organizations( + user_id=CREDENTIALS["USER_ID"], return_dataframe=False + ) + assert isinstance(orgs, list) + + def test_get_media_entries(): media_entries = MTOOL.get_media_entries( organization_id=CREDENTIALS["ORG"], columns=["_id"] @@ -36,6 +48,13 @@ def test_get_media_entries(): assert isinstance(media_entries, pd.DataFrame) +def test_get_media_entries_wrong_columns(caplog): + MTOOL.get_media_entries( + organization_id=CREDENTIALS["ORG"], columns=["wrong_column", "random_column"] + ) + assert "Columns ['wrong_column', 'random_column'] are incorrect." in caplog.text + + def test_get_media_types_correct_id(): media_types = MTOOL.get_media_types(media_type_ids=[CREDENTIALS["MEDIA_TYPE_ID"]]) assert isinstance(media_types, pd.DataFrame) @@ -48,11 +67,25 @@ def test_get_media_types_wrong_id(): _ = MTOOL.get_media_types(["040404"]) -def test_get_vehicles(caplog): +def test_get_media_types_return_list(): + media_types = MTOOL.get_media_types( + media_type_ids=[CREDENTIALS["MEDIA_TYPE_ID"]], return_dataframe=False + ) + assert isinstance(media_types, list) + + +def test_get_vehicles_wrong_ids(caplog): _ = MTOOL.get_vehicles(vehicle_ids=["100000", "200000"]) assert "Vehicle were not found for: ['100000', '200000']" in caplog.text +def test_get_vehicles_return_dict(): + vehicles = MTOOL.get_vehicles( + vehicle_ids=[CREDENTIALS["VEHICLE_ID"]], return_dataframe=False + ) + assert isinstance(vehicles, dict) + + def test_rename_columns_correct(): data = {"id": [1, 2], "amount": [3, 4]} df = pd.DataFrame(data=data) diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index 502ffded0..ba0985f7c 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,17 +1,16 @@ import os import re +from copy import deepcopy import pandas as pd -from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret from viadot.config import local_config from viadot.exceptions import CredentialError -from viadot.sources import Sharepoint +from viadot.sources import Sharepoint, SharepointList from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF -from viadot.sources import SharepointList def get_url() -> str: @@ -168,10 +167,11 @@ def test_get_data_types(file_name): assert "String" in dtypes +### SECTION FOR TESTING SHAREPOINT LIST CONNECTOR ### @pytest.fixture(scope="session") def sharepoint_list(): """ - Fixture for creating a Sharepoint class instance. + Fixture for creating a SharepointList class instance. The class instance can be used within a test functions to interact with Sharepoint. """ spl = SharepointList() @@ -187,15 +187,31 @@ def test_valid_filters(sharepoint_list): assert result is True -def test_invalid_dtype(sharepoint_list): +def test_filters_missing_dtype(sharepoint_list): + filters = { + "filter1": {"operator1": ">", "value1": 10}, + } + with pytest.raises( + ValueError, + match=re.escape("dtype for filter1 is missing!"), + ): + sharepoint_list.check_filters(filters) + + +def test_filters_invalid_dtype(sharepoint_list): filters = { "filter1": {"dtype": "list", "operator1": ">", "value1": 10}, } - with pytest.raises(ValueError, match="dtype not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "dtype not allowed! Expected: ['datetime', 'date', 'bool', 'int', 'float', 'complex', 'str'] got: list ." + ), + ): sharepoint_list.check_filters(filters) -def test_missing_operator1(sharepoint_list): +def test_filters_missing_operator1(sharepoint_list): filters = { "filter1": {"dtype": "int", "value1": 10}, } @@ -203,23 +219,28 @@ def test_missing_operator1(sharepoint_list): sharepoint_list.check_filters(filters) -def test_invalid_operator1(sharepoint_list): +def test_filters_invalid_operator1(sharepoint_list): filters = { "filter1": {"dtype": "int", "operator1": "*", "value1": 10}, } - with pytest.raises(ValueError, match="Operator type not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator1 type not allowed! Expected: ['<', '>', '<=', '>=', '==', '!='] got: * ." + ), + ): sharepoint_list.check_filters(filters) -def test_missing_value1(sharepoint_list): +def test_filters_missing_value1(sharepoint_list): filters = { "filter1": {"dtype": "int", "operator1": ">", "value1": None}, } - with pytest.raises(ValueError, match="Value for operator1 is missing!"): + with pytest.raises(ValueError, match="Value1 for operator1 is missing!"): sharepoint_list.check_filters(filters) -def test_missing_operators_conjuction(sharepoint_list): +def test_filters_missing_operators_conjunction(sharepoint_list): filters = { "filter1": { "dtype": "int", @@ -229,11 +250,16 @@ def test_missing_operators_conjuction(sharepoint_list): "value2": 20, }, } - with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator for conjunction is missing! Expected: ['&', '|'] got empty." + ), + ): sharepoint_list.check_filters(filters) -def test_invalid_operators_conjuction(sharepoint_list): +def test_filters_invalid_operators_conjunction(sharepoint_list): filters = { "filter1": { "dtype": "int", @@ -241,24 +267,54 @@ def test_invalid_operators_conjuction(sharepoint_list): "value1": 10, "operator2": "<", "value2": 20, - "operators_conjuction": "!", + "operators_conjunction": "!", }, } - with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator for conjunction not allowed! Expected: ['&', '|'] got ! ." + ), + ): sharepoint_list.check_filters(filters) -def test_invalid_filters_conjuction(sharepoint_list): +def test_filters_conjunction_not_allowed(sharepoint_list): filters = { "filter1": { "dtype": "int", "operator1": ">", "value1": 10, - "filters_conjuction": "!", + "filters_conjunction": "!", }, } with pytest.raises( - ValueError, match="Filters operators for conjuction not allowed!" + ValueError, + match=re.escape( + "Filters conjunction allowed only when more then one filter provided!" + ), + ): + sharepoint_list.check_filters(filters) + + +def test_filters_invalid_conjunction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "value1": 10, + "operator1": ">", + "filters_conjunction": "!", + }, + "filter2": { + "dtype": "int", + "operator1": "==", + }, + } + with pytest.raises( + ValueError, + match=re.escape( + "Filter operator for conjunction not allowed! Expected: ['&', '|'] got ! ." + ), ): sharepoint_list.check_filters(filters) @@ -266,52 +322,86 @@ def test_invalid_filters_conjuction(sharepoint_list): def test_valid_mapping(sharepoint_list): filters = { "filter1": { + "dtype": "int", + "value1": 10, + "value2": 20, "operator1": ">", "operator2": "<=", - "operators_conjuction": "&", - "filters_conjuction": "|", + "operators_conjunction": "&", + "filters_conjunction": "|", + }, + "filter2": { + "dtype": "int", + "value1": 30, + "value2": 0, + "operator1": "==", + "operator2": "!=", + "operators_conjunction": "|", }, - "filter2": {"operator1": "==", "operator2": "!=", "operators_conjuction": "|"}, } expected_result = { "filter1": { + "dtype": "int", + "value1": 10, + "value2": 20, "operator1": "gt", "operator2": "le", - "operators_conjuction": "and", - "filters_conjuction": "or", + "operators_conjunction": "and", + "filters_conjunction": "or", + }, + "filter2": { + "dtype": "int", + "value1": 30, + "value2": 0, + "operator1": "eq", + "operator2": "ne", + "operators_conjunction": "or", }, - "filter2": {"operator1": "eq", "operator2": "ne", "operators_conjuction": "or"}, } - result = sharepoint_list.operators_mapping(deepcopy(filters)) + result = sharepoint_list.operators_mapping(filters) assert result == expected_result -def test_invalid_comparison_operator(sharepoint_list): +def test_operators_mapping_invalid_comparison_operator(sharepoint_list): filters = { "filter1": { "operator1": "*", "operator2": "<=", - "operators_conjuction": "&", - "filters_conjuction": "|", + "operators_conjunction": "&", + "filters_conjunction": "|", }, } error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): - sharepoint_list.operators_mapping(deepcopy(filters)) + sharepoint_list.operators_mapping(filters) + + +def test_operators_mapping_invalid_logical_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjunction": "!", + "filters_conjunction": "|", + }, + } + error_message = "This conjunction (logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(filters) -def test_invalid_logical_operator(sharepoint_list): +def test_operators_mapping_invalid_filters_logical_operator(sharepoint_list): filters = { "filter1": { "operator1": ">", "operator2": "<=", - "operators_conjuction": "!", - "filters_conjuction": "|", + "operators_conjunction": "&", + "filters_conjunction": "!", }, } - error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + error_message = "This filters conjunction (logical) operator: ! is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): - sharepoint_list.operators_mapping(deepcopy(filters)) + sharepoint_list.operators_mapping(filters) def test_single_filter_datetime_api(sharepoint_list): @@ -348,7 +438,7 @@ def test_single_df_filter(sharepoint_list): def test_multiple_df_filters(sharepoint_list): filters = { - "column1": {"operator1": ">", "value1": 10, "filters_conjuction": "&"}, + "column1": {"operator1": ">", "value1": 10, "filters_conjunction": "&"}, "column2": {"operator1": "<", "value1": 20}, } result = sharepoint_list.make_filter_for_df(filters) diff --git a/tests/integration/test_tm1.py b/tests/integration/test_tm1.py new file mode 100644 index 000000000..c0d887a61 --- /dev/null +++ b/tests/integration/test_tm1.py @@ -0,0 +1,126 @@ +import pandas as pd +import pytest + +from viadot.config import local_config +from viadot.exceptions import CredentialError, ValidationError +from viadot.sources import TM1 + +CUBE = local_config.get("TM1").get("test_cube") +VIEW = local_config.get("TM1").get("test_view") +DIMENSION = local_config.get("TM1").get("test_dim") +HIERARCHY = local_config.get("TM1").get("test_hierarchy") + + +def test_get_connection(): + tm1_source = TM1() + connection = tm1_source.get_connection() + + assert connection is not None + + +def test_get_connection_fail(): + test_creds = { + "address": "Addres", + "port": 123, + "username": "user", + } + with pytest.raises(CredentialError): + tm1_source = TM1(credentials=test_creds) + + +def test_get_cubes_names(): + tm1_source = TM1() + cubes = tm1_source.get_cubes_names() + + assert len(cubes) > 0 + + +def test_get_dimensions_names(): + tm1_source = TM1() + dim = tm1_source.get_dimensions_names() + + assert len(dim) > 0 + + +def test_get_views_names(): + tm1_source = TM1(cube=CUBE) + views = tm1_source.get_views_names() + + assert len(views) > 0 + + +def test_get_hierarchies_names(): + tm1_source = TM1(dimension=DIMENSION) + hierarchies = tm1_source.get_hierarchies_names() + + assert len(hierarchies) > 0 + + +def test_get_available_elements(): + tm1_source = TM1(dimension=DIMENSION, hierarchy=HIERARCHY) + elements = tm1_source.get_available_elements() + + assert len(elements) > 0 + + +def test_to_df_view(): + tm1_source = TM1(cube=CUBE, view=VIEW) + df = tm1_source.to_df() + + assert isinstance(df, pd.DataFrame) + assert df.empty is False + + +def test_to_df_mdx(): + query = ( + """ + select + { + [version].[version].[Budget] + } + on columns, + { + [company].[company].MEMBERS + } + on rows + + FROM """ + + f"{CUBE}" + ) + + tm1_source = TM1(mdx_query=query) + df = tm1_source.to_df(if_empty="pass") + + assert isinstance(df, pd.DataFrame) + + +def test_to_df_fail_both(): + query = ( + """ + select + { + [version].[version].[Budget] + } + on columns, + { + [company].[company].MEMBERS + } + on rows + + FROM """ + + f"{CUBE}" + ) + + tm1_source = TM1(mdx_query=query, cube=CUBE) + with pytest.raises( + ValidationError, match="Specify only one: MDX query or cube and view." + ): + tm1_source.to_df(if_empty="pass") + + +def test_to_df_fail_no(): + tm1_source = TM1() + with pytest.raises( + ValidationError, match="MDX query or cube and view are required." + ): + tm1_source.to_df(if_empty="pass") diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 1f0874453..675dbfbdc 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.21" + assert __version__ == "0.4.22" diff --git a/tests/unit/test_task_utils.py b/tests/unit/test_task_utils.py index e77c24fdd..969b699a4 100644 --- a/tests/unit/test_task_utils.py +++ b/tests/unit/test_task_utils.py @@ -19,8 +19,8 @@ df_to_parquet, dtypes_to_json_task, union_dfs_task, - write_to_json, validate_df, + write_to_json, ) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index a94eaff9f..75ef30e97 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -6,9 +6,10 @@ from viadot.signals import SKIP from viadot.utils import ( + add_viadot_metadata_columns, check_if_empty_file, gen_bulk_insert_query_from_df, - add_viadot_metadata_columns, + check_value, ) EMPTY_CSV_PATH = "empty.csv" @@ -153,3 +154,51 @@ def test_add_viadot_metadata_columns_with_parameter(): assert df_base.columns.to_list() == ["a", "b"] assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"] assert df_decorated["_viadot_source"][0] == "Source_name" + + +# Sample test checking the correctness of the function when the key is found +def test_check_value_found(): + json_data = { + "first_known_lvl": { + "second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}} + } + } + result = check_value( + json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"], + ["searched_phrase"], + ) + assert result == "phrase" + + +# Sample test checking the correctness of the function when the key is not found +def test_check_value_not_found(): + json_data = { + "first_known_lvl": { + "second_known_lvl": { + "third_known_lvl": {"other_phrase": "This won't be found"} + } + } + } + result = check_value( + json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"], + ["searched_phrase"], + ) + assert result is None + + +# Sample test checking the correctness of the function with an empty dictionary +def test_check_value_empty_dict(): + json_data = {} + result = check_value(json_data, ["searched_phrase"]) + assert result is None + + +# Sample test checking the correctness of the function with a nonexistent key +def test_check_value_nonexistent_key(): + json_data = { + "first_known_lvl": { + "second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}} + } + } + result = check_value(json_data, ["nonexistent_key"]) + assert result is None diff --git a/viadot/__init__.py b/viadot/__init__.py index e427a5547..ece529aa1 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.21" +__version__ = "0.4.22" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index e138735d6..2f30c04d8 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS, SharepointListToADLS +from .sharepoint_to_adls import SharepointListToADLS, SharepointToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL diff --git a/viadot/flows/bigquery_to_adls.py b/viadot/flows/bigquery_to_adls.py index e09981ebe..cd092066f 100644 --- a/viadot/flows/bigquery_to_adls.py +++ b/viadot/flows/bigquery_to_adls.py @@ -43,6 +43,7 @@ def __init__( if_exists: str = "replace", validate_df_dict: dict = None, timeout: int = 3600, + set_prefect_kv: bool = False, *args: List[Any], **kwargs: Dict[str, Any], ): @@ -84,6 +85,7 @@ def __init__( When passed, `validate_df` task validation tests are triggered. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + set_prefect_kv(int, optional): Specifies whether to set a key-value pair in the Prefect KV Store. Defaults to False. """ # BigQueryToDF self.query = query @@ -125,6 +127,8 @@ def __init__( adls_dir_path, "schema", self.now + ".json" ) + self.set_prefect_kv = set_prefect_kv + super().__init__(*args, name=name, **kwargs) self.gen_flow() @@ -205,4 +209,5 @@ def gen_flow(self) -> Flow: df_to_be_loaded.set_upstream(dtypes_dict, flow=self) file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.set_prefect_kv is True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 39225330c..82e14d5b4 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -38,6 +38,8 @@ def __init__( ] = None, start_date: datetime = None, end_date: datetime = None, + unpack_by_field_reference_cols: List[str] = None, + unpack_by_nested_dict_transformer: List[str] = None, customer_gauge_credentials_secret: str = "CUSTOMER-GAUGE", anonymize: bool = False, columns_to_anonymize: List[str] = None, @@ -59,39 +61,46 @@ def __init__( **kwargs: Dict[str, Any] ): """ - Flow for downloading data from the Customer Gauge's endpoints (Responses and Non-Responses) via API to a CSV or Parquet file. - The data anonimization is optional.Then upload it to Azure Data Lake. + Flow for downloading data from the Customer Gauge's endpoints (Responses and Non-Responses) via API + to a CSV or Parquet file.The data anonimization is optional.Then upload it to Azure Data Lake. Args: name (str): The name of the flow. - endpoint (Literal["responses", "non-responses"], optional): Indicate which endpoint to connect. Defaults to None. + endpoint (Literal["responses", "non-responses"], optional): Indicate which endpoint to connect. + Defaults to None. endpoint_url (str, optional): Full URL for pointing to specific endpoint. Defaults to None. - total_load (bool, optional): Indicate whether to download the data to the latest. If 'False', only one API call is executed (up to 1000 records). - Defaults to True. + total_load (bool, optional): Indicate whether to download the data to the latest. If 'False', + only one API call is executed (up to 1000 records). Defaults to True. cursor (int, optional): Cursor value to navigate to the page. Defaults to None. - pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. Defaults to 1000. - date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], optional): Specifies the date type which filter date range. - Defaults to None. + pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. + Defaults to 1000. + date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], optional): + Specifies the date type which filter date range. Defaults to None. start_date (datetime, optional): Defines the period start date in yyyy-mm-dd format. Defaults to None. end_date (datetime, optional): Defines the period end date in yyyy-mm-dd format. Defaults to None. - customer_gauge_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ['client_id', 'client_secret']. - Defaults to "CUSTOMER-GAUGE". + unpack_by_field_reference_cols (List[str]): Columns to unpack and modify using `_field_reference_unpacker`. Defaults to None. + unpack_by_nested_dict_transformer (List[str]): Columns to unpack and modify using `_nested_dict_transformer`. Defaults to None. + customer_gauge_credentials_secret (str, optional): The name of the Azure Key Vault secret containing + a dictionary with ['client_id', 'client_secret']. Defaults to "CUSTOMER-GAUGE". vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. anonymize (bool, optional): Indicates if anonymize selected columns. Defaults to False. columns_to_anonymize (List[str], optional): List of columns to anonymize. Defaults to None. - anonymize_method (Literal["mask", "hash"], optional): Method of anonymizing data. "mask" -> replace the data with "value" arg. - "hash" -> replace the data with the hash value of an object (using `hash()` method). Defaults to "mask". + anonymize_method (Literal["mask", "hash"], optional): Method of anonymizing data. "mask" -> replace the + data with "value" arg. "hash" -> replace the data with the hash value of an object (using `hash()` + method). Defaults to "mask". anonymize_value (str, optional): Value to replace the data. Defaults to "***". - date_column (str, optional): Name of the date column used to identify rows that are older than a specified number of days. Defaults to None. - days (int, optional): The number of days beyond which we want to anonymize the data, e.g. older that 2 years can be: 2*365. Defaults to None. + date_column (str, optional): Name of the date column used to identify rows that are older than a specified + number of days. Defaults to None. + days (int, optional): The number of days beyond which we want to anonymize the data, e.g. older than + 2 years can be: 2*365. Defaults to None. output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet". adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. local_file_path (str, optional): Local destination path. Defaults to None. adls_file_name (str, optional): Name of file in ADLS. Defaults to None. - adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with - ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. - Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary + with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure + Data Lake. Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. @@ -107,6 +116,8 @@ def __init__( self.date_field = date_field self.start_date = start_date self.end_date = end_date + self.unpack_by_field_reference_cols = unpack_by_field_reference_cols + self.unpack_by_nested_dict_transformer = unpack_by_nested_dict_transformer self.customer_gauge_credentials_secret = customer_gauge_credentials_secret # validate_df @@ -171,6 +182,8 @@ def gen_flow(self) -> Flow: date_field=self.date_field, start_date=self.start_date, end_date=self.end_date, + unpack_by_field_reference_cols=self.unpack_by_field_reference_cols, + unpack_by_nested_dict_transformer=self.unpack_by_nested_dict_transformer, vault_name=self.vault_name, credentials_secret=self.customer_gauge_credentials_secret, flow=self, diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index eaf747bab..18e392a55 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -17,8 +17,7 @@ validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF - +from viadot.tasks.sharepoint import SharepointListToDF, SharepointToDF logger = logging.get_logger() @@ -42,6 +41,7 @@ def __init__( if_exists: str = "replace", validate_df_dict: dict = None, timeout: int = 3600, + set_prefect_kv: bool = False, *args: List[any], **kwargs: Dict[str, Any], ): @@ -69,6 +69,7 @@ def __init__( dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + set_prefect_kv (bool, optional): Whether to do key-value parameters in KV Store or not. Defaults to False. """ # SharepointToDF self.if_empty = if_empty @@ -86,6 +87,7 @@ def __init__( self.adls_sp_credentials_secret = adls_sp_credentials_secret self.if_exists = if_exists self.output_file_extension = output_file_extension + self.set_prefect_kv = set_prefect_kv self.now = str(pendulum.now("utc")) if self.local_dir_path is not None: self.local_file_path = ( @@ -177,7 +179,8 @@ def gen_flow(self) -> Flow: file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.set_prefect_kv == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name): @@ -188,39 +191,40 @@ class SharepointListToADLS(Flow): def __init__( self, name: str, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, + path: str, + adls_dir_path: str, + adls_file_name: str, + filters: dict = None, required_fields: List[str] = None, field_property: str = "Title", - filters: dict = None, row_count: int = 5000, + adls_sp_credentials_secret: str = None, sp_cert_credentials_secret: str = None, vault_name: str = None, - path: str = None, - adls_dir_path: str = None, - adls_file_name: str = None, - adls_sp_credentials_secret: str = None, overwrite_adls: bool = True, output_file_extension: str = ".parquet", validate_df_dict: dict = None, + set_prefect_kv: bool = False, *args: List[any], **kwargs: Dict[str, Any], ): """ - Run Flow SharepointListToADLS. + Flow for ingesting sharepoint list items(rows) with a given (or all) columns. + It allows to filter the output by column values. + Data is ingested from MS Sharepoint list (with given name and url ) and stored in MS Azure ADLS. Args: - name (str): Prefect flow name. - list_title (str): Title of Sharepoint List. Default to None. - site_url (str): URL to set of Sharepoint Lists. Default to None. - required_fields (List[str]): Required fields(columns) need to be extracted from - Sharepoint List. Default to None. - field_property (List[str]): Property to expand with expand query method. - All propertys can be found under list.item.properties. - Default to ["Title"] - filters (dict): Dictionary with operators which filters the SharepointList output. + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. + site_url (str): URL to set of Sharepoint Lists. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str): Name of file in ADLS. Defaults to None. + filters (dict, optional): Dictionary with operators which filters the SharepointList output. Defaults to None. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') - allowed conjuction: ('&','|') + allowed conjunction: ('&','|') allowed operators: ('<','>','<=','>=','==','!=') Example how to build the dict: filters = { @@ -231,8 +235,8 @@ def __init__( 'value2':'YYYY-MM-DD', 'operator1':'>=', 'operator2':'<=', - 'operators_conjuction':'&', # conjuction operators allowed only when 2 values passed - 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed + 'operators_conjunction':'&', # conjunction operators allowed only when 2 values passed + 'filters_conjunction':'&', # conjunction filters allowed only when 2 columns passed } , 'Column_name_2' : @@ -242,16 +246,26 @@ def __init__( 'operator1':'==', }, } - row_count (int): Number of downloaded rows in single request. Default to 5000. - sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. - vault_name (str): KeyVaultSecret name. Default to None. - path (str): Local file path. Default to None. - adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. - adls_file_name (str, optional): Name of file in ADLS. Defaults to None. - adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with - ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, - CLIENT_SECRET) for the Azure Data Lake. Defaults to None. - overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + required_fields (List[str], optional): Required fields(columns) need to be extracted from + Sharepoint List. Defaults to None. + field_property (str, optional): Property to expand fields with expand query method. + For example: User fields could be expanded and "Title" + or "ID" could be extracted + -> useful to get user name instead of ID + All properties can be found under list.item.properties. + WARNING! Field types and properties might change which could + lead to errors - extension of sp connector would be required. + Default to ["Title"]. Defaults to "Title". + row_count (int, optional): Number of downloaded rows in single request.Defaults to 5000. + adls_sp_credentials_secret (str, optional): Credentials to connect to Azure ADLS + If not passed it will take cred's from your .config/credentials.json Defaults to None. + sp_cert_credentials_secret (str, optional): Credentials to verify Sharepoint connection. + If not passed it will take cred's from your .config/credentials.json Default to None. + vault_name (str, optional): KeyVaultSecret name. Default to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + output_file_extension (str, optional): Extension of the resulting file to be stored. Defaults to ".parquet". + validate_df_dict (dict, optional): Whether to do an extra df validation before ADLS upload or not to do. Defaults to None. + set_prefect_kv (bool, optional): Whether to do key-value parameters in KV Store or not. Defaults to False. Returns: .parquet file inside ADLS. @@ -275,6 +289,7 @@ def __init__( self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret self.output_file_extension = output_file_extension + self.set_prefect_kv = set_prefect_kv self.now = str(pendulum.now("utc")) if self.path is not None: self.local_file_path = ( @@ -365,7 +380,8 @@ def gen_flow(self) -> Flow: file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.set_prefect_kv == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name): diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 80253eb88..cff39fc89 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -18,8 +18,8 @@ dtypes_to_json_task, union_dfs_task, update_dtypes_dict, - write_to_json, validate_df, + write_to_json, ) from viadot.tasks import ( AzureDataLakeUpload, diff --git a/viadot/flows/transform_and_catalog.py b/viadot/flows/transform_and_catalog.py index 1de5c4430..08ac6b895 100644 --- a/viadot/flows/transform_and_catalog.py +++ b/viadot/flows/transform_and_catalog.py @@ -1,13 +1,13 @@ import os -from pathlib import Path import shutil +from pathlib import Path from typing import Dict, List, Union from prefect import Flow, task from prefect.tasks.shell import ShellTask from prefect.triggers import any_successful -from viadot.tasks import CloneRepo, AzureKeyVaultSecret, LumaIngest +from viadot.tasks import AzureKeyVaultSecret, CloneRepo, LumaIngest @task(trigger=any_successful) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index c0d96abe2..7f0bf6f51 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -30,6 +30,7 @@ from .mindful import Mindful from .sql_server import SQLServer from .sqlite import SQLite +from .tm1 import TM1 # APIS from .uk_carbon_intensity import UKCarbonIntensity diff --git a/viadot/sources/bigquery.py b/viadot/sources/bigquery.py index 1be69e866..32d1dac2c 100644 --- a/viadot/sources/bigquery.py +++ b/viadot/sources/bigquery.py @@ -6,8 +6,8 @@ from ..config import local_config from ..exceptions import CredentialError, DBDataAccessError -from .base import Source from ..utils import add_viadot_metadata_columns +from .base import Source class BigQuery(Source): diff --git a/viadot/sources/customer_gauge.py b/viadot/sources/customer_gauge.py index a696bba32..819f92a90 100644 --- a/viadot/sources/customer_gauge.py +++ b/viadot/sources/customer_gauge.py @@ -20,6 +20,7 @@ def __init__( endpoint: Literal["responses", "non-responses"] = None, url: str = None, credentials: Dict[str, Any] = None, + credentials_secret: str = "CUSTOMER-GAUGE", ): """ A class to connect and download data using Customer Gauge API. @@ -31,7 +32,8 @@ def __init__( endpoint (Literal["responses", "non-responses"]): Indicate which endpoint to connect. Defaults to None. url (str, optional): Endpoint URL. Defaults to None. credentials (Dict[str, Any], optional): Credentials to connect with API containing client_id, client_secret. Defaults to None. - + credentials_secret (str, optional): The name of the secret stored in local_config containing a + dictionary with ['client_id', 'client_secret']. Defaults to "CUSTOMER-GAUGE". Raises: ValueError: If endpoint is not provided or incorect. CredentialError: If credentials are not provided in local_config or directly as a parameter @@ -50,11 +52,12 @@ def __init__( raise ValueError( "Provide endpoint name. Choose: 'responses' or 'non-responses'. Otherwise, provide URL" ) + self.credentials_secret = credentials_secret if credentials is not None: self.credentials = credentials else: - self.credentials = local_config.get("CustomerGauge") + self.credentials = local_config.get(credentials_secret) if self.credentials is None: raise CredentialError("Credentials not provided.") @@ -165,90 +168,3 @@ def get_cursor(self, json_response: Dict[str, Any] = None) -> int: ) return cur - - def properties_cleaning( - self, json_response: Dict[str, Any] = None - ) -> Dict[str, Any]: - """ - Returns initialy cleaned data. The cleaning of the additional params is depend on the endpoint. - - Args: - json_response (Dict[str, Any], optional): Dictionary with nested structure that contains data and cursor parameter value. Defaults to None. - - Returns: - Dict[str, Any]: Dictionary that contains cleaned data corresponding to one record. - """ - clean_properties = { - d["field"]: d["reference"] for d in json_response["properties"] - } - json_response["properties"] = clean_properties - - if self.endpoint == "responses": - json_response["drivers"] = ( - " ".join(map(str, json_response["drivers"])) - .replace("label", ",") - .replace(r"{',':", " ") - .replace(r"'", "") - .replace("}", "") - .strip() - .replace(" ", ",") - ) - json_response["tags"] = " ".join(map(str, json_response["tags"])).replace( - "[]", "" - ) - json_response["questions"] = " ".join( - map(str, json_response["questions"]) - ).replace("[]", "") - else: - pass - - return json_response - - def flatten_json(self, json_response: Dict[str, Any] = None) -> Dict[str, Any]: - """ - Function that flattens a nested structure of the JSON object into a single-level dictionary. - Uses a nested `flatten()` function to recursively combine nested keys in the JSON object with '_' to create the flattened keys. - - Args: - json_response (Dict[str, Any], optional): JSON object represented as a nested dictionary. Defaults to None. - - Returns: - Dict[str, Any]: The flattened dictionary. - """ - out = {} - - def flattify(x, key=""): - if type(x) is dict: - for a in x: - flattify(x[a], key + a + "_") - else: - out[key[:-1]] = x - - flattify(json_response) - - return out - - def to_df(self, json_response: Dict[str, Any] = None) -> pd.DataFrame: - """ - Flatten dictionary structure and convert it into pandas DataFrame. Cleans column names. - - Args: - json_response (Dict[str, Any], optional): JSON object represented as a nested dictionary that contains data and cursor parameter value. Defaults to None. - - Raises: - ValueError: If data value not found. - - Returns: - pd.DataFrame: pandas.DataFrame - """ - try: - response_json = json_response["data"] - except: - raise ValueError( - "Provided argument doesn't contain 'data' value. Pass json returned from the endpoint." - ) - clean_json = list(map(self.properties_cleaning, response_json)) - df = pd.DataFrame(list(map(self.flatten_json, clean_json))) - df.columns = df.columns.str.lower().str.replace(" ", "_") - - return df diff --git a/viadot/sources/epicor.py b/viadot/sources/epicor.py index a3e59c432..ad28019d0 100644 --- a/viadot/sources/epicor.py +++ b/viadot/sources/epicor.py @@ -104,6 +104,7 @@ def parse_orders_xml(xml_data: str) -> pd.DataFrame: Args: xml_data (str, required): Response from Epicor API in form of xml + Returns: pd.DataFrame: DataFrame containing parsed orders data. """ @@ -221,7 +222,11 @@ def __init__( super().__init__(*args, credentials=credentials, **kwargs) def generate_token(self) -> str: - "Function to generate API access token that is valid for 24 hours" + """Function to generate API access token that is valid for 24 hours. + + Returns: + str: Generated token. + """ url = ( "http://" @@ -243,7 +248,11 @@ def generate_token(self) -> str: return token def generate_url(self) -> str: - "Function to generate url to download data" + """Function to generate url to download data + + Returns: + str: Output url string. + """ return ( "http://" @@ -282,8 +291,12 @@ def get_xml_response(self): ) return response - def to_df(self): - "Function for creating pandas DataFrame from Epicor API response" + def to_df(self) -> pd.DataFrame: + """Function for creating pandas DataFrame from Epicor API response + + Returns: + pd.DataFrame: Output DataFrame. + """ data = self.get_xml_response() df = parse_orders_xml(data) return df diff --git a/viadot/sources/genesys.py b/viadot/sources/genesys.py index 6be907a66..d9109b313 100644 --- a/viadot/sources/genesys.py +++ b/viadot/sources/genesys.py @@ -322,6 +322,7 @@ def download_all_reporting_exports( "queue_performance_detail_view", "queue_interaction_detail_view", "agent_status_detail_view", + "agent_interaction_detail_view", ]: file_name = f"{self.view_type.upper()}_{next(self.count)}_{date}" elif single_report[4].lower() in [ @@ -343,7 +344,7 @@ def download_all_reporting_exports( if store_file_names is True: file_name_list.append(file_name + "." + self.file_extension) - self.logger.info("Al reports were successfully dowonload.") + self.logger.info("All reports were successfully downloaded.") if store_file_names is True: self.logger.info("Successfully genetared file names list.") diff --git a/viadot/sources/mediatool.py b/viadot/sources/mediatool.py index 88e358ee7..4b40c8739 100644 --- a/viadot/sources/mediatool.py +++ b/viadot/sources/mediatool.py @@ -1,7 +1,7 @@ import inspect import json from datetime import date, timedelta -from typing import List +from typing import List, Union import pandas as pd from prefect.utilities import logging @@ -35,14 +35,14 @@ def __init__( organization_id (str, optional): Organization ID. Defaults to None. user_id (str, optional): User ID. Defaults to None. """ - if credentials is not None: - try: - self.header = {"Authorization": f"Bearer {credentials.get('TOKEN')}"} - except: - raise CredentialError("Credentials not found.") + if any([rq not in credentials for rq in ["TOKEN", "USER_ID"]]): + raise CredentialError( + "Missing credentials. 'TOKEN' and 'USER_ID' are required." + ) super().__init__(*args, credentials=credentials, **kwargs) + self.header = {"Authorization": f"Bearer {self.credentials.get('TOKEN')}"} self.organization_id = organization_id or self.credentials.get( "ORGANIZATION_ID" ) @@ -80,7 +80,7 @@ def get_media_entries( end_date: str = None, time_delta: int = 360, return_dataframe: bool = True, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, dict]: """ Get data for media entries. This is a main function. Media entries contain IDs for most of the fields for other endpoints.Returns DataFrame or Dict. @@ -95,7 +95,7 @@ def get_media_entries( Defaults to True. Returns: - pd.DataFrame: Default return dataframe If 'return_daframe=False' then return list of dicts. + Union[pd.DataFrame, dict]: Default return dataframe If 'return_daframe=False' then return list of dicts. """ today = date.today() @@ -119,9 +119,11 @@ def get_media_entries( columns = df.columns try: df_filtered = df[columns] - except KeyError as e: - logger.info(e) - return df_filtered + return df_filtered + except KeyError: + logger.error( + f"Columns {columns} are incorrect. Whole dictionary for 'mediaEntries' will be returned." + ) return response_dict["mediaEntries"] @@ -137,7 +139,7 @@ def get_campaigns( Defaults to True. Returns: - pd.DataFrame: Default return dataframe If 'return_daframe=False' then return list of dicts. + pd.DataFrame: Default return dataframe If 'return_daframe=False' then return dictionary. """ url_campaigns = ( f"https://api.mediatool.com/organizations/{organization_id}/campaigns" @@ -168,7 +170,7 @@ def get_vehicles( self, vehicle_ids: List[str], return_dataframe: bool = True, - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, dict]: """ Get vehicles data based on the organization IDs. Returns DataFrame or Dict. @@ -178,7 +180,7 @@ def get_vehicles( Defaults to True. Returns: - pd.DataFrame: Default return dataframe. If 'return_daframe=False' then return list of dicts. + Union[pd.DataFrame, dict]: Default return dataframe. If 'return_daframe=False' then return dictionary. """ response_dict = {} dfs = [] @@ -211,11 +213,11 @@ def get_vehicles( return df_updated return None - return response_dict["vehicles"] + return response_dict["vehicle"] def get_organizations( self, user_id: str = None, return_dataframe: bool = True - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, List[dict]]: """ Get organizations data based on the user ID. Returns DataFrame or Dict. @@ -225,7 +227,7 @@ def get_organizations( Defaults to True. Returns: - pd.DataFrame: Default return dataframe. If 'return_daframe=False' then return list of dicts. + Union[pd.DataFrame, List[dict]]: Default return dataframe. If 'return_daframe=False' then return list of dicts. """ user_id = user_id or self.user_id url_organizations = f"https://api.mediatool.com/users/{user_id}/organizations" @@ -258,7 +260,7 @@ def get_organizations( def get_media_types( self, media_type_ids: List[str], return_dataframe: bool = True - ) -> pd.DataFrame: + ) -> Union[pd.DataFrame, List[dict]]: """ Get media types data based on the media types ID. User have to provide list of media type IDs. Returns DataFrame or Dict. @@ -269,7 +271,7 @@ def get_media_types( Defaults to True. Returns: - pd.DataFrame: Default return dataframe. If 'return_daframe=False' then return list of dicts. + Union[pd.DataFrame, List[dict]]: Default return dataframe. If 'return_daframe=False' then return list of dicts. """ list_media_types = [] for id_media_type in media_type_ids: diff --git a/viadot/sources/mindful.py b/viadot/sources/mindful.py index 254eecb9d..2698adb15 100644 --- a/viadot/sources/mindful.py +++ b/viadot/sources/mindful.py @@ -1,12 +1,12 @@ import os -from io import StringIO from datetime import datetime, timedelta +from io import StringIO from typing import Any, Dict, Literal, Tuple import pandas as pd import prefect -from requests.models import Response from requests.auth import HTTPBasicAuth +from requests.models import Response from viadot.exceptions import APIError from viadot.sources.base import Source diff --git a/viadot/sources/sap_bw.py b/viadot/sources/sap_bw.py index 8f4fb0583..94e3347a9 100644 --- a/viadot/sources/sap_bw.py +++ b/viadot/sources/sap_bw.py @@ -101,7 +101,7 @@ def get_output_data(self, mdx_query: str) -> dict: { "COLUMN": 0, "ROW": 0, - "DATA": "VELUX Deutschland GmbH", + "DATA": "DATA", "VALUE_DATA_TYPE": "CHAR", "CELL_STATUS": "" },... diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 096de825b..fbbd1b08b 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,23 +1,24 @@ -from ..config import local_config -from ..exceptions import CredentialError -from .base import Source -from viadot.utils import get_nested_dict - -from typing import Any, Dict, List -from fnmatch import fnmatch -from datetime import datetime from copy import deepcopy -import pandas as pd +from datetime import datetime +from fnmatch import fnmatch +from typing import Any, Dict, List +import pandas as pd import sharepy from office365.runtime.auth.authentication_context import AuthenticationContext -from office365.sharepoint.client_context import ClientContext from office365.runtime.client_request_exception import ClientRequestException +from office365.sharepoint.client_context import ClientContext from prefect.utilities import logging +from viadot.utils import get_nested_dict + +from ..config import local_config +from ..exceptions import CredentialError +from .base import Source logger = logging.get_logger() + # Print out how many rows was extracted in specific iteration def log_of_progress(items): logger.info("Items read: {0}".format(len(items))) @@ -83,24 +84,30 @@ def download_file( class SharepointList(Source): - """ - A Sharepoint_List class to connect and download data from sharpoint lists. + def __init__( + self, + credentials: Dict[str, Any] = None, + *args, + **kwargs, + ): + """ + A Sharepoint_List class to connect and download data from Sharepoint lists. + Warning! + Please be careful with selection of the column names because once sharepoint list is opened inside a browser it may display columns in different languages. + Because of that the resulting file or output might have different column names then the one which u see in the browser. - Args: - credentials (dict): Credentials should include: + Args: + credentials (Dict[str, Any], optional): Credentials should include: - "tenant" - "client_id" - "scopes" - "thumbprint" - "private_key" - """ - def __init__( - self, - credentials: Dict[str, Any] = None, - *args, - **kwargs, - ): + Raises: + CredentialError: If no credentials are passed + and local config doesn't contain them neither + """ DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") credentials = credentials or DEFAULT_CREDENTIALS if credentials is None: @@ -108,12 +115,16 @@ def __init__( super().__init__(*args, credentials=credentials, **kwargs) - def get_connection( - self, - site_url: str = None, - ): + def get_connection(self, site_url: str): + """Function for connecting into Sharepoint with AuthenticationContext. - # Connecting into Sharepoint with AuthenticationContext + Args: + site_url (str): URL of the sharepoint list. + + Returns: + ctx: Authentication context. + """ + logger.info("Connecting into Sharepoint with AuthenticationContexts.") try: auth_context = AuthenticationContext(site_url) auth_context.with_client_certificate( @@ -132,43 +143,80 @@ def get_connection( return self.ctx - # Function for extracting list items from search fields def _unpack_fields( self, list_item, - selected_fields: dict = None, - ): + selected_fields: dict, + ) -> dict: + """Function for extracting and unpacking list items from the search fields. + + Args: + list_items (office365 list item): A list with office365 list item objects (rows) + selected_fields (dict): A dict with fields selected for ingestion, generated by SharepointList.select_fields() + Raises: + ValueError: "Check if given field property is valid!" + ValueError: "Get nested dict for not recognized type of field! Check field types in the source." + ValueError: "Get empty properties for list items." + + Returns: + dict: A dictionary with Column: Value pairs for each row from the list. + """ # Creating the body of dictionary new_dict = dict() - - # For loop scanning the propertys of searching fields + # For loop scanning the properties of searching fields item_values_dict = list_item.properties - for field, val in item_values_dict.items(): - nested_dict = get_nested_dict(val) - # Check if the dictionary is nested - if nested_dict != None: - # It might be that there are different field properties than expected - nested_value = nested_dict.get(selected_fields["FieldProperty"]) - if nested_value != None: - new_dict[field] = nested_value + if item_values_dict: + for field, val in item_values_dict.items(): + nested_dict = get_nested_dict(val) + # Check if the values are nested + if nested_dict != None: + # Check if field has expandable type + if field in selected_fields["FieldToExpand"]: + # It might be that there are different field properties than expected + nested_value = nested_dict.get( + selected_fields["FieldExpandProperty"] + ) + if nested_value != None: + new_dict[field] = nested_value + else: + raise ValueError("Check if given field property is valid!") + elif field in selected_fields["MultiChoiceField"]: + # Field type of multi choice could have more than 1 selection. + new_dict[field] = ";".join(nested_dict.values()) + else: + raise ValueError( + "Get nested dict for not recognized type of field! Check field types in the source." + ) else: - logger.info("I'm not the right value") - raise ValueError - else: - new_dict[field] = val - + new_dict[field] = val + else: + raise ValueError( + "Get empty properties for list items. Check if parameter list_item collection contains any data -> item objects." + ) return new_dict def get_fields( self, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, required_fields: List[str] = None, - ): + ) -> List: + """ + Function for getting list of fields objects from the sharepoint list. + It can get all fields available if required_fields not passed + or just the one which are in the list required_fields. - ctx = self.get_connection(site_url=site_url) + Args: + list_title (str): Name of the sharepoint list. + site_url (str): URL to the sharepoint list with "/" at the end. + required_fields (List[str], optional ): List of required fields to ingest. It will get all fields if not passed. + + Returns: + List: List with office365 sharepoint list field objects. + """ + ctx = self.get_connection(site_url=site_url) # Get list of lists object by List Title self.list_object = ctx.web.lists.get_by_title(list_title) list_fields_all = self.list_object.fields @@ -182,25 +230,42 @@ def get_fields( else: list_fields_required = [ - list_fields_all.get_by_internal_name_or_title(field).get() + list_fields_all.get_by_internal_name_or_title(field) + .get() + .execute_query() for field in required_fields ] - ctx.execute_batch() return list_fields_required - def select_expandable_user_fields( + def select_fields( self, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, required_fields: List[str] = None, field_property: str = "Title", - ): + ) -> dict: """ - Method to expand fields and get more informations. + Method to create a data structure for handling info about + selection of fields with details about possible expansion for more data or details. + Field types to extract more values can be: "User*", "MultiChoice" field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. -> more properties can be discovered by getting list.item.properties. - Default to "Title" + + Args: + list_title (str): A title of the sharepoint list. Defaults to None. + site_url (str): A sharepoint list URL. Defaults to None. + required_fields (List[str], optional): List of fields(columns) to be ingested. Defaults to None. + field_property (str, optional): Property to extract from nested fields + like column with type User*. Defaults to "Title". + + Returns: + dict: selected_fields = { + "FieldInternalNames": List of fields to select with its InternalNames (from api), + "FieldToExpand": fields_to_expand,-> fields which could be expanded to get more data from API + "FieldExpandProperty": field_property, property of the expandable field which will be extracted + "MultiChoiceField": List of fields which can have multiple values in 1 row + } """ list_fields = self.get_fields( @@ -220,93 +285,156 @@ def select_expandable_user_fields( for field in list_fields if fnmatch(field.properties["TypeAsString"], f"User*") ] - + multi_choice_fields = [ + field.properties["InternalName"] + for field in list_fields + if fnmatch(field.properties["TypeAsString"], "MultiChoice") + ] # Creating the body of the function output selected_fields = { "FieldInternalNames": fields_to_select, "FieldToExpand": fields_to_expand, - "FieldProperty": field_property, + "FieldExpandProperty": field_property, + "MultiChoiceField": multi_choice_fields, } return selected_fields def check_filters( self, - filters: dict = None, + filters: dict, ) -> bool: """ Function to check if filters dict is valid. - example1: if operator2 is present value2 must be in place as well - example2: if dtype is not on allowed list it will throw an error + Please check and apply only allowed filter settings: + allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] + allowed_conjunction = ["&", "|"] + allowed_operators = ["<", ">", "<=", ">=", "==", "!="] + Operator conjunction is only possible if there are 2 values like: value <= 1 | value == 5 + Filter conjunction is only possible if there are more then 1 filters for ex. date and creator + + Args: + filters (dict): A dictionary containing filter settings + Example: + filters = { + "Created": { + "dtype": "datetime", + "value1": yesterday_date, + "value2": today_date, + "operator1": ">=", + "operator2": "<=", + "operators_conjunction": "&", + "filters_conjunction": "&", + }, + "Factory": { + "dtype": "str", + "value1": "NM-PL", + "operator1": "==", + }, + } + + Raises: + ValueError: If dtype not in allowed list. + ValueError: If comparison operator1 not in allowed list. + ValueError: If value for operator1 is missing. + ValueError: If comparison operator1 for the first value is missing. + ValueError: If comparison operator2 not in allowed list. + ValueError: If value for operator2 is missing. + ValueError: If comparison operator2 for the first value is missing. + ValueError: If operator conjunction is missing while there are 2 values and 2 operators passed. + ValueError: If operator conjunction is not in the allowed list. + ValueError: If operator conjunction provided why only one filter value is given. + ValueError: If filter conjunction provided without 2nd filter. + ValueError: If filter conjunction not in the allowed list. + + Returns: + bool: True if all checks passed. """ allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] - allowed_conjuction = ["&", "|"] + allowed_conjunction = ["&", "|"] allowed_operators = ["<", ">", "<=", ">=", "==", "!="] - for parameters in filters.values(): + for filter_name, parameters in filters.items(): + if not parameters.get("dtype"): + raise ValueError(f"dtype for {filter_name} is missing!") if parameters.get("dtype") not in allowed_dtypes: raise ValueError( - f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}." + f"dtype not allowed! Expected: {allowed_dtypes} got: {parameters.get('dtype')} ." ) if parameters.get("operator1"): if parameters.get("operator1") not in allowed_operators: raise ValueError( - f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}." + f"Operator1 type not allowed! Expected: {allowed_operators} got: {parameters.get('operator1')} ." ) if not parameters.get("value1"): - raise ValueError("Value for operator1 is missing!") + raise ValueError("Value1 for operator1 is missing!") elif not parameters.get("operator1"): raise ValueError("Operator1 is missing!") if ( not parameters.get("operator2") - and parameters.get("operators_conjuction") is not None + and parameters.get("operators_conjunction") is not None ): raise ValueError( - f"Operator conjuction allowed only with more than one filter operator!" + f"Operator conjunction allowed only with more then one filter operator!" ) if parameters.get("operator2"): if parameters.get("operator2") not in allowed_operators: raise ValueError( - f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}." + f"Operator2 type not allowed! Expected: {allowed_operators} got: {parameters.get('operator2')} ." ) if not parameters.get("value2"): - raise ValueError("Value for operator2 is missing!") - if not parameters.get("operators_conjuction"): + raise ValueError("Value2 for operator2 is missing!") + if not parameters.get("operators_conjunction"): raise ValueError( - f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty." + f"Operator for conjunction is missing! Expected: {allowed_conjunction} got empty." ) - if parameters.get("operators_conjuction") not in allowed_conjuction: + if parameters.get("operators_conjunction") not in allowed_conjunction: raise ValueError( - f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}." + f"Operator for conjunction not allowed! Expected: {allowed_conjunction} got {parameters.get('operators_conjunction')} ." ) - if parameters.get("filters_conjuction"): + if parameters.get("filters_conjunction"): if ( len(filters.keys()) == 1 - and parameters.get("filters_conjuction") is not None + and parameters.get("filters_conjunction") is not None ): raise ValueError( - f"Filters conjuction allowed only with more than one filter column!" + f"Filters conjunction allowed only when more then one filter provided!" ) - if parameters.get("filters_conjuction") not in allowed_conjuction: + if parameters.get("filters_conjunction") not in allowed_conjunction: raise ValueError( - f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}." + f"Filter operator for conjunction not allowed! Expected: {allowed_conjunction} got {parameters.get('filters_conjunction')} ." ) return True def operators_mapping( self, - filters: dict = None, + filters: dict, ) -> dict: """ - Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + Function for mapping comparison and conjunction(logical) operators of filters to the format which is recognized by Microsoft API. + Allowed operators: + < + > + <= + >= + == + != + "&" + "|" Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionary which contains operators. + + Raises: + ValueError: If operator1 not allowed. + ValueError: If operator2 not allowed. + ValueError: If operators conjunction not allowed. + ValueError: If filters conjunction not allowed. Returns: - New modified dict. + dict: New modified dict with mapped operators. """ filters_dict = deepcopy(filters) @@ -337,36 +465,36 @@ def operators_mapping( raise ValueError( f"This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!" ) - if parameters.get("operators_conjuction"): - logical_op_to_change = parameters.get("operators_conjuction") + if parameters.get("operators_conjunction"): + logical_op_to_change = parameters.get("operators_conjunction") if logical_op_to_change in logical_op.keys(): - parameters["operators_conjuction"] = logical_op[ + parameters["operators_conjunction"] = logical_op[ logical_op_to_change ] else: raise ValueError( - f"This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" + f"This conjunction (logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" ) - if parameters.get("filters_conjuction"): - logical_fl_to_change = parameters.get("filters_conjuction") + if parameters.get("filters_conjunction"): + logical_fl_to_change = parameters.get("filters_conjunction") if logical_fl_to_change in logical_op.keys(): - parameters["filters_conjuction"] = logical_op[logical_fl_to_change] + parameters["filters_conjunction"] = logical_op[logical_fl_to_change] else: raise ValueError( - f"This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" + f"This filters conjunction (logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" ) return filters_dict - def make_filter_for_api(self, filters: dict) -> "str": + def make_filter_for_api(self, filters: dict) -> str: """ Function changing type of operators to match MS API style as 'str' passing to URL call. Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionary which contains operators. Returns: - Output as string to pass as filter parameter to API. + str: Output as filtering string to pass as filter parameter to API. """ filter_text = "" @@ -387,7 +515,7 @@ def make_filter_for_api(self, filters: dict) -> "str": ).isoformat() filter_text = ( filter_text - + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + + f" {parameters.get('operators_conjunction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " ) elif parameters.get("dtype") not in ["datetime", "date"]: filter_text = ( @@ -399,23 +527,23 @@ def make_filter_for_api(self, filters: dict) -> "str": filter_text + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" ) - if parameters.get("filters_conjuction"): - filter_text = filter_text + f"{parameters.get('filters_conjuction')} " + if parameters.get("filters_conjunction"): + filter_text = filter_text + f"{parameters.get('filters_conjunction')} " return filter_text def make_filter_for_df( self, - filters: dict = None, - ) -> "str": + filters: dict, + ) -> str: """ - Function changing dict operators into pandas DataFrame filters. + Function changing filters into pandas DataFrame filtering string used later for filtering the DF. Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionary which contains operators. Returns: - Output as string to pass as filter to DataFrame. + str: Output as string to pass as filter to DataFrame. """ filter_in_df = "df.loc[" @@ -429,11 +557,13 @@ def make_filter_for_df( if parameters.get("operator2"): filter_in_df = ( filter_in_df - + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + + f") {parameters.get('operators_conjunction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" ) - if parameters.get("filters_conjuction"): - filter_in_df = filter_in_df + ")" + parameters.get("filters_conjuction") + if parameters.get("filters_conjunction"): + filter_in_df = ( + filter_in_df + ")" + parameters.get("filters_conjunction") + ) else: filter_in_df = filter_in_df + ")" @@ -453,6 +583,9 @@ def list_item_to_df( ): """ Method to extract data from Sharepoint List into DataFrame. + If filters are passed, function will try to extract only filtered data to reduce the amount of data to transfer. + If there is no filter or there is an throttling (max rows returned limit reached) + exception ,then 2nd workflow will start and download all data which will be filtered later in the data frame. Args: list_title (str): Title of Sharepoint List. Default to None. @@ -460,11 +593,11 @@ def list_item_to_df( required_fields (List[str]): Required fields(columns) need to be extracted from Sharepoint List. Default to None. field_property (List[str]): Property to expand with expand query method. - All propertys can be found under list.item.properties. + All properties can be found under list.item.properties. Default to ["Title"] filters (dict): Dictionary with operators which filters the SharepointList output. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') - allowed conjuction: ('&','|') + allowed conjunction: ('&','|') allowed operators: ('<','>','<=','>=','==','!=') Example how to build the dict: filters = { @@ -475,8 +608,8 @@ def list_item_to_df( 'value2':'YYYY-MM-DD', 'operator1':'>=', 'operator2':'<=', - 'operators_conjuction':'&', - 'filters_conjuction':'&', + 'operators_conjunction':'&', + 'filters_conjunction':'&', } , 'Column_name_2' : @@ -488,6 +621,10 @@ def list_item_to_df( } row_count (int): Number of downloaded rows in single request. Default to 5000. + Raises: + AttributeError: If filter column not included inside required fields list. + ValueError: If there is no filter passed - > will extract all fields and filter later. + Returns: pd.DataFrame """ @@ -499,7 +636,7 @@ def list_item_to_df( for key in filters: if key not in required_fields: raise AttributeError( - f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!" + f"Filter '{key}' column not included inside required fields. It is obligatory to extract data which is filtered!" ) # changing the body of the filter for MS API call @@ -507,8 +644,8 @@ def list_item_to_df( download_all = False - # extracting requeird_fields SP_List objects - selected_fields = self.select_expandable_user_fields( + # extracting required_fields SP_List objects + selected_fields = self.select_fields( list_title=list_title, site_url=site_url, required_fields=required_fields, @@ -518,7 +655,7 @@ def list_item_to_df( try: # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. if filters is None: - raise ValueError("There is no filter. Starting extraxction all data") + raise ValueError("There is no filter. Switching to extract all fields.") else: list_items = ( self.list_object.items.filter(filter_text) @@ -530,7 +667,7 @@ def list_item_to_df( self.ctx.execute_query() except (ClientRequestException, ValueError) as e: - # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. + # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFrame level. logger.info(f"Exception SPQueryThrottledException occurred: {e}") list_items = ( self.list_object.items.get_all(row_count, log_of_progress) @@ -546,7 +683,7 @@ def list_item_to_df( ) if download_all == True and filters is not None: - # Filter for desired range of created date and for factory Namyslow PL + # Apply filters to the data frame -> accordingly to the filter dict passed as na parameter self.logger.info("Filtering df with all data output") filter_for_df = self.make_filter_for_df(filters) df = eval(filter_for_df) diff --git a/viadot/sources/tm1.py b/viadot/sources/tm1.py new file mode 100644 index 000000000..fcb1dae7a --- /dev/null +++ b/viadot/sources/tm1.py @@ -0,0 +1,185 @@ +from typing import Any, Dict, Literal + +import pandas as pd +from prefect.utilities import logging +from TM1py.Services import TM1Service + +from ..config import local_config +from ..exceptions import CredentialError, ValidationError +from .base import Source + +logger = logging.get_logger(__name__) + + +class TM1(Source): + """ + Class for downloading data from TM1 Software using TM1py library + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + config_key: str = "TM1", + mdx_query: str = None, + cube: str = None, + view: str = None, + dimension: str = None, + hierarchy: str = None, + limit: int = None, + private: bool = False, + verify: bool = False, + *args, + **kwargs, + ): + """ + Creating an instance of TM1 source class. To download the data to the dataframe user needs to specify MDX query or + combination of cube and view. + + Args: + credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username, + password, address, port. Defaults to None. + config_key (str, optional): Credential key to dictionary where credentials are stored. Defaults to "TM1". + mdx_query (str, optional): MDX select query needed to download the data. Defaults to None. + cube (str, optional): Cube name from which data will be downloaded. Defaults to None. + view (str, optional): View name from which data will be downloaded. Defaults to None. + dimension (str, optional): Dimension name. Defaults to None. + hierarchy (str, optional): Hierarchy name. Defaults to None. + limit (str, optional): How many rows should be extracted. If None all the avaiable rows will + be downloaded. Defaults to None. + private (bool, optional): Whether or not data download shoulb be private. Defaults to False. + verify (bool, optional): Whether or not verify SSL certificates while. Defaults to False. + + + Raises: + CredentialError: When credentials are not found. + """ + DEFAULT_CREDENTIALS = local_config.get(config_key) + credentials = credentials or DEFAULT_CREDENTIALS + + required_credentials = ["address", "port", "username", "password"] + if any([cred_key not in credentials for cred_key in required_credentials]): + not_found = [c for c in required_credentials if c not in credentials] + raise CredentialError(f"Missing credential(s): '{not_found}'.") + + self.config_key = config_key + self.mdx_query = mdx_query + self.cube = cube + self.view = view + self.dimension = dimension + self.hierarchy = hierarchy + self.limit = limit + self.private = private + self.verify = verify + + super().__init__(*args, credentials=credentials, **kwargs) + + def get_connection(self) -> TM1Service: + """ + Start a connection to TM1 instance. + + Returns: + TM1Service: Service instance if connection is succesfull. + """ + return TM1Service( + address=self.credentials["address"], + port=self.credentials["port"], + user=self.credentials["username"], + password=self.credentials["password"], + ssl=self.verify, + ) + + def get_cubes_names(self) -> list: + """ + Get list of avaiable cubes in TM1 instance. + + Returns: + list: List containing avaiable cubes names. + + """ + conn = self.get_connection() + return conn.cubes.get_all_names() + + def get_views_names(self) -> list: + """ + Get list of avaiable views in TM1 cube instance. + + Returns: + list: List containing avaiable views names. + + """ + conn = self.get_connection() + return conn.views.get_all_names(self.cube) + + def get_dimensions_names(self) -> list: + """ + Get list of avaiable dimensions in TM1 instance. + + Returns: + list: List containing avaiable dimensions names. + + """ + conn = self.get_connection() + return conn.dimensions.get_all_names() + + def get_hierarchies_names(self) -> list: + """ + Get list of avaiable hierarchies in TM1 dimension instance. + + Returns: + list: List containing avaiable hierarchies names. + + """ + conn = self.get_connection() + return conn.hierarchies.get_all_names(self.dimension) + + def get_available_elements(self) -> list: + """ + Get list of avaiable elements in TM1 instance based on hierarchy and diemension. + + Returns: + list: List containing avaiable elements names. + + """ + conn = self.get_connection() + return conn.elements.get_element_names( + dimension_name=self.dimension, hierarchy_name=self.hierarchy + ) + + def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame: + """ + Function for downloading data from TM1 to pd.DataFrame. To download the data to the dataframe user needs to specify MDX query or + combination of cube and view. + + Args: + if_empty (Literal["warn", "fail", "skip"], optional): What to do if output DataFrame is empty. Defaults to "skip". + + Returns: + pd.DataFrame: DataFrame with data downloaded from TM1 view. + + Raises: + ValidationError: When mdx and cube + view are not specified or when combination of both is specified. + """ + conn = self.get_connection() + + if self.mdx_query is None and (self.cube is None or self.view is None): + raise ValidationError("MDX query or cube and view are required.") + elif self.mdx_query is not None and ( + self.cube is not None or self.view is not None + ): + raise ValidationError("Specify only one: MDX query or cube and view.") + elif self.cube is not None and self.view is not None: + df = conn.cubes.cells.execute_view_dataframe( + cube_name=self.cube, + view_name=self.view, + private=self.private, + top=self.limit, + ) + elif self.mdx_query is not None: + df = conn.cubes.cells.execute_mdx_dataframe(self.mdx_query) + + logger.info( + f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows." + ) + if df.empty is True: + self._handle_if_empty(if_empty) + return df diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 6173e2994..6a532f932 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -1,8 +1,8 @@ import copy import json import os -import shutil import re +import shutil from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index ecba1d5c5..7dc3d61cd 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF, SharepointListToDF +from .sharepoint import SharepointListToDF, SharepointToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF @@ -50,11 +50,12 @@ from .duckdb import DuckDBCreateTableFromParquet, DuckDBQuery, DuckDBToDF from .epicor import EpicorOrdersToDF from .eurostat import EurostatToDF +from .git import CloneRepo from .hubspot import HubspotToDF +from .luma import LumaIngest from .mediatool import MediatoolToDF from .mindful import MindfulToCSV from .sftp import SftpList, SftpToDF from .sql_server import SQLServerCreateTable, SQLServerQuery, SQLServerToDF +from .tm1 import TM1ToDF from .vid_club import VidClubToDF -from .git import CloneRepo -from .luma import LumaIngest diff --git a/viadot/tasks/customer_gauge.py b/viadot/tasks/customer_gauge.py index 4f1f26bbd..ecb5e0de5 100644 --- a/viadot/tasks/customer_gauge.py +++ b/viadot/tasks/customer_gauge.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import Literal +from typing import Any, Dict, List, Literal import pandas as pd from prefect import Task @@ -26,23 +26,37 @@ def __init__( ] = None, start_date: datetime = None, end_date: datetime = None, + unpack_by_field_reference_cols: List[str] = None, + unpack_by_nested_dict_transformer: List[str] = None, timeout: int = 3600, *args, **kwargs, ): """ - Task CustomerGaugeToDF for downloading the selected range of data from Customer Gauge endpoint and return as one pandas DataFrame. + Task CustomerGaugeToDF for downloading the selected range of data from Customer Gauge + endpoint and return as one pandas DataFrame. Args: - endpoint (Literal["responses", "non-responses"], optional): Indicate which endpoint to connect. Defaults to None. - total_load (bool, optional): Indicate whether to download the data to the latest. If 'False', only one API call is executed (up to 1000 records). Defaults to True. + endpoint (Literal["responses", "non-responses"], optional): Indicate which endpoint + to connect. Defaults to None. + total_load (bool, optional): Indicate whether to download the data to the latest. + If 'False', only one API call is executed (up to 1000 records). Defaults to True. endpoint_url (str, optional): Endpoint URL. Defaults to None. cursor (int, optional): Cursor value to navigate to the page. Defaults to None. - pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. Defaults to 1000. - date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], optional): Specifies the date type which filter date range. Defaults to None. - start_date (datetime, optional): Defines the period end date in yyyy-mm-dd format. Defaults to None. - end_date (datetime, optional): Defines the period start date in yyyy-mm-dd format. Defaults to None. - timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. + Defaults to 1000. + date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], + optional): Specifies the date type which filter date range. Defaults to None. + start_date (datetime, optional): Defines the period end date in yyyy-mm-dd format. + Defaults to None. + end_date (datetime, optional): Defines the period start date in yyyy-mm-dd format. + Defaults to None. + unpack_by_field_reference_cols (List[str]): Columns to unpack and modify using `_field_reference_unpacker`. + Defaults to None. + unpack_by_nested_dict_transformer (List[str]): Columns to unpack and modify using `_nested_dict_transformer`. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this task before + a timeout occurs. Defaults to 3600. """ self.endpoint = endpoint self.total_load = total_load @@ -52,6 +66,8 @@ def __init__( self.date_field = date_field self.start_date = start_date self.end_date = end_date + self.unpack_by_field_reference_cols = unpack_by_field_reference_cols + self.unpack_by_nested_dict_transformer = unpack_by_nested_dict_transformer super().__init__( name="customer_gauge_to_df", @@ -60,6 +76,260 @@ def __init__( **kwargs, ) + def get_data( + self, + json_response: Dict[str, Any] = None, + ) -> List[Dict[str, Any]]: + """ + Extract and return the 'data' part of a JSON response as a list of dictionaries. + + Args: + json_response (Dict[str, Any], optional): JSON object represented as a nested + dictionary that contains data and cursor parameter value. Defaults to None. + + Raises: + KeyError: If the 'data' key is not present in the provided JSON response. + + Returns: + List[Dict[str, Any]]: A list of dictionaries containing data from the 'data' + part of the JSON response. + """ + jsons_list = [] + try: + jsons_list = json_response["data"] + except KeyError: + logger.error( + "Provided argument doesn't contain 'data' value. Pass json returned from the endpoint." + ) + raise + + return jsons_list + + def _field_reference_unpacker( + self, + json_response: Dict[str, Any], + field: str, + ) -> Dict[str, Any]: + """ + Unpack and modify dictionaries within the specified field of a JSON response. + + This function takes a JSON response and a field name. It processes dictionaries + within the specified field, checking if each dictionary contains exactly two items. + If a dictionary meets this criteria, it is transformed into a new dictionary, + where the first key becomes a key, and the second key becomes its associated value + + Args: + json_response (Dict[str, Any], optional): JSON response with data. + field (str): The key (column) of the dictionary to be modified. + + Returns: + Dict[str, Any]: The JSON response with modified nested dictionaries + within the specified field. + + Raises: + ValueError: If a dictionary within the specified field doesn't contain exactly two items. + """ + + result = {} + for i, dictionary in enumerate(json_response[field]): + if isinstance(dictionary, dict) and len(dictionary.items()) == 2: + list_properties = list(dictionary.values()) + result[list_properties[0]] = list_properties[1] + else: + raise ValueError( + f"Dictionary within the specified field doesn't contain exactly two items." + ) + if result: + json_response[field] = result + + return json_response + + def _nested_dict_transformer( + self, + json_response: Dict[str, Any], + field: str, + ) -> Dict[str, Any]: + """ + Modify nested dictionaries within the specified field of a JSON response. + + This function takes a JSON response and a field name. It modifies nested + dictionaries within the specified field by adding an index and underscore + to the keys. The modified dictionary is then updated in the JSON response. + + Args: + json_response (Dict[str, Any], optional): JSON response with data. + field (str): The key (column) of the dictionary to be modified. + + Returns: + Dict[str, Any]: The JSON response with modified nested dictionaries + within the specified field. + """ + result = {} + try: + for i, dictionary in enumerate(json_response[field], start=1): + for key, value in dictionary.items(): + result[f"{i}_{key}"] = value + if result: + json_response[field] = result + except TypeError as te: + logger.error(te) + + return json_response + + def column_unpacker( + self, + json_list: List[Dict[str, Any]] = None, + unpack_by_field_reference_cols: List[str] = None, + unpack_by_nested_dict_transformer: List[str] = None, + ) -> List[Dict[str, Any]]: + + """ + Function to unpack and modify specific columns in a list of dictionaries by using one of two methods, + chosen by the user. + If user would like to use field_reference_unpacker, he/she needs to provide list of fields as strings in + `unpack_by_field_reference_cols` parameter, if user would like to use nested_dict_transformer he/she needs to provide list of + fields as strings in unpack_by_nested_dict_transformer parameter. + + Args: + json_list (List[Dict[str, Any]): A list of dictionaries containing the data. + unpack_by_field_reference_cols (List[str]): Columns to unpack and modify using `_field_reference_unpacker`. + Defaults to None. + unpack_by_nested_dict_transformer (List[str]): Columns to unpack and modify using `_nested_dict_transformer`. + Defaults to None. + + Raises: + ValueError: If 'json_list' is not provided. + ValueError: If specified columns do not exist in the JSON data. + ValueError: If columns are mentioned in both 'unpack_by_field_reference_cols' and 'unpack_by_nested_dict_transformer'. + + Returns: + List[Dict[str, Any]]: The updated list of dictionaries after column unpacking and modification. + """ + duplicated_cols = [] + + if json_list is None: + raise ValueError("Input 'json_list' is required.") + + def unpack_columns(columns, unpack_function): + json_list_clean = json_list.copy() + for field in columns: + if field in json_list_clean[0]: + logger.info( + f"Unpacking column '{field}' with {unpack_function.__name__} method..." + ) + try: + json_list_clean = list( + map(lambda x: unpack_function(x, field), json_list_clean) + ) + logger.info( + f"All elements in '{field}' are unpacked successfully." + ) + except ValueError as ve: + logger.info( + f"No transformation were made in '{field}'," + "because didn't contain list of key-value data." + ) + except Exception as e: + logger.info(f"Error while unpacking {field}: {e}") + else: + logger.info(f"Column '{field}' not found.") + return json_list_clean + + if unpack_by_field_reference_cols and unpack_by_nested_dict_transformer: + duplicated_cols = set(unpack_by_field_reference_cols).intersection( + set(unpack_by_nested_dict_transformer) + ) + if duplicated_cols: + raise ValueError( + f"{duplicated_cols} were mentioned in both unpack_by_field_reference_cols and unpack_by_nested_dict_transformer." + " It's not possible to apply two methods to the same field." + ) + else: + if unpack_by_field_reference_cols is not None: + json_list = unpack_columns( + columns=unpack_by_field_reference_cols, + unpack_function=self._field_reference_unpacker, + ) + + if unpack_by_nested_dict_transformer is not None: + json_list = unpack_columns( + columns=unpack_by_nested_dict_transformer, + unpack_function=self._nested_dict_transformer, + ) + + return json_list + + def flatten_json(self, json_response: Dict[str, Any] = None) -> Dict[str, Any]: + """ + Function that flattens a nested structure of the JSON object into + a single-level dictionary. It uses a nested `flattify()` function to recursively + combine nested keys in the JSON object with '_' to create the flattened keys. + + Args: + json_response (Dict[str, Any], optional): JSON object represented as + a nested dictionary. Defaults to None. + + Raises: + TypeError: If the 'json_response' not a dictionary. + + Returns: + Dict[str, Any]: The flattened dictionary. + """ + result = {} + + if not isinstance(json_response, dict): + raise TypeError("Input must be a dictionary.") + + def flattify(field, key="", out=None): + if out is None: + out = result + + if isinstance(field, dict): + for item in field.keys(): + flattify(field[item], key + item + "_", out) + else: + out[key[:-1]] = field + + flattify(json_response) + + return result + + def square_brackets_remover(self, df: pd.DataFrame = None) -> pd.DataFrame: + """ + Replace square brackets "[]" with an empty string in a pandas DataFrame. + + Args: + df (pd.DataFrame, optional): Replace square brackets "[]" with an empty string + in a pandas DataFrame. Defaults to None. + + Returns: + pd.DataFrame: The modified DataFrame with square brackets replaced by an empty string. + """ + + df = df.astype(str) + df = df.applymap(lambda x: x.strip("[]")) + return df + + def _drivers_cleaner(self, drivers: str = None) -> str: + """ + Clean and format the 'drivers' data. + + Args: + drivers (str, optional): Column name of the data to be cleaned. Defaults to None. + + Returns: + str: A cleaned and formatted string of driver data. + """ + + cleaned_drivers = ( + drivers.replace("{", "") + .replace("}", "") + .replace("'", "") + .replace("label: ", "") + ) + + return cleaned_drivers + def __call__(self): """Download Customer Gauge data to a DF""" super().__call__(self) @@ -73,6 +343,8 @@ def __call__(self): "date_field", "start_date", "end_date", + "unpack_by_field_reference_cols", + "unpack_by_nested_dict_transformer", ) def run( self, @@ -86,23 +358,38 @@ def run( ] = None, start_date: datetime = None, end_date: datetime = None, + unpack_by_field_reference_cols: List[str] = None, + unpack_by_nested_dict_transformer: List[str] = None, credentials_secret: str = "CUSTOMER-GAUGE", vault_name: str = None, ) -> pd.DataFrame: """ - Run method. Downloading the selected range of data from Customer Gauge endpoint and return as one pandas DataFrame. + Run method. Downloading the selected range of data from Customer Gauge endpoint and return + as one pandas DataFrame. Args: - endpoint (Literal["responses", "non-responses"]): Indicate which endpoint to connect. Defaults to None. - total_load (bool, optional): Indicate whether to download the data to the latest. If 'False', only one API call is executed (up to 1000 records). Defaults to True. + endpoint (Literal["responses", "non-responses"]): Indicate which endpoint to connect. + Defaults to None. + total_load (bool, optional): Indicate whether to download the data to the latest. If + 'False', only one API call is executed (up to 1000 records). Defaults to True. endpoint_url (str, optional): Endpoint URL. Defaults to None. cursor (int, optional): Cursor value to navigate to the page. Defaults to None. - pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. Defaults to 1000. - date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], optional): Specifies the date type which filter date range. Defaults to None. - start_date (datetime, optional): Defines the period end date in yyyy-mm-dd format. Defaults to None. - end_date (datetime, optional): Defines the period start date in yyyy-mm-dd format. Defaults to None. - credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ['client_id', 'client_secret']. Defaults to "CUSTOMER-GAUGE". - vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + pagesize (int, optional): Number of responses (records) returned per page, max value = 1000. + Defaults to 1000. + date_field (Literal["date_creation", "date_order", "date_sent", "date_survey_response"], + optional): Specifies the date type which filter date range. Defaults to None. + start_date (datetime, optional): Defines the period end date in yyyy-mm-dd format. + Defaults to None. + end_date (datetime, optional): Defines the period start date in yyyy-mm-dd format. + Defaults to None. + unpack_by_field_reference_cols (List[str]): Columns to unpack and modify using `_field_reference_unpacker`. + Defaults to None. + unpack_by_nested_dict_transformer (List[str]): Columns to unpack and modify using `_nested_dict_transformer`. + Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault secret containing a + dictionary with ['client_id', 'client_secret']. Defaults to "CUSTOMER-GAUGE". + vault_name (str, optional): The name of the vault from which to obtain the secret. + Defaults to None. Returns: pd.DataFrame: Final pandas DataFrame. @@ -115,7 +402,7 @@ def run( except (ValueError, TypeError) as e: logger.error(e) - df_list = [] + total_json = [] customer_gauge = CustomerGauge( endpoint=endpoint, url=endpoint_url, credentials=credentials @@ -131,23 +418,37 @@ def run( end_date=end_date, ) cur = customer_gauge.get_cursor(json_data) - df = customer_gauge.to_df(json_data) - df_list.append(df) + + jsn = self.get_data(json_data) + total_json += jsn + if total_load == True: if cursor is None: logger.info( - f"Downloading all the data from the {self.endpoint or self.endpoint_url} endpoint. Process might take a few minutes..." + f"Downloading all the data from the {self.endpoint or self.endpoint_url} endpoint." + "Process might take a few minutes..." ) else: logger.info( f"Downloading starting from the {cursor} cursor. Process might take a few minutes..." ) - while df.empty == False: + while jsn: json_data = customer_gauge.get_json_response(cursor=cur) cur = customer_gauge.get_cursor(json_data) - df = customer_gauge.to_df(json_data) - df_list.append(df) + jsn = self.get_data(json_data) + total_json += jsn - df_total = pd.concat(df_list, ignore_index=True) + clean_json = self.column_unpacker( + json_list=total_json, + unpack_by_field_reference_cols=unpack_by_field_reference_cols, + unpack_by_nested_dict_transformer=unpack_by_nested_dict_transformer, + ) + logger.info("Inserting data into the DataFrame...") + df = pd.DataFrame(list(map(self.flatten_json, clean_json))) + df = self.square_brackets_remover(df) + if "drivers" in list(df.columns): + df["drivers"] = df["drivers"].apply(self._drivers_cleaner) + df.columns = df.columns.str.lower().str.replace(" ", "_") + logger.info("DataFrame: Ready. Data: Inserted. Let the magic happen!") - return df_total + return df diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index de47ddebf..942249ac2 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,10 +10,11 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs -from viadot.task_utils import * from viadot.exceptions import APIError from viadot.sources import Genesys +from viadot.utils import check_value +from viadot.task_utils import * logger = logging.get_logger() @@ -385,6 +386,7 @@ def run( "agent_performance_summary_view", "agent_status_summary_view", "agent_status_detail_view", + "agent_interaction_detail_view", ]: genesys.genesys_api_connection( post_data_list=post_data_list, end_point=end_point @@ -486,9 +488,9 @@ def run( temp_dict = { key: value for (key, value) in attributes.items() if key in key_list } - temp_dict["conversationId"] = json_file["id"] - temp_dict["startTime"] = json_file["startTime"] - temp_dict["endTime"] = json_file["endTime"] + temp_dict["conversationId"] = json_file.get("id") + temp_dict["startTime"] = json_file.get("startTime") + temp_dict["endTime"] = json_file.get("endTime") data_list.append(temp_dict) df = pd.DataFrame(data_list) @@ -509,3 +511,72 @@ def run( logger.info("Downloaded the data from the Genesys into the CSV.") return [file_name] + + elif view_type is None and end_point == "users": + # First call to API to get information about amount of pages to extract + temp_json = genesys.genesys_api_connection( + post_data_list=post_data_list, + end_point=f"{end_point}/?pageSize=500&pageNumber=1&expand=presence,dateLastLogin,groups,employerInfo,lasttokenissued&state=any", + method="GET", + ) + last_page = temp_json["pageCount"] + 1 + + data_list = [] + + # For loop to download all pages from Genesys GET API + for n in range(1, last_page): + json_file = genesys.genesys_api_connection( + post_data_list=post_data_list, + end_point=f"{end_point}/?pageSize=500&pageNumber={n}&expand=presence,dateLastLogin,groups,employerInfo,lasttokenissued&state=any", + method="GET", + ) + logger.info(f"Downloaded: {n} page") + + num_ids = len(json_file["entities"]) + + # For loop to extract data from specific page + for id in range(0, num_ids): + record_dict = {} + record_dict["Id"] = check_value(json_file["entities"][id], ["id"]) + record_dict["Name"] = check_value( + json_file["entities"][id], ["name"] + ) + record_dict["DivisionName"] = check_value( + json_file["entities"][id], ["division", "name"] + ) + record_dict["Email"] = check_value( + json_file["entities"][id], ["email"] + ) + record_dict["State"] = check_value( + json_file["entities"][id], ["state"] + ) + record_dict["Title"] = check_value( + json_file["entities"][id], ["title"] + ) + record_dict["Username"] = check_value( + json_file["entities"][id], ["username"] + ) + record_dict["SystemPresence"] = check_value( + json_file["entities"][id], + ["presence", "presenceDefinition", "systemPresence"], + ) + record_dict["DateLastLogin"] = check_value( + json_file["entities"][id], ["dateLastLogin"] + ) + + data_list.append(record_dict) + + df = pd.DataFrame(data_list) + + # data validation function (optional) + if validate_df_dict: + validate_df.run(df=df, tests=validate_df_dict) + + file_name = "All_Genesys_Users.csv" + df.to_csv( + os.path.join(file_name), + index=False, + sep="\t", + ) + + return [file_name] diff --git a/viadot/tasks/luma.py b/viadot/tasks/luma.py index 5b78ebc27..11eb91e45 100644 --- a/viadot/tasks/luma.py +++ b/viadot/tasks/luma.py @@ -1,5 +1,7 @@ import json + from prefect.tasks.shell import ShellTask + from .azure_key_vault import AzureKeyVaultSecret diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py index acc92c246..0d8d7b2e3 100644 --- a/viadot/tasks/sap_bw.py +++ b/viadot/tasks/sap_bw.py @@ -1,12 +1,12 @@ import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret -from viadot.tasks import AzureKeyVaultSecret from prefect.utilities import logging from viadot.exceptions import ValidationError from viadot.sources import SAPBW from viadot.task_utils import * +from viadot.tasks import AzureKeyVaultSecret logger = logging.get_logger() diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 2a1cb0bc4..f22c36f36 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,10 +1,10 @@ -from typing import List -import pandas as pd import copy import json import os import re +from typing import List +import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging @@ -12,8 +12,8 @@ from ..exceptions import ValidationError from ..sources import Sharepoint, SharepointList -from .azure_key_vault import AzureKeyVaultSecret from ..utils import add_viadot_metadata_columns +from .azure_key_vault import AzureKeyVaultSecret logger = logging.get_logger() @@ -243,11 +243,11 @@ class SharepointListToDF(Task): required_fields (List[str]): Required fields(columns) need to be extracted from Sharepoint List. Default to None. field_property (List[str]): Property to expand with expand query method. - All propertys can be found under list.item.properties. + All properties can be found under list.item.properties. Default to ["Title"] - filters (dict): Dictionary with operators which filters the SharepointList output. + filters (dict, optional): Dictionary with operators which filters the SharepointList output. Default to None. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') - allowed conjuction: ('&','|') + allowed conjunction: ('&','|') allowed operators: ('<','>','<=','>=','==','!=') Example how to build the dict: filters = { @@ -258,8 +258,8 @@ class SharepointListToDF(Task): 'value2':'YYYY-MM-DD', 'operator1':'>=', 'operator2':'<=', - 'operators_conjuction':'&', - 'filters_conjuction':'&', + 'operators_conjunction':'&', + 'filters_conjunction':'&', } , 'Column_name_2' : @@ -277,9 +277,9 @@ class SharepointListToDF(Task): def __init__( self, - path: str = None, - list_title: str = None, - site_url: str = None, + path: str, + list_title: str, + site_url: str, required_fields: List[str] = None, field_property: str = "Title", filters: dict = None, @@ -289,7 +289,6 @@ def __init__( *args, **kwargs, ): - self.path = path self.list_title = list_title self.site_url = site_url @@ -300,6 +299,11 @@ def __init__( self.vault_name = vault_name self.credentials_secret = credentials_secret + super().__init__( + *args, + **kwargs, + ) + if not credentials_secret: # Attempt to read a default for the service principal secret name try: @@ -313,16 +317,65 @@ def __init__( ).run() self.credentials = json.loads(credentials_str) - super().__init__( - *args, - **kwargs, - ) - def __call__(self): """Download Sharepoint_List data to a .parquet file""" super().__call__(self) + def _rename_duplicated_fields(self, df): + """ + Renames duplicated columns in a DataFrame by appending a numerical suffix. + Function to check if there are fields with + the same name but in different style (lower, upper) + It might happen that fields returned by get_fields() will be different + than actual list items fields ( from it's properties) + It is specific to sharepoint lists. + MS allowed users to create fields with similar names (but with different letters style) + fields with same values. For example Id and ID - > office select function doesn't + recognize upper/lower cases. + + Args: + df (pd.DataFrame): The input DataFrame with potentially duplicated columns. + required_fields (list): List of fields that should not be considered for renaming. + + Returns: + pd.DataFrame: DataFrame with duplicated columns renamed to ensure uniqueness. + + Example: + Given DataFrame df: + ``` + A B C B D + 0 1 2 3 4 5 + ``` + + Required fields = ['A', 'B'] + After calling _rename_duplicated_fields(df, required_fields): + ``` + A B C B2 D + 0 1 2 3 4 5 + ``` + """ + col_to_compare = df.columns.tolist() + i = 1 + for column in df.columns.tolist(): + if not column in self.required_fields: + col_to_compare.remove(column) + if column.lower() in [to_cmp.lower() for to_cmp in col_to_compare]: + i += 1 + logger.info(f"Found duplicated column: {column} !") + logger.info(f"Renaming from {column} to {column}{i}") + df = df.rename(columns={f"{column}": f"{column}{i}"}) + return df + def _convert_camel_case_to_words(self, input_str: str) -> str: + """ + Function for converting internal names joined as camelCase column names to regular words. + + Args: + input_str (str): Column name. + + Returns: + str: Converted column name. + """ self.input_str = input_str @@ -331,11 +384,23 @@ def _convert_camel_case_to_words(self, input_str: str) -> str: return converted - def change_column_name( - self, - df: pd.DataFrame = None, - ): - s = SharepointList() + def change_column_name(self, df: pd.DataFrame, credentials: str = None): + """ + Function for changing coded internal column names (Unicode style) to human readable names. + !Warning! + Names are taken from field properties Title! + Because of that the resulting column name might have different then initial name. + + Args: + df (pd.DataFrame): A data frame with loaded column names from sharepoint list. + credentials (str): Credentials str for sharepoint connection establishing. Defaults to None. + + Returns: + pd.DataFrame: Data frame with changed column names. + """ + s = SharepointList( + credentials=self.credentials, + ) list_fields = s.get_fields( list_title=self.list_title, site_url=self.site_url, @@ -364,7 +429,6 @@ def change_column_name( # Rename columns names inside DataFrame df = df.rename(columns=dictionary) - return df def run( @@ -389,7 +453,8 @@ def run( row_count=self.row_count, ) - df = self.change_column_name(df=df_raw) + df_col_changed = self.change_column_name(df=df_raw) + df = self._rename_duplicated_fields(df=df_col_changed) self.logger.info("Successfully changed structure of the DataFrame") return df diff --git a/viadot/tasks/tm1.py b/viadot/tasks/tm1.py new file mode 100644 index 000000000..56d4401f0 --- /dev/null +++ b/viadot/tasks/tm1.py @@ -0,0 +1,117 @@ +from typing import Any, Dict + +import pandas as pd +from prefect import Task +from prefect.utilities.tasks import defaults_from_attrs + +from ..sources import TM1 + + +class TM1ToDF(Task): + def __init__( + self, + credentials: Dict[str, Any] = None, + config_key: str = "TM1", + mdx_query: str = None, + cube: str = None, + view: str = None, + limit: int = None, + private: bool = False, + verify: bool = False, + if_empty: str = "skip", + timeout=3600, + *args, + **kwargs, + ): + """ + Task for downloading data from TM1 view to pandas DataFrame. + + Args: + credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username, + password, address, port. Defaults to None. + config_key (str, optional): Credential key to dictionary where credentials are stored. Defaults to "TM1". + mdx_query (str, optional): MDX select query needed to download the data. Defaults to None. + cube (str, optional): Cube name from which data will be downloaded. Defaults to None. + view (str, optional): View name from which data will be downloaded. Defaults to None. + limit (str, optional): How many rows should be extracted. If None all the avaiable rows will + be downloaded. Defaults to None. + private (bool, optional): Whether or not data download shoulb be private. Defaults to False. + verify (bool, optional): Whether or not verify SSL certificates while. Defaults to False. + if_empty (Literal["warn", "fail", "skip"], optional): What to do if output DataFrame is empty. Defaults to "skip". + + """ + self.credentials = credentials + self.config_key = config_key + self.mdx_query = mdx_query + self.cube = cube + self.view = view + self.limit = limit + self.private = private + self.verify = verify + self.if_empty = if_empty + + super().__init__( + name="tm1_to_df", + timeout=timeout, + *args, + **kwargs, + ) + + def __call__(self, *args, **kwargs): + """Load TM1 data to pandas DataFrame""" + return super().__call__(*args, **kwargs) + + @defaults_from_attrs( + "credentials", + "config_key", + "mdx_query", + "cube", + "view", + "limit", + "private", + "verify", + "if_empty", + ) + def run( + self, + credentials: Dict[str, Any] = None, + config_key: str = None, + mdx_query: str = None, + cube: str = None, + view: str = None, + limit: int = None, + private: bool = None, + verify: bool = None, + if_empty: str = None, + ) -> pd.DataFrame: + """ + Run method for TM1ToDF class. + + Args: + credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username, + password, address, port. Defaults to None. + config_key (str, optional): Credential key to dictionary where credentials are stored. Defaults to None. + mdx_query (str, optional): MDX select query needed to download the data. Defaults to None. + cube (str, optional): Cube name from which data will be downloaded. Defaults to None. + view (str, optional): View name from which data will be downloaded. Defaults to None. + limit (str, optional): How many rows should be extracted. If None all the avaiable rows will + be downloaded. Defaults to None. + private (bool, optional): Whether or not data download shoulb be private. Defaults to None. + verify (bool, optional): Whether or not verify SSL certificates while. Defaults to None. + if_empty (Literal["warn", "fail", "skip"], optional): What to do if output DataFrame is empty. Defaults to None. + + Returns: + pd.DataFrame: DataFrame with data downloaded from TM1 view. + + """ + tm1 = TM1( + credentials=credentials, + config_key=config_key, + mdx_query=mdx_query, + cube=cube, + view=view, + limit=limit, + private=private, + verify=verify, + ) + return tm1.to_df(if_empty=if_empty) diff --git a/viadot/utils.py b/viadot/utils.py index d05cfdd95..5e3de784c 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -2,7 +2,7 @@ import os import re from itertools import chain -from typing import Any, Callable, Dict, List, Literal +from typing import Union, Any, Callable, Dict, List, Literal import pandas as pd import prefect @@ -460,3 +460,28 @@ def get_nested_dict(d): return d else: return None + + +def check_value(base: Union[Dict, Any], levels: List) -> Union[None, Any]: + """ + Task to extract data from nested json file if there is any under passed parameters. + Otherwise return None. + + Args: + base (Dict, Any): variable with base lvl of the json, for example: + json_file["first_known_lvl"]["second_known_lvl"]["third_known_lvl"] + levels (List): List of potential lower levels of nested json for data retrieval. For example: + ["first_lvl_below_base", "second_lvl_below_base", "searched_phrase"] + + Returns: + Union[None, Any]: Searched value for the lowest level, in example data under "searched_phrase" key. + """ + + for lvl in levels: + if isinstance(base, dict): + base = base.get(lvl) + if base is None: + return None + else: + return base + return base