Skip to content

Commit

Permalink
WIP parse function now works as udf
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 5, 2019
1 parent f4a2fd8 commit 0676711
Show file tree
Hide file tree
Showing 31 changed files with 6,144 additions and 176,314 deletions.
1,295 changes: 987 additions & 308 deletions examples/sandbox.ipynb

Large diffs are not rendered by default.

565 changes: 521 additions & 44 deletions infer.py

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions optimus/audf.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import dateutil.parser
import fastnumbers
from pyspark.sql import functions as F

from optimus.helpers.check import is_column
from infer import Infer, is_column, parse_spark_class_dtypes
from optimus.helpers.parser import parse_python_dtypes
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.functions import is_pyarrow_installed
from optimus.helpers.logger import logger
from optimus.helpers.parser import parse_spark_class_dtypes, parse_python_dtypes
from optimus.helpers.raiseit import RaiseIt
from infer import Infer


def abstract_udf(col, func, func_return_type=None, attrs=None, func_type=None):
Expand Down
33 changes: 16 additions & 17 deletions optimus/dataframe/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,26 @@
from pyspark.sql.functions import when
from pyspark.sql.types import StringType, ArrayType, StructType

from infer import Infer
from infer import Infer, is_, is_type, is_function, is_list, is_tuple, is_list_of_str, \
is_list_of_dataframes, is_list_of_tuples, is_one_element, is_num_or_str, is_numeric, is_str, is_int, is_dataframe, \
parse_spark_class_dtypes, PYSPARK_NUMERIC_TYPES, PYSPARK_NOT_ARRAY_TYPES, PYSPARK_STRING_TYPES, PYSPARK_ARRAY_TYPES
# Functions
# from optimus.optimus import Optimus
# from optimus.optimus import Optimus
from optimus.audf import abstract_udf as audf, filter_row_by_data_type as fbdt
# Helpers
from optimus.helpers.check import is_num_or_str, is_list, is_, is_tuple, is_list_of_dataframes, is_list_of_tuples, \
is_function, is_one_element, is_type, is_int, is_str, has_, is_column_a, is_dataframe, is_list_of_str, is_numeric
from optimus.helpers.check import has_, is_column_a
from optimus.helpers.columns import get_output_cols, parse_columns, check_column_numbers, validate_columns_names, \
name_col
from optimus.helpers.columns_expression import match_nulls_strings, match_null, zeros_agg, hist_agg, count_na_agg, \
percentile_agg, count_uniques_agg, range_agg
from optimus.helpers.constants import PYSPARK_NUMERIC_TYPES, PYSPARK_NOT_ARRAY_TYPES, \
PYSPARK_STRING_TYPES, PYSPARK_ARRAY_TYPES, RELATIVE_ERROR, Actions
from optimus.helpers.constants import RELATIVE_ERROR, Actions
from optimus.helpers.converter import one_list_to_val, tuple_to_dict, format_dict, val_to_list
from optimus.helpers.decorators import add_attr
from optimus.helpers.functions import append as append_df
from optimus.helpers.functions import filter_list, collect_as_list, create_buckets
from optimus.helpers.logger import logger
from optimus.helpers.parser import parse_python_dtypes, parse_spark_class_dtypes, parse_col_names_funcs_to_keys, \
compress_list, compress_dict
from optimus.helpers.parser import compress_list, compress_dict, parse_python_dtypes, parse_col_names_funcs_to_keys
from optimus.helpers.raiseit import RaiseIt
from optimus.ml.encoding import string_to_index as ml_string_to_index
from optimus.profiler.functions import fill_missing_var_types, parse_profiler_dtypes
Expand Down Expand Up @@ -844,8 +843,8 @@ def mode(columns):
mode_df = count.join(
count.agg(F.max("count").alias("max_")), F.col("count") == F.col("max_")
)
if Optimus.cache:
mode_df = mode_df.cache()

