Skip to content

Commit

Permalink
🎨 Formatted code
Browse files Browse the repository at this point in the history
  • Loading branch information
judynah committed Sep 24, 2024
1 parent 5c0bf75 commit c5d4aa5
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 70 deletions.
33 changes: 16 additions & 17 deletions src/viadot/orchestration/prefect/flows/business_core_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
retry_delay_seconds=60,
)
def business_core_to_parquet(
path: str,
url: str,
path: str = None,
url: str = None,
filters_dict: Dict[str, Any] = {
"BucketCount": None,
"BucketNo": None,
Expand All @@ -27,35 +27,34 @@ def business_core_to_parquet(
config_key: str | None = None,
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
):
"""Task for downloading data from Business Core API to a Parquet file.
):
"""Flow for downloading data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary. Available filters: 'BucketCount',
'BucketNo', 'FromDate', 'ToDate'. Defaults to {"BucketCount": None,"BucketNo": None,"FromDate": None,
"ToDate": None,}. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores SAP credentials. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): Credential key to dictionary where details are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional): What to do if the table exists.
if_exists (Literal["append", "replace", "skip"], optional): What to do if the table exists.
Defaults to "replace".
verify (bool, optional): Whether or not verify certificates while connecting to an API. Defaults to True.
"""

df = business_core_to_df(
url=url,
path=path,
credentials_secret=credentials_secret,
config_key=config_key,
filters_dict=filters_dict,
verify=verify
)
url=url,
path=path,
credentials_secret=credentials_secret,
config_key=config_key,
filters_dict=filters_dict,
verify=verify,
)
return df_to_parquet(
df=df,
path=path,
path=path,
if_exists=if_exists,
)

)
39 changes: 20 additions & 19 deletions src/viadot/orchestration/prefect/tasks/business_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def business_core_to_df(
path: str,
url: str,
path: str = None,
url: str = None,
filters_dict: Dict[str, Any] = {
"BucketCount": None,
"BucketNo": None,
Expand All @@ -21,16 +21,16 @@ def business_core_to_df(
config_key: str | None = None,
if_empty: str = "skip",
verify: bool = True,
):
"""Task for downloading data from Business Core API to a Parquet file.
):
"""Task for downloading data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to None.
filters_dict (Dict[str, Any], optional): Filters in form of dictionary. Available filters: 'BucketCount',
'BucketNo', 'FromDate', 'ToDate'. Defaults to {"BucketCount": None,"BucketNo": None,"FromDate": None,
"ToDate": None,}. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores SAP credentials. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): Credential key to dictionary where details are stored. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to "skip".
Expand All @@ -39,28 +39,29 @@ def business_core_to_df(

if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(credentials_secret)

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

bc = BusinessCore(
url=url,
path=path,
credentials=credentials,
config_key=config_key,
filters_dict=filters_dict,
verify=verify
)
url=url,
path=path,
credentials=credentials,
config_key=config_key,
filters_dict=filters_dict,
verify=verify,
)

df = bc.to_df(if_empty=if_empty)

df = bc.to_df(if_empty = if_empty)

nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)

return df

return df
28 changes: 15 additions & 13 deletions src/viadot/sources/business_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
from viadot.sources.base import Source
from viadot.utils import handle_api_response


class BusinessCoreCredentials(BaseModel):
"""Validate Business Core credentials
Two key values are held in the Business Core connector:
- username: The unique username for the organization.
- password: Secret string of characters to have access to data.
- password: A secret string of characters for access to data.
Args:
BaseModel (pydantic.main.ModelMetaclass): A base class for creating
Pydantic models.
"""

username: str
password: str

Expand All @@ -27,6 +29,7 @@ class BusinessCore(Source):
"""
Source for getting data from Business Core ERP API.
"""

def __init__(
self,
url: str = None,
Expand All @@ -51,17 +54,16 @@ def __init__(
"ToDate": None,}.
credentials (Dict[str, Any], optional): Credentials stored in a dictionary. Required credentials: username,
password. Defaults to None.
config_key (str, optional): Credential key to dictionary where details are stored. Defaults to "BusinessCore".
config_key (str, optional): Credential key to the dictionary where details are stored. Defaults to "BusinessCore".
verify (bool, optional): Whether or not verify certificates while connecting to an API. Defaults to True.
Raises:
CredentialError: When credentials are not found.
"""


raw_creds = credentials or get_source_credentials(config_key) or None
if raw_creds is None:
raise CredentialError("Missing Credentials.")

validated_creds = dict(BusinessCoreCredentials(**raw_creds))

self.credentials = validated_creds
Expand All @@ -71,30 +73,33 @@ def __init__(

super().__init__(*args, credentials=self.credentials, **kwargs)


def generate_token(self) -> str:
"""
Function for generating Business Core API token based on username and password.
Returns:
string: Business Core API token.
string: Business Core API token.
"""
url = "https://api.businesscore.ae/api/user/Login"

payload = f'grant_type=password&username={self.credentials.get("username")}&password={self.credentials.get("password")}&scope='
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = handle_api_response(
url=url, headers=headers, method="GET", data=payload, verify=self.verify,
url=url,
headers=headers,
method="GET",
data=payload,
verify=self.verify,
)
token = json.loads(response.text).get("access_token")
self.token = token
return token

def clean_filters_dict(self) -> Dict:
"""
Function for replacing 'None' with '&' in a dictionary. Needed for payload in 'x-www-form-urlencoded' from.
Returns:
Dict: Dictionary with filters prepared for further use.
Dict: Dictionary with filters prepared for further use.
"""
return {
key: ("&" if val is None else val) for key, val in self.filters_dict.items()
Expand All @@ -105,7 +110,7 @@ def get_data(self) -> Dict:
Function for obtaining data in dictionary format from Business Core API.
Returns:
Dict: Dictionary with data downloaded from Business Core API.
Dict: Dictionary with data downloaded from Business Core API.
"""
filters = self.clean_filters_dict()

Expand Down Expand Up @@ -169,6 +174,3 @@ def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFr
self._handle_if_empty(if_empty)

return df



Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@
from pandas import DataFrame, read_parquet
from viadot.orchestration.prefect.flows import business_core_to_parquet

URL="https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH="/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
URL = "https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH = "/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
CREDS = "business-core"


def test_business_core_to_parquet():
assert not Path(PATH).exists()

business_core_to_parquet(
url=URL,
path=PATH,
credentials_secret=CREDS,
verify=False
)

business_core_to_parquet(url=URL, path=PATH, credentials_secret=CREDS, verify=False)

assert Path(PATH).exists()

n_cols = 11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,20 @@
from prefect import flow
from viadot.orchestration.prefect.tasks.business_core import business_core_to_df

URL="https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH="/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
URL = "https://api.businesscore.ae/api/LappDataIntegrationAPI/GetCustomerData"
PATH = "/home/viadot/data/middle_east/customer_master/customer_master_full_data.parquet"
CREDS = "business-core"


def test_business_core_to_df():
@flow
def test_flow():
return business_core_to_df(
url=URL,
path=PATH,
credentials_secret=CREDS,
verify=False
)

url=URL, path=PATH, credentials_secret=CREDS, verify=False
)

df = test_flow()
assert isinstance(df, DataFrame)

n_cols = 11
assert df.shape[1] == n_cols



2 changes: 1 addition & 1 deletion tests/integration/test_business_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ def test_to_df(business_core):
assert len(df.columns) == 2
assert len(df) == 1
assert df["id"].tolist() == [1]
assert df["name"].tolist() == ["John Doe"]
assert df["name"].tolist() == ["John Doe"]

0 comments on commit c5d4aa5

Please sign in to comment.