-
Notifications
You must be signed in to change notification settings - Fork 0
/
movie-similarities-1m.py
107 lines (81 loc) · 3.63 KB
/
movie-similarities-1m.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
#To run on EMR successfully + output results for Star Wars:
#aws s3 cp s3://sundog-spark/MovieSimilarities1M.py ./
#aws s3 sp c3://sundog-spark/ml-1m/movies.dat ./
#spark-submit --executor-memory 1g MovieSimilarities1M.py 260
def loadMovieNames():
movieNames = {}
with open("movies.dat") as f:
for line in f:
fields = line.split("::")
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames
def makePairs((user, ratings)):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))
def filterDuplicates( (userID, ratings) ):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
def computeCosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
conf = SparkConf()
sc = SparkContext(conf = conf)
print("\nLoading movie names...")
nameDict = loadMovieNames()
data = sc.textFile("s3n://sundog-spark/ml-1m/ratings.dat")
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()
# Save the results if desired
moviePairSimilarities.sortByKey()
moviePairSimilarities.saveAsTextFile("movie-sims")
# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):
scoreThreshold = 0.97
coOccurenceThreshold = 1000
movieID = int(sys.argv[1])
# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
(pair[0] == movieID or pair[1] == movieID) \
and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)
# Sort by quality score.
results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)
print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == movieID):
similarMovieID = pair[1]
print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))