-
Notifications
You must be signed in to change notification settings - Fork 78
/
most_popular.py
149 lines (134 loc) · 5.49 KB
/
most_popular.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: fuxuemingzhu
@site: www.fuxuemingzhu.cn
@file: most_popular.py
@time: 18-4-17 下午6:50
Description : Recommend by Most Popular movies.
"""
import random
import math
from collections import defaultdict
from operator import itemgetter
import similarity
import utils
class MostPopular:
"""
Recommend via Random Choice.
Top-N recommendation.
"""
def __init__(self, n_rec_movie=10, save_model=True):
"""
Init MostPopular with n_rec_movie.
:return: None
"""
print("MostPopular start...\n")
self.n_rec_movie = n_rec_movie
self.trainset = None
self.save_model = save_model
def fit(self, trainset):
"""
Fit the trainset via count movies.
:param trainset: train dataset
:return: None
"""
model_manager = utils.ModelManager()
try:
self.movie_popular = model_manager.load_model('movie_popular')
self.movie_count = model_manager.load_model('movie_count')
self.trainset = model_manager.load_model('trainset')
self.total_movies = model_manager.load_model('total_movies')
self.movie_popular_sort = model_manager.load_model('movie_popular_sort')
print('MostPopular model has saved before.\nLoad model success...\n')
except OSError:
print('No model saved before.\nTrain a new model...')
self.trainset = trainset
self.movie_popular, self.movie_count = similarity.calculate_movie_popular(trainset)
self.total_movies = list(self.movie_popular.keys())
self.movie_popular_sort = sorted(self.movie_popular.items(), key=itemgetter(1), reverse=True)
print('Train a new model success.')
if self.save_model:
model_manager.save_model(self.movie_popular, 'movie_popular')
model_manager.save_model(self.movie_count, 'movie_count')
model_manager.save_model(self.total_movies, 'total_movies')
model_manager.save_model(self.movie_popular_sort, 'movie_popular_sort')
print('The new model has saved success.\n')
def recommend(self, user):
"""
Random recommend N movies for the user.
:param user: The user we recommend movies to.
:return: the N best score movies
"""
if not self.n_rec_movie or not self.trainset or not self.movie_popular \
or not self.movie_count or not self.movie_popular_sort:
raise NotImplementedError('MostPopular has not init or fit method has not called yet.')
N = self.n_rec_movie
# Recommend N most popular movies for the user.
predict_movies = list()
watched_movies = self.trainset[user]
for movie, _ in self.movie_popular_sort:
if len(predict_movies) < N and movie not in watched_movies:
predict_movies.append(movie)
return predict_movies
def test(self, testset):
"""
Test the recommendation system by recommending scores to all users in testset.
:param testset: test dataset
:return:
"""
if not self.n_rec_movie or not self.trainset or not self.movie_popular \
or not self.movie_count or not self.movie_popular_sort:
raise ValueError('UserCF has not init or fit method has not called yet.')
self.testset = testset
print('Test recommendation system start...')
N = self.n_rec_movie
# varables for precision and recall
hit = 0
rec_count = 0
test_count = 0
# varables for coverage
all_rec_movies = set()
# varables for popularity
popular_sum = 0
# record the calculate time has spent.
test_time = utils.LogTime(print_step=1000)
for i, user in enumerate(self.trainset):
test_movies = self.testset.get(user, {})
rec_movies = self.recommend(user) # type:list
for movie in rec_movies:
if movie in test_movies:
hit += 1
all_rec_movies.add(movie)
popular_sum += math.log(1 + self.movie_popular[movie])
# log steps and times.
rec_count += N
test_count += len(test_movies)
# print time per 500 times.
test_time.count_time()
precision = hit / (1.0 * rec_count)
recall = hit / (1.0 * test_count)
coverage = len(all_rec_movies) / (1.0 * self.movie_count)
popularity = popular_sum / (1.0 * rec_count)
print('Test recommendation system success.')
test_time.finish()
print('precision=%.4f\trecall=%.4f\tcoverage=%.4f\tpopularity=%.4f\n' %
(precision, recall, coverage, popularity))
def predict(self, testset):
"""
Recommend movies to all users in testset.
:param testset: test dataset
:return: `dict` : recommend list for each user.
"""
movies_recommend = defaultdict(list)
print('Predict scores start...')
# record the calculate time has spent.
predict_time = utils.LogTime(print_step=500)
for i, user in enumerate(testset):
rec_movies = self.recommend(user) # type:list
movies_recommend[user].append(rec_movies)
# log steps and times.
predict_time.count_time()
print('Predict scores success.')
predict_time.finish()
return movies_recommend