Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOMクラスにtransformメソッドを追加 #110

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions libs/models/som.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class SOM:
def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='random',metric="sqeuclidean"):
def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='random', metric="sqeuclidean"):
self.X = X
self.N = self.X.shape[0]

Expand All @@ -28,22 +28,23 @@ def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='r
elif latent_dim == 2:
if isinstance(init, str) and init == 'PCA':
comp1, comp2 = pca.singular_values_[0], pca.singular_values_[1]
zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-comp2/comp1, comp2/comp1, resolution))
zeta = np.meshgrid(np.linspace(-1, 1, resolution),
np.linspace(-comp2 / comp1, comp2 / comp1, resolution))
else:
zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-1, 1, resolution))
self.Zeta = np.dstack(zeta).reshape(resolution**2, latent_dim)
self.Zeta = np.dstack(zeta).reshape(resolution ** 2, latent_dim)
else:
raise ValueError("invalid latent dimension: {}".format(latent_dim))

self.K = resolution**self.L
self.K = resolution ** self.L

if isinstance(init, str) and init == 'random':
self.Z = np.random.rand(self.N, latent_dim) * 2.0 - 1.0
elif isinstance(init, str) and init == 'random_bmu':
init_bmus = np.random.randint(0, self.Zeta.shape[0] - 1, self.N)
self.Z = self.Zeta[init_bmus,:]
self.Z = self.Zeta[init_bmus, :]
elif isinstance(init, str) and init == 'PCA':
self.Z = pca.transform(X)/comp1
self.Z = pca.transform(X) / comp1
elif isinstance(init, np.ndarray) and init.dtype == int:
init_bmus = init.copy()
self.Z = self.Zeta[init_bmus, :]
Expand All @@ -52,9 +53,9 @@ def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='r
else:
raise ValueError("invalid init: {}".format(init))

#metricに関する処理
# metricに関する処理
if metric == "sqeuclidean":
self.metric="sqeuclidean"
self.metric = "sqeuclidean"

elif metric == "KLdivergence":
self.metric = "KLdivergence"
Expand All @@ -78,28 +79,28 @@ def fit(self, nb_epoch=100, verbose=True):
# 協調過程
# 学習量を計算
# sigma = self.sigma_min + (self.sigma_max - self.sigma_min) * np.exp(-epoch / self.tau) # 近傍半径を設定
sigma = max(self.sigma_min, self.sigma_max * ( 1 - (epoch / self.tau) ) )# 近傍半径を設定
sigma = max(self.sigma_min, self.sigma_max * (1 - (epoch / self.tau))) # 近傍半径を設定
Dist = dist.cdist(self.Zeta, self.Z, 'sqeuclidean')
# KxNの距離行列を計算
# ノードと勝者ノードの全ての組み合わせにおける距離を網羅した行列
H = np.exp(-Dist / (2 * sigma * sigma)) # KxNの学習量行列を計算
H = np.exp(-Dist / (2 * sigma * sigma)) # KxNの学習量行列を計算

# 適合過程
# 参照ベクトルの更新
G = np.sum(H, axis=1)[:, np.newaxis] # 各ノードが受ける学習量の総和を保持するKx1の列ベクトルを計算
Ginv = np.reciprocal(G) # Gのそれぞれの要素の逆数を取る
R = H * Ginv # 学習量の総和が1になるように規格化
self.Y = R @ self.X # 学習量を重みとして観測データの平均を取り参照ベクトルとする
G = np.sum(H, axis=1)[:, np.newaxis] # 各ノードが受ける学習量の総和を保持するKx1の列ベクトルを計算
Ginv = np.reciprocal(G) # Gのそれぞれの要素の逆数を取る
R = H * Ginv # 学習量の総和が1になるように規格化
self.Y = R @ self.X # 学習量を重みとして観測データの平均を取り参照ベクトルとする

# 競合過程
if self.metric is "sqeuclidean": # ユークリッド距離を使った勝者決定
if self.metric == "sqeuclidean": # ユークリッド距離を使った勝者決定
# 勝者ノードの計算
Dist = dist.cdist(self.X, self.Y) # NxKの距離行列を計算
bmus = Dist.argmin(axis=1)
# Nx1の勝者ノード番号をまとめた列ベクトルを計算
# argmin(axis=1)を用いて各行で最小値を探しそのインデックスを返す
self.Z = self.Zeta[bmus, :] # 勝者ノード番号から勝者ノードを求める
elif self.metric is "KLdivergence": # KL情報量を使った勝者決定
elif self.metric == "KLdivergence": # KL情報量を使った勝者決定
Dist = np.sum(self.X[:, np.newaxis, :] * np.log(self.Y)[np.newaxis, :, :], axis=2) # N*K行列
# 勝者番号の決定
bmus = np.argmax(Dist, axis=1)
Expand All @@ -110,3 +111,11 @@ def fit(self, nb_epoch=100, verbose=True):
self.history['z'][epoch] = self.Z
self.history['y'][epoch] = self.Y
self.history['sigma'][epoch] = sigma

def transform(self, X):
if self.metric == "sqeuclidean":
distance = dist.cdist(X, self.Y, self.metric)
return self.Zeta[distance.argmin(axis=1)]
elif self.metric == "KLdivergence":
divergence = -np.sum(self.X[:, np.newaxis, :] * np.log(self.Y[np.newaxis, :, :]), axis=2) # NxK
return self.Zeta[divergence.argmin(axis=1)]
31 changes: 31 additions & 0 deletions tests/som/test_som.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,37 @@ def test_init_pca(self):

np.testing.assert_allclose(SOMResult, EVDResult/np.sqrt(Lambda.real.max()), rtol=1e-06)

def test_transform(self):
n_distributon = 100
n_category = 20

# create categorical distribution
X_categorical = np.random.rand(n_distributon,n_category)
X_categorical = X_categorical / X_categorical.sum(axis=1)[:,None]

np.testing.assert_allclose(X_categorical.sum(axis=1),np.ones(X_categorical.shape[0]))

# fit
som_categorical = SOM(X_categorical,latent_dim=2,resolution=50,sigma_max=2.0,sigma_min=0.3,tau=50,metric="KLdivergence")
som_categorical.fit(50)
Z_fit = som_categorical.Z
Z_transformed = som_categorical.transform(X_categorical)

np.testing.assert_allclose(Z_transformed,Z_fit)

# confirm to multi variable dataset
n_samples = 100
n_features = 20

X_multi_variate = np.random.normal(0.0,1.0,(n_samples,n_features))

# fit
som_multi_variate = SOM(X_multi_variate,latent_dim=2,resolution=50,sigma_max=2.0,sigma_min=0.2,tau=50,metric="sqeuclidean")
som_multi_variate.fit(10)
Z_fit = som_multi_variate.Z
Z_transformed = som_multi_variate.transform(X_multi_variate)

np.testing.assert_allclose(Z_fit,Z_transformed)



Expand Down