Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… into dev
  • Loading branch information
iggy.lee committed Nov 25, 2019
2 parents 14af434 + 53ef513 commit de3966f
Show file tree
Hide file tree
Showing 15 changed files with 2,619 additions and 89 deletions.
10 changes: 8 additions & 2 deletions buffalo/algo/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def init_factors(self):

def _get_topk_recommendation(self, rows, topk, pool=None):
p = self.P[rows]
topks = super()._get_topk_recommendation(p, self.Q, pool, topk, self.opt.num_workers)
topks = super()._get_topk_recommendation(
p, self.Q,
pb=None, Qb=None,
pool=pool, topk=topk, num_workers=self.opt.num_workers)
return zip(rows, topks)

def _get_most_similar_item(self, col, topk, pool):
Expand All @@ -103,6 +106,10 @@ def get_scores(self, row_col_pairs):
rets = {(r, c): self.P[r].dot(self.Q[c]) for r, c in row_col_pairs}
return rets

def _get_scores(self, row, col):
scores = (self.P[row] * self.Q[col]).sum(axis=1)
return scores

def _get_buffer(self):
buf = BufferedDataMatrix()
buf.initialize(self.data)
Expand Down Expand Up @@ -154,7 +161,6 @@ def train(self):
self.obj.set_placeholder(lindptr, rindptr, batch_size)

best_loss, rmse, self.validation_result = 987654321.0, None, {}
self.prepare_evaluation()
self.initialize_tensorboard(self.opt.num_iters)
full_st = time.time()
for i in range(self.opt.num_iters):
Expand Down
23 changes: 17 additions & 6 deletions buffalo/algo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,31 @@ def _normalize(self, feat):
def initialize(self):
self.__early_stopping = {'round': 0,
'min_loss': 987654321}
if self.opt.random_seed:
np.random.seed(self.opt.random_seed)

@abc.abstractmethod
def normalize(self, group='item'):
raise NotImplementedError

def _get_topk_recommendation(self, p, Q, pool, topk, num_workers):
# Warning: This should be inherited.
if pool is None:
topks = self.get_topk(p.dot(Q.T), k=topk, num_threads=num_workers)
else:
topks = self.get_topk(p.dot(Q[pool].T), k=topk, num_threads=num_workers)
def _get_topk_recommendation(self, p, Q, pb, Qb, pool, topk, num_workers):
if pool is not None:
Q = Q[pool]
if Qb is not None:
Qb = Qb[pool]

scores = p.dot(Q.T)
if pb is not None:
scores += pb
if Qb is not None:
scores += Qb.T

topks = self.get_topk(scores, k=topk, num_threads=num_workers)
if pool is not None:
topks = np.array([pool[t] for t in topks])
return topks


