Skip to content

Commit

Permalink
Merge branch 'dev' into improve_transform_and_catalog_flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafalz13 authored Oct 12, 2023
2 parents 0331aab + c7d917d commit aab3323
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ sap_netweaver_rfc
# Databricks-connect

.databricks-connect
.dotnet
.dotnet
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added `Office365-REST-Python-Client` library to `requirements`
- Added `GetSalesQuotationData` view in `BusinessCore` source.
- Added new ViewType `queue_interaction_detail_view` to Genesys.

### Fixed

### Changed
- Changed the flow name from `TransformAndCatalog` to `TransformAndCatalogToLuma`.
- Modified `add_viadot_metadata_columns` to be able to apply a parameter source_name to the decorator for to_df funtion or function where the DataFrame is generated.
- Changed `SharepointToDF` task in order to implement add_viadot_metadata_columns with value `source_name="Sharepoint"` after changes.
- Changed, `Mindful` credentials passed by the `auth` parameter, instead of by the `header`.


## [0.4.19] - 2023-08-31
### Added
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ avro-python3==1.10.2
pygit2>=1.10.1, <1.11.0
dbt-core==1.3.2
dbt-sqlserver==1.3.1
lumaCLI==0.0.19
lumaCLI==0.0.19
Office365-REST-Python-Client==2.4.4
17 changes: 16 additions & 1 deletion tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@


class ClassForDecorator:
source = "Source_name"

def __init__(self):
self.df = pd.DataFrame({"a": [123], "b": ["abc"]})

def to_df(self):
return self.df

@add_viadot_metadata_columns
@add_viadot_metadata_columns()
def to_df_decorated(self):
return self.df

@add_viadot_metadata_columns(source)
def to_df_decorated_parameter(self):
return self.df


def test_single_quotes_inside():
TEST_VALUE = "a'b"
Expand Down Expand Up @@ -138,3 +144,12 @@ def test_add_viadot_metadata_columns_base():
assert df_base.columns.to_list() == ["a", "b"]
assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"]
assert df_decorated["_viadot_source"][0] == "ClassForDecorator"


def test_add_viadot_metadata_columns_with_parameter():
df_base = ClassForDecorator().to_df()
df_decorated = ClassForDecorator().to_df_decorated_parameter()

assert df_base.columns.to_list() == ["a", "b"]
assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"]
assert df_decorated["_viadot_source"][0] == "Source_name"
1 change: 1 addition & 0 deletions viadot/sources/business_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFr
"GetSalesInvoiceData",
"GetSalesReturnDetailData",
"GetSalesOrderData",
"GetSalesQuotationData",
]:
raise APIError(f"View {view} currently not available.")

