From ece858ba50cc07a7f2f4d7a136d2cabc40fba238 Mon Sep 17 00:00:00 2001 From: seolmin Date: Thu, 5 Dec 2024 10:40:08 +0900 Subject: [PATCH 01/11] feat: change so that the join key can be received and processed when using the Join Operator. Signed-off-by: seolmin --- .../data_transformation_manager.py | 77 +++++++++++++++++-- 1 file changed, 69 insertions(+), 8 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 0ae45b8..1394405 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -117,8 +117,16 @@ def join_data_tables( key="options.JOIN.how", reason="Invalid join type" ) + left_keys = self.options.get("left_keys") + right_keys = self.options.get("right_keys") + if not left_keys or not right_keys: + missing_key = "left_keys" if not left_keys else "right_keys" + raise ERROR_REQUIRED_PARAMETER(key=f"options.JOIN.{missing_key}") + origin_vo = self.data_table_vos[0] other_vo = self.data_table_vos[1] + origin_vo_name = origin_vo.name + other_vo_name = other_vo.name origin_data_keys = set(origin_vo.data_info.keys()) other_data_keys = set(other_vo.data_info.keys()) @@ -129,11 +137,54 @@ def join_data_tables( self.data_keys = list(origin_data_keys | other_data_keys) origin_label_keys = set(origin_vo.labels_info.keys()) other_label_keys = set(other_vo.labels_info.keys()) - self.label_keys = list(origin_label_keys | other_label_keys) - on = list(origin_label_keys & other_label_keys) - if len(on) == 0: - raise ERROR_NO_FIELDS_TO_JOIN() + if len(left_keys) != len(right_keys): + raise ERROR_INVALID_PARAMETER( + key="options.JOIN", + reason=f"left_keys and right_keys should have same length. " + f"left_keys={list(origin_label_keys)}, right_keys={list(other_label_keys)}", + ) + + for key in left_keys: + if key not in origin_label_keys: + raise ERROR_INVALID_PARAMETER( + key="options.JOIN.left_keys", + reason=f"Invalid key: {key}, table keys={list(origin_label_keys)}", + ) + for key in right_keys: + if key not in other_label_keys: + raise ERROR_INVALID_PARAMETER( + key="options.JOIN.left_keys", + reason=f"Invalid key: {key}, table keys={list(other_label_keys)}", + ) + multi_keys = zip(left_keys, right_keys) + + rename_columns = {} + if how in ["left", "inner", "outer"]: + for left_key, right_key in multi_keys: + rename_columns[right_key] = left_key + elif how == "right": + for left_key, right_key in multi_keys: + rename_columns[left_key] = right_key + + origin_df = self._get_data_table(origin_vo, granularity, start, end, vars) + other_df = self._get_data_table(other_vo, granularity, start, end, vars) + + if how in ["left", "inner", "outer"]: + other_df = other_df.rename(columns=rename_columns) + elif how == "right": + origin_df = origin_df.rename(columns=rename_columns) + + origin_label_keys = set(origin_df.columns) + other_label_keys = set(other_df.columns) + + all_keys = list(origin_label_keys | other_label_keys) + self.label_keys = list(set(all_keys) - set(self.data_keys)) + + if how in ["left", "right", "inner"]: + join_keys = left_keys + else: + join_keys = right_keys fill_na = {} for key in self.data_keys: @@ -142,9 +193,6 @@ def join_data_tables( for key in self.label_keys: fill_na[key] = "" - origin_df = self._get_data_table(origin_vo, granularity, start, end, vars) - other_df = self._get_data_table(other_vo, granularity, start, end, vars) - if len(other_df) == 0: if how in ["left", "outer"]: self.df = origin_df @@ -158,8 +206,21 @@ def join_data_tables( self.df = origin_df return - merged_df = origin_df.merge(other_df, left_on=on, right_on=on, how=how) + merged_df = origin_df.merge( + other_df, left_on=join_keys, right_on=join_keys, how=how + ) merged_df = merged_df.fillna(value=fill_na) + merged_rename_columns = {} + for column in merged_df.columns: + if column.endswith("_x"): + column_name, _ = column.split("_") + merged_rename_columns[column] = f"{column_name}({origin_vo_name})" + elif column.endswith("_y"): + column_name, _ = column.split("_") + merged_rename_columns[column] = f"{column_name}({other_vo_name})" + merged_df = merged_df.rename(columns=merged_rename_columns) + self.label_keys = list(set(merged_df.columns) - set(self.data_keys)) + self.df = merged_df def concat_data_tables( From 7b1c72a3cd6e4461213aecc82e3dd29958d88c14 Mon Sep 17 00:00:00 2001 From: seolmin Date: Fri, 6 Dec 2024 10:09:57 +0900 Subject: [PATCH 02/11] fix: refactor join operator when using transform method Signed-off-by: seolmin --- .../data_transformation_manager.py | 218 ++++++++++-------- 1 file changed, 121 insertions(+), 97 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 1394405..4644dde 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -112,115 +112,28 @@ def join_data_tables( vars: dict = None, ) -> None: how = self.options.get("how", "left") - if how not in ["left", "right", "inner", "outer"]: - raise ERROR_INVALID_PARAMETER( - key="options.JOIN.how", reason="Invalid join type" - ) - left_keys = self.options.get("left_keys") right_keys = self.options.get("right_keys") - if not left_keys or not right_keys: - missing_key = "left_keys" if not left_keys else "right_keys" - raise ERROR_REQUIRED_PARAMETER(key=f"options.JOIN.{missing_key}") + + self._validate_options(how, left_keys, right_keys) origin_vo = self.data_table_vos[0] other_vo = self.data_table_vos[1] - origin_vo_name = origin_vo.name - other_vo_name = other_vo.name - origin_data_keys = set(origin_vo.data_info.keys()) - other_data_keys = set(other_vo.data_info.keys()) - - duplicate_keys = list(origin_data_keys & other_data_keys) - if len(duplicate_keys) > 0: - raise ERROR_DUPLICATED_DATA_FIELDS(field=", ".join(duplicate_keys)) - - self.data_keys = list(origin_data_keys | other_data_keys) - origin_label_keys = set(origin_vo.labels_info.keys()) - other_label_keys = set(other_vo.labels_info.keys()) - - if len(left_keys) != len(right_keys): - raise ERROR_INVALID_PARAMETER( - key="options.JOIN", - reason=f"left_keys and right_keys should have same length. " - f"left_keys={list(origin_label_keys)}, right_keys={list(other_label_keys)}", - ) - - for key in left_keys: - if key not in origin_label_keys: - raise ERROR_INVALID_PARAMETER( - key="options.JOIN.left_keys", - reason=f"Invalid key: {key}, table keys={list(origin_label_keys)}", - ) - for key in right_keys: - if key not in other_label_keys: - raise ERROR_INVALID_PARAMETER( - key="options.JOIN.left_keys", - reason=f"Invalid key: {key}, table keys={list(other_label_keys)}", - ) - multi_keys = zip(left_keys, right_keys) - - rename_columns = {} - if how in ["left", "inner", "outer"]: - for left_key, right_key in multi_keys: - rename_columns[right_key] = left_key - elif how == "right": - for left_key, right_key in multi_keys: - rename_columns[left_key] = right_key - origin_df = self._get_data_table(origin_vo, granularity, start, end, vars) other_df = self._get_data_table(other_vo, granularity, start, end, vars) - if how in ["left", "inner", "outer"]: - other_df = other_df.rename(columns=rename_columns) - elif how == "right": - origin_df = origin_df.rename(columns=rename_columns) - - origin_label_keys = set(origin_df.columns) - other_label_keys = set(other_df.columns) - - all_keys = list(origin_label_keys | other_label_keys) - self.label_keys = list(set(all_keys) - set(self.data_keys)) - - if how in ["left", "right", "inner"]: - join_keys = left_keys - else: - join_keys = right_keys + self._validate_join_keys(left_keys, right_keys, origin_vo, other_vo) - fill_na = {} - for key in self.data_keys: - fill_na[key] = 0 + self._set_data_keys(origin_vo, other_vo) - for key in self.label_keys: - fill_na[key] = "" - - if len(other_df) == 0: - if how in ["left", "outer"]: - self.df = origin_df - else: - self.df = other_df - return - elif len(origin_df) == 0: - if how in ["right", "outer"]: - self.df = other_df - else: - self.df = origin_df - return - - merged_df = origin_df.merge( - other_df, left_on=join_keys, right_on=join_keys, how=how + merged_df = self._merge_data_frames( + origin_df, other_df, how, left_keys, right_keys + ) + merged_df = self._rename_duplicated_columns( + merged_df, origin_vo.name, other_vo.name ) - merged_df = merged_df.fillna(value=fill_na) - merged_rename_columns = {} - for column in merged_df.columns: - if column.endswith("_x"): - column_name, _ = column.split("_") - merged_rename_columns[column] = f"{column_name}({origin_vo_name})" - elif column.endswith("_y"): - column_name, _ = column.split("_") - merged_rename_columns[column] = f"{column_name}({other_vo_name})" - merged_df = merged_df.rename(columns=merged_rename_columns) - self.label_keys = list(set(merged_df.columns) - set(self.data_keys)) + self.label_keys = list(set(merged_df.columns) - set(self.data_keys)) self.df = merged_df def concat_data_tables( @@ -617,6 +530,117 @@ def _get_data_table_from_options( return parent_dt_vos + def _set_data_keys( + self, + origin_vo: Union[PublicDataTable, PrivateDataTable], + other_vo: Union[PublicDataTable, PrivateDataTable], + ) -> None: + origin_data_keys = set(origin_vo.data_info.keys()) + other_data_keys = set(other_vo.data_info.keys()) + + duplicate_keys = list(origin_data_keys & other_data_keys) + if duplicate_keys: + raise ERROR_DUPLICATED_DATA_FIELDS(field=", ".join(duplicate_keys)) + + self.data_keys = list(origin_data_keys | other_data_keys) + + @staticmethod + def _validate_options(how: str, left_keys: list, right_keys: list) -> None: + if how not in ["left", "right", "inner", "outer"]: + raise ERROR_INVALID_PARAMETER( + key="options.JOIN.how", reason="Invalid join type" + ) + if not left_keys or not right_keys: + missing_key = "left_keys" if not left_keys else "right_keys" + raise ERROR_REQUIRED_PARAMETER(key=f"options.JOIN.{missing_key}") + + @staticmethod + def _validate_join_keys( + left_keys: list, + right_keys: list, + origin_vo: Union[PublicDataTable, PrivateDataTable], + other_vo: Union[PublicDataTable, PrivateDataTable], + ) -> None: + origin_label_keys = set(origin_vo.labels_info.keys()) + other_label_keys = set(other_vo.labels_info.keys()) + + if len(left_keys) != len(right_keys): + raise ERROR_INVALID_PARAMETER( + key="options.JOIN", + reason=f"left_keys and right_keys should have the same length. " + f"left_keys={list(origin_label_keys)}, right_keys={list(other_label_keys)}", + ) + + for key in left_keys: + if key not in origin_label_keys: + raise ERROR_INVALID_PARAMETER( + key="options.JOIN.left_keys", + reason=f"Invalid key: {key}, table keys={list(origin_label_keys)}", + ) + for key in right_keys: + if key not in other_label_keys: + raise ERROR_INVALID_PARAMETER( + key="options.JOIN.right_keys", + reason=f"Invalid key: {key}, table keys={list(other_label_keys)}", + ) + + @staticmethod + def _create_rename_columns_from_join_keys(left_keys, right_keys, how): + # 키 매핑 + multi_keys = zip(left_keys, right_keys) + rename_columns = {} + if how in ["left", "inner", "outer"]: + for left_key, right_key in multi_keys: + rename_columns[right_key] = left_key + elif how == "right": + for left_key, right_key in multi_keys: + rename_columns[left_key] = right_key + + return rename_columns + + def _merge_data_frames( + self, + origin_df: pd.DataFrame, + other_df: pd.DataFrame, + how: str, + left_keys: list, + right_keys: list, + ) -> pd.DataFrame: + rename_columns = self._create_rename_columns_from_join_keys( + left_keys, right_keys, how + ) + if how in ["left", "inner", "outer"]: + other_df = other_df.rename(columns=rename_columns) + elif how == "right": + origin_df = origin_df.rename(columns=rename_columns) + + join_keys = left_keys if how in ["left", "inner", "outer"] else right_keys + merged_df = origin_df.merge( + other_df, left_on=join_keys, right_on=join_keys, how=how + ) + + label_keys = list(set(merged_df.columns) - set(self.data_keys)) + fill_na = {key: 0 for key in self.data_keys} + fill_na.update({key: "" for key in label_keys}) + merged_df = merged_df.fillna(value=fill_na) + + return merged_df + + @staticmethod + def _rename_duplicated_columns( + merged_df: pd.DataFrame, origin_vo_name: str, other_vo_name: str + ) -> pd.DataFrame: + merged_rename_columns = {} + for column in merged_df.columns: + if column.endswith("_x"): + column_name, _ = column.split("_") + merged_rename_columns[column] = f"{column_name}({origin_vo_name})" + elif column.endswith("_y"): + column_name, _ = column.split("_") + merged_rename_columns[column] = f"{column_name}({other_vo_name})" + + return merged_df.rename(columns=merged_rename_columns) + @staticmethod def _check_columns( df: pd.DataFrame, indexes: list, columns: list, values: list From 2eb1bacbdccd27835f696da941761ccc653797df Mon Sep 17 00:00:00 2001 From: seolmin Date: Fri, 6 Dec 2024 12:31:43 +0900 Subject: [PATCH 03/11] fix: change pivot options Signed-off-by: seolmin --- .../data_transformation_manager.py | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 4644dde..1dd0ca3 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -408,23 +408,23 @@ def pivot_data_table( vars: dict = None, ) -> None: origin_vo = self.data_table_vos[0] - index, columns, values = ( - self.options["index"], - self.options["columns"], - self.options["values"], + field_options = self.options["fields"] + label_fields, column_field, data_field = ( + field_options["label_fields"], + field_options["column_field"], + field_options["data_field"], ) - aggregation = self.options.get("aggregation", "sum") - + aggregation = field_options.get("aggregation", "sum") raw_df = self._get_data_table(origin_vo, granularity, start, end, vars) - self._check_columns(raw_df, index, columns, values) - fill_value = self._set_fill_value_from_df(raw_df, values) + self._check_columns(raw_df, label_fields, column_field, data_field) + fill_value = self._set_fill_value_from_df(raw_df, data_field) try: pivot_table = pd.pivot_table( raw_df, - values=values, - index=index, - columns=columns, + values=[data_field], + index=label_fields, + columns=[column_field], aggfunc=aggregation, fill_value=fill_value, ) @@ -643,26 +643,32 @@ def _rename_duplicated_columns( @staticmethod def _check_columns( - df: pd.DataFrame, indexes: list, columns: list, values: list + df: pd.DataFrame, label_fields: list, column_field: str, data_field: str ) -> None: df_columns = set(df.columns) - for col_type, col_list in [ - ("index", indexes), - ("columns", columns), - ("values", values), - ]: - for col in col_list: - if col not in df_columns: - raise ERROR_INVALID_PARAMETER( - key=f"options.PIVOT.{col_type}", - reason=f"Invalid key: {col}, columns={list(df_columns)}", - ) + for label_field in label_fields: + if label_field not in df_columns: + raise ERROR_INVALID_PARAMETER( + key=f"options.PIVOT.label_fields", + reason=f"Invalid key: {label_field}, columns={list(df_columns)}", + ) + + if column_field not in df_columns: + raise ERROR_INVALID_PARAMETER( + key=f"options.PIVOT.column_field", + reason=f"Invalid key: {column_field}, columns={list(df_columns)}", + ) + + if data_field not in df_columns: + raise ERROR_INVALID_PARAMETER( + key=f"options.PIVOT.data_field", + reason=f"Invalid key: {data_field}, columns={list(df_columns)}", + ) @staticmethod - def _set_fill_value_from_df(df: pd.DataFrame, values: list) -> Union[int, str]: - for value in values: - if df[value].dtype == "object": - return "" + def _set_fill_value_from_df(df: pd.DataFrame, data_field: str) -> Union[int, str]: + if df[data_field].dtype == "object": + return "" return 0 @staticmethod From c9624c210d97c36eebbd171c798ae856c9e97212 Mon Sep 17 00:00:00 2001 From: seolmin Date: Fri, 6 Dec 2024 17:07:40 +0900 Subject: [PATCH 04/11] feat: add pivot options for data processing Signed-off-by: seolmin --- .../data_transformation_manager.py | 91 ++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 1dd0ca3..ded2732 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -50,6 +50,7 @@ def __init__( self.data_table_vos = self._get_data_table_from_options(operator, options) self.data_keys = [] self.label_keys = [] + self.total_series = None def get_data_and_labels_info(self) -> Tuple[dict, dict]: data_info = {} @@ -415,6 +416,9 @@ def pivot_data_table( field_options["data_field"], ) aggregation = field_options.get("aggregation", "sum") + manual_column_fields = self.options.get("manual_column_fields") + sorted_columns = None + raw_df = self._get_data_table(origin_vo, granularity, start, end, vars) self._check_columns(raw_df, label_fields, column_field, data_field) fill_value = self._set_fill_value_from_df(raw_df, data_field) @@ -434,7 +438,80 @@ def pivot_data_table( pivot_table.reset_index(inplace=True) self._set_keys(list(pivot_table.columns)) - self.df = self._set_new_column_names(pivot_table) + pivot_table = self._set_new_column_names(pivot_table) + column_field_items = list(set(pivot_table.columns) - set(label_fields)) + + if not self.total_series: + self.total_series = pivot_table[column_field_items].sum(axis=1) + + if sort := self.options.get("sort"): + if sort_keys := sort.get("rows"): + pivot_table["_total"] = self.total_series + + if len(sort_keys) == 1 and "key" not in sort_keys[0]: + sort_order = sort_keys[0]["order"] + if sort_order == "desc": + order_state = False + else: + order_state = True + pivot_table = pivot_table.sort_values( + by="_total", ascending=order_state + ) + + elif sort_keys: + multi_keys = [] + multi_orders = [] + for sort_key_info in sort_keys: + sort_key = sort_key_info.get("key") + sort_order = sort_key_info.get("order") + + if sort_key != "_total" and sort_key not in column_field_items: + raise ERROR_INVALID_PARAMETER( + key="options.PIVOT.sort.keys", + reason=f"Invalid key: {sort_key}, columns={column_field_items}", + ) + + if sort_order == "desc": + order_state = False + else: + order_state = True + + multi_keys.append(sort_key) + multi_orders.append(order_state) + + pivot_table = pivot_table.sort_values( + by=multi_keys, ascending=multi_orders + ) + + pivot_table = pivot_table.drop(columns=["_total"]) + + if sort.get("column_fields"): + column_sums = pivot_table.drop(columns=label_fields).sum() + sorted_columns = column_sums.sort_values(ascending=False).index.tolist() + + pivot_table = pivot_table[label_fields + sorted_columns] + + if manual_column_fields: + self._validate_manual_column_fields( + manual_column_fields, column_field_items + ) + sorted_manual_columns = [] + if sorted_columns: + for column_field in sorted_columns: + if column_field in manual_column_fields: + sorted_manual_columns.append(column_field) + manual_column_fields = sorted_manual_columns + + pivot_table = pivot_table[label_fields + manual_column_fields] + + if limit := self.options.get("limit"): + if rows := limit.get("rows"): + pivot_table = pivot_table.head(rows) + + if column_fields := limit.get("column_fields"): + pivot_table = pivot_table.iloc[:, : len(label_fields) + column_fields] + + self.df = pivot_table def _get_data_table( self, @@ -685,3 +762,15 @@ def _set_keys(self, columns: list) -> None: upper_col for upper_col, lower_col in columns if not lower_col ] self.data_keys = [lower_col for upper_col, lower_col in columns if lower_col] + + @staticmethod + def _validate_manual_column_fields( + manual_column_fields: list, + column_fields: list, + ) -> None: + for manual_column_field in manual_column_fields: + if manual_column_field not in column_fields: + raise ERROR_INVALID_PARAMETER( + key="options.PIVOT.manual_column_fields", + reason=f"Invalid key: {manual_column_field}, columns={column_fields}", + ) From 8346648f930463a1ca3079aa029c320c8b297231 Mon Sep 17 00:00:00 2001 From: seolmin Date: Fri, 6 Dec 2024 17:54:56 +0900 Subject: [PATCH 05/11] fix: Refactor pivot table to suit purpose Signed-off-by: seolmin --- .../data_transformation_manager.py | 227 +++++++++++------- 1 file changed, 137 insertions(+), 90 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index ded2732..134d8fb 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -416,101 +416,15 @@ def pivot_data_table( field_options["data_field"], ) aggregation = field_options.get("aggregation", "sum") - manual_column_fields = self.options.get("manual_column_fields") - sorted_columns = None raw_df = self._get_data_table(origin_vo, granularity, start, end, vars) self._check_columns(raw_df, label_fields, column_field, data_field) fill_value = self._set_fill_value_from_df(raw_df, data_field) - try: - pivot_table = pd.pivot_table( - raw_df, - values=[data_field], - index=label_fields, - columns=[column_field], - aggfunc=aggregation, - fill_value=fill_value, - ) - except Exception as e: - _LOGGER.error(f"[pivot_data_table] pivot error: {e}") - raise ERROR_INVALID_PARAMETER(key="options.PIVOT", reason=str(e)) - - pivot_table.reset_index(inplace=True) - self._set_keys(list(pivot_table.columns)) - pivot_table = self._set_new_column_names(pivot_table) - column_field_items = list(set(pivot_table.columns) - set(label_fields)) - - if not self.total_series: - self.total_series = pivot_table[column_field_items].sum(axis=1) - - if sort := self.options.get("sort"): - if sort_keys := sort.get("rows"): - pivot_table["_total"] = self.total_series - - if len(sort_keys) == 1 and "key" not in sort_keys[0]: - sort_order = sort_keys[0]["order"] - if sort_order == "desc": - order_state = False - else: - order_state = True - pivot_table = pivot_table.sort_values( - by="_total", ascending=order_state - ) - - elif sort_keys: - multi_keys = [] - multi_orders = [] - for sort_key_info in sort_keys: - sort_key = sort_key_info.get("key") - sort_order = sort_key_info.get("order") - - if sort_key != "_total" and sort_key not in column_field_items: - raise ERROR_INVALID_PARAMETER( - key="options.PIVOT.sort.keys", - reason=f"Invalid key: {sort_key}, columns={column_field_items}", - ) - - if sort_order == "desc": - order_state = False - else: - order_state = True - - multi_keys.append(sort_key) - multi_orders.append(order_state) - - pivot_table = pivot_table.sort_values( - by=multi_keys, ascending=multi_orders - ) - - pivot_table = pivot_table.drop(columns=["_total"]) - - if sort.get("column_fields"): - column_sums = pivot_table.drop(columns=label_fields).sum() - sorted_columns = column_sums.sort_values(ascending=False).index.tolist() - - pivot_table = pivot_table[label_fields + sorted_columns] - - if manual_column_fields: - self._validate_manual_column_fields( - manual_column_fields, column_field_items - ) - sorted_manual_columns = [] - if sorted_columns: - for column_field in sorted_columns: - if column_field in manual_column_fields: - sorted_manual_columns.append(column_field) - manual_column_fields = sorted_manual_columns - - pivot_table = pivot_table[label_fields + manual_column_fields] - - if limit := self.options.get("limit"): - if rows := limit.get("rows"): - pivot_table = pivot_table.head(rows) - - if column_fields := limit.get("column_fields"): - pivot_table = pivot_table.iloc[:, : len(label_fields) + column_fields] - + pivot_table = self._create_pivot_table( + raw_df, label_fields, column_field, data_field, aggregation, fill_value + ) + pivot_table = self._sort_and_filter_pivot_table(pivot_table, label_fields) self.df = pivot_table def _get_data_table( @@ -774,3 +688,136 @@ def _validate_manual_column_fields( key="options.PIVOT.manual_column_fields", reason=f"Invalid key: {manual_column_field}, columns={column_fields}", ) + + def _create_pivot_table( + self, + raw_df: pd.DataFrame, + label_fields: list, + column_field: str, + data_field: str, + aggregation: str, + fill_value: Union[int, str], + ) -> pd.DataFrame: + try: + pivot_table = pd.pivot_table( + raw_df, + values=[data_field], + index=label_fields, + columns=[column_field], + aggfunc=aggregation, + fill_value=fill_value, + ) + pivot_table.reset_index(inplace=True) + self._set_keys(list(pivot_table.columns)) + return self._set_new_column_names(pivot_table) + except Exception as e: + _LOGGER.error(f"[pivot_data_table] pivot error: {e}") + raise ERROR_INVALID_PARAMETER(key="options.PIVOT", reason=str(e)) + + def _sort_and_filter_pivot_table( + self, + pivot_table: pd.DataFrame, + label_fields: list, + ) -> pd.DataFrame: + column_field_items = list(set(pivot_table.columns) - set(label_fields)) + if not self.total_series: + self.total_series = pivot_table[column_field_items].sum(axis=1) + + if sort := self.options.get("sort"): + pivot_table = self._apply_row_sorting(pivot_table, sort, column_field_items) + pivot_table = self._apply_column_sorting(pivot_table, sort, label_fields) + + if manual_column_fields := self.options.get("manual_column_fields"): + pivot_table = self._apply_manual_column_sorting( + pivot_table, manual_column_fields, label_fields, column_field_items + ) + + if limit := self.options.get("limit"): + pivot_table = self._apply_limits(pivot_table, limit, label_fields) + + return pivot_table + + def _apply_row_sorting( + self, + pivot_table: pd.DataFrame, + sort: dict, + column_field_items: list, + ) -> pd.DataFrame: + if sort_keys := sort.get("rows"): + pivot_table["_total"] = self.total_series + + if len(sort_keys) == 1 and "key" not in sort_keys[0]: + order_state = self._get_sort_order(sort_keys[0]["order"]) + pivot_table = pivot_table.sort_values( + by="_total", ascending=order_state + ) + else: + multi_keys, multi_orders = self._parse_sort_keys( + sort_keys, column_field_items + ) + pivot_table = pivot_table.sort_values( + by=multi_keys, ascending=multi_orders + ) + + pivot_table = pivot_table.drop(columns=["_total"]) + return pivot_table + + @staticmethod + def _apply_column_sorting( + pivot_table: pd.DataFrame, + sort: dict, + label_fields: list, + ) -> pd.DataFrame: + if sort.get("column_fields"): + column_sums = pivot_table.drop(columns=label_fields).sum() + sorted_columns = column_sums.sort_values(ascending=False).index.tolist() + pivot_table = pivot_table[label_fields + sorted_columns] + return pivot_table + + def _apply_manual_column_sorting( + self, + pivot_table: pd.DataFrame, + manual_column_fields: list, + label_fields: list, + column_field_items: list, + ) -> pd.DataFrame: + self._validate_manual_column_fields(manual_column_fields, column_field_items) + sorted_columns = [ + col for col in column_field_items if col in manual_column_fields + ] + pivot_table = pivot_table[label_fields + sorted_columns] + return pivot_table + + @staticmethod + def _apply_limits( + pivot_table: pd.DataFrame, + limit: dict, + label_fields: list, + ) -> pd.DataFrame: + if rows := limit.get("rows"): + pivot_table = pivot_table.head(rows) + if column_fields := limit.get("column_fields"): + pivot_table = pivot_table.iloc[:, : len(label_fields) + column_fields] + return pivot_table + + def _parse_sort_keys(self, sort_keys: list, column_field_items: list) -> tuple: + multi_keys = [] + multi_orders = [] + for sort_key_info in sort_keys: + sort_key = sort_key_info.get("key") + sort_order = sort_key_info.get("order") + + if sort_key != "_total" and sort_key not in column_field_items: + raise ERROR_INVALID_PARAMETER( + key="options.PIVOT.sort.keys", + reason=f"Invalid key: {sort_key}, columns={column_field_items}", + ) + + order_state = self._get_sort_order(sort_order) + multi_keys.append(sort_key) + multi_orders.append(order_state) + return multi_keys, multi_orders + + @staticmethod + def _get_sort_order(sort_order: str) -> bool: + return sort_order != "desc" From 1c9da49a0fa78d9c53d641e7a0658bb53e78b2a5 Mon Sep 17 00:00:00 2001 From: seolmin Date: Mon, 9 Dec 2024 13:13:29 +0900 Subject: [PATCH 06/11] fix: change pivot table to ensure order Signed-off-by: seolmin --- .../data_table_manager/data_transformation_manager.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 134d8fb..abdc0ce 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -9,7 +9,6 @@ ERROR_NOT_SUPPORTED_OPERATOR, ERROR_REQUIRED_PARAMETER, ERROR_DUPLICATED_DATA_FIELDS, - ERROR_NO_FIELDS_TO_JOIN, ERROR_INVALID_PARAMETER_TYPE, ) from spaceone.dashboard.manager.data_table_manager import DataTableManager, GRANULARITY @@ -726,6 +725,9 @@ def _sort_and_filter_pivot_table( if sort := self.options.get("sort"): pivot_table = self._apply_row_sorting(pivot_table, sort, column_field_items) pivot_table = self._apply_column_sorting(pivot_table, sort, label_fields) + column_field_items = [ + field for field in pivot_table.columns if field not in label_fields + ] if manual_column_fields := self.options.get("manual_column_fields"): pivot_table = self._apply_manual_column_sorting( @@ -782,9 +784,10 @@ def _apply_manual_column_sorting( column_field_items: list, ) -> pd.DataFrame: self._validate_manual_column_fields(manual_column_fields, column_field_items) - sorted_columns = [ - col for col in column_field_items if col in manual_column_fields - ] + sorted_columns = [] + for column_field in column_field_items: + if column_field in manual_column_fields: + sorted_columns.append(column_field) pivot_table = pivot_table[label_fields + sorted_columns] return pivot_table From d9a51f6247df3471709fb8da8f98cbacd755a4fe Mon Sep 17 00:00:00 2001 From: seolmin Date: Tue, 10 Dec 2024 15:25:58 +0900 Subject: [PATCH 07/11] fix: change PIVOT Operator to fixed spec Signed-off-by: seolmin --- .../data_transformation_manager.py | 198 +++++++----------- 1 file changed, 81 insertions(+), 117 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index abdc0ce..08e88a4 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -409,21 +409,22 @@ def pivot_data_table( ) -> None: origin_vo = self.data_table_vos[0] field_options = self.options["fields"] - label_fields, column_field, data_field = ( - field_options["label_fields"], - field_options["column_field"], - field_options["data_field"], + labels, column, data = ( + field_options["labels"], + field_options["column"], + field_options["data"], ) - aggregation = field_options.get("aggregation", "sum") + function = self.options.get("function", "sum") raw_df = self._get_data_table(origin_vo, granularity, start, end, vars) - self._check_columns(raw_df, label_fields, column_field, data_field) - fill_value = self._set_fill_value_from_df(raw_df, data_field) + self._check_columns(raw_df, labels, column, data) + fill_value = self._set_fill_value_from_df(raw_df, data) pivot_table = self._create_pivot_table( - raw_df, label_fields, column_field, data_field, aggregation, fill_value + raw_df, labels, column, data, function, fill_value ) - pivot_table = self._sort_and_filter_pivot_table(pivot_table, label_fields) + + pivot_table = self._sort_and_filter_pivot_table(pivot_table) self.df = pivot_table def _get_data_table( @@ -639,19 +640,19 @@ def _check_columns( for label_field in label_fields: if label_field not in df_columns: raise ERROR_INVALID_PARAMETER( - key=f"options.PIVOT.label_fields", + key=f"options.PIVOT.labels", reason=f"Invalid key: {label_field}, columns={list(df_columns)}", ) if column_field not in df_columns: raise ERROR_INVALID_PARAMETER( - key=f"options.PIVOT.column_field", + key=f"options.PIVOT.column", reason=f"Invalid key: {column_field}, columns={list(df_columns)}", ) if data_field not in df_columns: raise ERROR_INVALID_PARAMETER( - key=f"options.PIVOT.data_field", + key=f"options.PIVOT.data", reason=f"Invalid key: {data_field}, columns={list(df_columns)}", ) @@ -677,33 +678,42 @@ def _set_keys(self, columns: list) -> None: self.data_keys = [lower_col for upper_col, lower_col in columns if lower_col] @staticmethod - def _validate_manual_column_fields( - manual_column_fields: list, + def _validate_select_fields( + select_fields: list, column_fields: list, ) -> None: - for manual_column_field in manual_column_fields: - if manual_column_field not in column_fields: + for select_field in select_fields: + if select_field not in column_fields: raise ERROR_INVALID_PARAMETER( - key="options.PIVOT.manual_column_fields", - reason=f"Invalid key: {manual_column_field}, columns={column_fields}", + key="options.PIVOT.select", + reason=f"Invalid key: {select_field}, columns={column_fields}", ) + @staticmethod + def _validate_function(function: str) -> None: + if function not in ["sum", "mean", "max", "min"]: + raise ERROR_INVALID_PARAMETER( + key="options.PIVOT.function", + reason=f"Invalid function type: {function}", + ) + def _create_pivot_table( self, raw_df: pd.DataFrame, - label_fields: list, - column_field: str, - data_field: str, - aggregation: str, + labels: list, + column: str, + data: str, + function: str, fill_value: Union[int, str], ) -> pd.DataFrame: try: + self._validate_function(function) pivot_table = pd.pivot_table( raw_df, - values=[data_field], - index=label_fields, - columns=[column_field], - aggfunc=aggregation, + values=[data], + index=labels, + columns=[column], + aggfunc=function, fill_value=fill_value, ) pivot_table.reset_index(inplace=True) @@ -713,114 +723,68 @@ def _create_pivot_table( _LOGGER.error(f"[pivot_data_table] pivot error: {e}") raise ERROR_INVALID_PARAMETER(key="options.PIVOT", reason=str(e)) - def _sort_and_filter_pivot_table( - self, - pivot_table: pd.DataFrame, - label_fields: list, - ) -> pd.DataFrame: - column_field_items = list(set(pivot_table.columns) - set(label_fields)) + def _sort_and_filter_pivot_table(self, pivot_table: pd.DataFrame) -> pd.DataFrame: + column_fields = list(set(pivot_table.columns) - set(self.label_keys)) if not self.total_series: - self.total_series = pivot_table[column_field_items].sum(axis=1) + self.total_series = pivot_table[column_fields].sum(axis=1) - if sort := self.options.get("sort"): - pivot_table = self._apply_row_sorting(pivot_table, sort, column_field_items) - pivot_table = self._apply_column_sorting(pivot_table, sort, label_fields) - column_field_items = [ - field for field in pivot_table.columns if field not in label_fields - ] + pivot_table = self._apply_row_sorting(pivot_table) - if manual_column_fields := self.options.get("manual_column_fields"): - pivot_table = self._apply_manual_column_sorting( - pivot_table, manual_column_fields, label_fields, column_field_items - ) + if select_fields := self.options.get("select"): + self._validate_select_fields(select_fields, column_fields) + pivot_table = pivot_table[self.label_keys + select_fields] + column_fields = select_fields + + if order_by := self.options.get("order_by"): + self._validate_order_by_type(order_by) + pivot_table = self._apply_order_by(pivot_table, order_by, column_fields) if limit := self.options.get("limit"): - pivot_table = self._apply_limits(pivot_table, limit, label_fields) + pivot_table = pivot_table.iloc[:, : len(self.label_keys) + limit] + + pivot_table["Sub Total"] = self.total_series.loc[pivot_table.index] + self.data_keys = [ + col for col in pivot_table.columns if col not in set(self.label_keys) + ] return pivot_table def _apply_row_sorting( self, pivot_table: pd.DataFrame, - sort: dict, - column_field_items: list, ) -> pd.DataFrame: - if sort_keys := sort.get("rows"): - pivot_table["_total"] = self.total_series - - if len(sort_keys) == 1 and "key" not in sort_keys[0]: - order_state = self._get_sort_order(sort_keys[0]["order"]) - pivot_table = pivot_table.sort_values( - by="_total", ascending=order_state - ) - else: - multi_keys, multi_orders = self._parse_sort_keys( - sort_keys, column_field_items - ) - pivot_table = pivot_table.sort_values( - by=multi_keys, ascending=multi_orders - ) - - pivot_table = pivot_table.drop(columns=["_total"]) + pivot_table["_total"] = self.total_series + pivot_table = pivot_table.sort_values(by="_total", ascending=False) + pivot_table = pivot_table.drop(columns=["_total"]) return pivot_table @staticmethod - def _apply_column_sorting( - pivot_table: pd.DataFrame, - sort: dict, - label_fields: list, - ) -> pd.DataFrame: - if sort.get("column_fields"): - column_sums = pivot_table.drop(columns=label_fields).sum() - sorted_columns = column_sums.sort_values(ascending=False).index.tolist() - pivot_table = pivot_table[label_fields + sorted_columns] - return pivot_table + def _validate_order_by_type(order_by: dict) -> None: + order_by_type = order_by.get("type", "key") + if order_by_type not in ["key", "value"]: + raise ERROR_INVALID_PARAMETER( + key="options.PIVOT.order_by.type", + reason=f"Invalid order_by type: {order_by_type}", + ) - def _apply_manual_column_sorting( + def _apply_order_by( self, pivot_table: pd.DataFrame, - manual_column_fields: list, - label_fields: list, - column_field_items: list, - ) -> pd.DataFrame: - self._validate_manual_column_fields(manual_column_fields, column_field_items) - sorted_columns = [] - for column_field in column_field_items: - if column_field in manual_column_fields: - sorted_columns.append(column_field) - pivot_table = pivot_table[label_fields + sorted_columns] - return pivot_table - - @staticmethod - def _apply_limits( - pivot_table: pd.DataFrame, - limit: dict, - label_fields: list, + order_by: dict, + column_fields: list, ) -> pd.DataFrame: - if rows := limit.get("rows"): - pivot_table = pivot_table.head(rows) - if column_fields := limit.get("column_fields"): - pivot_table = pivot_table.iloc[:, : len(label_fields) + column_fields] + order_by_type = order_by.get("type", "key") + desc = order_by.get("desc", False) + if order_by_type == "key": + pivot_table = pivot_table[ + self.label_keys + sorted(column_fields, reverse=desc) + ] + else: + column_sums = pivot_table.drop(columns=self.label_keys).sum() + if desc: + ascending = False + else: + ascending = True + sorted_columns = column_sums.sort_values(ascending=ascending).index.tolist() + pivot_table = pivot_table[self.label_keys + sorted_columns] return pivot_table - - def _parse_sort_keys(self, sort_keys: list, column_field_items: list) -> tuple: - multi_keys = [] - multi_orders = [] - for sort_key_info in sort_keys: - sort_key = sort_key_info.get("key") - sort_order = sort_key_info.get("order") - - if sort_key != "_total" and sort_key not in column_field_items: - raise ERROR_INVALID_PARAMETER( - key="options.PIVOT.sort.keys", - reason=f"Invalid key: {sort_key}, columns={column_field_items}", - ) - - order_state = self._get_sort_order(sort_order) - multi_keys.append(sort_key) - multi_orders.append(order_state) - return multi_keys, multi_orders - - @staticmethod - def _get_sort_order(sort_order: str) -> bool: - return sort_order != "desc" From c480e73e9d6c7d686df2ca9efb908bcdf7083bc6 Mon Sep 17 00:00:00 2001 From: seolmin Date: Tue, 10 Dec 2024 15:30:59 +0900 Subject: [PATCH 08/11] fix: remove else condition for improved logic clarity in EVAL Operator Signed-off-by: seolmin --- .../data_transformation_manager.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 08e88a4..7bb3c2e 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -261,7 +261,6 @@ def evaluate_data_table( name = expression.get("name") field_type = expression.get("field_type", "DATA") condition = expression.get("condition") - else_condition = expression.get("else") value_expression = expression.get("expression") if name is None: @@ -288,15 +287,6 @@ def evaluate_data_table( value_expression, gv_type_map ) - if self.is_jinja_expression(else_condition): - else_condition, gv_type_map = self.change_global_variables( - else_condition, vars - ) - else_condition = self.remove_jinja_braces(else_condition) - else_condition = self.change_expression_data_type( - else_condition, gv_type_map - ) - template_vars = {} for key in self.data_keys: template_vars[key] = f"`{key}`" @@ -346,11 +336,6 @@ def evaluate_data_table( df.loc[df.query(condition).index, last_key] = df.eval( merged_expr ) - if else_condition: - else_index = list( - set(df.index) - set(df.query(condition).index) - ) - df.loc[else_index, last_key] = df.eval(else_condition) else: df.eval(merged_expr, inplace=True) From 4f6d2d2807468b4df95590ba96fb077487466f4a Mon Sep 17 00:00:00 2001 From: seolmin Date: Tue, 10 Dec 2024 16:34:02 +0900 Subject: [PATCH 09/11] feat: add ADD_LABELS Operator when using transform method Signed-off-by: seolmin --- .../data_transformation_manager.py | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 7bb3c2e..5152e6b 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -38,7 +38,15 @@ def __init__( ): super().__init__(*args, **kwargs) - if operator not in ["JOIN", "CONCAT", "AGGREGATE", "QUERY", "EVAL", "PIVOT"]: + if operator not in [ + "JOIN", + "CONCAT", + "AGGREGATE", + "QUERY", + "EVAL", + "PIVOT", + "ADD_LABELS", + ]: raise ERROR_NOT_SUPPORTED_OPERATOR(operator=operator) self.data_table_type = data_table_type @@ -93,6 +101,8 @@ def load( self.evaluate_data_table(granularity, start, end, vars) elif self.operator == "PIVOT": self.pivot_data_table(granularity, start, end, vars) + elif self.operator == "ADD_LABELS": + self.add_labels_data_table(granularity, start, end, vars) self.state = "AVAILABLE" self.error_message = None @@ -412,6 +422,48 @@ def pivot_data_table( pivot_table = self._sort_and_filter_pivot_table(pivot_table) self.df = pivot_table + def add_labels_data_table( + self, + granularity: GRANULARITY = "MONTHLY", + start: str = None, + end: str = None, + vars: dict = None, + ) -> None: + origin_vo = self.data_table_vos[0] + label_keys = list(origin_vo.labels_info.keys()) + data_keys = list(origin_vo.data_info.keys()) + + labels = self.options.get("labels") + + if not labels: + raise ERROR_REQUIRED_PARAMETER(key="options.ADD_LABELS.labels") + + df = self._get_data_table(origin_vo, granularity, start, end, vars) + + for label_key in labels.keys(): + if label_key in df.columns: + raise ERROR_INVALID_PARAMETER( + key="options.ADD_LABELS.labels", + reason=f"Duplicated key: {label_key}, columns={list(df.columns)}", + ) + + for key, value in labels.items(): + df[key] = value + if isinstance(value, str): + label_keys.append(key) + elif isinstance(value, (int, float)): + data_keys.append(key) + else: + raise ERROR_INVALID_PARAMETER_TYPE( + key="options.ADD_LABELS.labels", type=type(value) + ) + df = df.reindex(columns=label_keys + data_keys) + + self.label_keys = label_keys + self.data_keys = data_keys + + self.df = df + def _get_data_table( self, data_table_vo: Union[PublicDataTable, PrivateDataTable], From ba4bd6916dab73e47b6e41b271f65fed35b4783e Mon Sep 17 00:00:00 2001 From: seolmin Date: Tue, 10 Dec 2024 22:41:51 +0900 Subject: [PATCH 10/11] feat: add VALUE_MAPPING Operator when using transform method Signed-off-by: seolmin --- .../data_transformation_manager.py | 72 +++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 5152e6b..878e765 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -46,6 +46,7 @@ def __init__( "EVAL", "PIVOT", "ADD_LABELS", + "VALUE_MAPPING", ]: raise ERROR_NOT_SUPPORTED_OPERATOR(operator=operator) @@ -103,6 +104,8 @@ def load( self.pivot_data_table(granularity, start, end, vars) elif self.operator == "ADD_LABELS": self.add_labels_data_table(granularity, start, end, vars) + elif self.operator == "VALUE_MAPPING": + self.value_mapping_data_table(granularity, start, end, vars) self.state = "AVAILABLE" self.error_message = None @@ -429,16 +432,16 @@ def add_labels_data_table( end: str = None, vars: dict = None, ) -> None: - origin_vo = self.data_table_vos[0] - label_keys = list(origin_vo.labels_info.keys()) - data_keys = list(origin_vo.data_info.keys()) + data_table_vo = self.data_table_vos[0] + label_keys = list(data_table_vo.labels_info.keys()) + data_keys = list(data_table_vo.data_info.keys()) labels = self.options.get("labels") if not labels: raise ERROR_REQUIRED_PARAMETER(key="options.ADD_LABELS.labels") - df = self._get_data_table(origin_vo, granularity, start, end, vars) + df = self._get_data_table(data_table_vo, granularity, start, end, vars) for label_key in labels.keys(): if label_key in df.columns: @@ -464,6 +467,60 @@ def add_labels_data_table( self.df = df + def value_mapping_data_table( + self, + granularity: GRANULARITY = "MONTHLY", + start: str = None, + end: str = None, + vars: dict = None, + ) -> None: + data_table_vo = self.data_table_vos[0] + df = self._get_data_table(data_table_vo, granularity, start, end, vars) + + self.label_keys = list(data_table_vo.labels_info.keys()) + self.data_keys = list(data_table_vo.data_info.keys()) + + name = self.options["name"] + field_type = self.options.get("field_type", "LABEL") + else_value = self.options.get("else", None) + + if condition := self.options.get("condition"): + if self.is_jinja_expression(condition): + condition, gv_type_map = self.change_global_variables(condition, vars) + condition = self.remove_jinja_braces(condition) + condition = self.change_expression_data_type(condition, gv_type_map) + + filtered_df = df.query(condition).copy() + else: + filtered_df = df.copy() + + filtered_df.loc[:, name] = else_value + + if cases := self.options.get("cases", []): + for case in cases: + self._validate_case(case) + key = case["key"] + operator = case["operator"] + value = case["value"] + match = case["match"] + + if operator == "eq": + filtered_df.loc[filtered_df[key] == match, name] = value + elif operator == "regex": + filtered_df.loc[filtered_df[key].str.contains(match), name] = value + + df.loc[filtered_df.index, name] = filtered_df[name] + + unfiltered_index = df.index.difference(filtered_df.index) + if field_type == "LABEL": + df.loc[unfiltered_index, name] = "" + self.data_keys.append(name) + elif field_type == "DATA": + df.loc[unfiltered_index, name] = 0 + self.data_keys.append(name) + + self.df = df + def _get_data_table( self, data_table_vo: Union[PublicDataTable, PrivateDataTable], @@ -825,3 +882,10 @@ def _apply_order_by( sorted_columns = column_sums.sort_values(ascending=ascending).index.tolist() pivot_table = pivot_table[self.label_keys + sorted_columns] return pivot_table + + @staticmethod + def _validate_case(case): + required_keys = ["key", "match", "value", "operator"] + for key in required_keys: + if key not in case: + raise ERROR_REQUIRED_PARAMETER(key=f"options.VALUE_MAPPING.cases.{key}") From 7b2f534f49a4749d151257e32cb681f0d82f9501 Mon Sep 17 00:00:00 2001 From: seolmin Date: Tue, 10 Dec 2024 23:00:27 +0900 Subject: [PATCH 11/11] fix: refactor VALUE_MAPPING and ADD_LABELS operators Signed-off-by: seolmin --- .../data_transformation_manager.py | 152 +++++++++++------- 1 file changed, 97 insertions(+), 55 deletions(-) diff --git a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py index 878e765..db32467 100755 --- a/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py +++ b/src/spaceone/dashboard/manager/data_table_manager/data_transformation_manager.py @@ -435,31 +435,14 @@ def add_labels_data_table( data_table_vo = self.data_table_vos[0] label_keys = list(data_table_vo.labels_info.keys()) data_keys = list(data_table_vo.data_info.keys()) - labels = self.options.get("labels") - if not labels: - raise ERROR_REQUIRED_PARAMETER(key="options.ADD_LABELS.labels") - df = self._get_data_table(data_table_vo, granularity, start, end, vars) - for label_key in labels.keys(): - if label_key in df.columns: - raise ERROR_INVALID_PARAMETER( - key="options.ADD_LABELS.labels", - reason=f"Duplicated key: {label_key}, columns={list(df.columns)}", - ) + self.validate_labels(labels, df) + + self.add_labels_to_dataframe(df, labels, label_keys, data_keys) - for key, value in labels.items(): - df[key] = value - if isinstance(value, str): - label_keys.append(key) - elif isinstance(value, (int, float)): - data_keys.append(key) - else: - raise ERROR_INVALID_PARAMETER_TYPE( - key="options.ADD_LABELS.labels", type=type(value) - ) df = df.reindex(columns=label_keys + data_keys) self.label_keys = label_keys @@ -469,7 +452,7 @@ def add_labels_data_table( def value_mapping_data_table( self, - granularity: GRANULARITY = "MONTHLY", + granularity: str = "MONTHLY", start: str = None, end: str = None, vars: dict = None, @@ -479,45 +462,14 @@ def value_mapping_data_table( self.label_keys = list(data_table_vo.labels_info.keys()) self.data_keys = list(data_table_vo.data_info.keys()) - name = self.options["name"] field_type = self.options.get("field_type", "LABEL") - else_value = self.options.get("else", None) - - if condition := self.options.get("condition"): - if self.is_jinja_expression(condition): - condition, gv_type_map = self.change_global_variables(condition, vars) - condition = self.remove_jinja_braces(condition) - condition = self.change_expression_data_type(condition, gv_type_map) - - filtered_df = df.query(condition).copy() - else: - filtered_df = df.copy() - - filtered_df.loc[:, name] = else_value - if cases := self.options.get("cases", []): - for case in cases: - self._validate_case(case) - key = case["key"] - operator = case["operator"] - value = case["value"] - match = case["match"] - - if operator == "eq": - filtered_df.loc[filtered_df[key] == match, name] = value - elif operator == "regex": - filtered_df.loc[filtered_df[key].str.contains(match), name] = value + filtered_df = self.filter_data(df, vars) + filtered_df = self.apply_cases(filtered_df) df.loc[filtered_df.index, name] = filtered_df[name] - - unfiltered_index = df.index.difference(filtered_df.index) - if field_type == "LABEL": - df.loc[unfiltered_index, name] = "" - self.data_keys.append(name) - elif field_type == "DATA": - df.loc[unfiltered_index, name] = 0 - self.data_keys.append(name) + self.handle_unfiltered_data(df, filtered_df, name, field_type) self.df = df @@ -843,6 +795,96 @@ def _sort_and_filter_pivot_table(self, pivot_table: pd.DataFrame) -> pd.DataFram return pivot_table + @staticmethod + def validate_labels(labels: dict, df: pd.DataFrame) -> None: + if not labels: + raise ERROR_REQUIRED_PARAMETER(key="options.ADD_LABELS.labels") + + for label_key in labels.keys(): + if label_key in df.columns: + raise ERROR_INVALID_PARAMETER( + key="options.ADD_LABELS.labels", + reason=f"Duplicated key: {label_key}, columns={list(df.columns)}", + ) + + @staticmethod + def update_keys( + key: str, + value, + label_keys: list, + data_keys: list, + ) -> None: + if isinstance(value, str): + label_keys.append(key) + elif isinstance(value, (int, float)): + data_keys.append(key) + else: + raise ERROR_INVALID_PARAMETER_TYPE( + key="options.ADD_LABELS.labels", type=type(value) + ) + + def add_labels_to_dataframe( + self, + df: pd.DataFrame, + labels: dict, + label_keys: list, + data_keys: list, + ) -> None: + for key, value in labels.items(): + df[key] = value + self.update_keys(key, value, label_keys, data_keys) + + def filter_data(self, df: pd.DataFrame, vars: dict) -> pd.DataFrame: + condition = self.options.get("condition") + if not condition: + return df.copy() + + if self.is_jinja_expression(condition): + condition, gv_type_map = self.change_global_variables(condition, vars) + condition = self.remove_jinja_braces(condition) + condition = self.change_expression_data_type(condition, gv_type_map) + + return df.query(condition).copy() + + def apply_cases(self, filtered_df: pd.DataFrame) -> pd.DataFrame: + name = self.options["name"] + else_value = self.options.get("else", None) + cases = self.options.get("cases", []) + + filtered_df.loc[:, name] = else_value + + for case in cases: + self._validate_case(case) + key = case["key"] + operator = case["operator"] + value = case["value"] + match = case["match"] + + if operator == "eq": + filtered_df.loc[filtered_df[key] == match, name] = value + elif operator == "regex": + filtered_df.loc[ + filtered_df[key].str.contains(match, na=False), name + ] = value + + return filtered_df + + def handle_unfiltered_data( + self, + df: pd.DataFrame, + filtered_df: pd.DataFrame, + name: str, + field_type: str, + ): + unfiltered_index = df.index.difference(filtered_df.index) + + if field_type == "LABEL": + df.loc[unfiltered_index, name] = "" + self.label_keys.append(name) + elif field_type == "DATA": + df.loc[unfiltered_index, name] = 0 + self.data_keys.append(name) + def _apply_row_sorting( self, pivot_table: pd.DataFrame,