forked from mizimmer90/slandscapes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
landscapes.py
473 lines (428 loc) · 17.6 KB
/
landscapes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import collections
import glob
import itertools
import matplotlib.colors as colors
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as plticker
import numpy as np
import os
from mdtraj import io
#######################################################################
# formating and plotting stuff #
#######################################################################
def _convert_path(path, n_states):
dim = int(np.sqrt(n_states))
shape = (dim, dim)
states_shape = np.arange(n_states).reshape(shape)
xy = np.array(
[(np.concatenate(np.where(states_shape==state))) for state in path]).T
return xy[::-1]
def convert_paths(paths, n_states):
xys = []
for path in paths:
xy = _convert_path(path, n_states)
xys.append(xy)
return xys
def format_line(array):
return " ".join([str(i) for i in array])
def format_flux_paths(fluxes, paths, n_states):
xys = convert_paths(paths, n_states)
if len(fluxes) != len(paths):
raise
output_data = ""
for num in range(len(xys)):
output_line = str(fluxes[num])+"\n"+\
format_line(xys[num][0])+"\n"+\
format_line(xys[num][1])+"\n"
output_data += output_line
return output_data
def plot_pijs(filenames, grid_size=None, output_names=None, state_num=None):
if isinstance(filenames, collections.Iterable) and not \
isinstance(filenames, (str, bytes)):
filenames = filenames
else:
filenames = glob.glob(filenames)
data = [np.load(filename) for filename in filenames]
if grid_size is None:
grid_edge = np.sqrt(len(data[0]))
grid_size = (grid_edge, grid_edge)
X,Y = np.meshgrid(
np.arange(grid_size[0]), np.arange(grid_size[1]))
for num in range(len(filenames)):
plt.figure(filenames[num])
if state_num is None:
plot_data = data[num].reshape(grid_size)
else:
plot_data = data[num][state_num].reshape(grid_size)
plt.pcolormesh(X, Y, plot_data, vmin=0, vmax=1)
plt.colorbar()
if output_names is not None:
plt.savefig(output_names[num])
plt.show()
return
#######################################################################
# custom gaussians #
#######################################################################
def _gaussian(xs, a, b, c):
f = a*np.exp(-((xs-b)**2) / (2 * (c**2)))
return f
def _gaussian_multD(xs, x0s, height=1, widths=1, normed=False):
# check dim of a
if isinstance(height, collections.Iterable):
raise
# check dim of x0s
if isinstance(x0s, collections.Iterable):
if len(x0s) != len(xs):
raise
else:
x_shape = xs.shape
if len(x_shape) >= 3:
x0s = np.array([x0s for i in range(x_shape[0])])
# check dim of c
if isinstance(widths, collections.Iterable):
if len(widths) != len(xs):
raise
else:
x_shape = xs.shape
if len(x_shape) >= 3:
widths = np.array([widths for i in range(x_shape[0])])
# calc exponent
exponent = np.sum(
[
((xs[num] - x0s[num])**2) / (2 * (widths[num]**2))
for num in range(len(xs))],
axis=0)
if normed:
height = np.prod(1/(((2*np.pi)**0.5)*np.array(widths)))*height
f = height*np.exp(-exponent)
return f
def gaussian_noise(
x1s, x2s, gaussians_per_axis=None, height_range=[-0.1,0.1],
width_range=[0.9,1.1], rigidity=0):
"""Given x1 coords and x2 coords, generates a specified number of evenly
spaced gaussians along each axis of varying height and widths. The
rigidity value determines how evenly spaced gaussians are (0 is loose
and 1 is rigid)."""
if type(gaussians_per_axis) is int:
gaussians_per_axis = [gaussians_per_axis, gaussians_per_axis]
elif gaussians_per_axis is None:
gaussians_per_axis = [int(x1s.max()/2.), int(x2s.max()/2.)]
tot_gaussians = np.prod(gaussians_per_axis)
# dimension info
axis1 = x1s[0]
axis2 = x2s[:,0]
box_length_1 = (axis1[-1]-axis1[0])/gaussians_per_axis[0]
box_length_2 = (axis2[-1]-axis2[0])/gaussians_per_axis[1]
box_centers_1 = (np.arange(gaussians_per_axis[0]) * box_length_1) + \
(box_length_1/2.0)
box_centers_2 = (np.arange(gaussians_per_axis[1]) * box_length_2) + \
(box_length_2/2.0)
# generate rigid center coords
centers = np.array(
list(itertools.product(box_centers_1, box_centers_2)))
# heights
height_spread = height_range[1] - height_range[0]
heights = [
height_spread * np.random.random() + height_range[0]
for n in range(tot_gaussians)]
# widths
widths_spread = width_range[1] - width_range[0]
widths = [
widths_spread * np.random.random() + width_range[0]
for n in range(tot_gaussians)]
# rigid formula and initialize noise and xs
rigidity_div = 2 + (rigidity * 4)**2
noise = np.zeros((len(axis2), len(axis1)))
xs = np.array([x1s, x2s])
# add gaussians
for num in range(tot_gaussians):
rand1 = np.random.random() - 0.5
rand2 = np.random.random() - 0.5
center = [
centers[num, 0] + (box_length_1 / rigidity_div) * rand1,
centers[num, 1] + (box_length_2 / rigidity_div) * rand2]
noise += _gaussian_multD(
xs, center, height=heights[num], widths=widths[num])
return noise
#######################################################################
# converting landscape to a probability matrix #
#######################################################################
def energies_to_probs(Aij, energies):
"""Given an adjacency matrix and a list of state energies,
returns the corresponding transition probability matrix.
"""
# test shape of Aij
fd, sd = Aij.shape
if fd != sd:
print("Aij is not square!")
raise
# test values of Aij
if len(np.where((Aij != 0)*(Aij != 1))[0]) > 0:
print("Aij has elements that are not 0 or 1...")
raise
# initialize probs
probs = np.zeros(Aij.shape)
for state in range(len(probs)):
# find where there is a defined transition in Aij
transitions = np.where(Aij[state] == 1)[0]
# calculates rate = min[1, e^(U1-U2)]
rates = np.minimum(
np.ones(len(transitions)),
np.exp(energies[state] - energies[transitions]))
probs[state, transitions] = rates
# norms the rates into transition probs
probs = probs / probs.sum(axis=1)[:, None]
return probs
def surface_to_probs(x1s, x2s, surface, grid_size, adjust_centers=True):
"""given a potential energy landscape (in the form of values for x1,
x2, and f(x1,x2) and a connectivity grid size) returns the transition
probability matrix that corresponds to that surface. Looks for the
highest energy between adjacent states and uses the arrhenius equation
to generate a rate. Potential energy surface is in units of kT."""
# identify number of states and resolution (points between states)
n_states = grid_size[0]*grid_size[1]
states = np.arange(n_states).reshape((grid_size[1],grid_size[0]))
res = len(x1s[0])//grid_size[0]
if adjust_centers:
res_adjust = res//2
else:
res_adjust = 0
# initialize empy probability matrix
T = np.zeros((n_states, n_states))
# Get transitions between columns on the grid
for col in range(len(states[0])-1):
# determine the maximum energy between column-adjacent states
# by taking the maximum energy, we don't need to employ the
# metropolis min criterion, since the maximum could be the state
# in question (i.e. the rate will never be less than 1)
surface_slice = surface[
res_adjust::res,
(res_adjust + col*res):((col + 1)*res + 1 + res_adjust)]
max_energy = np.max(surface_slice, axis=1)
energy_diffs_1 = max_energy - surface_slice[:,0]
energy_diffs_2 = max_energy - surface_slice[:,-1]
# convert energy diffs to rates
rates1 = np.exp(-energy_diffs_1)
rates2 = np.exp(-energy_diffs_2)
# get state indices of transitions
state_trans = states[:,col:col+2].T
trans_iis_1 = (state_trans[0], state_trans[1])
trans_iis_2 = (state_trans[1], state_trans[0])
T[trans_iis_1] = rates1
T[trans_iis_2] = rates2
# Get transitions between rows on the grid
for row in range(len(states)-1):
# determine the maximum energy between row-adjacent states
surface_slice = surface[
(res_adjust + row*res):((row + 1)*res + 1 + res_adjust),
res_adjust::res]
max_energy = np.max(surface_slice, axis=0)
energy_diffs_1 = max_energy - surface_slice[0, :]
energy_diffs_2 = max_energy - surface_slice[-1, :]
# convert energy diffs to rates
rates1 = np.exp(-energy_diffs_1)
rates2 = np.exp(-energy_diffs_2)
# get state indices of transitions
state_trans = states[row:row+2,:]
trans_iis_1 = (state_trans[0], state_trans[1])
trans_iis_2 = (state_trans[1], state_trans[0])
T[trans_iis_1] = rates1
T[trans_iis_2] = rates2
# Get diagonal transitions
T[range(len(T)), range(len(T))] = 1
# normalize rows and return
T /= T.sum(axis=1)[:,None]
return T
def gen_aij(grid_size):
"""Generate a lattice with an arbitrary shape. Returns state
numberings and the adjacency matrix."""
n_states = np.prod(grid_size)
states = np.arange(n_states)
l_states = states.reshape(grid_size)
iis = np.array(
[np.array(np.where(l_states==state)).flatten() for state in states])
aij = np.zeros((n_states, n_states), dtype=int)
for state in states:
diffs = iis - iis[state]
dists = np.einsum('ij,ij->i', diffs, diffs)
aij[state, np.where(dists<=1)[0]] = 1
return aij
#######################################################################
# landscape class #
#######################################################################
class landscape:
"""Generation of a toy energy landscape.
Parameters
----------
grid_size : tuple, defaut=None
The dimensions of the 2d landscape. i.e. for a landscape that
is 50x10, grid_size=(50,10).
x1_coords : array, shape=(grid_size[1], grid_size[0]), default=None
The x1 coordinates of a pre-made grid. Not needed for a grid
being initialized.
x2_coords : array, shape=(grid_size[1], grid_size[0]), default=None
The x2 coordinates of a pre-made grid. Not needed for a grid
being initialized.
values : array, shape=(grid_size[1], grid_size[0]), default=None
The energies at each grid point. Not needed for a grid being
initialized. If None, all values are set to 0.
resolution : int, default=1
The number of energies to include between states on the grid.
This serves as a type of resolution of the landscape.
"""
def __init__(
self, grid_size=None, x1_coords=None, x2_coords=None, values=None,
resolution=1):
if (x1_coords is None) and (x2_coords is None) and (values is None):
if (grid_size is None):
print("Need to specify a grid_size")
raise
self.grid_size = grid_size
x1_points = np.arange(grid_size[0]*resolution)/resolution
x2_points = np.arange(grid_size[1]*resolution)/resolution
self.x1_coords, self.x2_coords = np.meshgrid(x1_points, x2_points)
self.values = np.zeros(self.x1_coords.shape)
else:
if (x1_coords is None) or (x2_coords is None) or (values is None):
print("missing inputs")
raise
self.grid_size = (
int(x1_coords[0][-1] + x1_coords[0][1]),
int(x2_coords[:,0][-1] + x2_coords[:,0][1]))
self.x1_coords = x1_coords
self.x2_coords = x2_coords
self.values = values
def add_gaussian(self, x0s, height=1, widths=1):
input_coords = np.array([self.x1_coords, self.x2_coords])
new_values = self.values + _gaussian_multD(
input_coords, x0s, height=height, widths=widths)
return landscape(
grid_size=self.grid_size, x1_coords=self.x1_coords,
x2_coords=self.x2_coords, values=new_values)
def add_noise(
self, gaussians_per_axis=None, height_range=[-0.1, 0.1],
width_range=[0.85, 1.15], rigidity=0):
noise = gaussian_noise(
self.x1_coords, self.x2_coords,
gaussians_per_axis = gaussians_per_axis, height_range = height_range,
width_range = width_range, rigidity = rigidity)
return landscape(
grid_size=self.grid_size, x1_coords=self.x1_coords,
x2_coords=self.x2_coords, values=self.values+noise)
def plot(
self, title='potential energy landscape', cmap='seismic',
**kwargs):
plt.figure(
title, figsize=(
self.x1_coords.max()/self.x2_coords.max()*10, 8))
plt.xlim((self.x1_coords[0,0], self.x1_coords[0,-1]))
plt.ylim((self.x2_coords[0,0], self.x2_coords[-1,0]))
plt.pcolormesh(
self.x1_coords, self.x2_coords, self.values, cmap=cmap, **kwargs)
plt.colorbar()
plt.show()
return
def cplot(
self, title='potential energy landscape', cmap='RdYlBu_r',
n_bins=10, show_plot=True, grid=True, **kwargs):
# get X, Y, and Z coords
X = self.x1_coords
Y = self.x2_coords
Z = self.values
# setup figure
fig = plt.figure(
title, figsize=(
self.x1_coords.max()/self.x2_coords.max()*10, 8))
ax = fig.add_subplot(1,1,1)
# get appropriate levels
try:
levels = kwargs['levels']
kwargs = removekey(kwargs,'levels')
try:
norm = kwargs['norm']
if type(norm) is type(colors.LogNorm()):
tick_values = np.logspace(
start=np.log10(np.min(levels)),
stop=np.log10(np.max(levels)),
num=len(levels), base=10.0)
else:
raise
except:
tick_values = np.linspace(
start=np.min(levels),
stop=np.max(levels),
num=len(levels))
except:
try:
# test if norm exists
norm = kwargs['norm']
if type(norm) is type(colors.LogNorm()):
levels = np.logspace(
#Z.min(), Z.max(), nbins, endpoint=True)
start=np.log10(Z.min()),
stop=np.log10(Z.max()),
num=n_bins, base=10.0)
else:
levels = mpl.ticker.MaxNLocator(n_bins=n_bins).tick_values(Z.min(), Z.max())
except:
levels = mpl.ticker.MaxNLocator(n_bins=n_bins).tick_values(Z.min(), Z.max())
tick_values = np.linspace(Z.min(), Z.max(), len(levels))
CS = ax.contourf(
X, Y, Z, cmap=cmap, levels=levels, origin='lower', zorder=0, **kwargs)
if grid:
# set grid
intervals = 1
loc = plticker.MultipleLocator(base=intervals)
ax.yaxis.set_major_locator(loc)
ax.xaxis.set_major_locator(loc)
plt.grid('on', linestyle='-', linewidth=1, color='black', zorder=1)
# make colorbar
fig.subplots_adjust(right=0.8)
cbar_ax = fig.add_axes([0.82, 0.15, 0.02, 0.7])
cb = fig.colorbar(CS, cax=cbar_ax, ticks=levels)
cb.ax.set_yticklabels([str(i) for i in tick_values])
if show_plot:
plt.show()
return fig, ax
def save_fig(self, output_name, title='potential energy landscape'):
plt.figure(title)
plt.xlim((self.x1_coords[0,0], self.x1_coords[0,-1]))
plt.ylim((self.x2_coords[0,0], self.x2_coords[-1,0]))
plt.pcolormesh(self.x1_coords, self.x2_coords, self.values)
plt.colorbar()
plt.savefig(output_name)
plt.show()
def to_probs(self):
T = surface_to_probs(
self.x1_coords, self.x2_coords, self.values, self.grid_size)
return T
def save(self, output_name, txt=False, txt_fmt='%d %d %d %f'):
if txt:
x1_coords_flat = self.x1_coords.flatten()
x2_coords_flat = self.x2_coords.flatten()
values_flat = self.values.flatten()
states = np.arange(len(x1_coords_flat))
output_data = np.array(
list(
zip(states, x1_coords_flat, x2_coords_flat, values_flat)))
np.savetxt(
output_name, output_data, fmt=txt_fmt,
header='state x1 x2 energy')
else:
output_dict = {
'x1_coords' : self.x1_coords,
'x2_coords' : self.x2_coords,
'landscape' : self.values}
io.saveh(output_name, **output_dict)
def load(input_name):
load_dict = io.loadh(input_name)
x1_coords = load_dict['x1_coords']
x2_coords = load_dict['x2_coords']
values = load_dict['landscape']
return landscape(
x1_coords=x1_coords, x2_coords=x2_coords, values=values)
def removekey(d, key):
r = dict(d)
del r[key]
return r