Expand Down
1 change: 1 addition & 0 deletions viadot/sources/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def download_all_reporting_exports(
date = self.start_date.replace("-", "")
if single_report[4].lower() in [
"queue_performance_detail_view",
"queue_interaction_detail_view",
"agent_status_detail_view",
]:
file_name = f"{self.view_type.upper()}_{next(self.count)}_{date}"
Expand Down
13 changes: 7 additions & 6 deletions viadot/sources/mindful.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
from datetime import datetime, timedelta
from io import StringIO
from typing import Any, Dict, Literal
from datetime import datetime, timedelta
from typing import Any, Dict, Literal, Tuple

import pandas as pd
import prefect
from requests.models import Response
from requests.auth import HTTPBasicAuth

from viadot.exceptions import APIError
from viadot.sources.base import Source
Expand All @@ -15,7 +16,7 @@
class Mindful(Source):
def __init__(
self,
header: str,
auth: Tuple[str],
region: Literal["us1", "us2", "us3", "ca1", "eu1", "au1"] = "eu1",
start_date: datetime = None,
end_date: datetime = None,
Expand All @@ -27,7 +28,7 @@ def __init__(
"""Mindful connector which allows listing and downloading into Data Frame or specified format output.
Args:
header (str): Header with credentials for calling Mindful API.
auth (Tuple[str]): Authentication credentials for calling Mindful API. The structure is user and password.
region (Literal[us1, us2, us3, ca1, eu1, au1], optional): SD region from where to interact with the mindful API. Defaults to "eu1".
start_date (datetime, optional): Start date of the request. Defaults to None.
end_date (datetime, optional): End date of the resquest. Defaults to None.
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(
)

self.file_extension = file_extension
self.header = header
self.auth = auth

def _mindful_api_response(
self,
Expand All @@ -94,8 +95,8 @@ def _mindful_api_response(
response = handle_api_response(
url=f"https://{self.region}surveydynamix.com/api/{endpoint}",
params=params,
headers=self.header,
method="GET",
auth=HTTPBasicAuth(*self.auth),
)

return response
Expand Down
18 changes: 10 additions & 8 deletions viadot/tasks/cloud_for_customers.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,22 @@ def run(
vault_name: str = None,
):
"""
Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters).
This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add
some filters parameters in this case.
Task for downloading data from the Cloud for Customers to a pandas DataFrame using URL (with query parameters).
Data is obtained from table by passing table name in the url or endpoint. It is recommended to add filters
parameters in this case.
Example:
url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi"
endpoint = "ServiceRequestCollection"
params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"}
Args:
url (str, optional): The url to the API in case of prepared report. Defaults to None.
env (str, optional): The environment to use. Defaults to 'QA'.
url (str, optional): The url to the API used in case of prepared report. Defaults to None.
env (str, optional): The environment to use to obtain credentials. Defaults to 'QA'.
endpoint (str, optional): The endpoint of the API. Defaults to None.
fields (List[str], optional): The C4C Table fields. Defaults to None.
params (Dict[str, str]): Query parameters. Defaults to $format=json.
chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor.
params (Dict[str, str]): Query parameters. Defaults to None.
chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor. Defaults to None.
if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with C4C credentials. Defaults to None.
Expand Down Expand Up @@ -211,7 +211,9 @@ def run(

def _generate_chunks() -> Generator[pd.DataFrame, None, None]:
"""
Util returning chunks as a generator to save memory.
Util function returning chunks.
Returns: Generator[pd.DataFrame, None, None]
"""
offset = 0
total_record_count = 0
Expand Down
9 changes: 5 additions & 4 deletions viadot/tasks/mindful.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ def run(
credentials_mindful = None
raise CredentialError("Credentials not found.")

header = {
"Authorization": f"Bearer {credentials_mindful.get('VAULT')}",
}
auth = (
credentials_mindful["CUSTOMER_UUID"],
credentials_mindful["AUTH_TOKEN"],
)

mindful = Mindful(
header=header,
auth=auth,
region=region,
start_date=start_date,
end_date=end_date,
Expand Down
2 changes: 2 additions & 0 deletions viadot/tasks/sharepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..exceptions import ValidationError
from ..sources import Sharepoint
from .azure_key_vault import AzureKeyVaultSecret
from ..utils import add_viadot_metadata_columns

logger = logging.get_logger()

Expand Down Expand Up @@ -147,6 +148,7 @@ def split_sheet(
"sheet_number",
"validate_excel_file",
)
@add_viadot_metadata_columns(source_name="Sharepoint")
def run(
self,
path_to_file: str = None,
Expand Down
45 changes: 34 additions & 11 deletions viadot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,21 +408,44 @@ def check_if_empty_file(
handle_if_empty_file(if_empty, message=f"Input file - '{path}' is empty.")


def add_viadot_metadata_columns(func: Callable) -> Callable:
def add_viadot_metadata_columns(source_name: str = None) -> Callable:
"""
Decorator that adds metadata columns to df in 'to_df' method.
For now only _viadot_source is available because _viadot_downloaded_at_utc is added on the Flow level.
Args:
source_name (str, optional): The name of the source to be included in the DataFrame.
This should be provided when creating a DataFrame in a Task, rather than in a Source.
Defaults to None.
Warning: Please remember to include brackets when applying a decorator, even if you are not passing the 'source_name' parameter.
Example:
In task:
@add_viadot_metadata_columns(source_name="Sharepoint")
def to_df(self):
...
In source:
@add_viadot_metadata_columns()
def to_df(self):
...
"""

@functools.wraps(func)
def wrapper(*args, **kwargs) -> pd.DataFrame:
df = func(*args, **kwargs)
def decorator(func) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> pd.DataFrame:
df = func(*args, **kwargs)

df["_viadot_source"] = (
source_name if source_name is not None else args[0].__class__.__name__
)

return df

# Accessing instance
instance = args[0]
_viadot_source = kwargs.get("source_name") or instance.__class__.__name__
df["_viadot_source"] = _viadot_source
# df["_viadot_downloaded_at_utc"] = datetime.now(timezone.utc).replace(microsecond=0)
return df
return wrapper

return wrapper
return decorator

0 comments on commit aab3323

Please sign in to comment.