From b01a8f8d2cce1cfa8998b19941de8f06d1d54db5 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 23 Nov 2019 20:04:36 -0600 Subject: [PATCH] Now print the cluster items count --- optimus/ml/distancecluster.py | 63 +++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/optimus/ml/distancecluster.py b/optimus/ml/distancecluster.py index 3929f908..b92fe061 100644 --- a/optimus/ml/distancecluster.py +++ b/optimus/ml/distancecluster.py @@ -1,5 +1,4 @@ import simplejson as json - from pyspark.sql import functions as F from optimus import Optimus @@ -10,69 +9,83 @@ LEVENSHTEIN_DISTANCE -def levenshtein_json(df, input_col): +def levenshtein_json(df, input_col, threshold: int = None): """ Output the levenshtein distance in json format :param df: Spark Dataframe - :param input_col: + :param input_col: Column to be processed + :param threshold: number :return: """ - df = keycollision.fingerprint(df, input_col) - # df.table() + # Create fingerprint + df_fingerprint = keycollision.fingerprint(df, input_col) + + # Names fingerprint_col = name_col(input_col, FINGERPRINT_COL) distance_col_name = name_col(input_col, LEVENSHTEIN_DISTANCE) - temp_col_1 = input_col + "_LEVENSHTEIN_1" temp_col_2 = input_col + "_LEVENSHTEIN_2" + count = "count" # Prepare the columns to calculate the cross join - result = df.select(input_col, F.col(fingerprint_col).alias(temp_col_1)).distinct() + fingerprint_count = df_fingerprint.select(input_col, fingerprint_col).groupby(input_col) \ + .agg(F.first(input_col).alias(temp_col_1), F.first(fingerprint_col).alias(temp_col_2), + F.count(input_col).alias(count)) \ + .select(temp_col_1, temp_col_2, count).collect() - df = df.select(input_col, F.col(fingerprint_col).alias(temp_col_1), - F.col(fingerprint_col).alias(temp_col_2)).distinct() + df = df_fingerprint.select(input_col, F.col(fingerprint_col).alias(temp_col_1), + F.col(fingerprint_col).alias(temp_col_2)).distinct() # Create all the combination between the string to calculate the levenshtein distance df = df.select(temp_col_1).crossJoin(df.select(temp_col_2)) \ .withColumn(distance_col_name, F.levenshtein(F.col(temp_col_1), F.col(temp_col_2))) - # if Optimus.cache: - # df = df.cache() - # Select only the string with shortest path distance_col = name_col(input_col, LEVENSHTEIN_DISTANCE) distance_r_col = input_col + "_LEVENSHTEIN_DISTANCE_R" temp_r = "TEMP_R" - df_r = (df.rows.drop(F.col(distance_col) == 0) + if threshold is None: + where = ((F.col(distance_col) == 0) & (F.col(temp_col_1) != F.col(temp_col_2))) + else: + where = (F.col(distance_col) == 0) | (F.col(distance_col) > threshold) + + df_r = (df.rows.drop(where) + .cols.replace(distance_col, 0, None) .groupby(temp_col_1) .agg(F.min(distance_col).alias(distance_r_col)) + # .cols.rename(distance_col, distance_r_col) .cols.rename(temp_col_1, temp_r)).repartition(1) df = df.join(df_r, ((df_r[temp_r] == df[temp_col_1]) & (df_r[distance_r_col] == df[distance_col]))) \ .select(temp_col_1, distance_col, temp_col_2).repartition(1) # Create the clusters/lists - df = (df.groupby(temp_col_1) - .agg(F.collect_list(temp_col_2))) + .agg(F.collect_list(temp_col_2), F.count(temp_col_2))) + # Replace ngram per string kv_dict = {} - for row in result.collect(): + for row in fingerprint_count: _row = list(row.asDict().values()) - kv_dict[_row[1]] = _row[0] + kv_dict[_row[1]] = {_row[0]: _row[2]} - kv_result_df = {} + result = {} for row in df.collect(): _row = list(row.asDict().values()) - kv_result_df[_row[0]] = _row[1] + d = {} + for i in _row[1]: + key = list(kv_dict[i].keys())[0] + value = list(kv_dict[i].values())[0] + d[key] = value + key = list(kv_dict[_row[0]].keys())[0] + value = list(kv_dict[_row[0]].values())[0] + d.update({key: value}) + result[key] = d + + # Order - result = {} - for k, v in kv_result_df.items(): - a = result[kv_dict[k]] = [] - for iv in v: - a.append(kv_dict[iv]) - # json.dump(data, outfile, indent=4, ensure_ascii=False, default=json_converter) result = json.dumps(result, ignore_nan=True, default=json_converter) return result