-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMCCA.py
236 lines (199 loc) · 10.9 KB
/
MCCA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import numpy as np
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from scipy.linalg import eigh, norm
class MCCA:
""" Performs multiset canonical correlation analysis with an optional
regularization term based on spatial similarity of weight maps. The
stronger the regularization, the more similar weight maps are forced to
be across subjects. Note that the term 'weights' is used interchangeably
with PCA / MCCA eigenvectors here.
Parameters:
n_components_pca (int): Number of PCA components to retain for each subject (default 50)
n_components_mcca (int): Number of MCCA components to retain (default 10)
r (int or float): Regularization strength (default 0)
pca_only (bool): If true, skip MCCA calculation (default False)
Attributes:
mu (ndarray): Mean subtracted before PCA (subjects, sensors)
sigma (ndarray): PCA standard deviation (subjects, PCs)
pca_weights (ndarray): PCA weights that transform sensors to PCAs for each
subject (subjects, sensors, PCs)
mcca_weights (ndarray): MCCA weights that transform PCAs to MCCAs for each subject.
None if pca_only is True. (subjects, PCs, CCs)
"""
def __init__(self, n_components_pca=50, n_components_mcca=10, r=0, pca_only=False):
if n_components_mcca > n_components_pca:
import warnings
warnings.warn(f"Warning........... number of MCCA components ({n_components_mcca}) cannot be greater than "
f"number of PCA components ({n_components_pca}), setting them equal.")
n_components_mcca = n_components_pca
self.n_pcs = n_components_pca
self.n_ccs = n_components_mcca
self.r = r
self.pca_only = pca_only
self.mcca_weights, self.pca_weights, self.mu, self.sigma = None, None, None, None
def obtain_mcca(self, X):
""" Apply individual-subject PCA and across-subjects MCCA.
Parameters:
X (ndarray): Input data in sensor space (subjects, samples, sensors)
Returns:
scores (ndarray): Returns scores in PCA space if self.pca_only is true and MCCA scores otherwise.
"""
n_subjects, n_samples, n_sensors = X.shape
X_pca = np.zeros((n_subjects, n_samples, self.n_pcs))
self.pca_weights = np.zeros((n_subjects, n_sensors, self.n_pcs))
self.mu = np.zeros((n_subjects, n_sensors))
self.sigma = np.zeros((n_subjects, self.n_pcs))
# obtain subject-specific PCAs
for i in range(n_subjects):
pca = PCA(n_components=self.n_pcs, svd_solver='full')
x_i = np.squeeze(X[i]).copy() # time x sensors
score = pca.fit_transform(x_i)
self.pca_weights[i] = pca.components_.T
self.mu[i] = pca.mean_
self.sigma[i] = np.sqrt(pca.explained_variance_)
score /= self.sigma[i]
X_pca[i] = score
if self.pca_only:
return X_pca
else:
return self._mcca(X_pca)
def _mcca(self, pca_scores):
""" Performs multiset canonical correlation analysis with an optional
regularization term based on spatial similarity of weight maps. The
stronger the regularization, the more similar weight maps are forced to
be across subjects.
Parameters:
pca_scores (ndarray): Input data in PCA space (subjects, samples, PCs)
Returns:
mcca_scores (ndarray): Input data in MCCA space (subjects, samples, CCs).
"""
# R_kl is a block matrix containing all cross-covariances R_kl = X_k^T X_l between subjects k, l, k != l
# where X is the data in the subject-specific PCA space (PCA scores)
# R_kk is a block diagonal matrix containing auto-correlations R_kk = X_k^T X_k in its diagonal blocks
R_kl, R_kk = _compute_cross_covariance(pca_scores)
# Regularization
if self.r != 0:
# The regularization terms W_kl and W_kk are calculated the same way as R_kl and R_kk above, but using
# cross-covariance of PCA weights instead of PCA scores
W_kl, W_kk = _compute_cross_covariance(self.pca_weights)
# Add regularization term to R_kl and R_kk
R_kl += self.r * W_kl
R_kk += self.r * W_kk
# Obtain MCCA solution by solving the generalized eigenvalue problem
# R_kl h = p R_kk h
# where h are the concatenated eigenvectors of all subjects and
# p are the generalized eigenvalues (canonical correlations).
# If PCA scores are whitened and no regularisation is used, R_kk is an identity matrix and the generalized
# eigenvalue problem is reduced to a regular eigenvalue problem
p, h = eigh(R_kl, R_kk, subset_by_index=(R_kl.shape[0] - self.n_ccs, R_kl.shape[0] - 1))
# eigh returns eigenvalues in ascending order. To pick the k largest from a total of n eigenvalues,
# we use subset_by_index=(n - k, n - 1).
# Flip eigenvectors so that they are in descending order
h = np.flip(h, axis=1)
# Reshape h from (subjects * PCs, CCs) to (subjects, PCs, CCs)
h = h.reshape((pca_scores.shape[0], self.n_pcs, self.n_ccs))
# Normalize eigenvectors per subject
self.mcca_weights = h / norm(h, ord=2, axis=(1, 2), keepdims=True)
return np.matmul(pca_scores, self.mcca_weights)
def transform_trials(self, X, subject=0):
""" Use of MCCA weights (obtained from averaged data) to transform single
trial data from sensor space to MCCA space.
Parameters:
X (ndarray): Single trial data of one subject in sensor space
(trials, samples, sensors)
subject (int): Index of the subject whose data is being transformed
Returns:
X_mcca (ndarray): Transformed single trial data in MCCA space
(trials, samples, CCs)
"""
if self.mcca_weights is None:
raise NotFittedError('MCCA needs to be fitted before calling transform_trials')
X -= self.mu[np.newaxis, np.newaxis, subject] # centered
X_pca = np.matmul(X, self.pca_weights[subject])
X_pca /= self.sigma[np.newaxis, np.newaxis, subject] # standardized
X_mcca = np.matmul(X_pca, self.mcca_weights[subject])
return X_mcca
def inverse_transform_trials(self, X_mcca, subject=0):
""" Transform single trial data from MCCA space back to sensor space.
Parameters:
X_mcca (ndarray): Single trial data of one subject in MCCA space
(trials, samples, CCs)
subject (int): Index of the subject whose data is being transformed
Returns:
X (ndarray): Single trial data transformed back into sensor space
(trials, samples, sensors)
"""
if self.mcca_weights is None:
raise NotFittedError('MCCA needs to be fitted before calling inverse_transform_trials')
X_pca = np.matmul(X_mcca, np.linalg.pinv(self.mcca_weights[subject])) # trials x samples x PCAs
X_pca *= self.sigma[np.newaxis, np.newaxis, subject] # revert standardization
X = np.matmul(X_pca, np.linalg.pinv(self.pca_weights[subject]))
X += self.mu[np.newaxis, np.newaxis, subject] # revert centering
return X
def transform_trials_pca(self, X, subject=0):
""" Transform single trial data from sensor space to PCA space
Parameters:
X (ndarray): Single trial data of one subject in sensor space
(trials, samples, sensors)
subject (int): Index of the subject whose data is being transformed
Returns:
X_pca (ndarray): Transformed single trial data in PCA space
(trials, samples, PCs)
"""
if self.pca_weights is None:
raise NotFittedError('PCA needs to be fitted before calling transform_trials_pca')
X -= self.mu[np.newaxis, np.newaxis, subject] # centered
X_pca = np.matmul(X, self.pca_weights[subject])
X_pca /= self.sigma[np.newaxis, np.newaxis, subject] # standardized
return X_pca
def test_mcca(self, data_train, data_test):
""" Test if the inter-subject correlations/consistency observed from training
data MCCAs generalize over testing data.
Parameters:
data_train (ndarray): Training data in sensor space
(subjects, samples, sensors)
data_test (ndarray): Test data in sensor space
(subjects, samples, sensors)
Returns:
correlations (ndarray): Inter-subject correlations (averaged over
every pair of subjects) over every MCCA component
(first row: training data; second row: testing data)
"""
if self.mcca_weights is None:
raise NotFittedError('MCCA needs to be fitted before calling test_mcca')
t_mu = self.mu[:, np.newaxis]
t_sigma = self.sigma[:, np.newaxis]
data_train -= t_mu
pca_train = np.matmul(data_train, self.pca_weights)
pca_train /= t_sigma
mcca_train = np.matmul(pca_train, self.mcca_weights)
data_test -= t_mu
pca_test = np.matmul(data_test, self.pca_weights)
pca_test /= t_sigma
mcca_test = np.matmul(pca_test, self.mcca_weights)
K = self.mcca_weights.shape[2]
correlations = np.zeros((K, 2))
for i in range(K):
inter_subject_corr_train = np.corrcoef(np.squeeze(mcca_train[:, :, i]))
inter_subject_corr_test = np.corrcoef(np.squeeze(mcca_test[:, :, i]))
# averaged inter-subject correlations over training data
correlations[i, 0] = np.mean(inter_subject_corr_train)
# averaged inter-subject correlations over testing data
correlations[i, 1] = np.mean(inter_subject_corr_test)
return correlations
def _compute_cross_covariance(X):
""" Computes cross-covariance of PCA scores or components between subjects.
Parameters:
X (ndarray): PCA scores (subjects, samples, PCs) or weights (subjects, sensors, PCs)
Returns:
R_kl (ndarray): Block matrix containing all cross-covariances R_kl = X_k^T X_l between subjects k, l, k != l
with shape (subjects * PCs, subjects * PCs)
R_kk (ndarray): Block diagonal matrix containing auto-correlations R_kk = X_k^T X_k in its diagonal blocks
with shape (subjects * PCs, subjects * PCs)
"""
n_subjects, n_samples, n_pcs = X.shape
R = np.cov(X.swapaxes(1, 2).reshape(n_subjects * n_pcs, n_samples))
R_kk = R * np.kron(np.eye(n_subjects), np.ones((n_pcs, n_pcs)))
R_kl = R - R_kk
return R_kl, R_kk