Skip to content

Commit

Permalink
feat: add VALUE_MAPPING Operator when using transform method
Browse files Browse the repository at this point in the history
Signed-off-by: seolmin <[email protected]>
  • Loading branch information
stat-kwon committed Dec 10, 2024
1 parent 4f6d2d2 commit ba4bd69
Showing 1 changed file with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
"EVAL",
"PIVOT",
"ADD_LABELS",
"VALUE_MAPPING",
]:
raise ERROR_NOT_SUPPORTED_OPERATOR(operator=operator)

Expand Down Expand Up @@ -103,6 +104,8 @@ def load(
self.pivot_data_table(granularity, start, end, vars)
elif self.operator == "ADD_LABELS":
self.add_labels_data_table(granularity, start, end, vars)
elif self.operator == "VALUE_MAPPING":
self.value_mapping_data_table(granularity, start, end, vars)

self.state = "AVAILABLE"
self.error_message = None
Expand Down Expand Up @@ -429,16 +432,16 @@ def add_labels_data_table(
end: str = None,
vars: dict = None,
) -> None:
origin_vo = self.data_table_vos[0]
label_keys = list(origin_vo.labels_info.keys())
data_keys = list(origin_vo.data_info.keys())
data_table_vo = self.data_table_vos[0]
label_keys = list(data_table_vo.labels_info.keys())
data_keys = list(data_table_vo.data_info.keys())

labels = self.options.get("labels")

if not labels:
raise ERROR_REQUIRED_PARAMETER(key="options.ADD_LABELS.labels")

df = self._get_data_table(origin_vo, granularity, start, end, vars)
df = self._get_data_table(data_table_vo, granularity, start, end, vars)

for label_key in labels.keys():
if label_key in df.columns:
Expand All @@ -464,6 +467,60 @@ def add_labels_data_table(

self.df = df

def value_mapping_data_table(
self,
granularity: GRANULARITY = "MONTHLY",
start: str = None,
end: str = None,
vars: dict = None,
) -> None:
data_table_vo = self.data_table_vos[0]
df = self._get_data_table(data_table_vo, granularity, start, end, vars)

self.label_keys = list(data_table_vo.labels_info.keys())
self.data_keys = list(data_table_vo.data_info.keys())

name = self.options["name"]
field_type = self.options.get("field_type", "LABEL")
else_value = self.options.get("else", None)

if condition := self.options.get("condition"):
if self.is_jinja_expression(condition):
condition, gv_type_map = self.change_global_variables(condition, vars)
condition = self.remove_jinja_braces(condition)
condition = self.change_expression_data_type(condition, gv_type_map)

filtered_df = df.query(condition).copy()
else:
filtered_df = df.copy()

filtered_df.loc[:, name] = else_value

if cases := self.options.get("cases", []):
for case in cases:
self._validate_case(case)
key = case["key"]
operator = case["operator"]
value = case["value"]
match = case["match"]

if operator == "eq":
filtered_df.loc[filtered_df[key] == match, name] = value
elif operator == "regex":
filtered_df.loc[filtered_df[key].str.contains(match), name] = value

df.loc[filtered_df.index, name] = filtered_df[name]

unfiltered_index = df.index.difference(filtered_df.index)
if field_type == "LABEL":
df.loc[unfiltered_index, name] = ""
self.data_keys.append(name)
elif field_type == "DATA":
df.loc[unfiltered_index, name] = 0
self.data_keys.append(name)

self.df = df

def _get_data_table(
self,
data_table_vo: Union[PublicDataTable, PrivateDataTable],
Expand Down Expand Up @@ -825,3 +882,10 @@ def _apply_order_by(
sorted_columns = column_sums.sort_values(ascending=ascending).index.tolist()
pivot_table = pivot_table[self.label_keys + sorted_columns]
return pivot_table

@staticmethod
def _validate_case(case):
required_keys = ["key", "match", "value", "operator"]
for key in required_keys:
if key not in case:
raise ERROR_REQUIRED_PARAMETER(key=f"options.VALUE_MAPPING.cases.{key}")

0 comments on commit ba4bd69

Please sign in to comment.