def topk_recommendation(self, keys, topk=10, pool=None):
"""Return TopK recommendation for each users(keys)
Expand Down
19 changes: 11 additions & 8 deletions buffalo/algo/bpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def normalize(self, group='item'):
def initialize(self):
super().initialize()
assert self.data, 'Data is not setted'
if self.opt.random_seed:
np.random.seed(self.opt.random_seed)
self.buf = BufferedDataMatrix()
self.buf.initialize(self.data)
self.init_factors()
Expand Down Expand Up @@ -119,21 +117,27 @@ def prepare_sampling(self):

def _get_topk_recommendation(self, rows, topk, pool=None):
p = self.P[rows]
Q = self.Q
if self.opt.use_bias:
p = np.hstack([p, np.ones(shape=(p.shape[0], 1))]).astype("float32")
Q = np.hstack([Q, self.Qb]).astype("float32")
Qb = self.Qb if self.opt.use_bias else None

topks = super()._get_topk_recommendation(
p, self.Q,
pb=None, Qb=Qb,
pool=pool, topk=topk, num_workers=self.opt.num_workers)

topks = super()._get_topk_recommendation(p, Q, pool, topk, self.opt.num_workers)
return zip(rows, topks)


def _get_most_similar_item(self, col, topk, pool):
return super()._get_most_similar_item(col, topk, self.Q, self.opt._nrz_Q, pool)

def get_scores(self, row_col_pairs):
rets = {(r, c): self.P[r].dot(self.Q[c]) + self.Qb[c][0] for r, c in row_col_pairs}
return rets

def _get_scores(self, row, col):
scores = (self.P[row] * self.Q[col]).sum(axis=1) + self.Qb[col][0]
return scores

def sampling_loss_samples(self):
users, positives, negatives = [], [], []
if self.opt.compute_loss_on_training:
Expand Down Expand Up @@ -220,7 +224,6 @@ def _finalize_train(self):

def train(self):
self.validation_result = {}
self.prepare_evaluation()
self.initialize_tensorboard(self.opt.num_iters)
self.sampling_loss_samples()
best_loss = 987654321.0
Expand Down
10 changes: 8 additions & 2 deletions buffalo/algo/cfr.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ def initialize(self):

def _get_topk_recommendation(self, rows, topk, pool=None):
u = self.U[rows]
topks = super()._get_topk_recommendation(u, self.I, pool, topk, self.opt.num_workers)
topks = super()._get_topk_recommendation(
u, self.I,
pb=None, Qb=None,
pool=pool, topk=topk, num_workers=self.opt.num_workers)
return zip(rows, topks)

def _get_most_similar_item(self, col, topk, pool):
Expand All @@ -116,6 +119,10 @@ def get_scores(self, row_col_pairs):
rets = {(r, c): self.U[r].dot(self.I[c]) for r, c in row_col_pairs}
return rets

def _get_scores(self, row, col):
scores = (self.U[row] * self.I[col]).sum(axis=1)
return scores

def _get_buffer(self):
buf = BufferedDataMatrix()
buf.initialize(self.data, with_sppmi=True)
Expand Down Expand Up @@ -186,7 +193,6 @@ def train(self):
assert self.is_initialized, "embedding matrix is not initialized"
buf = self._get_buffer()
best_loss, self.validation_result = 987654321.0, {}
self.prepare_evaluation()
self.initialize_tensorboard(self.opt.num_iters)
scale = self.compute_scale()
for i in range(self.opt.num_iters):
Expand Down
3 changes: 0 additions & 3 deletions buffalo/algo/w2v.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ def _get_feature(self, index, group='item'):
def initialize(self):
super().initialize()
assert self.data, 'Data is not setted'
if self.opt.random_seed:
np.random.seed(self.opt.random_seed)
self.buf = BufferedDataStream()
self.buf.initialize(self.data)
self.build_vocab()
Expand Down Expand Up @@ -200,7 +198,6 @@ def _iterate(self):

def train(self):
self.validation_result = {}
self.prepare_evaluation()
self.obj.launch_workers()
for i in range(self.opt.num_iters):
start_t = time.time()
Expand Down
64 changes: 52 additions & 12 deletions buffalo/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import h5py
import numpy as np

from scipy.sparse import csr_matrix
from buffalo.data import prepro
from buffalo.misc import aux, log
from buffalo.data.fileio import chunking_into_bins, sort_and_compressed_binarization
Expand Down Expand Up @@ -198,7 +198,7 @@ def _create_database(self, path, **kwargs):
def _create_validation(self, f, **kwargs):
if not self.opt.data.validation:
return kwargs['num_nnz']
num_nnz = kwargs['num_nnz']
num_users, num_nnz = kwargs['num_users'], kwargs['num_nnz']
method = self.opt.data.validation.name

f.create_group('vali')
Expand All @@ -208,22 +208,20 @@ def _create_validation(self, f, **kwargs):
if method == 'sample':
sz = min(self.opt.data.validation.max_samples,
int(num_nnz * self.opt.data.validation.p))
g.create_dataset('row', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('col', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('val', (sz,), dtype='float32', maxshape=(sz,))
g.create_dataset('indexes', (sz,), dtype='int64', maxshape=(sz,))
# Watch out, create_working_data cannot deal with last line of data
# for validation data thus we have to reduce index 1.
g['indexes'][:] = np.random.choice(num_nnz - 1, sz, replace=False)
num_nnz -= sz
elif method in ['newest']:
sz = kwargs['num_validation_samples']
g.create_dataset('row', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('col', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('val', (sz,), dtype='float32', maxshape=(sz,))
g.attrs['n'] = self.opt.data.validation.n
# We don't need to reduce sample size for validation samples. It
# already applied on caller side.

g.create_dataset('row', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('col', (sz,), dtype='int32', maxshape=(sz,))
g.create_dataset('val', (sz,), dtype='float32', maxshape=(sz,))
g.attrs['num_samples'] = sz
return num_nnz

Expand All @@ -232,10 +230,52 @@ def fill_validation_data(self, db, validation_data):
return
validation_data = [line.strip().split() for line in validation_data]
assert len(validation_data) == db['vali'].attrs['num_samples'], 'Mimatched validation data'
db['vali']['row'][:] = [int(r) - 1 for r, _, _ in validation_data] # 0-based
db['vali']['col'][:] = [int(c) - 1 for _, c, _ in validation_data] # 0-based
V = np.array([float(v) for _, _, v in validation_data], dtype=np.float32)
db['vali']['val'][:] = self.value_prepro(V)

num_users, num_items = db.attrs['num_users'], db.attrs['num_items']
row = [int(r) - 1 for r, _, _ in validation_data] # 0-based
col = [int(c) - 1 for _, c, _ in validation_data] # 0-based
val = np.array([float(v) for _, _, v in validation_data], dtype=np.float32)
temp_mat = csr_matrix((val, (row, col)), (num_users, num_items))
db['vali']['row'][:] = row
db['vali']['col'][:] = col
db['vali']['val'][:] = self.value_prepro(temp_mat.data)

def _prepare_validation_data(self):
if hasattr(self, 'vali_data'):
return True

db = self.handle
num_users, num_items = db.attrs['num_users'], db.attrs['num_items']
row = db['vali']['row'][::]
col = db['vali']['col'][::]
val = db['vali']['val'][::]

_temp_mat = csr_matrix((val, (row, col)), (num_users, num_items))
indptr = _temp_mat.indptr[1:]
key = _temp_mat.indices
vali_rows = np.arange(len(indptr))[np.ediff1d(indptr, to_begin=indptr[0]) > 0]
vali_gt = {
u: set(key[indptr[u - 1]:indptr[u]]) if u != 0 else set(key[:indptr[0]])
for u in vali_rows}
validation_seen = {}
max_seen_size = 0
for rowid in vali_rows:
seen, *_ = self.get(rowid)
validation_seen[rowid] = set(seen)
max_seen_size = max(len(seen), max_seen_size)
validation_seen = validation_seen
validation_max_seen_size = max_seen_size

self.vali_data = {
"row": row,
"col": col,
"val": val,
"vali_rows": vali_rows,
"vali_gt": vali_gt,
"validation_seen": validation_seen,
"validation_max_seen_size": validation_max_seen_size
}
return True

def _sort_and_compressed_binarization(self, mm_path, num_lines, max_key, sort_key):
num_workers = psutil.cpu_count()
Expand Down
36 changes: 18 additions & 18 deletions buffalo/data/fileio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,18 @@ int _parallel_build_sppmi(string from_path, string to_path,
#pragma omp parallel num_threads(num_workers)
{
ifstream fin(from_path.c_str());

// build appearances
#pragma omp for schedule(dynamic, 1)
for (int i=0; i < num_split; ++i) {
long start_pos = i*split_size;
long end_pos = (i+1) * split_size;

fin.clear();
fin.seekg(start_pos, ios::beg);
fin.seekg(start_pos, ios::beg);

string line;
if (i != 0) {
if (i != 0) {
// first line is handled by previous thread
getline(fin, line);
}
Expand All @@ -160,18 +160,18 @@ int _parallel_build_sppmi(string from_path, string to_path,
}
}
}

// build sppmi
#pragma omp for schedule(dynamic, 1)
for (int i=0; i < num_split; ++i) {
long start_pos = i*split_size;
long end_pos = (i+1) * split_size;

fin.clear();
fin.seekg(start_pos, ios::beg);
fin.seekg(start_pos, ios::beg);

string line;
if (i != 0) {
if (i != 0) {
// first line is handled by previous thread
getline(fin, line);
}
Expand All @@ -186,7 +186,7 @@ int _parallel_build_sppmi(string from_path, string to_path,
sscanf(line.c_str(), "%d %d", &cur_id, &c);
assert(cur_id > 0 and cur_id <= num_items);

// first id found is handled by previous thread
// first id found is handled by previous thread
if (i != 0 and (skip_id == -1 or skip_id == cur_id)) {
skip_id = cur_id;

Expand All @@ -207,19 +207,19 @@ int _parallel_build_sppmi(string from_path, string to_path,
} else if (probe_id != cur_id) {
unordered_set<int> chunk_set(chunk.begin(), chunk.end());
for (const auto& _c : chunk_set) {
if (probe_id < _c)
if (probe_id < _c)
continue;
int cnt = count(chunk.begin(), chunk.end(), _c);
double pmi = log(cnt) + log_d
- log(appearances[probe_id-1].load(memory_order_relaxed))
double pmi = log(cnt) + log_d
- log(appearances[probe_id-1].load(memory_order_relaxed))
- log(appearances[_c-1].load(memory_order_relaxed));
double sppmi = pmi - log_k;

if (sppmi > 0) {
#pragma omp critical(out)
{
fout << probe_id << ' ' << _c << ' ' << sppmi << '\n';
fout << _c << ' ' << probe_id << ' ' << sppmi << '\n';
fout << probe_id << ' ' << _c << ' ' << sppmi << '\n';
fout << _c << ' ' << probe_id << ' ' << sppmi << '\n';
nnz += 2;
}
}
Expand Down Expand Up @@ -288,11 +288,11 @@ vector<string> _sort_and_compressed_binarization(
long end_pos = (i+1) * split_size;

fin.clear();
fin.seekg(start_pos, ios::beg);
fin.seekg(start_pos, ios::beg);

vector<triple_t> records;
string line;
if (i != 0) {
if (i != 0) {
// first line is handled by previous thread
getline(fin, line);
}
Expand Down
6 changes: 3 additions & 3 deletions buffalo/data/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def get_default_option(self) -> aux.Option:
},
'data': {
'validation': {
'name': 'newest', # sample or oldest or newest
'p': 0.01, # if set oldest or newest, ignored
'name': 'newest', # sample or newest
'p': 0.01, # if set newest, ignored
'n': 1, # if set sample, ignored
'max_samples': 500
},
Expand Down Expand Up @@ -99,7 +99,7 @@ def get_max_column_length(fname):
vali_n = self.opt.data.validation.get('n', 0)
num_nnz, vali_limit, itemids = 0, 0, set()
self.logger.info(f'gathering itemids from {main_path}...')
if self.opt.data.validation.name not in ["oldest", "newest"]:
if self.opt.data.validation.name not in ["newest"]:
vali_n = 0
with open(main_path) as fin:
for line in log.ProgressBar(level=log.DEBUG, iterable=fin):
Expand Down
Loading

0 comments on commit de3966f

Please sign in to comment.