Skip to content

Commit

Permalink
Unify infer function usage <3
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 1, 2019
1 parent 8a6db64 commit b19fae5
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 212 deletions.
205 changes: 112 additions & 93 deletions infer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file need to be send to the cluster via .addpyfile to handle the pickle problem
# This file need to be send to the cluster via .addPyFile to handle the pickle problem
# This is outside the optimus folder on purpose because it cause problem importing optimus when using de udf.

import re
Expand All @@ -9,117 +9,136 @@


class Infer(object):
"""
This functions return True or False if match and specific dataType
"""

@staticmethod
def parse(value, infer, dtypes, str_funcs, int_funcs, mismatch):
"""
def str_to_boolean(_value):
_value = _value.lower()
if _value == "true" or _value == "false":
return True

:param value:
:param infer:
:param dtypes:
:param str_funcs:
:param int_funcs:
:param mismatch:
@staticmethod
def str_to_date(_value):
try:
dparse(_value)
return True
except (ValueError, OverflowError):
pass

@staticmethod
def str_to_null(_value):
_value = _value.lower()
if _value == "null":
return True

@staticmethod
def is_null(_value):
if _value is None:
return True

@staticmethod
def str_to_gender(_value):
_value = _value.lower()
if _value == "male" or _value == "female":
return True

@staticmethod
def str_to_data_type(_value, _dtypes):
"""
Check if value can be parsed to a tuple or and list.
Because Spark can handle tuples we will try to transform tuples to arrays
:param _value:
:return:
"""
col_name, value = value
try:

def str_to_boolean(_value):
_value = _value.lower()
if _value == "true" or _value == "false":
if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes):
return True
except (ValueError, SyntaxError):
pass

def str_to_date(_value):
try:
dparse(_value)
return True
except (ValueError, OverflowError):
pass
@staticmethod
def str_to_array(_value):
return Infer.str_to_data_type(_value, (list, tuple))

def str_to_null(_value):
_value = _value.lower()
if _value == "null":
return True
@staticmethod
def str_to_object(_value):
return Infer.str_to_data_type(_value, (dict, set))

def is_null(_value):
if _value is None:
return True
@staticmethod
def str_to_url(_value):
regex = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if regex.match(_value):
return True

def str_to_gender(_value):
_value = _value.lower()
if _value == "male" or _value == "female":
return True
@staticmethod
def str_to_ip(_value):
regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''')
if regex.match(_value):
return True

def str_to_array(_value):
return str_to_data_type(_value, (list, tuple))

def str_to_object(_value):
return str_to_data_type(_value, (dict, set))

def str_to_data_type(_value, _dtypes):
"""
Check if value can be parsed to a tuple or and list.
Because Spark can handle tuples we will try to transform tuples to arrays
:param _value:
:return:
"""
try:

if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes):
return True
except (ValueError, SyntaxError):
pass

def str_to_url(_value):
regex = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if regex.match(_value):
return True
@staticmethod
def str_to_email(_value):
regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)")
if regex.match(_value):
return True

def str_to_ip(_value):
regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''')
if regex.match(_value):
return True
@staticmethod
def str_to_credit_card(_value):
# Reference https://www.regular-expressions.info/creditcard.html
# https://codereview.stackexchange.com/questions/74797/credit-card-checking
regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa
r'|5[1-5]\d{14}' # Mastercard
r'|6011\d{12}' # Discover (incomplete?)
r'|7\d{15}' # What's this?
r'|3[47]\d{13}' # American Express
r')$')
return bool(regex.match(_value))

def str_to_email(_value):
regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)")
if regex.match(_value):
return True
@staticmethod
def str_to_zip_code(_value):
regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$')
if regex.match(_value):
return True
return False

def str_to_credit_card(_value):
# Reference https://www.regular-expressions.info/creditcard.html
# https://codereview.stackexchange.com/questions/74797/credit-card-checking
regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa
r'|5[1-5]\d{14}' # Mastercard
r'|6011\d{12}' # Discover (incomplete?)
r'|7\d{15}' # What's this?
r'|3[47]\d{13}' # American Express
r')$')
return bool(regex.match(_value))

def str_to_zip_code(_value):
regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$')
if regex.match(_value):
return True
return False
@staticmethod
def str_to_missing(_value):
if _value == "":
return True

def str_to_missing(_value):
if value == "":
return True
@staticmethod
def parse(value, infer: bool, dtypes, str_funcs, int_funcs, mismatch):
"""
:param value:
:param infer:
:param dtypes:
:param str_funcs:
:param int_funcs:
:param mismatch:
:return:
"""
col_name, value = value

# Try to order the functions from less to more computational expensive
if int_funcs is None:
int_funcs = [(str_to_credit_card, "credit_card_number"), (str_to_zip_code, "zip_code")]
int_funcs = [(Infer.str_to_credit_card, "credit_card_number"), (Infer.str_to_zip_code, "zip_code")]

if str_funcs is None:
str_funcs = [
(str_to_missing, "missing"), (str_to_boolean, "boolean"), (str_to_date, "date"),
(str_to_array, "array"), (str_to_object, "object"), (str_to_ip, "ip"), (str_to_url, "url"),
(str_to_email, "email"), (str_to_gender, "gender"), (str_to_null, "null")
(Infer.str_to_missing, "missing"), (Infer.str_to_boolean, "boolean"), (Infer.str_to_date, "date"),
(Infer.str_to_array, "array"), (Infer.str_to_object, "object"), (Infer.str_to_ip, "ip"),
(Infer.str_to_url, "url"),
(Infer.str_to_email, "email"), (Infer.str_to_gender, "gender"), (Infer.str_to_null, "null")
]

mismatch_count = 0
Expand Down Expand Up @@ -167,9 +186,9 @@ def str_to_missing(_value):

else:
_data_type = dtypes[col_name]
if is_null(value) is True:
if Infer.is_null(value) is True:
_data_type = "null"
elif str_to_missing(value) is True:
elif Infer.str_to_missing(value) is True:
_data_type = "missing"
else:
if dtypes[col_name].startswith("array"):
Expand Down
68 changes: 3 additions & 65 deletions optimus/audf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from optimus.helpers.logger import logger
from optimus.helpers.parser import parse_spark_class_dtypes, parse_python_dtypes
from optimus.helpers.raiseit import RaiseIt
from infer import Infer


def abstract_udf(col, func, func_return_type=None, attrs=None, func_type=None):
Expand Down Expand Up @@ -116,81 +117,18 @@ def filter_row_by_data_type(col_name, data_type=None, get_type=False):
:param get_type: Value to be returned as string or boolean
:return: True or False
"""
from ast import literal_eval

