diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 823df6b3..1b73e8d9 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -153,8 +153,10 @@ def copy(input_cols, output_cols=None, columns=None): columns = list(zip(input_cols, output_cols)) for input_col, output_col in columns: + current_meta = self.get_meta() df = df.withColumn(output_col, F.col(input_col)) - df = df.preserve_meta(self, Actions.COPY.value, output_col) + df = df.set_meta(value=current_meta) + df = df.copy_meta({input_col: output_col}) return df @add_attr(cols) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 8b077d3d..cdf803ea 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -436,13 +436,14 @@ def debug(self): """ print(self.rdd.toDebugString().decode("ascii")) + @add_method(DataFrame) def reset(self): - df = self.set_meta("transformations.actions", {}) Profiler.instance.output_columns = {} return df + @add_method(DataFrame) def send(self, name=None, infer=True, mismatch=None, stats=True): """ @@ -459,11 +460,11 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): df.set_name(name) columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, - approx_count=True, - sample=10000, - stats=stats, - format="json", - mismatch=mismatch) + approx_count=True, + sample=10000, + stats=stats, + format="json", + mismatch=mismatch) if Comm: Comm.instance.send(output) @@ -471,6 +472,29 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)") +@add_method(DataFrame) +def copy_meta(self, old_new_columns): + """ + Shortcut to add transformations to a dataframe + :param self: + :param old_new_columns: + :return: + """ + + key = "transformations.actions.copy" + + df = self + + copy_cols = df.get_meta(key) + if copy_cols is None: + copy_cols = {} + copy_cols.update(old_new_columns) + + df = df.set_meta(key, copy_cols) + + return df + + @add_method(DataFrame) def rename_meta(self, old_new_columns): """ diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index 8238fc0d..dd0ff66a 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -103,7 +103,6 @@ class Actions(Enum): NEST = "nest" UNNEST = "unnest" DROP_ROW = "drop_row" - COPY = "copy" @staticmethod def list(): diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 4f0c4739..7a00baf8 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -9,7 +9,7 @@ from optimus.audf import * from optimus.dataframe.plots.functions import plot_frequency, plot_missing_values, plot_hist -from optimus.helpers.check import is_column_a +from optimus.helpers.check import is_column_a, is_dict, is_list_of_one_element from optimus.helpers.columns import parse_columns from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg from optimus.helpers.constants import RELATIVE_ERROR, Actions @@ -253,6 +253,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT if is_cached and are_actions: drop = ["drop"] + copy = ["copy"] def match_actions_names(_actions): """ @@ -285,20 +286,30 @@ def match_names(_col_names): _renamed_columns = [] _actions = df.get_meta("transformations.actions") _rename = _actions.get("rename") + + def get_name(_col_name): + c = _actions["rename"].get(_col_name) + # The column has not been rename. Get the actual column name + if c is None: + c = _col_name + return c + if _rename: - for _col_name in _col_names: - # The column name has been changed. Get the new name - c = _actions["rename"].get(_col_name) - # The column has not been rename. Get the actual - if c is None: - c = _col_name - _renamed_columns.append(c) + # if a list + if is_list_of_one_element(_col_names): + for _col_name in _col_names: + # The column name has been changed. Get the new name + _renamed_columns.append(get_name(_col_name)) + # if a dict + if is_dict(_col_names): + for _col1, _col2 in _col_names.items(): + _renamed_columns.append({get_name(_col1):get_name(_col2)}) + else: _renamed_columns = _col_names return _renamed_columns # New columns - new_columns = [] current_col_names = df.cols.names() @@ -315,10 +326,14 @@ def match_names(_col_names): for k, v in actions["rename"].items(): profiler_columns[v] = profiler_columns.pop(k) - # # Drop Keys + # Drop Keys for col_names in match_actions_names(drop): profiler_columns.pop(col_names) + # Copy Keys + for source, target in df.get_meta("transformations.actions.copy").items(): + profiler_columns[target] = profiler_columns[source] + # Actions applied to current columns modified_columns = match_actions_names(Actions.list())