Skip to content

Commit

Permalink
🐛 [BUGFIX] For SAPRFCV2 adding whitespaces to match sub - queries (#…
Browse files Browse the repository at this point in the history
…1100)

* ⚡️ adding whitespaces to match SAP data type characters length to bullet proof the merge

* Changed for loop to vector approach

* 🚧 Changed df to df_tmp in whitespace filler

* Added whitespace logging

* ♻️ Refactored logging of the exception

* Update src/viadot/sources/sap_rfc.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/sources/sap_rfc.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/sources/sap_rfc.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/sources/sap_rfc.py

Co-authored-by: Michał Zawadzki <[email protected]>

* 🎨 Changed unique variables name

* 🔥 Removed try except from whitespace len check

* ⚡️ Wrapped adding whitespaces in sub function

* 📝  Comments speeling changed

* ✅ Added tests for _adjust_whitespaces sap function

* ✨ updated .gitignore with rye

* ⬆️ Update pandas requirement as some funcs depend on pandas >=2

* ➕ Update sap extra requirements

* ➖ Remove the non-required(?) Cython

---------

Co-authored-by: Michał Zawadzki <[email protected]>
  • Loading branch information
marcinpurtak and trymzet authored Nov 26, 2024
1 parent cff4060 commit 587a7fc
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,6 @@ profiles.yaml

# AWS
.aws

