diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 5991c046..ba0ca663 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -462,7 +462,7 @@ def reset(self): @add_method(DataFrame) -def send(self, name=None, infer=True, mismatch=None, stats=True): +def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True): """ Profile and send the data to the queue :param self: @@ -476,12 +476,14 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): if name is not None: df.set_name(name) - columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, - approx_count=True, - sample=10000, - stats=stats, - format="json", - mismatch=mismatch) + output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, + approx_count=True, + sample=10000, + stats=stats, + format="json", + mismatch=mismatch, + advanced_stats=advanced_stats + ) if Comm.instance: Comm.instance.send(output) diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index f7f41ecd..92d147e1 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -75,6 +75,8 @@ # Profiler PROFILER_COLUMN_TYPES = {"categorical", "numeric", "date", "null", "array", "binary"} +PYTHON_TO_PROFILER = {"string": "categorical", "boolean": "categorical", "int": "numeric", "decimal": "numeric", + "date": "date", "array": "array", "binaty": "binary", "null": "null"} SPARK_DTYPES_TO_PROFILER = {"int": ["smallint", "tinyint", "bigint", "int"], "decimal": ["float", "double"], "string": "string", "date": {"date", "timestamp"}, "boolean": "boolean", "binary": "binary", diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 57ed3709..416e7fa6 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -13,7 +13,7 @@ from optimus.helpers.check import is_column_a, is_dict, is_list_of_str from optimus.helpers.columns import parse_columns from optimus.helpers.columns_expression import zeros_agg, count_na_agg, hist_agg, percentile_agg, count_uniques_agg -from optimus.helpers.constants import RELATIVE_ERROR, Actions, PYSPARK_NUMERIC_TYPES +from optimus.helpers.constants import RELATIVE_ERROR, Actions, PYSPARK_NUMERIC_TYPES, PYTHON_TO_PROFILER from optimus.helpers.decorators import time_it from optimus.helpers.functions import absolute_path from optimus.helpers.json import json_converter @@ -54,61 +54,9 @@ def __init__(self, output_path=None): self.output_columns = {} - def _count_data_types(self, df, columns, infer=False, mismatch=None): - """ - Count the number of int, float, string, date and booleans and output the count in json format - :param df: Dataframe to be processed - :param columns: Columns to be processed - :param infer: infer the column datatype - - :return: json - """ - - columns = parse_columns(df, columns) - - count_by_data_type = df.cols.count_by_dtypes(columns, infer=infer, mismatch=mismatch) - count_by_data_type_no_mismatch = copy.deepcopy(count_by_data_type) - # Info from all the columns - type_details = {} - - for col_name in columns: - - """ - Function for determine if register value is float or int or string. - :param col_name: - :return: - """ - # Not count mismatch - if "mismatch" in count_by_data_type_no_mismatch[col_name]: - count_by_data_type_no_mismatch[col_name].pop("mismatch") - - # Get the greatest count by column data type - greatest_data_type_count = max(count_by_data_type_no_mismatch[col_name], - key=count_by_data_type_no_mismatch[col_name].get) - if greatest_data_type_count == "string" or greatest_data_type_count == "boolean": - cat = "categorical" - elif greatest_data_type_count == "int" or greatest_data_type_count == "decimal": - cat = "numeric" - elif greatest_data_type_count == "date": - cat = "date" - elif greatest_data_type_count == "array": - cat = "array" - elif greatest_data_type_count == "binary": - cat = "binary" - elif greatest_data_type_count == "null": - cat = "null" - else: - cat = None - - assign(type_details, col_name + ".dtype", greatest_data_type_count, dict) - assign(type_details, col_name + ".type", cat, dict) - assign(type_details, col_name + ".stats", count_by_data_type[col_name], dict) - # print(type_details) - return type_details - @time_it def run(self, df, columns="*", buckets=MAX_BUCKETS, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, - mismatch=None): + mismatch=None, advanced_stats=True): """ Return dataframe statistical information in HTML Format :param df: Dataframe to be analyzed @@ -118,12 +66,13 @@ def run(self, df, columns="*", buckets=MAX_BUCKETS, infer=False, relative_error= :param relative_error: Relative Error for quantile discretizer calculation :param approx_count: Use approx_count_distinct or countDistinct :param mismatch: + :param advanced_stats: :return: """ columns = parse_columns(df, columns) - columns, output = self.dataset(df, columns, buckets, infer, relative_error, approx_count, format="dict", - mismatch=mismatch) + output = self.dataset(df, columns, buckets, infer, relative_error, approx_count, format="dict", + mismatch=mismatch, advanced_stats=advanced_stats) # Load jinja template_loader = jinja2.FileSystemLoader(searchpath=absolute_path("/profiler/templates/out")) @@ -218,13 +167,138 @@ def to_file(self, path=None, output="html"): def to_json(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, sample=10000, stats=True, mismatch=None): - columns, output = self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error, - approx_count=approx_count, - sample=sample, stats=stats, format="json", mismatch=mismatch) - return output + return self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error, + approx_count=approx_count, + sample=sample, stats=stats, format="json", mismatch=mismatch) + + def cols_needs_profiling(self, df, columns): + """ + Calculate the columns that needs to be profiled. + :return: + """ + # Metadata + # If not empty the profiler already run. + # So process the dataframe's metadata to be sure which columns need to be profiled + + actions = df.get_meta("transformations.actions") + are_actions = actions is not None and len(actions) > 0 + + # Process actions to check if any column must be processed + if self.is_cached(): + if are_actions: + + drop = ["drop"] + + def match_actions_names(_actions): + """ + Get a list of columns which have been applied and specific action. + :param _actions: + :return: + """ + + _actions_json = df.get_meta("transformations.actions") + + modified = [] + for action in _actions: + if _actions_json.get(action): + # Check if was renamed + col = _actions_json.get(action) + if len(match_renames(col)) == 0: + _result = col + else: + _result = match_renames(col) + modified = modified + _result + + return modified + + def match_renames(_col_names): + """ + Get a list fo columns and return the renamed version. + :param _col_names: + :return: + """ + _renamed_columns = [] + _actions = df.get_meta("transformations.actions") + _rename = _actions.get("rename") + + def get_name(_col_name): + c = _rename.get(_col_name) + # The column has not been rename. Get the actual column name + if c is None: + c = _col_name + return c + + if _rename: + # if a list + if is_list_of_str(_col_names): + for _col_name in _col_names: + # The column name has been changed. Get the new name + _renamed_columns.append(get_name(_col_name)) + # if a dict + if is_dict(_col_names): + for _col1, _col2 in _col_names.items(): + _renamed_columns.append({get_name(_col1): get_name(_col2)}) + + else: + _renamed_columns = _col_names + return _renamed_columns + + # New columns + new_columns = [] + + current_col_names = df.cols.names() + renamed_cols = match_renames(df.get_meta("transformations.columns")) + for current_col_name in current_col_names: + if current_col_name not in renamed_cols: + new_columns.append(current_col_name) + + # Rename keys to match new names + profiler_columns = self.output_columns["columns"] + actions = df.get_meta("transformations.actions") + rename = actions.get("rename") + if rename: + for k, v in actions["rename"].items(): + profiler_columns[v] = profiler_columns.pop(k) + profiler_columns[v]["name"] = v + + # Drop Keys + for col_names in match_actions_names(drop): + profiler_columns.pop(col_names) + + # Copy Keys + copy_columns = df.get_meta("transformations.actions.copy") + if copy_columns is not None: + for source, target in copy_columns.items(): + profiler_columns[target] = profiler_columns[source].copy() + profiler_columns[target]["name"] = target + # Check is a new column is a copied column + new_columns = list(set(new_columns) - set(copy_columns.values())) + + # Actions applied to current columns + + modified_columns = match_actions_names(Actions.list()) + calculate_columns = modified_columns + new_columns + + # Remove duplicated. + calculate_columns = list(set(calculate_columns)) + + elif not are_actions: + calculate_columns = None + # elif not is_cached: + else: + calculate_columns = columns + + return calculate_columns + + def is_cached(self): + """ + + :return: + """ + return len(self.output_columns) > 0 def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, - sample=10000, stats=True, format="dict", mismatch=None): + sample=10000, stats=True, format="dict", mismatch=None, advanced_stats=False): """ Return the profiling data in json format :param df: Dataframe to be processed @@ -237,136 +311,24 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT :param stats: calculate stats, if not only data table returned :param format: dict or json :param mismatch: - :return: json file + :return: dict or json """ - output_columns = self.output_columns + cols_to_profile = self.cols_needs_profiling(df, columns) - # Metadata - # If not empty the profiler already run. - # So process the dataframe's metadata to be sure which columns need to be profiled - is_cached = len(self.output_columns) > 0 - actions = df.get_meta("transformations.actions") - are_actions = actions is not None and len(actions) > 0 - - # Process actions to check if any column must be processed - if is_cached and are_actions: - - drop = ["drop"] - - def match_actions_names(_actions): - """ - Get a list of columns which have been applied and specific action. - :param _actions: - :return: - """ - - _actions_json = df.get_meta("transformations.actions") - - modified = [] - for action in _actions: - if _actions_json.get(action): - # Check if was renamed - col = _actions_json.get(action) - if len(match_renames(col)) == 0: - _result = col - else: - _result = match_renames(col) - modified = modified + _result - - return modified - - def match_renames(_col_names): - """ - Get a list fo columns and return the renamed version. - :param _col_names: - :return: - """ - _renamed_columns = [] - _actions = df.get_meta("transformations.actions") - _rename = _actions.get("rename") - - def get_name(_col_name): - c = _rename.get(_col_name) - # The column has not been rename. Get the actual column name - if c is None: - c = _col_name - return c - - if _rename: - # if a list - if is_list_of_str(_col_names): - for _col_name in _col_names: - # The column name has been changed. Get the new name - _renamed_columns.append(get_name(_col_name)) - # if a dict - if is_dict(_col_names): - for _col1, _col2 in _col_names.items(): - _renamed_columns.append({get_name(_col1): get_name(_col2)}) - - else: - _renamed_columns = _col_names - return _renamed_columns - - # New columns - new_columns = [] - - current_col_names = df.cols.names() - renamed_cols = match_renames(df.get_meta("transformations.columns")) - for current_col_name in current_col_names: - if current_col_name not in renamed_cols: - new_columns.append(current_col_name) - - # Rename keys to match new names - profiler_columns = self.output_columns["columns"] - actions = df.get_meta("transformations.actions") - rename = actions.get("rename") - if rename: - for k, v in actions["rename"].items(): - profiler_columns[v] = profiler_columns.pop(k) - profiler_columns[v]["name"] = v - - # Drop Keys - for col_names in match_actions_names(drop): - profiler_columns.pop(col_names) - - # Copy Keys - copy_columns = df.get_meta("transformations.actions.copy") - if copy_columns is not None: - for source, target in copy_columns.items(): - profiler_columns[target] = profiler_columns[source].copy() - profiler_columns[target]["name"] = target - # Check is a new column is a copied column - new_columns = list(set(new_columns) - set(copy_columns.values())) - - # Actions applied to current columns - - modified_columns = match_actions_names(Actions.list()) - calculate_columns = modified_columns + new_columns - - # Remove duplicated. - calculate_columns = list(set(calculate_columns)) - - elif is_cached and not are_actions: - calculate_columns = None - # elif not is_cached: - else: - calculate_columns = columns - - # print ("calculate_columns",calculate_columns) # Get the stats for all the columns if stats is True: # Are there column to process? - if calculate_columns or not is_cached: + if cols_to_profile or not self.is_cached(): rows_count = df.count() self.rows_count = rows_count self.cols_count = cols_count = len(df.columns) - output_columns = self.columns_stats(df, calculate_columns, buckets, infer, relative_error, approx_count, - mismatch) + output_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count, + mismatch, advanced_stats) # Update last profiling info # Merge old and current profiling - if is_cached: + if self.is_cached(): output_columns["columns"].update(self.output_columns["columns"]) assign(output_columns, "name", df.get_name(), dict) @@ -402,26 +364,27 @@ def get_name(_col_name): df = df.set_meta(value={}) df = df.columns_meta(df.cols.names()) - col_names = output_columns["columns"].keys() + # col_names = output_columns["columns"].keys() if format == "json": result = json.dumps(output_columns, ignore_nan=True, default=json_converter) else: result = output_columns self.output_columns = output_columns - df = df.set_meta("transformations.actions", {}) + df.set_meta("transformations.actions", {}) - return col_names, result + return result def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, - mismatch=None): + mismatch=None, advanced_stats=True): """ Return statistical information about a specific column in json format :param df: Dataframe to be processed :param columns: Columns that you want to profile :param buckets: Create buckets divided by range. Each bin is equal. :param infer: try to infer the column dataType - :param relative_error: relative error when the percentile is calculated. 0 is more precision/slow 1 less precision/faster + :param relative_error: relative error when the percentile is calculated. + 0 more precision/slow 1 less precision/faster :param approx_count: Use the function approx_count_distinct or countDistinct. approx_count_distinct is faster :param mismatch: :return: json object @@ -434,7 +397,26 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL logger.print("Processing Stats For columns...") # Get columns data types. This is necessary to make the pertinent histogram calculations. - type_details = self._count_data_types(df, columns, infer, mismatch) + count_by_data_type = df.cols.count_by_dtypes(columns, infer=infer, mismatch=mismatch) + + count_by_data_type_no_mismatch = copy.deepcopy(count_by_data_type) + + # Info from all the columns + type_details = {} + + for col_name in columns: + # Not count mismatch + if "mismatch" in count_by_data_type_no_mismatch[col_name]: + count_by_data_type_no_mismatch[col_name].pop("mismatch") + + # Get the greatest count by column data type + greatest_data_type_count = max(count_by_data_type_no_mismatch[col_name], + key=count_by_data_type_no_mismatch[col_name].get) + cat = PYTHON_TO_PROFILER.get(greatest_data_type_count) + + assign(type_details, col_name + ".dtype", greatest_data_type_count, dict) + assign(type_details, col_name + ".type", cat, dict) + assign(type_details, col_name + ".stats", count_by_data_type[col_name], dict) # Count the categorical, numerical, boolean and date columns count_types = {} @@ -446,23 +428,16 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL count_types[name] = 1 # List the data types this data set have - total = 0 - dtypes = [] - for key, value in count_types.items(): - if value > 0: - dtypes.append(key) - total = total + 1 - - count_types = fill_missing_col_types(count_types) + dtypes = [key for key, value in count_types.items() if value > 0] columns_info = {} - columns_info["count_types"] = count_types - columns_info["total_count_dtypes"] = total + columns_info["count_types"] = fill_missing_col_types(count_types) + columns_info["total_count_dtypes"] = len(dtypes) columns_info["dtypes_list"] = dtypes columns_info["columns"] = type_details # Aggregation - stats = Profiler.columns_agg(df, columns, buckets, relative_error, approx_count) + stats = self.columns_agg(df, columns, buckets, relative_error, approx_count, advanced_stats) # Calculate Frequency logger.print("Processing Frequency ...") @@ -471,7 +446,6 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL if df_freq is not None: freq = df_freq.cols.frequency("*", buckets, True, self.rows_count) - # Calculate percentage for col_name in columns: col_info = {} assign(col_info, "stats", stats[col_name], dict) @@ -480,7 +454,6 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL if col_name in freq: assign(col_info, "frequency", freq[col_name]) - col_info["stats"].update(self.extra_columns_stats(df, col_name, stats)) assign(col_info, "name", col_name) assign(col_info, "column_dtype", columns_info["columns"][col_name]['dtype']) assign(col_info, "dtypes_stats", columns_info["columns"][col_name]['stats']) @@ -491,46 +464,50 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL return columns_info - @staticmethod - def columns_agg(df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True): + def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, advanced_stats=True): columns = parse_columns(df, columns) n = BATCH_SIZE list_columns = [columns[i * n:(i + 1) * n] for i in range((len(columns) + n - 1) // n)] - # we have problems sending +100 columns at the same time. Process in batch + # we have problems sending +100 columns at the same time. Processing in batch result = {} + for i, cols in enumerate(list_columns): logger.print("Batch Stats {BATCH_NUMBER}. Processing columns{COLUMNS}".format(BATCH_NUMBER=i, COLUMNS=cols)) + # Count uniques is necessary for calculate the histogram buckets funcs = [count_uniques_agg] exprs = df.cols.create_exprs(cols, funcs, approx_count) - # TODO: in basic calculations funcs = [F.min, F.max] - funcs = [F.min, F.max, F.stddev, F.kurtosis, F.mean, F.skewness, F.sum, F.variance, zeros_agg] + funcs = [F.min, F.max] exprs.extend(df.cols.create_exprs(cols, funcs)) - # TODO: None in basic calculation - funcs = [percentile_agg] - exprs.extend(df.cols.create_exprs(cols, funcs, df, [0.05, 0.25, 0.5, 0.75, 0.95], - relative_error)) - funcs = [count_na_agg] exprs.extend(df.cols.create_exprs(cols, funcs, df)) + + if advanced_stats is True: + funcs = [F.stddev, F.kurtosis, F.mean, F.skewness, F.sum, F.variance, zeros_agg] + exprs.extend(df.cols.create_exprs(cols, funcs)) + + # TODO: None in basic calculation + funcs = [percentile_agg] + exprs.extend(df.cols.create_exprs(cols, funcs, df, [0.05, 0.25, 0.5, 0.75, 0.95], + relative_error)) + result.update(df.cols.exec_agg(exprs)) - exprs = [] n = BATCH_SIZE result_hist = {} list_columns = [columns[i * n:(i + 1) * n] for i in range((len(columns) + n - 1) // n)] + for i, cols in enumerate(list_columns): logger.print( "Batch Histogram {BATCH_NUMBER}. Processing columns{COLUMNS}".format(BATCH_NUMBER=i, COLUMNS=cols)) funcs = [hist_agg] - # min_max = None for col_name in cols: - # Only process histogram id numeric. For toher data types using frequency + # Only process histogram for numeric columns. For other data types using frequency if is_column_a(df, col_name, PYSPARK_NUMERIC_TYPES): min_max = {"min": result[col_name]["min"], "max": result[col_name]["max"]} buckets = result[col_name]["count_uniques"] - 1 @@ -539,7 +516,6 @@ def columns_agg(df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_c elif buckets == 0: buckets = 1 exprs.extend(df.cols.create_exprs(col_name, funcs, df, buckets, min_max)) - agg_result = df.cols.exec_agg(exprs) if agg_result is not None: result_hist.update(agg_result) @@ -548,56 +524,63 @@ def columns_agg(df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_c for col_name in result: if col_name in result_hist: result[col_name].update(result_hist[col_name]) - return result - def extra_columns_stats(self, df, col_name, stats): - """ - Specific Stats for numeric columns - :param df: - :param col_name: - :param stats: - :return: - """ + def extra_columns_stats(df, col_name, stats): + """ + Specific Stats for numeric columns + :param df: + :param col_name: + :param stats: + :return: + """ - col_info = {} + col_info = {} - max_value = stats[col_name]["max"] - min_value = stats[col_name]["min"] + max_value = stats[col_name]["max"] + min_value = stats[col_name]["min"] - if is_column_a(df, col_name, PYSPARK_NUMERIC_TYPES): - stddev = stats[col_name]['stddev'] - mean = stats[col_name]['mean'] + if is_column_a(df, col_name, PYSPARK_NUMERIC_TYPES): + stddev = stats[col_name]['stddev'] + mean = stats[col_name]['mean'] - quantile = stats[col_name]["percentile"] - if max_value is not None and min_value is not None: - col_info['range'] = max_value - min_value - else: - col_info['range'] = None + quantile = stats[col_name]["percentile"] + if max_value is not None and min_value is not None: + col_info['range'] = max_value - min_value + else: + col_info['range'] = None - col_info['median'] = quantile["0.5"] + col_info['median'] = quantile["0.5"] - q1 = quantile["0.25"] - q3 = quantile["0.75"] + q1 = quantile["0.25"] + q3 = quantile["0.75"] - if q1 is not None and q3 is not None: - col_info['interquartile_range'] = q3 - q1 - else: - col_info['interquartile_range'] = None + if q1 is not None and q3 is not None: + col_info['interquartile_range'] = q3 - q1 + else: + col_info['interquartile_range'] = None - if mean != 0 and mean is not None: - col_info['coef_variation'] = round((stddev / mean), 5) - else: - col_info['coef_variation'] = None + if mean != 0 and mean is not None: + col_info['coef_variation'] = round((stddev / mean), 5) + else: + col_info['coef_variation'] = None + + mad = df.cols.mad(col_name) + if mad is not None: + col_info['mad'] = round(df.cols.mad(col_name), 5) + else: + col_info['mad'] = None + + col_info['p_count_na'] = round((stats[col_name]['count_na'] * 100) / self.rows_count, 2) + col_info['p_count_uniques'] = round((stats[col_name]['count_uniques'] * 100) / self.rows_count, 2) + return col_info + + if advanced_stats is True: + for col_name in columns: + result.update(extra_columns_stats(df, col_name, result)) + + return result - mad = df.cols.mad(col_name) - if mad is not None: - col_info['mad'] = round(df.cols.mad(col_name), 5) - else: - col_info['mad'] = None - col_info['p_count_na'] = round((stats[col_name]['count_na'] * 100) / self.rows_count, 2) - col_info['p_count_uniques'] = round((stats[col_name]['count_uniques'] * 100) / self.rows_count, 2) - return col_info @staticmethod def missing_values(df, columns):