Skip to content

Commit

Permalink
Now print the cluster items count
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Nov 24, 2019
1 parent b15bde4 commit b01a8f8
Showing 1 changed file with 38 additions and 25 deletions.
63 changes: 38 additions & 25 deletions optimus/ml/distancecluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import simplejson as json

from pyspark.sql import functions as F

from optimus import Optimus
Expand All @@ -10,69 +9,83 @@
LEVENSHTEIN_DISTANCE


def levenshtein_json(df, input_col):
def levenshtein_json(df, input_col, threshold: int = None):
"""
Output the levenshtein distance in json format
:param df: Spark Dataframe
:param input_col:
:param input_col: Column to be processed
:param threshold: number
:return:
"""
df = keycollision.fingerprint(df, input_col)
# df.table()
# Create fingerprint
df_fingerprint = keycollision.fingerprint(df, input_col)

# Names
fingerprint_col = name_col(input_col, FINGERPRINT_COL)
distance_col_name = name_col(input_col, LEVENSHTEIN_DISTANCE)

temp_col_1 = input_col + "_LEVENSHTEIN_1"
temp_col_2 = input_col + "_LEVENSHTEIN_2"
count = "count"

# Prepare the columns to calculate the cross join
result = df.select(input_col, F.col(fingerprint_col).alias(temp_col_1)).distinct()
fingerprint_count = df_fingerprint.select(input_col, fingerprint_col).groupby(input_col) \
.agg(F.first(input_col).alias(temp_col_1), F.first(fingerprint_col).alias(temp_col_2),
F.count(input_col).alias(count)) \
.select(temp_col_1, temp_col_2, count).collect()

df = df.select(input_col, F.col(fingerprint_col).alias(temp_col_1),
F.col(fingerprint_col).alias(temp_col_2)).distinct()
df = df_fingerprint.select(input_col, F.col(fingerprint_col).alias(temp_col_1),
F.col(fingerprint_col).alias(temp_col_2)).distinct()

# Create all the combination between the string to calculate the levenshtein distance
df = df.select(temp_col_1).crossJoin(df.select(temp_col_2)) \
.withColumn(distance_col_name, F.levenshtein(F.col(temp_col_1), F.col(temp_col_2)))

# if Optimus.cache:
# df = df.cache()

# Select only the string with shortest path
distance_col = name_col(input_col, LEVENSHTEIN_DISTANCE)
distance_r_col = input_col + "_LEVENSHTEIN_DISTANCE_R"
temp_r = "TEMP_R"

df_r = (df.rows.drop(F.col(distance_col) == 0)
if threshold is None:
where = ((F.col(distance_col) == 0) & (F.col(temp_col_1) != F.col(temp_col_2)))
else:
where = (F.col(distance_col) == 0) | (F.col(distance_col) > threshold)

df_r = (df.rows.drop(where)
.cols.replace(distance_col, 0, None)
.groupby(temp_col_1)
.agg(F.min(distance_col).alias(distance_r_col))
# .cols.rename(distance_col, distance_r_col)
.cols.rename(temp_col_1, temp_r)).repartition(1)

df = df.join(df_r, ((df_r[temp_r] == df[temp_col_1]) & (df_r[distance_r_col] == df[distance_col]))) \
.select(temp_col_1, distance_col, temp_col_2).repartition(1)

# Create the clusters/lists

df = (df.groupby(temp_col_1)
.agg(F.collect_list(temp_col_2)))
.agg(F.collect_list(temp_col_2), F.count(temp_col_2)))

# Replace ngram per string
kv_dict = {}
for row in result.collect():
for row in fingerprint_count:
_row = list(row.asDict().values())
kv_dict[_row[1]] = _row[0]
kv_dict[_row[1]] = {_row[0]: _row[2]}

kv_result_df = {}
result = {}
for row in df.collect():
_row = list(row.asDict().values())
kv_result_df[_row[0]] = _row[1]
d = {}
for i in _row[1]:
key = list(kv_dict[i].keys())[0]
value = list(kv_dict[i].values())[0]
d[key] = value
key = list(kv_dict[_row[0]].keys())[0]
value = list(kv_dict[_row[0]].values())[0]
d.update({key: value})
result[key] = d

# Order

result = {}
for k, v in kv_result_df.items():
a = result[kv_dict[k]] = []
for iv in v:
a.append(kv_dict[iv])

# json.dump(data, outfile, indent=4, ensure_ascii=False, default=json_converter)
result = json.dumps(result, ignore_nan=True, default=json_converter)
return result

Expand Down

0 comments on commit b01a8f8

Please sign in to comment.