diff --git a/infer.py b/infer.py index f0daab64..3c746c6d 100644 --- a/infer.py +++ b/infer.py @@ -1,4 +1,4 @@ -# This file need to be send to the cluster via .addpyfile to handle the pickle problem +# This file need to be send to the cluster via .addPyFile to handle the pickle problem # This is outside the optimus folder on purpose because it cause problem importing optimus when using de udf. import re @@ -9,117 +9,136 @@ class Infer(object): + """ + This functions return True or False if match and specific dataType + """ + @staticmethod - def parse(value, infer, dtypes, str_funcs, int_funcs, mismatch): - """ + def str_to_boolean(_value): + _value = _value.lower() + if _value == "true" or _value == "false": + return True - :param value: - :param infer: - :param dtypes: - :param str_funcs: - :param int_funcs: - :param mismatch: + @staticmethod + def str_to_date(_value): + try: + dparse(_value) + return True + except (ValueError, OverflowError): + pass + + @staticmethod + def str_to_null(_value): + _value = _value.lower() + if _value == "null": + return True + + @staticmethod + def is_null(_value): + if _value is None: + return True + + @staticmethod + def str_to_gender(_value): + _value = _value.lower() + if _value == "male" or _value == "female": + return True + + @staticmethod + def str_to_data_type(_value, _dtypes): + """ + Check if value can be parsed to a tuple or and list. + Because Spark can handle tuples we will try to transform tuples to arrays + :param _value: :return: """ - col_name, value = value + try: - def str_to_boolean(_value): - _value = _value.lower() - if _value == "true" or _value == "false": + if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes): return True + except (ValueError, SyntaxError): + pass - def str_to_date(_value): - try: - dparse(_value) - return True - except (ValueError, OverflowError): - pass + @staticmethod + def str_to_array(_value): + return Infer.str_to_data_type(_value, (list, tuple)) - def str_to_null(_value): - _value = _value.lower() - if _value == "null": - return True + @staticmethod + def str_to_object(_value): + return Infer.str_to_data_type(_value, (dict, set)) - def is_null(_value): - if _value is None: - return True + @staticmethod + def str_to_url(_value): + regex = re.compile( + r'^https?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... + r'localhost|' # localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + if regex.match(_value): + return True - def str_to_gender(_value): - _value = _value.lower() - if _value == "male" or _value == "female": - return True + @staticmethod + def str_to_ip(_value): + regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''') + if regex.match(_value): + return True - def str_to_array(_value): - return str_to_data_type(_value, (list, tuple)) - - def str_to_object(_value): - return str_to_data_type(_value, (dict, set)) - - def str_to_data_type(_value, _dtypes): - """ - Check if value can be parsed to a tuple or and list. - Because Spark can handle tuples we will try to transform tuples to arrays - :param _value: - :return: - """ - try: - - if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes): - return True - except (ValueError, SyntaxError): - pass - - def str_to_url(_value): - regex = re.compile( - r'^https?://' # http:// or https:// - r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... - r'localhost|' # localhost... - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip - r'(?::\d+)?' # optional port - r'(?:/?|[/?]\S+)$', re.IGNORECASE) - if regex.match(_value): - return True + @staticmethod + def str_to_email(_value): + regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)") + if regex.match(_value): + return True - def str_to_ip(_value): - regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''') - if regex.match(_value): - return True + @staticmethod + def str_to_credit_card(_value): + # Reference https://www.regular-expressions.info/creditcard.html + # https://codereview.stackexchange.com/questions/74797/credit-card-checking + regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa + r'|5[1-5]\d{14}' # Mastercard + r'|6011\d{12}' # Discover (incomplete?) + r'|7\d{15}' # What's this? + r'|3[47]\d{13}' # American Express + r')$') + return bool(regex.match(_value)) - def str_to_email(_value): - regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)") - if regex.match(_value): - return True + @staticmethod + def str_to_zip_code(_value): + regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$') + if regex.match(_value): + return True + return False - def str_to_credit_card(_value): - # Reference https://www.regular-expressions.info/creditcard.html - # https://codereview.stackexchange.com/questions/74797/credit-card-checking - regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa - r'|5[1-5]\d{14}' # Mastercard - r'|6011\d{12}' # Discover (incomplete?) - r'|7\d{15}' # What's this? - r'|3[47]\d{13}' # American Express - r')$') - return bool(regex.match(_value)) - - def str_to_zip_code(_value): - regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$') - if regex.match(_value): - return True - return False + @staticmethod + def str_to_missing(_value): + if _value == "": + return True - def str_to_missing(_value): - if value == "": - return True + @staticmethod + def parse(value, infer: bool, dtypes, str_funcs, int_funcs, mismatch): + """ + + :param value: + :param infer: + :param dtypes: + :param str_funcs: + :param int_funcs: + :param mismatch: + :return: + """ + col_name, value = value # Try to order the functions from less to more computational expensive if int_funcs is None: - int_funcs = [(str_to_credit_card, "credit_card_number"), (str_to_zip_code, "zip_code")] + int_funcs = [(Infer.str_to_credit_card, "credit_card_number"), (Infer.str_to_zip_code, "zip_code")] if str_funcs is None: str_funcs = [ - (str_to_missing, "missing"), (str_to_boolean, "boolean"), (str_to_date, "date"), - (str_to_array, "array"), (str_to_object, "object"), (str_to_ip, "ip"), (str_to_url, "url"), - (str_to_email, "email"), (str_to_gender, "gender"), (str_to_null, "null") + (Infer.str_to_missing, "missing"), (Infer.str_to_boolean, "boolean"), (Infer.str_to_date, "date"), + (Infer.str_to_array, "array"), (Infer.str_to_object, "object"), (Infer.str_to_ip, "ip"), + (Infer.str_to_url, "url"), + (Infer.str_to_email, "email"), (Infer.str_to_gender, "gender"), (Infer.str_to_null, "null") ] mismatch_count = 0 @@ -167,9 +186,9 @@ def str_to_missing(_value): else: _data_type = dtypes[col_name] - if is_null(value) is True: + if Infer.is_null(value) is True: _data_type = "null" - elif str_to_missing(value) is True: + elif Infer.str_to_missing(value) is True: _data_type = "missing" else: if dtypes[col_name].startswith("array"): diff --git a/optimus/audf.py b/optimus/audf.py index 001edc14..d853ea3d 100644 --- a/optimus/audf.py +++ b/optimus/audf.py @@ -8,6 +8,7 @@ from optimus.helpers.logger import logger from optimus.helpers.parser import parse_spark_class_dtypes, parse_python_dtypes from optimus.helpers.raiseit import RaiseIt +from infer import Infer def abstract_udf(col, func, func_return_type=None, attrs=None, func_type=None): @@ -116,76 +117,13 @@ def filter_row_by_data_type(col_name, data_type=None, get_type=False): :param get_type: Value to be returned as string or boolean :return: True or False """ - from ast import literal_eval if data_type is not None: data_type = parse_python_dtypes(data_type) def pandas_udf_func(v): - def str_to_boolean(value): - """ - Check if a str can be converted to boolean - :param value: - :return: - """ - value = value.lower() - if value == "true" or value == "false": - return True - - def str_to_date(value): - try: - dateutil.parser.parse(value) - return True - except (ValueError, OverflowError): - pass - - def str_to_array(value): - """ - Check if value can be parsed to a tuple or and array. - Because Spark can handle tuples we will try to transform tuples to arrays - :param value: - :return: - """ - try: - if isinstance(literal_eval((value.encode('ascii', 'ignore')).decode("utf-8")), (list, tuple)): - return True - except (ValueError, SyntaxError): - pass - - def func(value): - """ - Check if a value can be casted to a specific - :param value: value to be checked - :return: - """ - if isinstance(value, bool): - _data_type = "bool" - elif fastnumbers.isint(value): # Check if value is integer - _data_type = "int" - elif fastnumbers.isfloat(value): - _data_type = "float" - # if string we try to parse it to int, float or bool - elif isinstance(value, str): - if str_to_boolean(value): - _data_type = "bool" - elif str_to_date(value): - _data_type = "date" - elif str_to_array(value): - _data_type = "array" - else: - _data_type = "string" - else: - _data_type = "null" - - if get_type is False: - if _data_type == data_type: - return True - else: - return False - else: - return _data_type - return v.apply(func) + return v.apply(Infer.parse) if get_type is True: return_data_type = "string" @@ -193,4 +131,4 @@ def func(value): return_data_type = "boolean" col_name = one_list_to_val(col_name) - return F.pandas_udf(pandas_udf_func, return_data_type)(col_name) + return F.pandas_udf(pandas_udf_func, return_data_type)(col_name, None, data_type) diff --git a/optimus/dataframe/create.py b/optimus/dataframe/create.py index 22302626..2e389705 100644 --- a/optimus/dataframe/create.py +++ b/optimus/dataframe/create.py @@ -1,10 +1,10 @@ import pandas as pd from pyspark.sql.types import StringType, StructField, StructType -from optimus.spark import Spark +from infer import Infer from optimus.helpers.check import is_, is_list_of_tuples, is_one_element, is_tuple -from optimus.helpers.functions import infer from optimus.helpers.parser import parse_spark_class_dtypes +from optimus.spark import Spark class Create: @@ -35,7 +35,7 @@ def data_frame(cols=None, rows=None, infer_schema=True, pdf=None): col_name = c if infer_schema is True: - var_type = infer(r) + var_type = Infer.parse(r) else: var_type = StringType() nullable = True @@ -61,4 +61,4 @@ def data_frame(cols=None, rows=None, infer_schema=True, pdf=None): df = df.columns_meta(df.cols.names()) return df - df = data_frame \ No newline at end of file + df = data_frame diff --git a/optimus/helpers/functions.py b/optimus/helpers/functions.py index aba92359..936830c1 100644 --- a/optimus/helpers/functions.py +++ b/optimus/helpers/functions.py @@ -7,66 +7,18 @@ from functools import reduce from pathlib import Path -import fastnumbers import six from pyspark.ml.linalg import DenseVector from pyspark.sql import DataFrame from pyspark.sql import functions as F -from pyspark.sql.types import ArrayType from optimus import ROOT_DIR -from optimus.helpers.check import is_str, is_list, is_, is_bool, is_datetime, \ - is_date, is_binary -from optimus.helpers.converter import one_list_to_val, str_to_boolean, str_to_date, str_to_array, val_to_list +from optimus.helpers.check import is_ +from optimus.helpers.converter import one_list_to_val, val_to_list from optimus.helpers.logger import logger -from optimus.helpers.parser import parse_spark_class_dtypes from optimus.helpers.raiseit import RaiseIt -def infer(value): - """ - Infer a Spark data type from a value - :param value: value to be inferred - :return: Spark data type - """ - result = None - if value is None: - result = "null" - - elif is_bool(value): - result = "bool" - - elif fastnumbers.isint(value): - result = "int" - - elif fastnumbers.isfloat(value): - result = "float" - - elif is_list(value): - result = ArrayType(infer(value[0])) - - elif is_datetime(value): - result = "datetime" - - elif is_date(value): - result = "date" - - elif is_binary(value): - result = "binary" - - elif is_str(value): - if str_to_boolean(value): - result = "bool" - elif str_to_date(value): - result = "string" # date - elif str_to_array(value): - result = "string" # array - else: - result = "string" - - return parse_spark_class_dtypes(result) - - def random_int(n=5): """ Create a random string of ints