Skip to content

Commit

Permalink
Moved count_by_dtypes processing outside the fucntion
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 1, 2019
1 parent e079dd3 commit 8a6db64
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 35 deletions.
15 changes: 6 additions & 9 deletions optimus/parse.py → infer.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
# This file need to be send to the cluster via .addpyfile to handle the pickle problem
# This is outside the optimus folder on purpose because it cause problem importing optimus when using de udf.

import re
from ast import literal_eval

import fastnumbers
from dateutil.parser import parse as dparse


class p(object):
def exec(self, df, columns):
_count = (df.select(columns).rdd
.flatMap(lambda x: x.asDict().items())
.map(lambda x: self.parse(x, infer, dtypes, str_funcs, int_funcs))
.reduceByKey(lambda a, b: a + b))
return _count

def parse(self, value, infer, dtypes, str_funcs, int_funcs, mismatch):
class Infer(object):
@staticmethod
def parse(value, infer, dtypes, str_funcs, int_funcs, mismatch):
"""
:param value:
Expand Down
41 changes: 18 additions & 23 deletions optimus/dataframe/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from optimus.helpers.raiseit import RaiseIt
from optimus.ml.encoding import string_to_index as ml_string_to_index
from optimus.profiler.functions import fill_missing_var_types, parse_profiler_dtypes
from infer import Infer

ENGINE = "spark"
# Because the monkey patching and the need to call set a function we need to rename the standard python set.
Expand Down Expand Up @@ -1739,45 +1740,39 @@ def count_by_dtypes(columns, infer=False, str_funcs=None, int_funcs=None, mismat
:param mismatch: a dict with column names and pattern to check. Pattern can be a predefined or a regex
:return:
"""
from optimus.parse import p

columns = parse_columns(self, columns)

df = self
dtypes = df.cols.dtypes()

_count = p().exec(df, columns)

# if mismatch:
# _count = (df.select(columns).rdd
# .flatMap(lambda x: x.asDict().items())
# .map(lambda x: p.parse(x, infer, dtypes, str_funcs, int_funcs, mismatch))
# .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
# )
# else:
# _count = (df.select(columns).rdd
# .flatMap(lambda x: x.asDict().items())
# .map(lambda x: p.parse(x, infer, dtypes, str_funcs, int_funcs))
# .reduceByKey(lambda a, b: a + b))

result = {}
columns = parse_columns(df, columns)
columns_dtypes = df.cols.dtypes()

if mismatch:
m = lambda a, b: (a[0] + b[0], a[1] + b[1])
else:
m = lambda a, b: (a + b)

_count = (df.select(columns).rdd
.flatMap(lambda x: x.asDict().items())
.map(lambda x: Infer.parse(x, infer, columns_dtypes, str_funcs, int_funcs, mismatch))
.reduceByKey(m))

result = {}
for c in _count.collect():
result.setdefault(c[0][0], {})[c[0][1]] = c[1]

# Process mismatch
if mismatch is not None:
for col_name, result_dtypes in result.items():
result[col_name]["mismatch"] = 0
for dtype, count in result_dtypes.items():
for result_dtype, count in result_dtypes.items():
if is_tuple(count):
result[col_name]["mismatch"] = result[col_name]["mismatch"] + count[1]
result[col_name][dtype] = count[0]
result[col_name][result_dtype] = count[0]

if infer is True:
result = fill_missing_var_types(result, dtypes)
result = fill_missing_var_types(result, columns_dtypes)
else:
result = parse_profiler_dtypes(result, dtypes)
result = parse_profiler_dtypes(result, columns_dtypes)
return result

@add_attr(cols)
Expand Down
6 changes: 3 additions & 3 deletions optimus/optimus.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ def __init__(self, session=None, master="local[*]", app_name="optimus", checkpoi
logger.print(STARTING_OPTIMUS)

# Pickling
# Added parse.py
print(absolute_path("/parse.py"))
Spark.instance.sc.addPyFile(absolute_path("/parse.py"))
# Added parse1.py
print(absolute_path("/../infer.py"))
Spark.instance.sc.addPyFile(absolute_path("/../infer.py"))

if server:
logger.print("Starting Optimus Server...")
Expand Down

0 comments on commit 8a6db64

Please sign in to comment.