# Rye
.rye/
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies = [
"openpyxl>=3.0.0",
"prefect>=2.19.7, <3",
"prefect-sqlalchemy>=0.4.3",
"pandas>=1.2.0",
"duckdb==1.0.0",
"requests>=2.32.3",
"prefect-github>=0.2.7",
Expand All @@ -37,6 +36,7 @@ dependencies = [
"simple-salesforce==1.12.6",
"pandas-gbq==0.23.1",
"paramiko>=3.5.0",
"pandas>=2.2.3",
]
requires-python = ">=3.10"
readme = "README.md"
Expand All @@ -60,7 +60,7 @@ aws = [
"awswrangler>=2.20.1, <3.0",
"prefect-aws>=0.4.19",
]
sap = ["pyrfc"]
sap = ["pyrfc==3.3.1"]

[tool.rye]
managed = true
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ packaging==24.1
# via pytest
paginate==0.5.6
# via mkdocs-material
pandas==2.2.2
pandas==2.2.3
# via db-dtypes
# via mkdocs-table-reader-plugin
# via pandas-gbq
Expand Down
2 changes: 1 addition & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ packaging==24.1
# via google-cloud-bigquery
# via pandas-gbq
# via prefect
pandas==2.2.2
pandas==2.2.3
# via db-dtypes
# via pandas-gbq
# via viadot2
Expand Down
65 changes: 41 additions & 24 deletions src/viadot/sources/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ def __init__(

if rfc_unique_id is not None:
self.rfc_unique_id = list(set(rfc_unique_id))
self._rfc_unique_id_len = {}
else:
self.rfc_unique_id = rfc_unique_id

Expand Down Expand Up @@ -966,24 +967,24 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
col_length_total = 0
if isinstance(self.rfc_unique_id[0], str):
character_limit = self.rfc_total_col_width_character_limit
for ref_column in self.rfc_unique_id:
col_length_reference_column = int(
for rfc_unique_col in self.rfc_unique_id:
rfc_unique_col_len = int(
self.call(
"DDIF_FIELDINFO_GET",
TABNAME=table_name,
FIELDNAME=ref_column,
FIELDNAME=rfc_unique_col,
)["DFIES_TAB"][0]["LENG"]
)
if col_length_reference_column > int(
if rfc_unique_col_len > int(
self.rfc_total_col_width_character_limit / 4
):
msg = f"{ref_column} can't be used as unique column, too large."
msg = f"{rfc_unique_col} can't be used as unique column, too large."
raise ValueError(msg)
local_limit = (
self.rfc_total_col_width_character_limit
- col_length_reference_column
self.rfc_total_col_width_character_limit - rfc_unique_col_len
)
character_limit = min(local_limit, character_limit)
self._rfc_unique_id_len[rfc_unique_col] = rfc_unique_col_len
else:
character_limit = self.rfc_total_col_width_character_limit

Expand All @@ -995,21 +996,21 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
cols.append(col)
else:
if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_unique_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)
cols = [col]
col_length_total = int(col_length)

if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)

columns = lists_of_columns
Expand Down Expand Up @@ -1040,6 +1041,30 @@ def _get_alias(self, column: str) -> str:
def _get_client_side_filter_cols(self):
return [f[1].split()[0] for f in self.client_side_filters.items()]

def _adjust_whitespaces(self, df: pd.DataFrame) -> pd.DataFrame:
"""Adjust the number of whitespaces.
Add whitespace characters in each row of each unique column to achieve
equal length of values in these columns, ensuring proper merging of subqueries.
"""
for rfc_unique_col in self.rfc_unique_id:
# Check in SAP metadata what is the declared
# dtype characters amount
rfc_unique_column_len = self._rfc_unique_id_len[rfc_unique_col]
actual_length_of_field = df[rfc_unique_col].str.len()
# Check which rows have fewer characters
# than specified in the column data type.
rows_missing_whitespaces = actual_length_of_field < rfc_unique_column_len
if any(rows_missing_whitespaces):
# Check how many whitespaces are missing in each row.
logger.info(f"Adding whitespaces for {rfc_unique_col} column")
n_missing_whitespaces = rfc_unique_column_len - actual_length_of_field
df.loc[rows_missing_whitespaces, rfc_unique_col] += np.char.multiply(
" ", n_missing_whitespaces[rows_missing_whitespaces]
)
return df

# TODO: refactor to remove linter warnings and so this can be tested.
@add_viadot_metadata_columns
def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR0912, PLR0915
Expand Down Expand Up @@ -1117,7 +1142,6 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
record_key = "WA"
data_raw = np.array(response["DATA"])
del response

# If reference columns are provided, it's not necessary to remove
# any extra row.
if not isinstance(self.rfc_unique_id[0], str):
Expand All @@ -1126,22 +1150,15 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
)
else:
start = False

records = list(_gen_split(data_raw, sep, record_key))
del data_raw

if (
isinstance(self.rfc_unique_id[0], str)
and list(df.columns) != fields
):
df_tmp = pd.DataFrame(columns=fields)
df_tmp[fields] = records
# SAP adds whitespaces to the first extracted column value.
# If whitespace is in unique column, it must be removed to make
# a proper merge.
for col in self.rfc_unique_id:
df_tmp[col] = df_tmp[col].str.strip()
df[col] = df[col].str.strip()
df_tmp = self._adjust_whitespaces(df_tmp)
df = pd.merge(df, df_tmp, on=self.rfc_unique_id, how="outer")
elif not start:
df[fields] = records
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_sap_rfc_2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import OrderedDict

from pandas import DataFrame

from viadot.utils import skip_test_on_missing_extra

from .test_sap_rfc import (
Expand Down Expand Up @@ -104,3 +106,14 @@ def test___build_pandas_filter_query():
sap._build_pandas_filter_query(sap.client_side_filters)
== "thirdlongcolname == 01234"
), sap._build_pandas_filter_query(sap.client_side_filters)


def test__adjust_whitespaces():
sap.rfc_unique_id = ["column1", "column2"]
sap._rfc_unique_id_len = {"column1": 5, "column2": 4}
data = {"column1": ["xyz ", "oiu"], "column2": ["yrt ", "lkj"]}
df = DataFrame(data)
df = sap._adjust_whitespaces(df)
col_values_len = df.applymap(lambda x: len(x))
check_if_length_match = col_values_len == sap._rfc_unique_id_len.values()
assert check_if_length_match.all().all()

0 comments on commit 587a7fc

Please sign in to comment.