forked from mizimmer90/slandscapes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmc_sampling.py
182 lines (168 loc) · 7.12 KB
/
mc_sampling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import itertools
import numpy as np
from . import rankings
from enspara.msm import builders, MSM
from functools import partial
from multiprocessing import Pool
# A more efficient implementation of synthetic trajectory for speed
# note order flip in args
def synth_traj(t_probs, n_steps, start_state, rng=np.random.default_rng()):
traj = np.zeros(n_steps, dtype=int)
traj[0] = start_state
nstates = t_probs.shape[1]
for i in range(n_steps -1):
traj[i+1] = rng.choice(nstates, 1, p=t_probs[traj[i], :])
return traj
def _run_sampling(adaptive_sampling_obj):
"""Helper to adaptive sampling. Helps parallelize sampling runs."""
assignments = adaptive_sampling_obj[0].run()
return assignments
def adaptive_sampling(
T, initial_state=0, n_runs=1, n_clones=1, n_steps=1,
msm_obj=None, ranking_obj=None, n_reps=1, n_procs=1,
assignments=None):
"""Get synthetic adaptive sampling run from an MSM
Parameters
----------
T : array, shape=(n_states, n_states)
The transition probability matrix from which to sample.
initial_state : int, default=0
The initial state from which to start simulations.
n_runs : int, default=1
The number of rounds of adaptive sampling.
n_clones : int, default=1
The number of clones per run of adaptive sampling.
n_steps : int, default=1
The number of steps per clone (each trajectory).
msm_obj : enspara.msm.MSM object
An enspara MSM object. This is used to fit assignments at each
round of sampling.
ranking_obj : rankings object
This is an object with at least two functions: __init__(**args)
and select_states(msm, n_clones). The output of this object is
a list of states to simulate.
n_reps : int, default=1
The number of repetitions of adaptive sampling to perform.
n_procs : int, default=1
The number of processes to use when doing adaptive sampling.
This parallelizes over the number of reps.
assignments : array-like, shape = (n_trajs, n_steps), default=None
Optionally provide assignments to continue sampling from.
Returns
----------
assignments : array, shape=(n_reps, n_runs, n_clones, n_steps)
The assignments files for adaptive sampling runs.
"""
if msm_obj is None:
builder_obj = partial(builders.normalize, calculate_eq_probs=False)
msm_obj = MSM(
lag_time=1, method=builder_obj, max_n_states=len(T))
if msm_obj.max_n_states != len(T):
print(
"MSM.max_n_states should be equal to the total number of" + \
"states. Changing value.")
msm_obj.max_n_states = len(T)
if ranking_obj is None:
ranking_obj = rankings.counts()
sampling_info = list(
zip(
itertools.repeat(
Adaptive_Sampling(
T, initial_state, n_runs, n_clones, n_steps, msm_obj,
ranking_obj, assignments),
n_reps)))
pool = Pool(processes = n_procs)
new_assignments = pool.map(_run_sampling, sampling_info)
pool.terminate()
return np.array(new_assignments)
class Adaptive_Sampling:
"""Adaptive sampling object. Runs a single adaptive sampling run.
Parameters
----------
T : array, shape=(n_states, n_states)
The transition probability matrix from which to sample.
initial_state : int, default=0
The initial state from which to start simulations.
n_runs : int, default=1
The number of rounds of adaptive sampling.
n_clones : int, default=1
The number of clones per run of adaptive sampling.
n_steps : int, default=1
The number of steps per clone (each trajectory).
msm_obj : enspara.msm.MSM object
An enspara MSM object. This is used to fit assignments at each
round of sampling.
ranking_obj : rankings object
This is an object with at least two functions: __init__(**args)
and select_states(msm, n_clones). The output of this object is
a list of states to simulate.
assignments : array-like, shape = (n_trajs, n_steps), default=None
Optionally provide assignments to continue sampling from. If
using previous assignments, number of steps for each trajectory
must be the same.
Returns
----------
assignments : array, shape=(n_runs, n_clones, n_steps)
The assignments files for adaptive sampling runs.
"""
def __init__(
self, T, initial_state, n_runs, n_clones, n_steps, msm_obj,
ranking_obj, assignments=None):
# Initialize class variables
self.T = T
self.initial_state = initial_state
self.n_runs = n_runs
self.n_clones = n_clones
# adds 1 to the number of steps. This is because synthetic
# trajectories counts number of states, not steps.
self.n_steps = n_steps + 1
self.msm_obj = msm_obj
self.ranking_obj = ranking_obj
# format initial assignments if present
if assignments is not None:
if len(assignments.shape) == 2:
pass
elif len(assignments.shape) == 3:
pass
elif (len(assignments.shape) == 4) and (assignments.shape[0] == 1):
assignments = np.concatenate(assignments)
else:
raise
self.starting_assignments = assignments
def run(self):
# initialize random seed. This is necessary for getting
# independent samplings through parallelization.
rng = np.random.default_rng()
# initialize first run
assignments = []
if self.starting_assignments is None:
initial_assignments = np.array(
[synth_traj(self.T, self.n_steps, self.initial_state, rng=rng)
for i in range(self.n_clones)])
assignments.append(initial_assignments)
# If there are no starting assignments, gets initial
# assignments from initial state and this counts as a single
# run of adaptive sampling.
run_start = 1
else:
# checks if previous assignments have multiple runs (purely
# for the formatting of output)
if len(self.starting_assignments.shape) == 3:
for assignment in self.starting_assignments:
assignments.append(assignment)
else:
assignments.append(self.starting_assignments)
run_start = 0
# iterate through each run and append assignments
for run_num in range(run_start, self.n_runs):
# fit assignments with msm object
self.msm_obj.fit(np.concatenate(assignments))
# rank states based on ranking object
states_to_simulate = self.ranking_obj.select_states(
self.msm_obj, self.n_clones)
new_assignments = np.array(
[synth_traj(self.T, self.n_steps, states_to_simulate[i], rng=rng)
for i in range(self.n_clones)])
assignments.append(new_assignments)
assignments = np.array(assignments)
return assignments