Skip to content

Commit

Permalink
Merge pull request #749 from ironmussa/develop
Browse files Browse the repository at this point in the history
Profiler Improvements
  • Loading branch information
argenisleon authored Nov 10, 2019
2 parents 491dc78 + 6525251 commit 46edbf1
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 85 deletions.
4 changes: 4 additions & 0 deletions README_for_devs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ This will prompt your test.pypi.org credentials
Now install from test.pypi.org

`!pip install --index-url https://test.pypi.org/simple optimuspyspark`

### Installing from github

` pip3 install --upgrade --no-deps --force-reinstall git+https://github.com/ironmussa/Optimus.git@develop`
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
# The short X.Y version.
version = '2.2'
# The full version, including alpha/beta/rc tags.
release = "2.2.26"
release = "2.2.27"

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
8 changes: 5 additions & 3 deletions optimus/bumblebee.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ class Comm:
Send encrypted message to the Bumblebee
"""

def __init__(self, queue_name=None):
def __init__(self, queue_name=None, key=None):

# If queue_name was not given try lo load from file if not generate one
if queue_name is None:
self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4()))
else:
self.queue_name = queue_name

if key is None:
# key is generated as byte convert to base64 so we can saved it in the config file
key = Fernet.generate_key()
self.key = save_config_key("bumblebee.ini", "DEFAULT", "Key", key.decode())

else:
self.queue_name = queue_name
self.key = key

keys_link = "<a href ='{FULL_DOMAIN}'> here</a>".format(FULL_DOMAIN=FULL_DOMAIN,
SESSION=self.queue_name, KEY=self.key)
Expand Down
86 changes: 62 additions & 24 deletions optimus/dataframe/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from optimus.audf import abstract_udf as audf, filter_row_by_data_type as fbdt
# Helpers
from optimus.helpers.check import is_num_or_str, is_list, is_, is_tuple, is_list_of_dataframes, is_list_of_tuples, \
is_function, is_one_element, is_type, is_int, is_str, has_, is_column_a, is_dataframe, is_list_of_str
is_function, is_one_element, is_type, is_int, is_str, has_, is_column_a, is_dataframe, is_list_of_str, is_numeric
from optimus.helpers.columns import get_output_cols, parse_columns, check_column_numbers, validate_columns_names, \
name_col
from optimus.helpers.columns_expression import match_nulls_strings, match_null, zeros_agg, hist_agg, count_na_agg, \
Expand All @@ -48,6 +48,9 @@
from optimus.profiler.functions import fill_missing_var_types, parse_profiler_dtypes

ENGINE = "spark"
# Because the monkey patching and the need to call set a function we need to rename the standard python set.
# This is awful but the best option for the user.
python_set = set


def cols(self):
Expand Down Expand Up @@ -153,8 +156,10 @@ def copy(input_cols, output_cols=None, columns=None):
columns = list(zip(input_cols, output_cols))

for input_col, output_col in columns:
current_meta = self.get_meta()
df = df.withColumn(output_col, F.col(input_col))
df = df.preserve_meta(self, Actions.COPY.value, output_col)
df = df.set_meta(value=current_meta)
df = df.copy_meta({input_col: output_col})
return df

@add_attr(cols)
Expand Down Expand Up @@ -203,7 +208,7 @@ def func(value, args):
:param func_return_type: function return type. This is required by UDF and Pandas UDF.
:param args: Arguments to be passed to the function
:param func_type: pandas_udf or udf. If none try to use pandas udf (Pyarrow needed)
:param when: A expression to better control when the function is going to be apllied
:param when: A expression to better control when the function is going to be applied
:param filter_col_by_dtypes: Only apply the filter to specific type of value ,integer, float, string or bool
:param skip_output_cols_processing: In some special cases we do not want apply() to construct the output columns.
True or False
Expand Down Expand Up @@ -259,6 +264,33 @@ def apply_by_dtypes(columns, func, func_return_type, args=None, func_type=None,
when=fbdt(col_name, data_type))
return df

# TODO: Maybe we could merge this with apply()
@add_attr(cols)
def set(output_col, value=None):
"""
Execute a hive expression. Also handle ints and list in columns
:param output_col:
:param value: numeric, list or hive expression
:return:
"""
df = self

columns = parse_columns(self, output_col, accepts_missing_cols=True)
check_column_numbers(columns, 1)

if is_list(value):
expr = F.array([F.lit(x) for x in value])
elif is_numeric(value):
expr = F.lit(value)
elif is_str(value):
expr = F.expr(value)
else:
RaiseIt.value_error(value, ["numeric", "list", "hive expression"])

df = df.withColumn(output_col, expr)
df = df.preserve_meta(self, Actions.SET.value, columns)
return df

# TODO: Check if we must use * to select all the columns
@add_attr(cols)
@dispatch(object, object)
Expand Down Expand Up @@ -337,9 +369,6 @@ def cast(input_cols=None, dtype=None, output_cols=None, columns=None):
:return: Spark DataFrame
"""

