Skip to content

Commit

Permalink
Now copy() is cached
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Nov 7, 2019
1 parent a2de19e commit aa89406
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
4 changes: 3 additions & 1 deletion optimus/dataframe/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ def copy(input_cols, output_cols=None, columns=None):
columns = list(zip(input_cols, output_cols))

for input_col, output_col in columns:
current_meta = self.get_meta()
df = df.withColumn(output_col, F.col(input_col))
df = df.preserve_meta(self, Actions.COPY.value, output_col)
df = df.set_meta(value=current_meta)
df = df.copy_meta({input_col: output_col})
return df

@add_attr(cols)
Expand Down
36 changes: 30 additions & 6 deletions optimus/dataframe/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,14 @@ def debug(self):
"""
print(self.rdd.toDebugString().decode("ascii"))


@add_method(DataFrame)
def reset(self):

df = self.set_meta("transformations.actions", {})
Profiler.instance.output_columns = {}
return df


@add_method(DataFrame)
def send(self, name=None, infer=True, mismatch=None, stats=True):
"""
Expand All @@ -459,18 +460,41 @@ def send(self, name=None, infer=True, mismatch=None, stats=True):
df.set_name(name)

columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)

if Comm:
Comm.instance.send(output)
else:
raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)")


@add_method(DataFrame)
def copy_meta(self, old_new_columns):
"""
Shortcut to add transformations to a dataframe
:param self:
:param old_new_columns:
:return:
"""

key = "transformations.actions.copy"

df = self

copy_cols = df.get_meta(key)
if copy_cols is None:
copy_cols = {}
copy_cols.update(old_new_columns)

df = df.set_meta(key, copy_cols)

return df


@add_method(DataFrame)
def rename_meta(self, old_new_columns):
"""
Expand Down
1 change: 0 additions & 1 deletion optimus/helpers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class Actions(Enum):
NEST = "nest"
UNNEST = "unnest"
DROP_ROW = "drop_row"
COPY = "copy"

@staticmethod
def list():
Expand Down
35 changes: 25 additions & 10 deletions optimus/profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from optimus.audf import *
from optimus.dataframe.plots.functions import plot_frequency, plot_missing_values, plot_hist
from optimus.helpers.check import is_column_a
from optimus.helpers.check import is_column_a, is_dict, is_list_of_one_element
from optimus.helpers.columns import parse_columns
from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg
from optimus.helpers.constants import RELATIVE_ERROR, Actions
Expand Down Expand Up @@ -253,6 +253,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
if is_cached and are_actions:

drop = ["drop"]
copy = ["copy"]

def match_actions_names(_actions):
"""
Expand Down Expand Up @@ -285,20 +286,30 @@ def match_names(_col_names):
_renamed_columns = []
_actions = df.get_meta("transformations.actions")
_rename = _actions.get("rename")

def get_name(_col_name):
c = _actions["rename"].get(_col_name)
# The column has not been rename. Get the actual column name
if c is None:
c = _col_name
return c

if _rename:
for _col_name in _col_names:
# The column name has been changed. Get the new name
c = _actions["rename"].get(_col_name)
# The column has not been rename. Get the actual
if c is None:
c = _col_name
_renamed_columns.append(c)
# if a list
if is_list_of_one_element(_col_names):
for _col_name in _col_names:
# The column name has been changed. Get the new name
_renamed_columns.append(get_name(_col_name))
# if a dict
if is_dict(_col_names):
for _col1, _col2 in _col_names.items():
_renamed_columns.append({get_name(_col1):get_name(_col2)})

else:
_renamed_columns = _col_names
return _renamed_columns

# New columns

new_columns = []

current_col_names = df.cols.names()
Expand All @@ -315,10 +326,14 @@ def match_names(_col_names):
for k, v in actions["rename"].items():
profiler_columns[v] = profiler_columns.pop(k)

# # Drop Keys
# Drop Keys
for col_names in match_actions_names(drop):
profiler_columns.pop(col_names)

# Copy Keys
for source, target in df.get_meta("transformations.actions.copy").items():
profiler_columns[target] = profiler_columns[source]

# Actions applied to current columns

modified_columns = match_actions_names(Actions.list())
Expand Down

0 comments on commit aa89406

Please sign in to comment.