From 494a3c1670e2efe208765a68e1fd989ee482979a Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 5 Nov 2019 09:05:22 -0600 Subject: [PATCH 1/4] Fix send() and to_json() functions --- optimus/dataframe/columns.py | 9 +++++++-- optimus/dataframe/extension.py | 4 ++-- optimus/helpers/constants.py | 1 + optimus/profiler/profiler.py | 12 ++++++------ 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 9dae1f13..9b78d3fe 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -1117,7 +1117,7 @@ def impute(input_cols, data_type="continuous", strategy="mean", output_cols=None Imputes missing data from specified columns using the mean or median. :param input_cols: list of columns to be analyze. :param output_cols: - :param data_type: continuous or categorical + :param data_type: "continuous" or "categorical" :param strategy: String that specifies the way of computing missing data. Can be "mean", "median" for continuous or "mode" for categorical columns :return: Dataframe object (DF with columns that has the imputed values). @@ -2125,7 +2125,12 @@ def join_all(_dfs): @add_attr(cols) def string_to_index(input_cols, output_cols=None): - + """ + Encodes a string column of labels to a column of label indices + :param input_cols: + :param output_cols: + :return: + """ df = self input_cols = parse_columns(df, input_cols) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 57c7249c..0a8d8ddf 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -452,7 +452,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): if name is not None: df.set_name(name) - result = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, + columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, approx_count=True, sample=10000, stats=stats, @@ -460,7 +460,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): mismatch=mismatch) if Comm: - Comm.instance.send(result) + Comm.instance.send(output) else: raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)") diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index 119ebbf6..8238fc0d 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -29,6 +29,7 @@ "array": "array", "null": "null" } + PYTHON_TYPES = {"string": str, "int": int, "float": float, "boolean": bool} PYSPARK_NUMERIC_TYPES = ["byte", "short", "big", "int", "double", "float"] diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index c7af4f30..6b36c1a4 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -217,10 +217,11 @@ def to_file(self, path=None, output="html"): RaiseIt.type_error(output, ["html", "json"]) def to_json(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, - sample=10000, stats=True, format="json", mismatch=None): - return self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error, - approx_count=approx_count, - sample=sample, stats=stats, format=format, mismatch=mismatch) + sample=10000, stats=True, mismatch=None): + columns, output = self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error, + approx_count=approx_count, + sample=sample, stats=stats, format="json", mismatch=mismatch) + return output def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, sample=10000, stats=True, format="dict", mismatch=None): @@ -383,9 +384,8 @@ def match_names(_col_names): result = json.dumps(output_columns, ignore_nan=True, default=json_converter) else: result = output_columns - # print(format) - self.output_columns = result + self.output_columns = output_columns # print(result) df = df.set_meta("transformations.actions", {}) From 0a7481f83e95cfd909b425d490767e564b9b382f Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 5 Nov 2019 09:06:47 -0600 Subject: [PATCH 2/4] Bump version --- docs/source/conf.py | 2 +- optimus/version.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 6cfe9451..921b28fd 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -60,7 +60,7 @@ # The short X.Y version. version = '2.2' # The full version, including alpha/beta/rc tags. -release = "2.2.25" +release = "2.2.26" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/optimus/version.py b/optimus/version.py index 91c3e216..360247b8 100644 --- a/optimus/version.py +++ b/optimus/version.py @@ -5,5 +5,5 @@ def _safe_int(string): return string -__version__ = '2.2.25' +__version__ = '2.2.26' VERSION = tuple(_safe_int(x) for x in __version__.split('.')) diff --git a/setup.py b/setup.py index 0c407691..cf492401 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ def readme(): author='Favio Vazquez and Argenis Leon', author_email='argenisleon@gmail.com', url='https://github.com/ironmussa/Optimus/', - download_url='https://github.com/ironmussa/Optimus/archive/2.2.25.tar.gz', + download_url='https://github.com/ironmussa/Optimus/archive/2.2.26.tar.gz', description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with ' 'pyspark.'), long_description=readme(), From ff4deb2eec59e500a56b2275c5d558f53176ada9 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 5 Nov 2019 09:24:29 -0600 Subject: [PATCH 3/4] Remove prints --- optimus/dataframe/columns.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 9b78d3fe..f187ff4e 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -1500,7 +1500,6 @@ def _final_columns(_index, _splits, _output_col): _index = [i - 1 for i in _index] actual_index = _index - print("actual_index",actual_index) # Create final output columns if is_tuple(_output_col): @@ -1527,9 +1526,7 @@ def _final_columns(_index, _splits, _output_col): splits = format_dict(df.agg(F.max(F.size(input_col))).to_dict()) expr = F.col(input_col) - print(index, splits, output_col) final_columns = _final_columns(index, splits, output_col) - print("final_columns", final_columns) for i, col_name in final_columns: df = df.withColumn(col_name, expr.getItem(i)) @@ -1545,7 +1542,6 @@ def _final_columns(_index, _splits, _output_col): expr = F.split(F.col(input_col), separator) final_columns = _final_columns(index, splits, output_col) for i, col_name in final_columns: - print(i, col_name) df = df.withColumn(col_name, expr.getItem(i)) # Vector From 05a105b092905ff371940e8d61a1351bbced190c Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 5 Nov 2019 09:25:26 -0600 Subject: [PATCH 4/4] Now bucketizer accept ints in splits param --- optimus/dataframe/columns.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index f187ff4e..10b0e948 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -2141,12 +2141,16 @@ def bucketizer(input_cols, splits, output_cols=None): """ Bucketize multiples columns at the same time. :param input_cols: - :param splits: Dict of splits. You can use create_buckets() to make it + :param splits: Dict of splits or ints. You can use create_buckets() to make it :param output_cols: :return: """ df = self + if is_int(splits): + min_max = df.cols.range(input_cols)[input_cols]["range"] + splits = create_buckets(min_max["min"], min_max["max"], splits) + def _bucketizer(col_name, args): """ Create a column expression that create buckets in a range of values