if dtype is None:
RaiseIt.value_error(dtype, "datatype")

_dtype = []
# Parse params
if columns is None:
Expand Down Expand Up @@ -515,11 +544,6 @@ def drop(columns=None, regex=None, data_type=None):

return df

@add_attr(cols)
def create(output_col, action):
df = self
return df.withColumn(output_col, action)

@add_attr(cols)
def create_exprs(columns, funcs, *args):
"""
Expand Down Expand Up @@ -1257,7 +1281,7 @@ def value_counts(columns):
:return:
"""
columns = parse_columns(self, columns)
# check_column_numbers(columns, 1)
# .value(columns, 1)

result = {}
for col_name in columns:
Expand All @@ -1273,7 +1297,7 @@ def unique(columns):
"""
columns = parse_columns(self, columns)

check_column_numbers(columns, "1")
# .value(columns, "1")

result = {}
for col_name in columns:
Expand Down Expand Up @@ -1426,10 +1450,10 @@ def nest(input_cols, shape="string", separator="", output_col=None):
:param output_col:
:return: Spark DataFrame
"""

df = self
output_col = parse_columns(df, output_col, accepts_missing_cols=True)
check_column_numbers(output_col, 1)

df = self
if has_(input_cols, F.Column):
# Transform non Column data to lit
input_cols = [F.lit(col) if not is_(col, F.col) else col for col in input_cols]
Expand All @@ -1438,7 +1462,7 @@ def nest(input_cols, shape="string", separator="", output_col=None):

if shape is "vector":
input_cols = parse_columns(self, input_cols, filter_by_column_dtypes=PYSPARK_NUMERIC_TYPES)

output_col = one_list_to_val(output_col)
vector_assembler = VectorAssembler(
inputCols=input_cols,
outputCol=output_col)
Expand Down Expand Up @@ -1621,11 +1645,17 @@ def hist(columns, buckets=20):
result = agg_exprs(columns, hist_agg, self, buckets)
# TODO: for some reason casting to int in the exprs do not work. Casting Here. A Spark bug?
# Example
# Column < b'array(map(count, CAST(sum(CASE WHEN ((rank >= 7) AND (rank < 7.75)) THEN 1 ELSE 0 END) AS INT), lower, 7, upper, 7.75) AS `hist_agg_rank_0`, map(count, CAST(sum(CASE WHEN ((rank >= 7.75) AND (rank < 8.5)) THEN 1 ELSE 0 END) AS INT), lower, 7.75, upper, 8.5) AS `hist_agg_rank_1`, map(count, CAST(sum(CASE WHEN ((rank >= 8.5) AND (rank < 9.25)) THEN 1 ELSE 0 END) AS INT), lower, 8.5, upper, 9.25) AS `hist_agg_rank_2`, map(count, CAST(sum(CASE WHEN ((rank >= 9.25) AND (rank < 10)) THEN 1 ELSE 0 END) AS INT), lower, 9.25, upper, 10) AS `hist_agg_rank_3`) AS `histrank`' >
# Column < b'array(map(count, CAST(sum(CASE WHEN ((rank >= 7) AND (rank < 7.75)) THEN 1 ELSE 0 END) AS INT),
# lower, 7, upper, 7.75) AS `hist_agg_rank_0`, map(count, CAST(sum(CASE WHEN ((rank >= 7.75) AND (rank < 8.5))
# THEN 1 ELSE 0 END) AS INT), lower, 7.75, upper, 8.5) AS `hist_agg_rank_1`, map(count,
# CAST(sum(CASE WHEN ((rank >= 8.5) AND (rank < 9.25)) THEN 1 ELSE 0 END) AS INT), lower, 8.5, upper, 9.25)
# AS `hist_agg_rank_2`, map(count, CAST(sum(CASE WHEN ((rank >= 9.25) AND (rank < 10))
# THEN 1 ELSE 0 END) AS INT), lower, 9.25, upper, 10) AS `hist_agg_rank_3`) AS `histrank`' >

return result

# TODO: In tests this code run faster than using agg_exprs when run over all the columns. Not when running over columns individually
# TODO: In tests this code run faster than using agg_exprs when run over all the columns.
# Not when running over columns individually
# columns = parse_columns(self, columns)
# df = self
# for col_name in columns:
Expand Down Expand Up @@ -1707,7 +1737,7 @@ def str_to_array(_value):
return str_to_data_type(_value, (list, tuple))

def str_to_object(_value):
return str_to_data_type(_value, (dict, set))
return str_to_data_type(_value, (dict, python_set))