if data_type is not None:
data_type = parse_python_dtypes(data_type)

def pandas_udf_func(v):
def str_to_boolean(value):
"""
Check if a str can be converted to boolean
:param value:
:return:
"""
value = value.lower()
if value == "true" or value == "false":
return True

def str_to_date(value):
try:
dateutil.parser.parse(value)
return True
except (ValueError, OverflowError):
pass

def str_to_array(value):
"""
Check if value can be parsed to a tuple or and array.
Because Spark can handle tuples we will try to transform tuples to arrays
:param value:
:return:
"""
try:
if isinstance(literal_eval((value.encode('ascii', 'ignore')).decode("utf-8")), (list, tuple)):
return True
except (ValueError, SyntaxError):
pass

def func(value):
"""
Check if a value can be casted to a specific
:param value: value to be checked
:return:
"""
if isinstance(value, bool):
_data_type = "bool"
elif fastnumbers.isint(value): # Check if value is integer
_data_type = "int"
elif fastnumbers.isfloat(value):
_data_type = "float"
# if string we try to parse it to int, float or bool
elif isinstance(value, str):
if str_to_boolean(value):
_data_type = "bool"
elif str_to_date(value):
_data_type = "date"
elif str_to_array(value):
_data_type = "array"
else:
_data_type = "string"
else:
_data_type = "null"

if get_type is False:
if _data_type == data_type:
return True
else:
return False
else:
return _data_type

return v.apply(func)
return v.apply(Infer.parse)

if get_type is True:
return_data_type = "string"
else:
return_data_type = "boolean"

col_name = one_list_to_val(col_name)
return F.pandas_udf(pandas_udf_func, return_data_type)(col_name)
return F.pandas_udf(pandas_udf_func, return_data_type)(col_name, None, data_type)
8 changes: 4 additions & 4 deletions optimus/dataframe/create.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import pandas as pd
from pyspark.sql.types import StringType, StructField, StructType

from optimus.spark import Spark
from infer import Infer
from optimus.helpers.check import is_, is_list_of_tuples, is_one_element, is_tuple
from optimus.helpers.functions import infer
from optimus.helpers.parser import parse_spark_class_dtypes
from optimus.spark import Spark


class Create:
Expand Down Expand Up @@ -35,7 +35,7 @@ def data_frame(cols=None, rows=None, infer_schema=True, pdf=None):
col_name = c

if infer_schema is True:
var_type = infer(r)
var_type = Infer.parse(r)
else:
var_type = StringType()
nullable = True
Expand All @@ -61,4 +61,4 @@ def data_frame(cols=None, rows=None, infer_schema=True, pdf=None):
df = df.columns_meta(df.cols.names())
return df

df = data_frame
df = data_frame
Loading

0 comments on commit b19fae5

Please sign in to comment.