-
Notifications
You must be signed in to change notification settings - Fork 0
/
analyzer.py
194 lines (169 loc) · 9.47 KB
/
analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import gc
import numpy as np
from redis import Redis
from scipy.stats import pearsonr
import h5py
from datetime import datetime, timedelta
from utils import log_duration, logger
class BangumiAnalyzer:
def __init__(self, db, conf) -> None:
self.conf = conf
self.db = db
self.redis = Redis(self.conf.REDIS_HOST, self.conf.REDIS_PORT, db=self.conf.REDIS_DATABASE,
password=self.conf.REDIS_PASSWORD)
self.redis.config_set('maxmemory', self.conf.REDIS_MAX_MEMORY)
self.redis.config_set('maxmemory-policy', 'allkeys-lru')
@log_duration
def get_animes_authors_refs_matrix(self):
mat, media_ids, mids = None, None, None
try:
with h5py.File(self.conf.HDF5_FILENAME, 'r') as f:
last_update = datetime.strptime(f.attrs['last_update'], '%Y-%m-%d %H:%M:%S.%f')
if last_update > datetime.now() - timedelta(hours=self.conf.HDF5_DATA_SET_TTL):
mat = np.array(f['animes_authors_refs_matrix'])
media_ids = np.array(f['media_ids'])
mids = np.array(f['mids'])
else:
raise ValueError('Data Set Expired.')
except (OSError, KeyError, ValueError) as e:
logger.warning('Data Set in HDF5 File Will Not be Used for Ref Matrix Because %s.' % e)
if mat is None or media_ids is None or mids is None:
media_ids, media_id_indexes, cur = [], {}, 0
for entrance in self.db.get_all_entrances():
media_ids.append(entrance['media_id'])
media_id_indexes[str(entrance['media_id'])] = cur
cur += 1
authors_count = self.db.get_authors_count()
mat = np.zeros((authors_count, len(media_ids)), dtype='int8')
mids = []
cur = 0
for mid, reviews, _ in self.db.get_valid_author_ratings_follow_pairs():
mids.append(mid)
for review in reviews:
index = str(review['media_id'])
mat[cur, media_id_indexes[index]] = review['score']
cur += 1
with h5py.File(self.conf.HDF5_FILENAME, 'w') as f:
f.create_dataset('animes_authors_refs_matrix', data=mat)
f.create_dataset('media_ids', data=media_ids)
f.create_dataset('mids', data=mids)
return mat, media_ids, mids
@staticmethod
def asscalar(value):
return np.asscalar(value) if (type(value) != int and type(value) != float) else value
@staticmethod
def calc_similarity(lhs, rhs):
index = np.logical_and(lhs > 0, rhs > 0)
lhs_shared, rhs_shared = lhs[index], rhs[index]
return pearsonr(lhs, rhs)[0] if len(lhs_shared) > 1 else -1
@log_duration
def get_similarity_matrix(self, refs_matrix, dset):
mat = None
try:
with h5py.File(self.conf.HDF5_FILENAME, 'r') as f:
mat = np.array(f[dset])
except (OSError, KeyError) as e:
logger.warning('Data Set in HDF5 File Will Not be Used for Similarity Matrix Because %s.' % e)
if mat is None:
_, cols_count = refs_matrix.shape
mat = np.zeros((cols_count, cols_count))
for i in range(0, cols_count):
logger.info('Calculating Similarities %s/%s...' % (i, cols_count))
for j in range(i + 1, cols_count):
mat[i, j] = BangumiAnalyzer.calc_similarity(refs_matrix[:, i], refs_matrix[:, j])
mat += mat.T
np.fill_diagonal(mat, -1)
with h5py.File(self.conf.HDF5_FILENAME, 'r+') as f:
f.create_dataset(dset, data=mat)
f.attrs['last_update'] = str(datetime.now())
return mat
@log_duration
def process_animes_top_matches(self, ref_mat, media_ids) -> None:
logger.info('Calculating Animes Similarity Matrix...')
animes_sim_mat = self.get_similarity_matrix(ref_mat, 'animes_similarity_matrix')
logger.info('Animes Similarity Matrix %s Calculated.' % str(animes_sim_mat.shape))
animes_sim_indexes_mat = np.flip(animes_sim_mat.argsort()[:,
0 - self.conf.ANALYZE_ANIME_TOP_MATCHES_SIZE:], axis=1)
logger.info('Animes Sim-Indexes %s Get Finished.' % str(animes_sim_indexes_mat.shape))
cur = 0
for anime_sim_indexes in animes_sim_indexes_mat:
self.db.update_anime_top_matches(self.asscalar(media_ids[cur]), [{
'media_id': self.asscalar(media_ids[index]),
'similarity': self.asscalar(animes_sim_mat[cur, index])
} for index in anime_sim_indexes])
cur += 1
logger.info('Animes Top-Matches Persisted.')
def process_author_recommendation(self, total_scores_with_weight, total_weight, mid, media_ids, top_matches):
recommendation = []
recommend_indexes_sorted = np.flip((total_scores_with_weight / total_weight).argsort(), axis=0)
author_watched_media_ids = self.db.get_author_watched_media_ids(self.asscalar(mid))
for index in recommend_indexes_sorted:
if len(recommendation) == self.conf.ANALYZE_AUTHOR_RECOMMENDATION_SIZE:
break
if media_ids[index] not in author_watched_media_ids:
recommendation.append(self.asscalar(media_ids[index]))
self.db.update_author_recommendation(self.asscalar(mid), top_matches, recommendation)
@log_duration
def process_authors_recommendation(self, ref_mat, media_ids, mids) -> None:
logger.info('Calculating Animes Similarities...')
try:
authors_sim_mat = self.get_similarity_matrix(ref_mat.T, 'authors_similarity_matrix')
logger.info('Authors Similarity Matrix %s Calculated Using Numpy.' % str(authors_sim_mat.shape))
authors_sim_indexes_mat = np.flip(authors_sim_mat.argsort()[:,
0 - self.conf.ANALYZE_AUTHOR_TOP_MATCHES_SIZE:], axis=1)
logger.info('Authors Sim-Indexes %s Get Finished.' % str(authors_sim_indexes_mat.shape))
total_scores_with_weight, total_weight = 0, 0
for i in range(0, len(authors_sim_indexes_mat)):
top_matches = []
for index in authors_sim_indexes_mat[i]:
if i != index:
similarity = self.asscalar(authors_sim_mat[i, index])
top_matches.append({
'mid': self.asscalar(mids[index]),
'similarity': similarity
})
total_scores_with_weight += similarity * ref_mat[index]
total_weight += similarity
self.process_author_recommendation(total_scores_with_weight, total_weight, mids[i], media_ids,
top_matches)
except MemoryError:
logger.warning('Memory Error Caught, Using Redis as Cache to Calculate Similarities.')
for i in range(0, len(mids)):
if self.db.is_need_re_calculate(self.asscalar(mids[i])):
logger.info("[%s/%s] Calculating %s's Top-Matches and Recommendation..." % (i, len(mids), mids[i]))
similarities = np.empty((len(mids),))
similarities[i] = -2
for j in range(0, len(mids)):
if i != j:
index_pair = '%s:%s' % (mids[min(i, j)], mids[max(i, j)])
similarity = self.redis.get(index_pair)
if similarity is None:
similarity = self.calc_similarity(ref_mat[i], ref_mat[j])
self.redis.set(index_pair, similarity)
self.redis.expire(index_pair, self.conf.REDIS_SIMILARITY_TTL)
similarities[j] = similarity
sorted_indexes = np.flip(similarities.argsort(),
axis=0)[0 - self.conf.ANALYZE_AUTHOR_TOP_MATCHES_SIZE:]
top_matches, recommendation = [], []
total_scores_with_weight, total_weight = 0, 0
for index in sorted_indexes:
if i != index:
similarity = self.asscalar(similarities[index])
top_matches.append({'mid': self.asscalar(mids[index]), 'similarity': similarity})
total_scores_with_weight += similarity * ref_mat[index]
total_weight += similarity
self.process_author_recommendation(total_scores_with_weight, total_weight, mids[i], media_ids,
top_matches)
else:
logger.info('[%s/%s] Skip Calculating %s.' % (i, len(mids), mids[i]))
logger.info('Authors Top-Matches Persisted.')
def analyze(self) -> None:
logger.info('New Analyze Beginning...')
logger.info('Getting Ref Matrix...')
ref_mat, media_ids, mids = self.get_animes_authors_refs_matrix()
logger.info('Ref Matrix %s Got, with %s Medias and %s Authors.'
% (ref_mat.shape, len(media_ids), len(mids)))
self.process_animes_top_matches(ref_mat, media_ids)
self.process_authors_recommendation(ref_mat, media_ids, mids)
logger.info('Analyzing Tasks Finished.')
gc.collect()