def str_to_data_type(_value, _dtypes):
"""
Expand Down Expand Up @@ -2114,23 +2144,31 @@ def join_all(_dfs):

# names = before.cols.names(keys, invert=True)
# print(names)
pivotDF = pivotDF.preserve_meta(self)
df = pivotDF.toDF(*names).cols.fill_na(new_names, 0)
# df.table()
df = df.preserve_meta(self, Actions.VALUES_TO_COLS.value, new_names)

combined.append(df)
return join_all(combined)

df = join_all(combined)

return df

@add_attr(cols)
def string_to_index(input_cols, output_cols=None):
def string_to_index(input_cols=None, output_cols=None, columns=None):
"""
Encodes a string column of labels to a column of label indices
:param input_cols:
:param output_cols:
:param columns:
:return:
"""
df = self

input_cols = parse_columns(df, input_cols)
# output_cols = get_output_cols(input_cols, output_cols)
if columns is None:
input_cols = parse_columns(df, input_cols)
else:
input_cols, output_cols = zip(*columns)

df = ml_string_to_index(df, input_cols, output_cols)

Expand Down
61 changes: 54 additions & 7 deletions optimus/dataframe/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,27 @@ def table_html(self, limit=10, columns=None, title=None, full=False, truncate=Tr
return output


def isnotebook():
"""
Detect you are in a notebook or in a terminal
:return:
"""
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
return True # Jupyter notebook or qtconsole
elif shell == 'TerminalInteractiveShell':
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter


@add_method(DataFrame)
def table(self, limit=None, columns=None, title=None, truncate=True):
try:
if __IPYTHON__ and DataFrame.output == "html":
if isnotebook() and DataFrame.output == "html":
result = self.table_html(title=title, limit=limit, columns=columns, truncate=truncate)
print_html(result)
else:
Expand Down Expand Up @@ -437,6 +454,13 @@ def debug(self):
print(self.rdd.toDebugString().decode("ascii"))


@add_method(DataFrame)
def reset(self):
df = self.set_meta("transformations.actions", {})
Profiler.instance.output_columns = {}
return df


@add_method(DataFrame)
def send(self, name=None, infer=True, mismatch=None, stats=True):
"""
Expand All @@ -453,18 +477,41 @@ def send(self, name=None, infer=True, mismatch=None, stats=True):
df.set_name(name)

columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)

if Comm:
if Comm.instance:
Comm.instance.send(output)
else:
raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)")


@add_method(DataFrame)
def copy_meta(self, old_new_columns):
"""
Shortcut to add transformations to a dataframe
:param self:
:param old_new_columns:
:return:
"""

key = "transformations.actions.copy"

df = self

copy_cols = df.get_meta(key)
if copy_cols is None:
copy_cols = {}
copy_cols.update(old_new_columns)

df = df.set_meta(key, copy_cols)

return df


@add_method(DataFrame)
def rename_meta(self, old_new_columns):
"""
Expand Down
2 changes: 1 addition & 1 deletion optimus/dataframe/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from optimus.helpers.columns import parse_columns, validate_columns_names
from optimus.helpers.constants import Actions
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.decorators import *
from optimus.helpers.decorators import add_attr
from optimus.helpers.functions import append as append_df
from optimus.helpers.raiseit import RaiseIt

Expand Down
2 changes: 1 addition & 1 deletion optimus/helpers/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def check_column_numbers(columns, number=0):
elif number == ">1":
if not len(columns) > 1:
RaiseIt.value_error(len(columns), ["more than 1"])
elif len(columns) == number:
elif len(columns) != number:
RaiseIt.value_error(count, "{} columns, {} needed".format(number, columns))


Expand Down
3 changes: 2 additions & 1 deletion optimus/helpers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class Actions(Enum):
NEST = "nest"
UNNEST = "unnest"
DROP_ROW = "drop_row"
COPY = "copy"
VALUES_TO_COLS = "values_to_cols"
SET = "set"

@staticmethod
def list():
Expand Down
4 changes: 2 additions & 2 deletions optimus/helpers/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def create_buckets(lower_bound, upper_bound, bins):
buckets.append({"lower": low, "upper": high, "bucket": i})
low = high

# ensure that the upper bound is exactly the higher value.
# Because floating point calculation it can miss the upper bound in the final sum
# ensure that the upper bound is exactly the higher value.
# Because floating point calculation it can miss the upper bound in the final sum

buckets[bins - 1]["upper"] = upper_bound
return buckets
Expand Down
3 changes: 2 additions & 1 deletion optimus/helpers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def parse_col_names_funcs_to_keys(data):
_col_name = k[len(temp_func_name):]
if is_nan(v):
logger.print(
"'{FUNCTION}' function in '{COL_NAME}' column is returning 'nan'. Is that what you expected?. Seems that '{COL_NAME}' has 'nan' values".format(
"'{FUNCTION}' function in '{COL_NAME}' column is returning 'nan'. Is that what you expected?. "
"Seems that '{COL_NAME}' has 'nan' values".format(
FUNCTION=f,
COL_NAME=_col_name))
# If the value is numeric only get 5 decimals
Expand Down
Loading

0 comments on commit 46edbf1

Please sign in to comment.