-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
bmm.py
executable file
·130 lines (109 loc) · 3.77 KB
/
bmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#! /usr/bin/env python
# ! -*- coding=utf-8 -*-
# Project: Lihang
# Filename: bmm
# Date: 10/24/18
# Author: 😏 <smirk dot cao at gmail dot com>
import numpy as np
import argparse
import logging
class BMM(object):
def __init__(self,
n_components=2,
max_iter=100,
tol=1e-3,
mu=None,
alpha=None,
verbose=False):
"""
:param n_components:
:param max_iter:
:param tol:
:param mu: 这里可能要看一下和书中的p,q对应关系, p=0.6, q=0.7, 对应了X=1 X=0的情况, 所以[0.7, 0.6]
:param alpha:
:param verbose:
"""
# k
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
# N
self.m = 0
self.n = 0
self.gamma = None
self.n_iter_ = 0
self.mu = mu
self.alpha = alpha
self.X = None
self.label = None
self.verbose = verbose
def fit(self,
X,
y=None):
k = self.n_components
# self.m, self.n = X.shape
self.m = X.shape[0]
N = self.m
# 如果n > 1 , 分解成n个模型训练, 结果再拼回来
if y is not None:
self.label = y
# X: (N, 2)
self.X = np.eye(k)[X]
# gamma: (N, k), 样本对子模型的响应度gamma_jk, 按j求和应该是1
if self.gamma is None:
self.gamma = np.full((N, k), 1/k)
# alpha: (k) , 子模型对混合模型的贡献, 求和为1
if self.alpha is None:
self.alpha = 0.5
self.alpha = np.stack((1 - self.alpha, self.alpha), axis=-1)
# mu: (k, 2) 2是为了做矩阵乘法, 相对for loop效率应该会高, 这里todo: benchmark
if self.mu is None:
self.mu = np.full(k, 1/k)
self.mu = np.stack((1 - self.mu, self.mu), axis=-1)
for i in range(self.max_iter):
# print(self.alpha, "\n", self.mu[:, 0], "\n", self.gamma, "\n")
self.do_e_step()
self.do_m_step()
if self.verbose:
logger.info("alpha %s" % self.alpha)
logger.info(self.mu)
if self.is_convergence():
break
def is_convergence(self):
return False
def density(self):
# Bernoulli Distribution Density
rst = np.dot(self.X, self.mu.T)
return rst
def do_e_step(self):
# 更新gamma
self.gamma = self.density() * self.alpha
z = np.sum(self.gamma, axis=1).reshape(-1, 1)
self.gamma /= z
return self
def do_m_step(self):
nk = np.sum(self.gamma, axis=0)
# update mu, 注意这里X[:, 1] 中的1 来自X的取值为1的意思.
# p,q 对应的是A=1, B=1, A=0, C=1, 注意这里用样本刷
self.mu = np.sum(self.gamma * self.X[:, 1].reshape(-1, 1), axis=0) / nk
self.mu = np.stack((1 - self.mu, self.mu), axis=-1)
# update alpha
self.alpha = nk / self.m
return self
def predict(self, X):
pass
def sample(self,
n_samples=1):
pass
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--path", required=False, help="path to input data file")
args = vars(ap.parse_args())
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--path", required=False, help="path to input data")
args = vars(ap.parse_args())