Skip to content

Commit

Permalink
🎨 Formatted code
Browse files Browse the repository at this point in the history
  • Loading branch information
judynah committed Sep 24, 2024
1 parent 1662cc4 commit 3db633d
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 93 deletions.
53 changes: 26 additions & 27 deletions src/viadot/orchestration/prefect/flows/business_core_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Any, Dict, Literal
from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
"""Flow for downloading data from Business Core API to a Parquet file."""

from typing import Any, Literal

from prefect import flow

from viadot.orchestration.prefect.tasks.business_core import business_core_to_df
from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.business_core import BusinessCore
from prefect import flow


@flow(
Expand All @@ -15,36 +15,35 @@
retry_delay_seconds=60,
)
def business_core_to_parquet(
path: str = None,
url: str = None,
filters_dict: Dict[str, Any] = {
"BucketCount": None,
"BucketNo": None,
"FromDate": None,
"ToDate": None,
},
path: str | None = None,
url: str | None = None,
filters_dict: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
):
"""Flow for downloading data from Business Core API to a Parquet file.
) -> None:
"""Download data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary. Available filters: 'BucketCount',
'BucketNo', 'FromDate', 'ToDate'. Defaults to {"BucketCount": None,"BucketNo": None,"FromDate": None,
"ToDate": None,}. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business Core credentials. Defaults to None.
url (str, required): Base url to the view in Business Core API.
Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary.
Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'.
Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business
Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): Credential key to dictionary where details are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional): What to do if the table exists.
Defaults to "replace".
verify (bool, optional): Whether or not verify certificates while connecting to an API. Defaults to True.
config_key (str, optional): Credential key to dictionary where details
are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional):
What to do if the table exists. Defaults to "replace".
verify (bool, optional): Whether or not verify certificates while
connecting to an API. Defaults to True.
"""

df = business_core_to_df(
url=url,
path=path,
Expand Down
48 changes: 26 additions & 22 deletions src/viadot/orchestration/prefect/tasks/business_core.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
from typing import Any, Dict
"""Task for downloading data from Business Core API to a Parquet file."""

from typing import Any, dict

from pandas import DataFrame
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.business_core import BusinessCore
from prefect import task


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def business_core_to_df(
path: str = None,
url: str = None,
filters_dict: Dict[str, Any] = {
"BucketCount": None,
"BucketNo": None,
"FromDate": None,
"ToDate": None,
},
credentials_secret: str = None,
path: str | None = None,
url: str | None = None,
filters_dict: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
verify: bool = True,
):
"""Task for downloading data from Business Core API to a Parquet file.
) -> DataFrame:
"""Download data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary. Available filters: 'BucketCount',
'BucketNo', 'FromDate', 'ToDate'. Defaults to {"BucketCount": None,"BucketNo": None,"FromDate": None,
"ToDate": None,}. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business Core credentials. Defaults to None.
url (str, required): Base url to the view in Business Core API.
Defaults to None.
filters_dict (dict[str, Any], optional): Filters in form of dictionary.
Available filters: 'BucketCount','BucketNo', 'FromDate', 'ToDate'.
Defaults to None.
credentials_secret (str, optional): The name of the secret that stores
Business Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): Credential key to dictionary where details are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to "skip".
verify (bool, optional): Whether or not verify certificates while connecting to an API. Defaults to True.
config_key (str, optional): Credential key to dictionary where details
are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
verify (bool, optional): Whether or not verify certificates while connecting
to an API. Defaults to True.
"""

if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

Expand Down
2 changes: 1 addition & 1 deletion src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from importlib.util import find_spec

