|
| 1 | +from collections import OrderedDict |
| 2 | +from typing import Optional, Union |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +import torch |
| 6 | +from escnn.group import Representation |
| 7 | +from torch import Tensor |
| 8 | + |
| 9 | +from nn.LinearDynamics import DmdSolver, LinearDynamics |
| 10 | +from nn.markov_dynamics import MarkovDynamics |
| 11 | +from utils.mysc import full_rank_lstsq, full_rank_lstsq_symmetric |
| 12 | +from utils.representation_theory import isotypic_basis |
| 13 | + |
| 14 | + |
| 15 | +class EquivLinearDynamics(LinearDynamics): |
| 16 | + |
| 17 | + def __init__(self, |
| 18 | + state_rep: Representation = None, |
| 19 | + dmd_algorithm: Optional[DmdSolver] = None, |
| 20 | + dt: Optional[Union[float, int]] = 1, |
| 21 | + trainable=False, |
| 22 | + group_avg_trick: bool = True): |
| 23 | + |
| 24 | + self.symm_group = state_rep.group |
| 25 | + self.group_avg_trick = group_avg_trick |
| 26 | + # Find the Isotypic basis of the state space |
| 27 | + self.state_iso_reps, self.state_iso_dims, Q_iso2state = isotypic_basis(representation=state_rep, |
| 28 | + multiplicity=1, |
| 29 | + prefix='ELDstate') |
| 30 | + # Change of coordinates required for state to be in Isotypic basis. |
| 31 | + Q_iso2state = Tensor(Q_iso2state) |
| 32 | + Q_state2iso = Tensor(np.linalg.inv(Q_iso2state)) |
| 33 | + |
| 34 | + self.iso_transfer_op = OrderedDict() |
| 35 | + for irrep_id in self.state_iso_reps: # Preserve the order of the Isotypic Subspaces |
| 36 | + self.iso_transfer_op[irrep_id] = None |
| 37 | + |
| 38 | + self.is_trainable = trainable |
| 39 | + dmd_algorithm = dmd_algorithm if dmd_algorithm is not None else full_rank_lstsq_symmetric |
| 40 | + super(EquivLinearDynamics, self).__init__(state_rep=state_rep, |
| 41 | + dt=dt, |
| 42 | + dmd_algorithm=dmd_algorithm, |
| 43 | + state_change_of_basis=Q_state2iso, |
| 44 | + state_inv_change_of_basis=Q_iso2state) |
| 45 | + |
| 46 | + def update_transfer_op(self, X: Tensor, X_prime: Tensor, group_avg_trick: bool = True): |
| 47 | + """ Use a DMD algorithm to update the empirical transfer operator |
| 48 | + Args: |
| 49 | + X: (state_dim, n_samples) Data matrix of states at time `t`. |
| 50 | + X_prime: (state_dim, n_samples) Data matrix of the states at time `t + dt`. |
| 51 | + group_avg_trick: (bool) Whether to use the group average trick to enforce equivariance. |
| 52 | + """ |
| 53 | + if self.is_trainable: |
| 54 | + raise RuntimeError("This model was initialized as trainable") |
| 55 | + assert X.shape == X_prime.shape, f"X: {X.shape}, X_prime: {X_prime.shape}" |
| 56 | + assert X.shape[0] == self.state_dim, f"Invalid state dimension {X.shape[0]} != {self.state_dim}" |
| 57 | + |
| 58 | + state, next_state = X.T, X_prime.T |
| 59 | + iso_rec_error = [] |
| 60 | + # For each Isotypic Subspace, compute the empirical transfer operator. |
| 61 | + for irrep_id, iso_rep in self.state_iso_reps.items(): |
| 62 | + rep = iso_rep if irrep_id != self.symm_group.identity else None # Check for Trivial Subspace |
| 63 | + |
| 64 | + # IsoSpace |
| 65 | + # Get the projection of the state onto the isotypic subspace |
| 66 | + state_iso = state[..., self.state_iso_dims[irrep_id]] |
| 67 | + next_state_iso = next_state[..., self.state_iso_dims[irrep_id]] |
| 68 | + |
| 69 | + # Generate the data matrices of x(w_t) and x(w_t+1) |
| 70 | + X_iso = state_iso.T # (iso_state_dim, num_samples) |
| 71 | + X_iso_prime = next_state_iso.T # (iso_state_dim, num_samples) |
| 72 | + |
| 73 | + # Compute the empirical transfer operator of this Observable Isotypic subspace |
| 74 | + A_iso = self.dmd_algorithm(X_iso, X_iso_prime, |
| 75 | + rep_X=rep if self.group_avg_trick else None, |
| 76 | + rep_Y=rep if self.group_avg_trick else None) |
| 77 | + rec_error = torch.nn.functional.mse_loss(A_iso @ X_iso, X_iso_prime) |
| 78 | + iso_rec_error.append(rec_error) |
| 79 | + self.iso_transfer_op[irrep_id] = A_iso |
| 80 | + transfer_op = torch.block_diag(*[self.iso_transfer_op[irrep_id] for irrep_id in self.state_iso_reps.keys()]) |
| 81 | + assert transfer_op.shape == (self.state_dim, self.state_dim) |
| 82 | + self.transfer_op = transfer_op |
| 83 | + |
| 84 | + iso_rec_error = Tensor(iso_rec_error) |
| 85 | + rec_error = torch.sum(iso_rec_error) |
| 86 | + self.transfer_op = transfer_op |
| 87 | + |
| 88 | + return dict(solution_op_rank=torch.linalg.matrix_rank(transfer_op.detach()).to(torch.float), |
| 89 | + solution_op_cond_num=torch.linalg.cond(transfer_op.detach()).to(torch.float), |
| 90 | + solution_op_error=rec_error.detach().to(torch.float), |
| 91 | + solution_op_error_dist=iso_rec_error.detach().to(torch.float)) |
0 commit comments