-
Notifications
You must be signed in to change notification settings - Fork 1
/
mc_analysis.py
128 lines (107 loc) · 4.83 KB
/
mc_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import numpy as np
def _get_reactive_path(assignment, sinks, source=None):
"""Given a trajectory, determines the string of states that goes
from the source to the sink"""
if source is None:
source = assignment[0]
# determine the first frame that the trajectory reaches a sink
first_sinks_ii = np.array([],dtype=int)
# search over all sinks
for sink in sinks:
first_sinks_ii = np.concatenate(
[first_sinks_ii, np.where(assignment==sink)[0]])
if len(first_sinks_ii) > 0:
first_sinks_ii = np.min(first_sinks_ii)
# clip trajectory and find first frame that is reactive
pathway = assignment[:first_sinks_ii]
first_source_ii = np.where(pathway == source)[0][-1] + 1
pathway = pathway[first_source_ii:]
else:
pathway = []
return pathway
def reactive_pathways(assignments, sinks, source=None, verbose=False):
"""Given a set of trajectories, determines all the reactive
pathways, i.e. the trajectories of going from source to sink.
"""
sinks = np.array(sinks).reshape((-1,))
pathways = np.array(
[_get_reactive_path(assignment, sinks) for assignment in assignments])
# trim empty pathways
pathway_lengths = np.array([len(pathway) for pathway in pathways])
pathways = pathways[np.where(pathway_lengths >0)[0]]
if verbose:
print(
"%d reactive trajectories out of %d" % \
(len(pathways), len(assignments)))
return pathways
def reactive_density(
assignments, sinks, source=None, all_reactive=False, verbose=False):
"""The probability of being in a particular state when in a
reactive pathway between sources and sinks. `all_reactive` is a flag
to indicate whether all the trajectories supplied are reactive or
not; this can save time processing the assignments for reactive
parts.
"""
if not all_reactive:
pathways = reactive_pathways(assignments, sinks, verbose=verbose)
else:
pathways = assignments
state_counts = np.array(
np.bincount(np.concatenate(pathways)))
densities = state_counts / np.sum(state_counts)
return densities
def state_pathway_prob(
assignments, sinks, source=None, all_reactive=False, verbose=False):
"""The probability that a state is observed along a reactive pathway
between source and sink. This is different from reactive density in
that the density accounts for how much time is spent in a state
along a reactive trajectory. Instead, this returns the probability
that a state is ever reactive.
"""
if not all_reactive:
pathways = reactive_pathways(assignments, sinks, verbose=verbose)
else:
pathways = assignments
unique_pathways = np.array(
[np.unique(pathway) for pathway in pathways])
state_counts = np.array(np.bincount(np.concatenate(unique_pathways)))
pathway_probs = state_counts / len(unique_pathways)
return pathway_probs
def _condition_assignments(assignments, end_state, flatten=True):
"""Chops each assignment off at round that state is discovered.
Also, it flattens the assignments."""
state_exists_iis = np.unique(np.where(assignments == end_state)[0])
conditioned_assignments = []
for ii in state_exists_iis:
if (len(assignments.shape) == 4) and (assignments.shape[1:3] == (1,1)):
cut = np.where(assignments[ii]==end_state)[-1][0] + 1
conditioned_assignments.append(assignments[ii, 0, 0, :cut])
else:
cut = np.where(assignments[ii]==end_state)[0][0] + 1
conditioned_assignments.append(assignments[ii, :cut])
state_doesnt_exist_iis = np.setdiff1d(
np.arange(len(assignments)), state_exists_iis)
for ii in state_doesnt_exist_iis:
conditioned_assignments.append(assignments[ii])
if flatten:
flattened_assignments = np.array(
[ass.flatten() for ass in conditioned_assignments])
assignments_out = flattened_assignments
else:
assignments_out = conditioned_assignments
return assignments_out
def discover_probabilities(assignments, n_states=None, end_state=None):
"""Returns the probability that a state is discovered from a set
of trajectories or sampling run (treats each row in assignments as
an independent sampling run for calculating probabilities)"""
if n_states is None:
n_states = np.max(np.flatten(assignments))
if end_state is not None and assignments.shape:
assignments = _condition_assignments(assignments, end_state)
if len(assignments.shape) > 2:
assignments = np.array([ass.flatten() for ass in assignments])
observed_states = np.array(
[
np.bincount(ass, minlength=n_states) for ass in assignments]) > 0
discover_probs = np.sum(observed_states, axis=0) / len(assignments)
return discover_probs