from .business_core import BusinessCore
from ._duckdb import DuckDB
from ._trino import Trino
from .business_core import BusinessCore
from .cloud_for_customers import CloudForCustomers
from .epicor import Epicor
from .exchange_rates import ExchangeRates
Expand Down
77 changes: 39 additions & 38 deletions src/viadot/sources/business_core.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Source for connecting to Business Core API."""

import json
from typing import Any, Dict, Literal
from pydantic import BaseModel
from typing import Any, Literal

import pandas as pd
from pydantic import BaseModel

from viadot.config import get_source_credentials
from viadot.exceptions import APIError, CredentialError
from viadot.sources.base import Source
from viadot.utils import handle_api_response


class BusinessCoreCredentials(BaseModel):
"""Validate Business Core credentials
"""Validate Business Core credentials.
Two key values are held in the Business Core connector:
- username: The unique username for the organization.
Expand All @@ -26,43 +29,40 @@ class BusinessCoreCredentials(BaseModel):


class BusinessCore(Source):
"""
Source for getting data from Business Core ERP API.
"""
"""Class to connect to Business Core ERP API."""

def __init__(
self,
url: str = None,
filters_dict: Dict[str, Any] = {
"BucketCount": None,
"BucketNo": None,
"FromDate": None,
"ToDate": None,
},
credentials: Dict[str, Any] = None,
url: str | None = None,
filters_dict: dict[str, Any] | None = None,
credentials: dict[str, Any] | None = None,
config_key: str = "BusinessCore",
verify: bool = True,
*args,
**kwargs,
):
"""
Creating an instance of BusinessCore source class.
"""Creating an instance of BusinessCore source class.
Args:
url (str, optional): Base url to a view in Business Core API. Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary. Available filters: 'BucketCount',
'BucketNo', 'FromDate', 'ToDate'. Defaults to {"BucketCount": None,"BucketNo": None,"FromDate": None,
"ToDate": None,}.
credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username,
password. Defaults to None.
config_key (str, optional): Credential key to the dictionary where details are stored. Defaults to "BusinessCore".
verify (bool, optional): Whether or not verify certificates while connecting to an API. Defaults to True.
url (str, optional): Base url to a view in Business Core API.
Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary.
Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'.
Defaults to None.
credentials (Dict[str, Any], optional): Credentials stored in a dictionary.
Required credentials: username, password. Defaults to None.
config_key (str, optional): Credential key to the dictionary where details
are stored. Defaults to "BusinessCore".
verify (bool, optional): Whether or not verify certificates while
connecting to an API. Defaults to True.
Raises:
CredentialError: When credentials are not found.
"""

raw_creds = credentials or get_source_credentials(config_key) or None
error_message = "Missing Credentials."
if raw_creds is None:
raise CredentialError("Missing Credentials.")
raise CredentialError(error_message)

validated_creds = dict(BusinessCoreCredentials(**raw_creds))

Expand All @@ -74,8 +74,8 @@ def __init__(
super().__init__(*args, credentials=self.credentials, **kwargs)

def generate_token(self) -> str:
"""
Function for generating Business Core API token based on username and password.
"""Function for generating Business Core token based on username and password.
Returns:
string: Business Core API token.
"""
Expand All @@ -94,9 +94,10 @@ def generate_token(self) -> str:
self.token = token
return token

def clean_filters_dict(self) -> Dict:
"""
Function for replacing 'None' with '&' in a dictionary. Needed for payload in 'x-www-form-urlencoded' from.
def clean_filters_dict(self) -> dict:
"""Function for replacing 'None' with '&' in a dictionary.
Needed for payload in 'x-www-form-urlencoded' from.
Returns:
Dict: Dictionary with filters prepared for further use.
Expand All @@ -105,9 +106,8 @@ def clean_filters_dict(self) -> Dict:
key: ("&" if val is None else val) for key, val in self.filters_dict.items()
}

def get_data(self) -> Dict:
"""
Function for obtaining data in dictionary format from Business Core API.
def get_data(self) -> dict:
"""Function for obtaining data in dictionary format from Business Core API.
Returns:
Dict: Dictionary with data downloaded from Business Core API.
Expand Down Expand Up @@ -140,11 +140,11 @@ def get_data(self) -> Dict:
return json.loads(response.text)

def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame:
"""
Function for transforming data from dictionary to pd.DataFrame.
"""Function for transforming data from dictionary to pd.DataFrame.
Args:
if_empty (Literal["warn", "fail", "skip"], optional): What to do if output DataFrame is empty. Defaults to "skip".
if_empty (Literal["warn", "fail", "skip"], optional):
What to do if output DataFrame is empty. Defaults to "skip".
Returns:
pd.DataFrame: DataFrame with data downloaded from Business Core API view.
Expand All @@ -163,7 +163,8 @@ def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFr
"GetSalesOrderData",
"GetSalesQuotationData",
]:
raise APIError(f"View {view} currently not available.")
error_message = f"View {view} currently not available."
raise APIError(error_message)

data = self.get_data().get("MasterDataList")
df = pd.DataFrame.from_dict(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pathlib import Path
from pandas import DataFrame, read_parquet

from pandas import read_parquet
from viadot.orchestration.prefect.flows import business_core_to_parquet


URL = "https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH = "/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
CREDS = "business-core"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from pathlib import Path
from pandas import DataFrame

from prefect import flow
from viadot.orchestration.prefect.tasks.business_core import business_core_to_df


URL = "https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH = "/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
CREDS = "business-core"
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_business_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import Mock, patch

import pandas as pd
import pytest

from viadot.sources.business_core import BusinessCore


Expand All @@ -23,7 +23,8 @@ def business_core():
def test_generate_token(mock_api_response, business_core):
mock_api_response.return_value = Mock(text='{"access_token": "12345"}')
token = business_core.generate_token()
assert token == "12345"
t = "12345"
assert token == t


def test_clean_filters_dict(business_core):
Expand Down

0 comments on commit 3db633d

Please sign in to comment.