Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Component projection #124

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
804 changes: 804 additions & 0 deletions demos/demo_component_projection.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/rsatoolbox/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .computations import average_dataset_by
from .noise import cov_from_residuals
from .noise import prec_from_residuals
from .components import Components
from .noise import cov_from_measurements
from .noise import prec_from_measurements
from .noise import cov_from_unbalanced
Expand Down
150 changes: 150 additions & 0 deletions src/rsatoolbox/data/components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Computes and projects data onto a subspace defined by a set of components.
RDMs can then be visualized in that subspace.

See demo_component_projection.ipynb

@author: snormanhaignere
"""

import numpy as np
import sklearn.decomposition
from numpy.typing import NDArray
from typing import Optional


class Components:
'''Component model of data matrix.

Data matrix is approximated as the product of a response
and weight matrix (D ~= R * W)

Contains functions for computing component decompositions
and for reconstructing data given those component decompositions

Attributes:
R: [stimuli x component]
W: [component x channel]
'''

def __init__(self, R: Optional[NDArray] = None, W: Optional[NDArray] = None):
'''Constructs the core component object.

When called with no arguments, R and W are simply set to None.
Once initialized, you can computed components from data
using one of the component functions:
pca
fastica

Args:
R: Optional; [stimuli x component] numpy array
W: Optional; [component x channel] numpy array
'''

self.R = R
self.W = W
if (self.R is not None) and (self.W is not None):
if self.R.shape[1] != self.W.shape[0]:
raise NameError('Columns of R must match rows of W')
self.n_components = self.R.shape[1]
else:
self.n_components = None

def reconstruct(self, subset=None):
'''Reconstructs data by multiplying response and weight matrices.

Args:
subset: Optional; list of component indices.
Default is to use all components

Returns:
A [stimuli x channel] reconstruction of your data.
'''
if self.R is None or self.W is None:
raise ValueError("Decomposition not yet computed, cannot reconstruct")
if subset is not None:
measurements = np.matmul(self.R[:, subset], self.W[subset, :])
else:
measurements = np.matmul(self.R, self.W)
return measurements

def pca(self, measurements, n_components=None):
'''PCA decomposition of data matrix.

Decomposition is computed using SVD:
D = USV

Components are then given by:
R = U
W = SV

Args:
measurements: [stimuli x channel] numpy array
n_components: Optional; if specified, only returns top N PCs:
R = R[:, :n_components]
W = W[:n_components, :]

Returns:
Objects with self.R and self.W given as above
'''
[U, s, Vh] = np.linalg.svd(measurements, full_matrices=False)
self.R = U
self.W = np.expand_dims(s, axis=1) * Vh
self._select_top_components(n_components)

def fastica(self, measurements, n_components=None, method_params=None):
'''Decomposition computed using FastICA.

Returns decomposition with maximally independent weights,
as estimated using FastICA. Unlike PCA, component responses
can be correlated. In the language of ICA, R is the 'mixing matrix'.

Note that unlike standard ICA, the weights returned are not
zero-mean. The weights returned are given by the product of the
response profile matrix (mixing matrix) with the data matrix,
which provides a way to infer a meaningful mean.

Data can be first projected onto a low-dimensional subspace
using PCA, which is often a good idea (see n_components argument)

Decomposition performed using sklearn:
sklearn.decomposition.FastICA
https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.FastICA.html
https://scikit-learn.org/stable/auto_examples/decomposition/plot_ica_blind_source_separation.html

Args:
measurements: [stimuli x channel] numpy array
n_components: Optional; if specified, data is first
reduced in dimensionality using PCA, and then rotated
to maximize independence within this subspace.
method_params: Optional; dictionary with additional
parameters to pass to sklearn function. see:
https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.FastICA.html
Returns:
Objects with self.R and self.W given as above
'''

if method_params is None:
method_params = {}
ica = sklearn.decomposition.FastICA(
n_components=n_components, **method_params)
ica.fit_transform(np.transpose(measurements))
self.R = ica.mixing_
self.W = np.matmul(np.linalg.pinv(self.R), measurements)

def _select_top_components(self, n_components=None):
if self.R is None or self.W is None:
return
if n_components is not None:
self.R = self.R[:, :n_components]
self.W = self.W[:n_components, :]
self.n_components = n_components

def order_components(self, order):
'''Re-order components'''
if self.R is not None:
self.R = self.R[:, order]
if self.W is not None:
self.W = self.W[order, :]
6 changes: 3 additions & 3 deletions src/rsatoolbox/rdm/rdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,12 @@ def concat(*rdms: RDMs, target_pdesc: Optional[str] = None) -> RDMs:
rsatoolbox.rdm.RDMs: concatenated rdms object

"""
if len(rdms) == 1: ## single argument
if len(rdms) == 1: # single argument
if isinstance(rdms[0], RDMs):
rdms_list = [rdms[0]]
else:
rdms_list = list(rdms[0])
else: ## multiple arguments
else: # multiple arguments
rdms_list = list(rdms)
assert isinstance(rdms_list[0], RDMs), \
'Supply list of RDMs objects, or RDMs objects as separate arguments'
Expand All @@ -578,7 +578,7 @@ def concat(*rdms: RDMs, target_pdesc: Optional[str] = None) -> RDMs:
lambda n: n != 'index' and (
len(rdms_list[0].pattern_descriptors[n])
== len(set(rdms_list[0].pattern_descriptors[n]))),
pdescs))
pdescs))
target_pdesc = None
if len(pdesc_candidates) > 0:
target_pdesc = pdesc_candidates[0]
Expand Down
33 changes: 33 additions & 0 deletions tests/test_components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 18 2020

@author: snormanhaignere
"""

import unittest
import numpy as np
from rsatoolbox.data.components import Components


class TestComponents(unittest.TestCase):
def test_pca_reconstruction(self):
X = np.random.randn(10, 100)
components = Components()
components.pca(X)
assert np.allclose(components.reconstruct(), X)

def test_fastica_recon_equals_pca_recon(self):
X = np.random.randn(10, 100)
X = X - np.mean(X, axis=1, keepdims=True)
pca_components = Components()
pca_components.pca(X, n_components=3)
fastica_components = Components()
fastica_components.fastica(X, n_components=3)
assert np.allclose(pca_components.reconstruct(),
fastica_components.reconstruct())


if __name__ == '__main__':
unittest.main()
Loading