mode_df = mode_df.cache()
# if none of the values are repeated we not have mode
mode_list = (mode_df
.rows.select(mode_df["count"] > 1)
Expand Down Expand Up @@ -1755,10 +1754,10 @@ def frequency_by_group(columns, n=10, percentage=False, total_rows=None):
return result

@add_attr(cols)
def count_mismatch(columns_mismatch=None):
def count_mismatch(columns_mismatch: dict = None):
"""
:param columns_mismatch:
Return the num of mismatches
:param columns_mismatch: dict of {col_name:datatype}
:return:
"""
df = self
Expand Down Expand Up @@ -1798,13 +1797,13 @@ def count_by_dtypes(columns, infer=False, str_funcs=None, int_funcs=None):
columns = parse_columns(df, columns)
columns_dtypes = df.cols.dtypes()

_count = (df.select(columns).rdd
.flatMap(lambda x: x.asDict().items())
.map(lambda x: Infer.parse(x, infer, columns_dtypes, str_funcs, int_funcs))
.reduceByKey(lambda a, b: (a + b)))
df_count = (df.select(columns).rdd
.flatMap(lambda x: x.asDict().items())
.map(lambda x: Infer.parse(x, infer, columns_dtypes, str_funcs, int_funcs))
.reduceByKey(lambda a, b: (a + b)))

result = {}
for c in _count.collect():
for c in df_count.collect():
result.setdefault(c[0][0], {})[c[0][1]] = c[1]

# Process mismatch
Expand Down
7 changes: 3 additions & 4 deletions optimus/dataframe/create.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pandas as pd
from pyspark.sql.types import StringType, StructField, StructType

from infer import Infer
from optimus.helpers.check import is_, is_list_of_tuples, is_one_element, is_tuple
from optimus.helpers.parser import parse_spark_class_dtypes
from infer import Infer, is_, is_tuple, is_list_of_tuples, is_one_element, parse_spark_class_dtypes
from optimus.spark import Spark


Expand Down Expand Up @@ -35,7 +33,8 @@ def data_frame(cols=None, rows=None, infer_schema=True, pdf=None):
col_name = c

if infer_schema is True:
var_type = Infer.parse(r)
var_type = Infer.to_spark(r)
# print(var_type)
else:
var_type = StringType()
nullable = True
Expand Down
3 changes: 1 addition & 2 deletions optimus/dataframe/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from pyspark.sql.types import *

from optimus.bumblebee import Comm
from optimus.helpers.functions import val_to_list
from optimus.helpers.check import is_str
from infer import is_str
from optimus.helpers.columns import parse_columns
from optimus.helpers.constants import RELATIVE_ERROR
from optimus.helpers.decorators import *
Expand Down
2 changes: 1 addition & 1 deletion optimus/dataframe/plots/plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
plot_correlation, plot_qqplot
from optimus.helpers.columns import check_column_numbers
from optimus.helpers.columns import parse_columns
from optimus.helpers.constants import PYSPARK_NUMERIC_TYPES
from infer import PYSPARK_NUMERIC_TYPES
from optimus.helpers.decorators import add_attr


Expand Down
5 changes: 3 additions & 2 deletions optimus/dataframe/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
# Helpers
import optimus as op
from optimus.audf import filter_row_by_data_type as fbdt
from optimus.helpers.check import is_list_of_str_or_int, is_list_of_tuples, is_list_of_dataframes, is_dataframe
from infer import is_list_of_str_or_int, is_list_of_dataframes, is_list_of_tuples, is_dataframe, \
PYSPARK_NUMERIC_TYPES
from optimus.helpers.columns import parse_columns, validate_columns_names
from optimus.helpers.constants import Actions, PYSPARK_NUMERIC_TYPES
from optimus.helpers.constants import Actions
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.decorators import add_attr
from optimus.helpers.functions import append as append_df
Expand Down
2 changes: 1 addition & 1 deletion optimus/enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ratelimit import limits, RateLimitException
from tqdm import tqdm_notebook

from optimus.helpers.check import is_function, is_, is_dataframe
from infer import is_, is_function, is_dataframe
from optimus.helpers.logger import logger

# Temporal col used to create a temporal ID to join the enriched data in mongo with the dataframe.
Expand Down
Loading

0 comments on commit 0676711

Please sign in to comment.