diff --git a/infer.py b/infer.py index 3c746c6d..fa09c88e 100644 --- a/infer.py +++ b/infer.py @@ -8,137 +8,205 @@ from dateutil.parser import parse as dparse -class Infer(object): - """ - This functions return True or False if match and specific dataType - """ +def str_to_boolean(_value): + _value = _value.lower() + if _value == "true" or _value == "false": + return True + else: + return False - @staticmethod - def str_to_boolean(_value): - _value = _value.lower() - if _value == "true" or _value == "false": - return True - @staticmethod - def str_to_date(_value): - try: - dparse(_value) - return True - except (ValueError, OverflowError): - pass +def str_to_date(_value): + try: + dparse(_value) + return True + except (ValueError, OverflowError): + pass - @staticmethod - def str_to_null(_value): - _value = _value.lower() - if _value == "null": - return True - @staticmethod - def is_null(_value): - if _value is None: - return True +def str_to_null(_value): + _value = _value.lower() + if _value == "null": + return True + else: + return False - @staticmethod - def str_to_gender(_value): - _value = _value.lower() - if _value == "male" or _value == "female": - return True - @staticmethod - def str_to_data_type(_value, _dtypes): - """ - Check if value can be parsed to a tuple or and list. - Because Spark can handle tuples we will try to transform tuples to arrays - :param _value: - :return: - """ - try: +def is_null(_value): + if _value is None: + return True + else: + return False - if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes): - return True - except (ValueError, SyntaxError): - pass - @staticmethod - def str_to_array(_value): - return Infer.str_to_data_type(_value, (list, tuple)) +def str_to_gender(_value): + _value = _value.lower() + if _value == "male" or _value == "female": + return True + else + return False - @staticmethod - def str_to_object(_value): - return Infer.str_to_data_type(_value, (dict, set)) - @staticmethod - def str_to_url(_value): - regex = re.compile( - r'^https?://' # http:// or https:// - r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... - r'localhost|' # localhost... - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip - r'(?::\d+)?' # optional port - r'(?:/?|[/?]\S+)$', re.IGNORECASE) - if regex.match(_value): +def str_to_data_type(_value, _dtypes): + """ + Check if value can be parsed to a tuple or and list. + Because Spark can handle tuples we will try to transform tuples to arrays + :param _value: + :return: + """ + try: + if isinstance(literal_eval((_value.encode('ascii', 'ignore')).decode("utf-8")), _dtypes): return True + except (ValueError, SyntaxError): + pass - @staticmethod - def str_to_ip(_value): - regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''') - if regex.match(_value): - return True - @staticmethod - def str_to_email(_value): +def str_to_array(_value): + return Infer.str_to_data_type(_value, (list, tuple)) + + +def str_to_object(_value): + return Infer.str_to_data_type(_value, (dict, set)) + + +def str_to_url(_value): + regex = re.compile( + r'^https?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... + r'localhost|' # localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + if regex.match(_value): + return True + + +def str_to_ip(_value): + regex = re.compile('''\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''') + if regex.match(_value): + return True + + +def str_to_email(_value): + try: regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)") if regex.match(_value): return True + except TypeError: + pass - @staticmethod - def str_to_credit_card(_value): - # Reference https://www.regular-expressions.info/creditcard.html - # https://codereview.stackexchange.com/questions/74797/credit-card-checking - regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa - r'|5[1-5]\d{14}' # Mastercard - r'|6011\d{12}' # Discover (incomplete?) - r'|7\d{15}' # What's this? - r'|3[47]\d{13}' # American Express - r')$') - return bool(regex.match(_value)) + +def str_to_credit_card(_value): + # Reference https://www.regular-expressions.info/creditcard.html + # https://codereview.stackexchange.com/questions/74797/credit-card-checking + regex = re.compile(r'(4(?:\d{12}|\d{15})' # Visa + r'|5[1-5]\d{14}' # Mastercard + r'|6011\d{12}' # Discover (incomplete?) + r'|7\d{15}' # What's this? + r'|3[47]\d{13}' # American Express + r')$') + return bool(regex.match(_value)) + + +def str_to_zip_code(_value): + regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$') + if regex.match(_value): + return True + return False + + +def str_to_missing(_value): + return True if _value == "" else False + + +def str_to_int(_value): + return True if fastnumbers.isint(_value) else False + + +def str_to_decimal(_value): + return True if fastnumbers.isfloat(_value) else False + + +def str_to_str(_value): + return True if isinstance(_value, str) else False + + +DTYPE_FUNC = {"string": str_to_str, "boolean": str_to_boolean, "date": str_to_date, + "array": str_to_array, "object": str_to_object, "ip": str_to_ip, + "url": str_to_url, "email": str_to_email, "gender": str_to_gender, + "credit_card_number": str_to_credit_card, "zip_code": str_to_zip_code} + + +class Infer(object): + """ + This functions return True or False if match and specific dataType + """ @staticmethod - def str_to_zip_code(_value): - regex = re.compile(r'^(\d{5})([- ])?(\d{4})?$') - if regex.match(_value): - return True - return False + def value(value, dtype: str): + """ + Return if a value can be parsed as an specified dtype + :param value: + :param dtype: + :return: + """ + + return DTYPE_FUNC[dtype](value) @staticmethod - def str_to_missing(_value): - if _value == "": - return True + def mismatch(value: tuple, dtypes: dict): + """ + UDF function. + For example if we have an string column we also need to pass if it's a credit card or postal code. + Count the dataType that match, do not match, nulls and missing. + :param value: tuple(Column/Row, value) + :param dtypes: dict {col_name:(dataType, mismatch)} + + :return: + """ + col_name, value = value + + _data_type = "" + dtype = dtypes[col_name] + + if DTYPE_FUNC[dtype](value) is True: + _data_type = dtype + else: + if is_null(value) is True: + _data_type = "null" + elif str_to_missing(value) is True: + _data_type = "missing" + else: + _data_type = "mismatch" + + result = (col_name, _data_type), 1 + return result @staticmethod - def parse(value, infer: bool, dtypes, str_funcs, int_funcs, mismatch): + def parse(value, infer: bool, dtypes, str_funcs=None, int_funcs=None, mismatch: dict = None): """ :param value: - :param infer: + :param infer: If 'True' try to infer in all the dataTypes available. See int_func and str_funcs :param dtypes: - :param str_funcs: - :param int_funcs: - :param mismatch: + :param str_funcs: Custom string function to infer. + :param int_funcs: Custom numeric functions to infer. + :param mismatch: a dict with a column name and a regular expression that match the datatype defined by the user. + {col_name: regular_expression} :return: """ col_name, value = value # Try to order the functions from less to more computational expensive if int_funcs is None: - int_funcs = [(Infer.str_to_credit_card, "credit_card_number"), (Infer.str_to_zip_code, "zip_code")] + int_funcs = [(str_to_credit_card, "credit_card_number"), (str_to_zip_code, "zip_code")] if str_funcs is None: str_funcs = [ - (Infer.str_to_missing, "missing"), (Infer.str_to_boolean, "boolean"), (Infer.str_to_date, "date"), - (Infer.str_to_array, "array"), (Infer.str_to_object, "object"), (Infer.str_to_ip, "ip"), - (Infer.str_to_url, "url"), - (Infer.str_to_email, "email"), (Infer.str_to_gender, "gender"), (Infer.str_to_null, "null") + (str_to_missing, "missing"), (str_to_boolean, "boolean"), (str_to_date, "date"), + (str_to_array, "array"), (str_to_object, "object"), (str_to_ip, "ip"), + (str_to_url, "url"), + (str_to_email, "email"), (str_to_gender, "gender"), (str_to_null, "null") ] mismatch_count = 0 @@ -186,9 +254,9 @@ def parse(value, infer: bool, dtypes, str_funcs, int_funcs, mismatch): else: _data_type = dtypes[col_name] - if Infer.is_null(value) is True: + if is_null(value) is True: _data_type = "null" - elif Infer.str_to_missing(value) is True: + elif str_to_missing(value) is True: _data_type = "missing" else: if dtypes[col_name].startswith("array"): diff --git a/optimus/audf.py b/optimus/audf.py index d853ea3d..06095158 100644 --- a/optimus/audf.py +++ b/optimus/audf.py @@ -122,7 +122,6 @@ def filter_row_by_data_type(col_name, data_type=None, get_type=False): data_type = parse_python_dtypes(data_type) def pandas_udf_func(v): - return v.apply(Infer.parse) if get_type is True: diff --git a/optimus/dataframe/columns.py b/optimus/dataframe/columns.py index 3482bd48..3bdd2a1d 100644 --- a/optimus/dataframe/columns.py +++ b/optimus/dataframe/columns.py @@ -20,6 +20,7 @@ from pyspark.sql.functions import when from pyspark.sql.types import StringType, ArrayType, StructType +from infer import Infer # Functions # from optimus.optimus import Optimus # from optimus.optimus import Optimus @@ -42,8 +43,7 @@ compress_list, compress_dict from optimus.helpers.raiseit import RaiseIt from optimus.ml.encoding import string_to_index as ml_string_to_index -from optimus.profiler.functions import fill_missing_var_types, parse_profiler_dtypes -from infer import Infer +from optimus.profiler.functions import fill_missing_var_types ENGINE = "spark" # Because the monkey patching and the need to call set a function we need to rename the standard python set. @@ -1755,14 +1755,41 @@ def frequency_by_group(columns, n=10, percentage=False, total_rows=None): return result @add_attr(cols) - def count_by_dtypes(columns, infer=False, str_funcs=None, int_funcs=None, mismatch=None): + def count_mismatch(columns_mismatch=None): + """ + + :param columns_mismatch: + :return: + """ + df = self + columns = list(columns_mismatch.keys()) + columns = parse_columns(df, columns) + + _count = (df.select(columns).rdd + .flatMap(lambda x: x.asDict().items()) + .map(lambda x: Infer.mismatch(x, columns_mismatch)) + .reduceByKey(lambda a, b: (a + b))) + + result = {} + for c in _count.collect(): + result.setdefault(c[0][0], {})[c[0][1]] = c[1] + + # Be sure that we have the some default keys keys + for col_results in result.values(): + col_results.setdefault("mismatch", 0) + col_results.setdefault("null", 0) + col_results.setdefault("missing", 0) + + return result + + @add_attr(cols) + def count_by_dtypes(columns, infer=False, str_funcs=None, int_funcs=None): """ Use rdd to count the inferred data type in a row :param columns: Columns to be processed :param str_funcs: list of tuples for create a custom string parsers :param int_funcs: list of tuples for create a custom int parsers :param infer: Infer data type - :param mismatch: a dict with column names and pattern to check. Pattern can be a predefined or a regex :return: """ @@ -1771,33 +1798,27 @@ def count_by_dtypes(columns, infer=False, str_funcs=None, int_funcs=None, mismat columns = parse_columns(df, columns) columns_dtypes = df.cols.dtypes() - if mismatch: - m = lambda a, b: (a[0] + b[0], a[1] + b[1]) - else: - m = lambda a, b: (a + b) - _count = (df.select(columns).rdd .flatMap(lambda x: x.asDict().items()) - .map(lambda x: Infer.parse(x, infer, columns_dtypes, str_funcs, int_funcs, mismatch)) - .reduceByKey(m)) + .map(lambda x: Infer.parse(x, infer, columns_dtypes, str_funcs, int_funcs)) + .reduceByKey(lambda a, b: (a + b))) result = {} for c in _count.collect(): result.setdefault(c[0][0], {})[c[0][1]] = c[1] # Process mismatch - if mismatch is not None: - for col_name, result_dtypes in result.items(): - result[col_name]["mismatch"] = 0 - for result_dtype, count in result_dtypes.items(): - if is_tuple(count): - result[col_name]["mismatch"] = result[col_name]["mismatch"] + count[1] - result[col_name][result_dtype] = count[0] + for col_name, result_dtypes in result.items(): + result[col_name]["mismatch"] = 0 + for result_dtype, count in result_dtypes.items(): + if is_tuple(count): + result[col_name]["mismatch"] = result[col_name]["mismatch"] + count[1] + result[col_name][result_dtype] = count[0] if infer is True: result = fill_missing_var_types(result, columns_dtypes) else: - result = parse_profiler_dtypes(result, columns_dtypes) + result = parse_profiler_dtypes(result) return result @add_attr(cols) diff --git a/optimus/dataframe/rows.py b/optimus/dataframe/rows.py index 9e73c8de..de50eacb 100644 --- a/optimus/dataframe/rows.py +++ b/optimus/dataframe/rows.py @@ -73,7 +73,10 @@ def select_by_dtypes(input_cols, data_type=None): :param data_type: Datatype use filter values :return: Spark DataFrame """ + + input_cols = parse_columns(self, input_cols) + self.cols.apply() return self.where(fbdt(input_cols, data_type)) @@ -87,7 +90,6 @@ def select(columns, *args, **kwargs): :return: Spark DataFrame """ df = self - return df.filter(*args, **kwargs) df = df.filter(columns, *args, **kwargs) df = df.preserve_meta(self, Actions.SORT_ROW.value, columns) diff --git a/optimus/profiler/functions.py b/optimus/profiler/functions.py index 09927fe5..55a6621f 100644 --- a/optimus/profiler/functions.py +++ b/optimus/profiler/functions.py @@ -2,29 +2,11 @@ import math -from optimus.helpers.constants import SPARK_DTYPES_TO_PROFILER, ProfilerDataTypes, PROFILER_COLUMN_TYPES, \ +from optimus.helpers.constants import ProfilerDataTypes, PROFILER_COLUMN_TYPES, \ CONFIDENCE_LEVEL_CONSTANT from optimus.helpers.json import json_converter -def parse_profiler_dtypes(col_data_type, dtypes): - """ - Parse a spark data type to a profiler data type - :return: - """ - - columns = {} - for col_name, data_type_count in col_data_type.items(): - columns[col_name] = {data_type: 0 for data_type in ["null", "missing"]} - for data_type, count in data_type_count.items(): - for profiler_data_type, spark_data_type in SPARK_DTYPES_TO_PROFILER.items(): - if data_type in SPARK_DTYPES_TO_PROFILER[profiler_data_type]: - columns[col_name][profiler_data_type] = count - if data_type == "mismatch": - columns[col_name][data_type] = count - return columns - - def fill_missing_var_types(var_types, dtypes): """ Fill missing data types with 0