From 7a3c1134562e56626be127c07fb4ec10127052dd Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 01:50:46 -0600 Subject: [PATCH 01/25] Added reset function --- optimus/dataframe/extension.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 0a8d8ddf..8b077d3d 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -436,6 +436,12 @@ def debug(self): """ print(self.rdd.toDebugString().decode("ascii")) +@add_method(DataFrame) +def reset(self): + + df = self.set_meta("transformations.actions", {}) + Profiler.instance.output_columns = {} + return df @add_method(DataFrame) def send(self, name=None, infer=True, mismatch=None, stats=True): From 7ba0b9c477523de859efcd525e47ac6e62bc1eac Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 01:55:10 -0600 Subject: [PATCH 02/25] Fix cast when using colums param --- optimus/dataframe/columns.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 10b0e948..8853952f 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -337,9 +337,6 @@ def cast(input_cols=None, dtype=None, output_cols=None, columns=None): :return: Spark DataFrame """ - if dtype is None: - RaiseIt.value_error(dtype, "datatype") - _dtype = [] # Parse params if columns is None: From e7a356ecf02858ebd8a568c57546b43973b964ab Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 06:48:29 -0600 Subject: [PATCH 03/25] Bump version --- docs/source/conf.py | 2 +- optimus/helpers/functions.py | 4 ++-- optimus/version.py | 2 +- setup.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 921b28fd..7a0c1ecb 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -60,7 +60,7 @@ # The short X.Y version. version = '2.2' # The full version, including alpha/beta/rc tags. -release = "2.2.26" +release = "2.2.27" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/optimus/helpers/functions.py b/optimus/helpers/functions.py index 75e88c3b..15660260 100644 --- a/optimus/helpers/functions.py +++ b/optimus/helpers/functions.py @@ -297,8 +297,8 @@ def create_buckets(lower_bound, upper_bound, bins): buckets.append({"lower": low, "upper": high, "bucket": i}) low = high - # ensure that the upper bound is exactly the higher value. - # Because floating point calculation it can miss the upper bound in the final sum + # ensure that the upper bound is exactly the higher value. + # Because floating point calculation it can miss the upper bound in the final sum buckets[bins - 1]["upper"] = upper_bound return buckets diff --git a/optimus/version.py b/optimus/version.py index 360247b8..e4798d9d 100644 --- a/optimus/version.py +++ b/optimus/version.py @@ -5,5 +5,5 @@ def _safe_int(string): return string -__version__ = '2.2.26' +__version__ = '2.2.27' VERSION = tuple(_safe_int(x) for x in __version__.split('.')) diff --git a/setup.py b/setup.py index cf492401..38ab1d36 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ def readme(): author='Favio Vazquez and Argenis Leon', author_email='argenisleon@gmail.com', url='https://github.com/ironmussa/Optimus/', - download_url='https://github.com/ironmussa/Optimus/archive/2.2.26.tar.gz', + download_url='https://github.com/ironmussa/Optimus/archive/2.2.27.tar.gz', description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with ' 'pyspark.'), long_description=readme(), From 3878a12529231c38d23c6c12c912b8ca82128349 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 08:59:02 -0600 Subject: [PATCH 04/25] Added support for custom comm object --- optimus/bumblebee.py | 8 +++++--- optimus/optimus.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/optimus/bumblebee.py b/optimus/bumblebee.py index bdd785ac..dfc27330 100644 --- a/optimus/bumblebee.py +++ b/optimus/bumblebee.py @@ -28,18 +28,20 @@ class Comm: Send encrypted message to the Bumblebee """ - def __init__(self, queue_name=None): + def __init__(self, queue_name=None, key=None): # If queue_name was not given try lo load from file if not generate one if queue_name is None: self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4())) + else: + self.queue_name = queue_name + if key is None: # key is generated as byte convert to base64 so we can saved it in the config file key = Fernet.generate_key() self.key = save_config_key("bumblebee.ini", "DEFAULT", "Key", key.decode()) - else: - self.queue_name = queue_name + self.key = key keys_link = " here".format(FULL_DOMAIN=FULL_DOMAIN, SESSION=self.queue_name, KEY=self.key) diff --git a/optimus/optimus.py b/optimus/optimus.py index f1ca24c0..132f7362 100644 --- a/optimus/optimus.py +++ b/optimus/optimus.py @@ -81,8 +81,8 @@ def __init__(self, session=None, master="local[*]", app_name="optimus", checkpoi if comm is True: Comm.instance = Comm() - # else: - # Comm.instance = comm + else: + Comm.instance = comm if session is None: # Creating Spark Session From a7255d01059607892ae9443bbff743c0b9716155 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 09:49:01 -0600 Subject: [PATCH 05/25] string_to_index now accepts multiple columns --- optimus/dataframe/columns.py | 9 ++++++--- optimus/ml/feature.py | 18 ++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 8853952f..823df6b3 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -2117,17 +2117,20 @@ def join_all(_dfs): return join_all(combined) @add_attr(cols) - def string_to_index(input_cols, output_cols=None): + def string_to_index(input_cols=None, output_cols=None, columns=None): """ Encodes a string column of labels to a column of label indices :param input_cols: :param output_cols: + :param columns: :return: """ df = self - input_cols = parse_columns(df, input_cols) - # output_cols = get_output_cols(input_cols, output_cols) + if columns is None: + input_cols = parse_columns(df, input_cols) + else: + input_cols, output_cols = zip(*columns) df = ml_string_to_index(df, input_cols, output_cols) diff --git a/optimus/ml/feature.py b/optimus/ml/feature.py index df1e827d..481da052 100644 --- a/optimus/ml/feature.py +++ b/optimus/ml/feature.py @@ -28,26 +28,28 @@ def n_gram(df, input_col, n=2): return df_model, tfidf_model -def string_to_index(df, input_cols, output_col=None, **kargs): +def string_to_index(df, input_cols, output_cols=None, **kargs): """ Maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. :param df: Dataframe to be transformed :param input_cols: Columns to be indexed. - :param output_col:Column where the ouput is going to be saved + :param output_cols:Column where the ouput is going to be saved :return: Dataframe with indexed columns. """ - input_cols = parse_columns(df, input_cols) - if output_col is None: - output_col = name_col(input_cols, "index_to_string") + # input_cols = parse_columns(df, input_cols) + if output_cols is None: - indexers = [StringIndexer(inputCol=input_col, outputCol=output_col, **kargs).fit(df) for input_col - in list(set(input_cols))] + output_cols = [name_col(input_col, "index_to_string") for input_col in input_cols] + print(output_cols) + + indexers = [StringIndexer(inputCol=input_col, outputCol=output_col, **kargs).fit(df) for input_col, output_col + in zip(list(set(input_cols)), list(set(output_cols)))] pipeline = Pipeline(stages=indexers) df = pipeline.fit(df).transform(df) - + # df.show() return df From a2de19ecea1fbffb191324e0aa7cbd94ef68e9e8 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 16:02:05 -0600 Subject: [PATCH 06/25] Fix: Rename actions was firing a profling --- optimus/profiler/profiler.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 6b36c1a4..4f0c4739 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -247,9 +247,6 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT # So process the dataframe's metadata to be sure which columns need to be profiled is_cached = len(self.output_columns) > 0 actions = df.get_meta("transformations.actions") - # print(actions) - # are_actions = None - # actions = actions.get("actions") are_actions = actions is not None and len(actions) > 0 # Process actions to check if any column must be processed @@ -264,13 +261,13 @@ def match_actions_names(_actions): :return: """ - _actions = df.get_meta("transformations.actions") + _actions_json = df.get_meta("transformations.actions") modified = [] for action in _actions: - if _actions.get(action): + if _actions_json.get(action): # Check if was renamed - col = _actions.get(action) + col = _actions_json.get(action) if len(match_names(col)) == 0: _result = col else: @@ -325,6 +322,7 @@ def match_names(_col_names): # Actions applied to current columns modified_columns = match_actions_names(Actions.list()) + # print(modified_columns, new_columns) calculate_columns = modified_columns + new_columns # Remove duplicated. @@ -332,9 +330,10 @@ def match_names(_col_names): elif is_cached and not are_actions: calculate_columns = None - else: + elif not is_cached: calculate_columns = columns + # print ("calculate_columns",calculate_columns) # Get the stats for all the columns if stats is True: # Are there column to process? @@ -386,7 +385,6 @@ def match_names(_col_names): result = output_columns self.output_columns = output_columns - # print(result) df = df.set_meta("transformations.actions", {}) return col_names, result From aa89406dc408c441d375ab7516585bee834f370b Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 21:39:47 -0600 Subject: [PATCH 07/25] Now copy() is cached --- optimus/dataframe/columns.py | 4 +++- optimus/dataframe/extension.py | 36 ++++++++++++++++++++++++++++------ optimus/helpers/constants.py | 1 - optimus/profiler/profiler.py | 35 +++++++++++++++++++++++---------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 823df6b3..1b73e8d9 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -153,8 +153,10 @@ def copy(input_cols, output_cols=None, columns=None): columns = list(zip(input_cols, output_cols)) for input_col, output_col in columns: + current_meta = self.get_meta() df = df.withColumn(output_col, F.col(input_col)) - df = df.preserve_meta(self, Actions.COPY.value, output_col) + df = df.set_meta(value=current_meta) + df = df.copy_meta({input_col: output_col}) return df @add_attr(cols) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 8b077d3d..cdf803ea 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -436,13 +436,14 @@ def debug(self): """ print(self.rdd.toDebugString().decode("ascii")) + @add_method(DataFrame) def reset(self): - df = self.set_meta("transformations.actions", {}) Profiler.instance.output_columns = {} return df + @add_method(DataFrame) def send(self, name=None, infer=True, mismatch=None, stats=True): """ @@ -459,11 +460,11 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): df.set_name(name) columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, - approx_count=True, - sample=10000, - stats=stats, - format="json", - mismatch=mismatch) + approx_count=True, + sample=10000, + stats=stats, + format="json", + mismatch=mismatch) if Comm: Comm.instance.send(output) @@ -471,6 +472,29 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)") +@add_method(DataFrame) +def copy_meta(self, old_new_columns): + """ + Shortcut to add transformations to a dataframe + :param self: + :param old_new_columns: + :return: + """ + + key = "transformations.actions.copy" + + df = self + + copy_cols = df.get_meta(key) + if copy_cols is None: + copy_cols = {} + copy_cols.update(old_new_columns) + + df = df.set_meta(key, copy_cols) + + return df + + @add_method(DataFrame) def rename_meta(self, old_new_columns): """ diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index 8238fc0d..dd0ff66a 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -103,7 +103,6 @@ class Actions(Enum): NEST = "nest" UNNEST = "unnest" DROP_ROW = "drop_row" - COPY = "copy" @staticmethod def list(): diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 4f0c4739..7a00baf8 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -9,7 +9,7 @@ from optimus.audf import * from optimus.dataframe.plots.functions import plot_frequency, plot_missing_values, plot_hist -from optimus.helpers.check import is_column_a +from optimus.helpers.check import is_column_a, is_dict, is_list_of_one_element from optimus.helpers.columns import parse_columns from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg from optimus.helpers.constants import RELATIVE_ERROR, Actions @@ -253,6 +253,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT if is_cached and are_actions: drop = ["drop"] + copy = ["copy"] def match_actions_names(_actions): """ @@ -285,20 +286,30 @@ def match_names(_col_names): _renamed_columns = [] _actions = df.get_meta("transformations.actions") _rename = _actions.get("rename") + + def get_name(_col_name): + c = _actions["rename"].get(_col_name) + # The column has not been rename. Get the actual column name + if c is None: + c = _col_name + return c + if _rename: - for _col_name in _col_names: - # The column name has been changed. Get the new name - c = _actions["rename"].get(_col_name) - # The column has not been rename. Get the actual - if c is None: - c = _col_name - _renamed_columns.append(c) + # if a list + if is_list_of_one_element(_col_names): + for _col_name in _col_names: + # The column name has been changed. Get the new name + _renamed_columns.append(get_name(_col_name)) + # if a dict + if is_dict(_col_names): + for _col1, _col2 in _col_names.items(): + _renamed_columns.append({get_name(_col1):get_name(_col2)}) + else: _renamed_columns = _col_names return _renamed_columns # New columns - new_columns = [] current_col_names = df.cols.names() @@ -315,10 +326,14 @@ def match_names(_col_names): for k, v in actions["rename"].items(): profiler_columns[v] = profiler_columns.pop(k) - # # Drop Keys + # Drop Keys for col_names in match_actions_names(drop): profiler_columns.pop(col_names) + # Copy Keys + for source, target in df.get_meta("transformations.actions.copy").items(): + profiler_columns[target] = profiler_columns[source] + # Actions applied to current columns modified_columns = match_actions_names(Actions.list()) From 31296e52af167ea0f4a23e409c40f14127b7789d Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 6 Nov 2019 21:40:37 -0600 Subject: [PATCH 08/25] Docs for Pip install from git repo --- README_for_devs.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README_for_devs.md b/README_for_devs.md index 3330ebcc..58b721f4 100644 --- a/README_for_devs.md +++ b/README_for_devs.md @@ -81,3 +81,7 @@ This will prompt your test.pypi.org credentials Now install from test.pypi.org `!pip install --index-url https://test.pypi.org/simple optimuspyspark` + +### Installing from github + +` pip3 install --upgrade --no-deps --force-reinstall git+https://github.com/ironmussa/Optimus.git@develop` \ No newline at end of file From 127b0b9db73193e3a7513d2a7b8d15033251ed43 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Thu, 7 Nov 2019 08:53:34 -0600 Subject: [PATCH 09/25] Fix copy operation in the profiling --- optimus/profiler/profiler.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 7a00baf8..09bdf605 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -9,7 +9,7 @@ from optimus.audf import * from optimus.dataframe.plots.functions import plot_frequency, plot_missing_values, plot_hist -from optimus.helpers.check import is_column_a, is_dict, is_list_of_one_element +from optimus.helpers.check import is_column_a, is_dict, is_list_of_str from optimus.helpers.columns import parse_columns from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg from optimus.helpers.constants import RELATIVE_ERROR, Actions @@ -253,7 +253,6 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT if is_cached and are_actions: drop = ["drop"] - copy = ["copy"] def match_actions_names(_actions): """ @@ -269,15 +268,15 @@ def match_actions_names(_actions): if _actions_json.get(action): # Check if was renamed col = _actions_json.get(action) - if len(match_names(col)) == 0: + if len(match_renames(col)) == 0: _result = col else: - _result = match_names(col) + _result = match_renames(col) modified = modified + _result return modified - def match_names(_col_names): + def match_renames(_col_names): """ Get a list fo columns and return the renamed version. :param _col_names: @@ -288,7 +287,7 @@ def match_names(_col_names): _rename = _actions.get("rename") def get_name(_col_name): - c = _actions["rename"].get(_col_name) + c = _rename.get(_col_name) # The column has not been rename. Get the actual column name if c is None: c = _col_name @@ -296,14 +295,14 @@ def get_name(_col_name): if _rename: # if a list - if is_list_of_one_element(_col_names): + if is_list_of_str(_col_names): for _col_name in _col_names: # The column name has been changed. Get the new name _renamed_columns.append(get_name(_col_name)) # if a dict if is_dict(_col_names): for _col1, _col2 in _col_names.items(): - _renamed_columns.append({get_name(_col1):get_name(_col2)}) + _renamed_columns.append({get_name(_col1): get_name(_col2)}) else: _renamed_columns = _col_names @@ -313,7 +312,7 @@ def get_name(_col_name): new_columns = [] current_col_names = df.cols.names() - renamed_cols = match_names(df.get_meta("transformations.columns")) + renamed_cols = match_renames(df.get_meta("transformations.columns")) for current_col_name in current_col_names: if current_col_name not in renamed_cols: new_columns.append(current_col_name) @@ -331,13 +330,16 @@ def get_name(_col_name): profiler_columns.pop(col_names) # Copy Keys - for source, target in df.get_meta("transformations.actions.copy").items(): - profiler_columns[target] = profiler_columns[source] + copy_columns = df.get_meta("transformations.actions.copy") + if copy_columns is not None: + for source, target in copy_columns.items(): + profiler_columns[target] = profiler_columns[source] + # Check is a new column is a copied column + new_columns = list(set(new_columns) - set(copy_columns.values())) # Actions applied to current columns modified_columns = match_actions_names(Actions.list()) - # print(modified_columns, new_columns) calculate_columns = modified_columns + new_columns # Remove duplicated. @@ -391,6 +393,7 @@ def get_name(_col_name): assign(output_columns, "sample", sample, dict) df = df.set_meta(value={}) + print(df.cols.names()) df = df.columns_meta(df.cols.names()) col_names = output_columns["columns"].keys() From 970cc268fd86f62eccd1d9138c561260defd9289 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Thu, 7 Nov 2019 15:25:05 -0600 Subject: [PATCH 10/25] Fix profiling name on copy and rename --- optimus/profiler/profiler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 09bdf605..f5b58f9f 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -324,6 +324,7 @@ def get_name(_col_name): if rename: for k, v in actions["rename"].items(): profiler_columns[v] = profiler_columns.pop(k) + profiler_columns[v]["name"] = v # Drop Keys for col_names in match_actions_names(drop): @@ -333,7 +334,8 @@ def get_name(_col_name): copy_columns = df.get_meta("transformations.actions.copy") if copy_columns is not None: for source, target in copy_columns.items(): - profiler_columns[target] = profiler_columns[source] + profiler_columns[target] = profiler_columns[source].copy() + profiler_columns[target]["name"]= target # Check is a new column is a copied column new_columns = list(set(new_columns) - set(copy_columns.values())) @@ -393,7 +395,6 @@ def get_name(_col_name): assign(output_columns, "sample", sample, dict) df = df.set_meta(value={}) - print(df.cols.names()) df = df.columns_meta(df.cols.names()) col_names = output_columns["columns"].keys() From 9358f4da8c6c225685365c13c17991c6316e7c68 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Thu, 7 Nov 2019 18:12:33 -0600 Subject: [PATCH 11/25] Preserve profiler column order --- optimus/profiler/profiler.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index f5b58f9f..f1748d6e 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -1,5 +1,6 @@ import configparser import copy +from collections import OrderedDict import humanize import imgkit @@ -335,7 +336,7 @@ def get_name(_col_name): if copy_columns is not None: for source, target in copy_columns.items(): profiler_columns[target] = profiler_columns[source].copy() - profiler_columns[target]["name"]= target + profiler_columns[target]["name"] = target # Check is a new column is a copied column new_columns = list(set(new_columns) - set(copy_columns.values())) @@ -349,7 +350,8 @@ def get_name(_col_name): elif is_cached and not are_actions: calculate_columns = None - elif not is_cached: + # elif not is_cached: + else: calculate_columns = columns # print ("calculate_columns",calculate_columns) @@ -362,13 +364,16 @@ def get_name(_col_name): self.cols_count = cols_count = len(df.columns) output_columns = self.columns_stats(df, calculate_columns, buckets, infer, relative_error, approx_count, mismatch) - # Reset metadata # Update last profiling info - # if update_profiler: # Merge old and current profiling if is_cached: output_columns["columns"].update(self.output_columns["columns"]) + actual_columns = output_columns["columns"] + # Order columns + output_columns["columns"] = OrderedDict( + {_cols_name: actual_columns[_cols_name] for _cols_name in df.cols.names() if + _cols_name in list(actual_columns.keys())}) assign(output_columns, "name", df.get_name(), dict) assign(output_columns, "file_name", df.get_meta("file_name"), dict) @@ -483,6 +488,7 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL assign(columns_info, "columns." + col_name, col_info, dict) assign(col_info, "id", df.cols.get_meta(col_name, "id")) + return columns_info @staticmethod From b2fe7d35cc657eed597ab1583323e9c679b109b1 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Fri, 8 Nov 2019 09:56:26 -0600 Subject: [PATCH 12/25] Fix order output columns --- optimus/profiler/profiler.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index f1748d6e..e67e8bcd 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -369,11 +369,6 @@ def get_name(_col_name): # Merge old and current profiling if is_cached: output_columns["columns"].update(self.output_columns["columns"]) - actual_columns = output_columns["columns"] - # Order columns - output_columns["columns"] = OrderedDict( - {_cols_name: actual_columns[_cols_name] for _cols_name in df.cols.names() if - _cols_name in list(actual_columns.keys())}) assign(output_columns, "name", df.get_name(), dict) assign(output_columns, "file_name", df.get_meta("file_name"), dict) @@ -399,6 +394,12 @@ def get_name(_col_name): assign(output_columns, "sample", sample, dict) + actual_columns = output_columns["columns"] + # Order columns + output_columns["columns"] = dict(OrderedDict( + {_cols_name: actual_columns[_cols_name] for _cols_name in df.cols.names() if + _cols_name in list(actual_columns.keys())})) + df = df.set_meta(value={}) df = df.columns_meta(df.cols.names()) From 00b6ba29eb7d7496a87522d322fa76fd6b769e49 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Fri, 8 Nov 2019 13:03:40 -0600 Subject: [PATCH 13/25] Fixed values_to_cols() metadata --- optimus/dataframe/columns.py | 9 +++++++-- optimus/helpers/constants.py | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 1b73e8d9..5a1598e5 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -2113,10 +2113,15 @@ def join_all(_dfs): # names = before.cols.names(keys, invert=True) # print(names) + pivotDF = pivotDF.preserve_meta(self) df = pivotDF.toDF(*names).cols.fill_na(new_names, 0) - # df.table() + df = df.preserve_meta(self, Actions.VALUES_TO_COLS.value, new_names) + combined.append(df) - return join_all(combined) + + df = join_all(combined) + + return df @add_attr(cols) def string_to_index(input_cols=None, output_cols=None, columns=None): diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index dd0ff66a..abc624ce 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -103,6 +103,7 @@ class Actions(Enum): NEST = "nest" UNNEST = "unnest" DROP_ROW = "drop_row" + VALUES_TO_COLS = "values_to_cols" @staticmethod def list(): From 956727fb6e22103934671fedde0722949d720e01 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 13:50:16 -0600 Subject: [PATCH 14/25] Quality improvements --- optimus/dataframe/columns.py | 10 ++++++++-- optimus/dataframe/rows.py | 2 +- optimus/helpers/parser.py | 3 ++- optimus/ml/models.py | 10 +++++----- optimus/profiler/functions.py | 3 ++- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 5a1598e5..3cdd1d13 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -1620,11 +1620,17 @@ def hist(columns, buckets=20): result = agg_exprs(columns, hist_agg, self, buckets) # TODO: for some reason casting to int in the exprs do not work. Casting Here. A Spark bug? # Example - # Column < b'array(map(count, CAST(sum(CASE WHEN ((rank >= 7) AND (rank < 7.75)) THEN 1 ELSE 0 END) AS INT), lower, 7, upper, 7.75) AS `hist_agg_rank_0`, map(count, CAST(sum(CASE WHEN ((rank >= 7.75) AND (rank < 8.5)) THEN 1 ELSE 0 END) AS INT), lower, 7.75, upper, 8.5) AS `hist_agg_rank_1`, map(count, CAST(sum(CASE WHEN ((rank >= 8.5) AND (rank < 9.25)) THEN 1 ELSE 0 END) AS INT), lower, 8.5, upper, 9.25) AS `hist_agg_rank_2`, map(count, CAST(sum(CASE WHEN ((rank >= 9.25) AND (rank < 10)) THEN 1 ELSE 0 END) AS INT), lower, 9.25, upper, 10) AS `hist_agg_rank_3`) AS `histrank`' > + # Column < b'array(map(count, CAST(sum(CASE WHEN ((rank >= 7) AND (rank < 7.75)) THEN 1 ELSE 0 END) AS INT), + # lower, 7, upper, 7.75) AS `hist_agg_rank_0`, map(count, CAST(sum(CASE WHEN ((rank >= 7.75) AND (rank < 8.5)) + # THEN 1 ELSE 0 END) AS INT), lower, 7.75, upper, 8.5) AS `hist_agg_rank_1`, map(count, + # CAST(sum(CASE WHEN ((rank >= 8.5) AND (rank < 9.25)) THEN 1 ELSE 0 END) AS INT), lower, 8.5, upper, 9.25) + # AS `hist_agg_rank_2`, map(count, CAST(sum(CASE WHEN ((rank >= 9.25) AND (rank < 10)) + # THEN 1 ELSE 0 END) AS INT), lower, 9.25, upper, 10) AS `hist_agg_rank_3`) AS `histrank`' > return result - # TODO: In tests this code run faster than using agg_exprs when run over all the columns. Not when running over columns individually + # TODO: In tests this code run faster than using agg_exprs when run over all the columns. + # Not when running over columns individually # columns = parse_columns(self, columns) # df = self # for col_name in columns: diff --git a/optimus/dataframe/rows.py b/optimus/dataframe/rows.py index 2abe1de6..1ef340d6 100644 --- a/optimus/dataframe/rows.py +++ b/optimus/dataframe/rows.py @@ -12,7 +12,7 @@ from optimus.helpers.columns import parse_columns, validate_columns_names from optimus.helpers.constants import Actions from optimus.helpers.converter import one_list_to_val -from optimus.helpers.decorators import * +from optimus.helpers.decorators import add_attr from optimus.helpers.functions import append as append_df from optimus.helpers.raiseit import RaiseIt diff --git a/optimus/helpers/parser.py b/optimus/helpers/parser.py index 6c58d2f0..6e4c39df 100644 --- a/optimus/helpers/parser.py +++ b/optimus/helpers/parser.py @@ -48,7 +48,8 @@ def parse_col_names_funcs_to_keys(data): _col_name = k[len(temp_func_name):] if is_nan(v): logger.print( - "'{FUNCTION}' function in '{COL_NAME}' column is returning 'nan'. Is that what you expected?. Seems that '{COL_NAME}' has 'nan' values".format( + "'{FUNCTION}' function in '{COL_NAME}' column is returning 'nan'. Is that what you expected?. " + "Seems that '{COL_NAME}' has 'nan' values".format( FUNCTION=f, COL_NAME=_col_name)) # If the value is numeric only get 5 decimals diff --git a/optimus/ml/models.py b/optimus/ml/models.py index ed1fe613..dd901151 100644 --- a/optimus/ml/models.py +++ b/optimus/ml/models.py @@ -1,6 +1,6 @@ from pyspark.ml import feature, classification from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier -from pyspark.sql.functions import * +from pyspark.sql.functions import F from pysparkling import * from pysparkling.ml import H2OAutoML, H2ODeepLearning, H2OXGBoost, H2OGBM @@ -139,7 +139,7 @@ def h2o_automl(df, label, columns, **kwargs): model = automl.fit(df_va) df_raw = model.transform(df_va) - df_pred = df_raw.withColumn("prediction", when(df_raw.prediction_output["value"] > 0.5, 1.0).otherwise(0.0)) + df_pred = df_raw.withColumn("prediction", F.when(df_raw.prediction_output["value"] > 0.5, 1.0).otherwise(0.0)) return df_pred, model @@ -161,7 +161,7 @@ def h2o_deeplearning(df, label, columns, **kwargs): model = h2o_deeplearning.fit(df_va) df_raw = model.transform(df_va) - df_pred = df_raw.withColumn("prediction", when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) + df_pred = df_raw.withColumn("prediction", F.when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) return df_pred, model @@ -179,7 +179,7 @@ def h2o_xgboost(df, label, columns, **kwargs): model = h2o_xgboost.fit(df_va) df_raw = model.transform(df_va) - df_pred = df_raw.withColumn("prediction", when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) + df_pred = df_raw.withColumn("prediction", F.when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) return df_pred, model @@ -198,6 +198,6 @@ def h2o_gbm(df, label, columns, **kwargs): model = h2o_gbm.fit(df_va) df_raw = model.transform(df_va) - df_pred = df_raw.withColumn("prediction", when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) + df_pred = df_raw.withColumn("prediction", F.when(df_raw.prediction_output["p1"] > 0.5, 1.0).otherwise(0.0)) return df_pred, model diff --git a/optimus/profiler/functions.py b/optimus/profiler/functions.py index 3550c415..09927fe5 100644 --- a/optimus/profiler/functions.py +++ b/optimus/profiler/functions.py @@ -2,7 +2,8 @@ import math -from optimus.helpers.constants import * +from optimus.helpers.constants import SPARK_DTYPES_TO_PROFILER, ProfilerDataTypes, PROFILER_COLUMN_TYPES, \ + CONFIDENCE_LEVEL_CONSTANT from optimus.helpers.json import json_converter From 3d7e984f7f7ed4045467d399f6e98809c46e0fda Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 17:13:50 -0600 Subject: [PATCH 15/25] Add nullvalue param --- optimus/io/load.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/optimus/io/load.py b/optimus/io/load.py index fddc7e92..add4aadf 100644 --- a/optimus/io/load.py +++ b/optimus/io/load.py @@ -58,7 +58,7 @@ def tsv(path, header='true', infer_schema='true', charset="UTF-8", *args, **kwar return df @staticmethod - def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *args, **kwargs): + def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", null_value="None", *args, **kwargs): """ Return a dataframe from a csv file. It is the same read.csv Spark function with some predefined params @@ -68,6 +68,7 @@ def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *arg :param header: tell the function whether dataset has a header row. 'true' default. :param infer_schema: infers the input schema automatically from data. :param charset: Charset file encoding + :param null_value: value to convert the string to a None value It requires one extra pass over the data. 'true' default. :return dataFrame @@ -80,6 +81,7 @@ def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *arg .options(mode="DROPMALFORMED") .options(delimiter=sep) .options(inferSchema=infer_schema) + .options(nullValue=null_value) .option("charset", charset) .csv(file, *args, **kwargs)) From 1df746bc869768271483c3145fe528845ed93cc5 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 17:14:03 -0600 Subject: [PATCH 16/25] Fix imports --- optimus/ml/models.py | 2 +- optimus/profiler/profiler.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/optimus/ml/models.py b/optimus/ml/models.py index dd901151..ac2bbf6c 100644 --- a/optimus/ml/models.py +++ b/optimus/ml/models.py @@ -1,6 +1,6 @@ from pyspark.ml import feature, classification from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier -from pyspark.sql.functions import F +from pyspark.sql import functions as F from pysparkling import * from pysparkling.ml import H2OAutoML, H2ODeepLearning, H2OXGBoost, H2OGBM diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index e67e8bcd..7ecbd634 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -13,15 +13,14 @@ from optimus.helpers.check import is_column_a, is_dict, is_list_of_str from optimus.helpers.columns import parse_columns from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg -from optimus.helpers.constants import RELATIVE_ERROR, Actions +from optimus.helpers.constants import RELATIVE_ERROR, Actions, PYSPARK_NUMERIC_TYPES from optimus.helpers.decorators import time_it from optimus.helpers.functions import absolute_path from optimus.helpers.json import json_converter from optimus.helpers.logger import logger from optimus.helpers.output import print_html from optimus.helpers.raiseit import RaiseIt -from optimus.profiler.functions import fill_missing_col_types, \ - write_json, write_html, PYSPARK_NUMERIC_TYPES +from optimus.profiler.functions import fill_missing_col_types, write_json, write_html from optimus.profiler.templates.html import FOOTER, HEADER MAX_BUCKETS = 33 From d2653da58b0df6afedc51e9b4448a3c725829489 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 17:13:50 -0600 Subject: [PATCH 17/25] Add nullvalue param --- optimus/io/load.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/optimus/io/load.py b/optimus/io/load.py index fddc7e92..add4aadf 100644 --- a/optimus/io/load.py +++ b/optimus/io/load.py @@ -58,7 +58,7 @@ def tsv(path, header='true', infer_schema='true', charset="UTF-8", *args, **kwar return df @staticmethod - def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *args, **kwargs): + def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", null_value="None", *args, **kwargs): """ Return a dataframe from a csv file. It is the same read.csv Spark function with some predefined params @@ -68,6 +68,7 @@ def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *arg :param header: tell the function whether dataset has a header row. 'true' default. :param infer_schema: infers the input schema automatically from data. :param charset: Charset file encoding + :param null_value: value to convert the string to a None value It requires one extra pass over the data. 'true' default. :return dataFrame @@ -80,6 +81,7 @@ def csv(path, sep=',', header='true', infer_schema='true', charset="UTF-8", *arg .options(mode="DROPMALFORMED") .options(delimiter=sep) .options(inferSchema=infer_schema) + .options(nullValue=null_value) .option("charset", charset) .csv(file, *args, **kwargs)) From 38efa3131d33f51c0e996805e28c7cbe129ae3a9 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 17:34:51 -0600 Subject: [PATCH 18/25] Fix in returning html in ipython terminal --- optimus/dataframe/extension.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index cdf803ea..ad4772fa 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -399,10 +399,27 @@ def table_html(self, limit=10, columns=None, title=None, full=False, truncate=Tr return output +def isnotebook(): + """ + Detect you are in a notebook or in a terminal + :return: + """ + try: + shell = get_ipython().__class__.__name__ + if shell == 'ZMQInteractiveShell': + return True # Jupyter notebook or qtconsole + elif shell == 'TerminalInteractiveShell': + return False # Terminal running IPython + else: + return False # Other type (?) + except NameError: + return False # Probably standard Python interpreter + + @add_method(DataFrame) def table(self, limit=None, columns=None, title=None, truncate=True): try: - if __IPYTHON__ and DataFrame.output == "html": + if isnotebook() and DataFrame.output == "html": result = self.table_html(title=title, limit=limit, columns=columns, truncate=truncate) print_html(result) else: From e49ed90e0e00ffa05fa0818c2735a6ea913006f5 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 21:19:02 -0600 Subject: [PATCH 19/25] renamed create by set --- optimus/dataframe/columns.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 5a1598e5..d60d3d3a 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -261,6 +261,30 @@ def apply_by_dtypes(columns, func, func_return_type, args=None, func_type=None, when=fbdt(col_name, data_type)) return df + @add_attr(cols) + def set(output_col, value=None): + """ + Execute a hive expression. Also handle ints and list in columns + :param output_col: + :param value: numeric, list or hive expression + :return: + """ + df = self + + columns = parse_columns(self, output_col, accepts_missing_cols=True) + check_column_numbers(columns, 1) + + if is_list(value): + expr = F.array([F.lit(x) for x in value]) + elif is_numeric(value): + expr = F.lit(value) + elif value: + expr = F.expr(value) + else: + RaiseIt.value_error(value, ["numeric", "list", "hive expression"]) + + return df.withColumn(output_col, expr) + # TODO: Check if we must use * to select all the columns @add_attr(cols) @dispatch(object, object) @@ -514,11 +538,6 @@ def drop(columns=None, regex=None, data_type=None): return df - @add_attr(cols) - def create(output_col, action): - df = self - return df.withColumn(output_col, action) - @add_attr(cols) def create_exprs(columns, funcs, *args): """ From d32000ad2d37bf79815940d9ca7d90fbec538e73 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 9 Nov 2019 21:19:42 -0600 Subject: [PATCH 20/25] Minimal improvements --- optimus/dataframe/columns.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index d60d3d3a..0ca20f1a 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -28,7 +28,7 @@ from optimus.audf import abstract_udf as audf, filter_row_by_data_type as fbdt # Helpers from optimus.helpers.check import is_num_or_str, is_list, is_, is_tuple, is_list_of_dataframes, is_list_of_tuples, \ - is_function, is_one_element, is_type, is_int, is_str, has_, is_column_a, is_dataframe, is_list_of_str + is_function, is_one_element, is_type, is_int, is_str, has_, is_column_a, is_dataframe, is_list_of_str, is_numeric from optimus.helpers.columns import get_output_cols, parse_columns, check_column_numbers, validate_columns_names, \ name_col from optimus.helpers.columns_expression import match_nulls_strings, match_null, zeros_agg, hist_agg, count_na_agg, \ @@ -205,7 +205,7 @@ def func(value, args): :param func_return_type: function return type. This is required by UDF and Pandas UDF. :param args: Arguments to be passed to the function :param func_type: pandas_udf or udf. If none try to use pandas udf (Pyarrow needed) - :param when: A expression to better control when the function is going to be apllied + :param when: A expression to better control when the function is going to be applied :param filter_col_by_dtypes: Only apply the filter to specific type of value ,integer, float, string or bool :param skip_output_cols_processing: In some special cases we do not want apply() to construct the output columns. True or False @@ -1275,7 +1275,7 @@ def value_counts(columns): :return: """ columns = parse_columns(self, columns) - # check_column_numbers(columns, 1) + # .value(columns, 1) result = {} for col_name in columns: @@ -1291,7 +1291,7 @@ def unique(columns): """ columns = parse_columns(self, columns) - check_column_numbers(columns, "1") + # .value(columns, "1") result = {} for col_name in columns: @@ -1444,10 +1444,10 @@ def nest(input_cols, shape="string", separator="", output_col=None): :param output_col: :return: Spark DataFrame """ - + df = self + output_col = parse_columns(df, output_col) check_column_numbers(output_col, 1) - df = self if has_(input_cols, F.Column): # Transform non Column data to lit input_cols = [F.lit(col) if not is_(col, F.col) else col for col in input_cols] From 9d1111c8e9635691a2c625d61dc73951926bf1f0 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 10 Nov 2019 13:51:28 -0600 Subject: [PATCH 21/25] Fixed set function bug --- optimus/dataframe/columns.py | 12 +++++++++--- optimus/helpers/constants.py | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 0ca20f1a..a4c7019e 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -48,6 +48,9 @@ from optimus.profiler.functions import fill_missing_var_types, parse_profiler_dtypes ENGINE = "spark" +# Because the monkey patching and the need to call set a function we need to rename the standard python set. +# This is awful but the best option for the user. +python_set = set def cols(self): @@ -261,6 +264,7 @@ def apply_by_dtypes(columns, func, func_return_type, args=None, func_type=None, when=fbdt(col_name, data_type)) return df + # TODO: Maybe we could merge this with apply() @add_attr(cols) def set(output_col, value=None): """ @@ -278,12 +282,14 @@ def set(output_col, value=None): expr = F.array([F.lit(x) for x in value]) elif is_numeric(value): expr = F.lit(value) - elif value: + elif is_str(value): expr = F.expr(value) else: RaiseIt.value_error(value, ["numeric", "list", "hive expression"]) - return df.withColumn(output_col, expr) + df = df.withColumn(output_col, expr) + df = df.preserve_meta(self, Actions.SET.value, columns) + return df # TODO: Check if we must use * to select all the columns @add_attr(cols) @@ -1725,7 +1731,7 @@ def str_to_array(_value): return str_to_data_type(_value, (list, tuple)) def str_to_object(_value): - return str_to_data_type(_value, (dict, set)) + return str_to_data_type(_value, (dict, python_set)) def str_to_data_type(_value, _dtypes): """ diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index abc624ce..f7f41ecd 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -104,6 +104,7 @@ class Actions(Enum): UNNEST = "unnest" DROP_ROW = "drop_row" VALUES_TO_COLS = "values_to_cols" + SET = "set" @staticmethod def list(): From 71a4c0e03b5fb2ed4c668b1995c590fa1658a969 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 10 Nov 2019 13:51:48 -0600 Subject: [PATCH 22/25] Fix Comm check bug --- optimus/dataframe/extension.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index ad4772fa..5991c046 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -483,7 +483,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): format="json", mismatch=mismatch) - if Comm: + if Comm.instance: Comm.instance.send(output) else: raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)") From bbcb7ebc9fba49e739a6a2e7583bd5c2bd85c5a1 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 10 Nov 2019 13:52:00 -0600 Subject: [PATCH 23/25] Fix column detection bug --- optimus/helpers/columns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/helpers/columns.py b/optimus/helpers/columns.py index a842ba9c..90f76173 100644 --- a/optimus/helpers/columns.py +++ b/optimus/helpers/columns.py @@ -205,7 +205,7 @@ def check_column_numbers(columns, number=0): elif number == ">1": if not len(columns) > 1: RaiseIt.value_error(len(columns), ["more than 1"]) - elif len(columns) == number: + elif len(columns) != number: RaiseIt.value_error(count, "{} columns, {} needed".format(number, columns)) From 4193b875285d2f941956d99954052037967f52f6 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 10 Nov 2019 15:53:59 -0600 Subject: [PATCH 24/25] Fix nest --- optimus/dataframe/columns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index f2dde4e0..63b470a1 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -1451,7 +1451,7 @@ def nest(input_cols, shape="string", separator="", output_col=None): :return: Spark DataFrame """ df = self - output_col = parse_columns(df, output_col) + output_col = parse_columns(df, output_col, accepts_missing_cols=True) check_column_numbers(output_col, 1) if has_(input_cols, F.Column): From 65252518948b9b850c7c686afca196fd17864341 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 10 Nov 2019 16:21:36 -0600 Subject: [PATCH 25/25] Minimal Fixes --- optimus/dataframe/columns.py | 2 +- optimus/ml/feature.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 63b470a1..649d938f 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -1462,7 +1462,7 @@ def nest(input_cols, shape="string", separator="", output_col=None): if shape is "vector": input_cols = parse_columns(self, input_cols, filter_by_column_dtypes=PYSPARK_NUMERIC_TYPES) - + output_col = one_list_to_val(output_col) vector_assembler = VectorAssembler( inputCols=input_cols, outputCol=output_col) diff --git a/optimus/ml/feature.py b/optimus/ml/feature.py index 481da052..c8586867 100644 --- a/optimus/ml/feature.py +++ b/optimus/ml/feature.py @@ -38,18 +38,16 @@ def string_to_index(df, input_cols, output_cols=None, **kargs): :return: Dataframe with indexed columns. """ - # input_cols = parse_columns(df, input_cols) + input_cols = parse_columns(df, input_cols) if output_cols is None: - output_cols = [name_col(input_col, "index_to_string") for input_col in input_cols] - print(output_cols) indexers = [StringIndexer(inputCol=input_col, outputCol=output_col, **kargs).fit(df) for input_col, output_col in zip(list(set(input_cols)), list(set(output_cols)))] pipeline = Pipeline(stages=indexers) df = pipeline.fit(df).transform(df) - # df.show() + return df