-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
349 lines (277 loc) · 10 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# import itertools # for batchify (now in lyapnov.py)
import numpy as np
from matplotlib.colors import ListedColormap
import scipy.linalg
from scipy import signal
import torch
# from parfor import pmap
import multiprocessing as mp
import casadi as cs
from lyapunov import GridWorld
from lyapunov import config
NP_DTYPE = config.np_dtype
TF_DTYPE = config.dtype
def gridding(state_dim, state_constraints, num_states = 251, use_zero_threshold = True):
''' evenly discretize the state space
Args:
state_dim (int): The dimension of the state space.
state_constraints (np array): The constraints of the state space.
num_state (int): The number of states along each dimension.
use_zero_threshold (bool): Whether to use zero threshold.
False: the grid is infinitesimal
'''
# State grid
if state_constraints is None:
state_constraints = np.array([[-1., 1.], ] * state_dim)
grid_limits = state_constraints
state_discretization = GridWorld(grid_limits, num_states)
# Discretization constant
if use_zero_threshold:
tau = 0.0 # assume the grid is infinitesimal
else:
tau = np.sum(state_discretization.unit_maxes) / 2
print('Grid size: {}'.format(state_discretization.nindex))
print('Discretization constant (tau): {}'.format(tau))
return state_discretization
def binary_cmap(color='red', alpha=1.):
"""Construct a binary colormap."""
if color == 'red':
color_code = (1., 0., 0., alpha)
elif color == 'green':
color_code = (0., 1., 0., alpha)
elif color == 'blue':
color_code = (0., 0., 1., alpha)
else:
color_code = color
transparent_code = (1., 1., 1., 0.)
return ListedColormap([transparent_code, color_code])
def balanced_class_weights(y_true, scale_by_total=True):
"""Compute class weights from class label counts."""
y = y_true.astype(np.bool_)
nP = y.sum()
nN = y.size - y.sum()
class_counts = np.array([nN, nP])
weights = np.ones_like(y, dtype=float)
weights[ y] /= nP
weights[~y] /= nN
if scale_by_total:
weights *= y.size
return weights, class_counts
def dlqr(a, b, q, r):
"""Compute the discrete-time LQR controller.
The optimal control input is `u = -k.dot(x)`.
Parameters
----------
a : np.array
b : np.array
q : np.array
r : np.array
Returns
-------
k : np.array
Controller matrix
p : np.array
Cost to go matrix
"""
a, b, q, r = map(np.atleast_2d, (a, b, q, r))
p = scipy.linalg.solve_discrete_are(a, b, q, r)
# LQR gain
# k = (b.T * p * b + r)^-1 * (b.T * p * a)
bp = b.T.dot(p)
tmp1 = bp.dot(b)
tmp1 += r
tmp2 = bp.dot(a)
k = np.linalg.solve(tmp1, tmp2)
return k, p
def discretize_linear_system(A, B, dt, exact=False):
'''Discretization of a linear system
dx/dt = A x + B u
--> xd[k+1] = Ad xd[k] + Bd ud[k] where xd[k] = x(k*dt)
Args:
A (ndarray): System transition matrix.
B (ndarray): Input matrix.
dt (scalar): Step time interval.
exact (bool): If to use exact discretization.
Returns:
Ad (ndarray): The discrete linear state matrix A.
Bd (ndarray): The discrete linear input matrix B.
'''
state_dim, input_dim = A.shape[1], B.shape[1]
if exact:
M = np.zeros((state_dim + input_dim, state_dim + input_dim))
M[:state_dim, :state_dim] = A
M[:state_dim, state_dim:] = B
Md = scipy.linalg.expm(M * dt)
Ad = Md[:state_dim, :state_dim]
Bd = Md[:state_dim, state_dim:]
else:
Identity = np.eye(state_dim)
Ad = Identity + A * dt
Bd = B * dt
return Ad, Bd
class InvertedPendulum(object):
"""Inverted Pendulum.
Parameters
----------
mass : float
length : float
friction : float, optional
dt : float, optional
The sampling time.
normalization : tuple, optional
A tuple (Tx, Tu) of arrays used to normalize the state and actions. It
is so that diag(Tx) *x_norm = x and diag(Tu) * u_norm = u.
"""
def __init__(self, mass, length, friction=0, dt=1 / 80,
normalization=None):
"""Initialization; see `InvertedPendulum`."""
super(InvertedPendulum, self).__init__()
self.mass = mass
self.length = length
self.gravity = 9.81
self.friction = friction
self.dt = dt
self.nx = 2
self.nu = 1
self.symbolic = None
self.normalization = normalization
if normalization is not None:
self.normalization = [np.array(norm, dtype=config.np_dtype)
for norm in normalization]
self.inv_norm = [norm ** -1 for norm in self.normalization]
def __call__(self, *args, **kwargs):
"""Evaluate the function using the template to ensure variable sharing.
Parameters
----------
args : list
The input arguments to the function.
kwargs : dict, optional
The keyword arguments to the function.
Returns
-------
outputs : list
The output arguments of the function as given by evaluate.
"""
outputs = self.forward(*args, **kwargs)
return outputs
@property
def inertia(self):
"""Return inertia of the pendulum."""
return self.mass * self.length ** 2
def normalize(self, state, action):
"""Normalize states and actions."""
if self.normalization is None:
return state, action
Tx_inv, Tu_inv = map(np.diag, self.inv_norm)
state = np.matmul(state, Tx_inv)
if action is not None:
# action = tf.matmul(action, Tu_inv)
# action = torch.matmul(action, Tu_inv)
action = np.matmul(action, Tu_inv)
return state, action
def denormalize(self, state, action):
"""De-normalize states and actions."""
if self.normalization is None:
return state, action
Tx, Tu = map(np.diag, self.normalization)
state = np.matmul(state, Tx)
if action is not None:
action = np.matmul(action, Tu)
return state, action
def linearize(self):
"""Return the linearized system.
Returns
-------
a : ndarray
The state matrix.
b : ndarray
The action matrix.
"""
gravity = self.gravity
length = self.length
friction = self.friction
inertia = self.inertia
A = np.array([[0, 1],
[gravity / length, -friction / inertia]],
dtype=config.np_dtype)
B = np.array([[0],
[1 / inertia]],
dtype=config.np_dtype)
if self.normalization is not None:
Tx, Tu = map(np.diag, self.normalization)
Tx_inv, Tu_inv = map(np.diag, self.inv_norm)
A = np.linalg.multi_dot((Tx_inv, A, Tx))
B = np.linalg.multi_dot((Tx_inv, B, Tu))
sys = signal.StateSpace(A, B, np.eye(2), np.zeros((2, 1)))
sysd = sys.to_discrete(self.dt)
return sysd.A, sysd.B
def forward(self, state_action):
"""Evaluate the dynamics."""
# Denormalize
# split the state_action into state and action,
# the first two column are state, the last column is action
state, action = np.split(state_action, [2], axis=1)
# state, action = np.split(state_action, [2], axis=0)
state, action = self.denormalize(state, action)
n_inner = 10
dt = self.dt / n_inner
for i in range(n_inner):
state_derivative = self.ode(state, action)
state = state + dt * state_derivative
return self.normalize(state, None)[0]
def ode(self, state, action):
"""Compute the state time-derivative.
Parameters
----------
states: ndarray or Tensor
Unnormalized states.
actions: ndarray or Tensor
Unnormalized actions.
Returns
-------
x_dot: Tensor
The normalized derivative of the dynamics
"""
# Physical dynamics
gravity = self.gravity
length = self.length
friction = self.friction
inertia = self.inertia
angle, angular_velocity = np.split(state, [1], axis=-1)
x_ddot = gravity / length * np.sin(angle) + action / inertia
if friction > 0:
x_ddot -= friction / inertia * angular_velocity
state_derivative = np.concatenate((angular_velocity, x_ddot), axis=-1)
# Normalize
return state_derivative
def compute_roa_pendulum(grid, closed_loop_dynamics, horizon=100, tol=1e-3, equilibrium=None, no_traj=True):
"""Compute the largest ROA as a set of states in a discretization."""
if isinstance(grid, np.ndarray):
all_points = grid
nindex = grid.shape[0]
ndim = grid.shape[1]
else: # grid is a GridWorld instance
all_points = grid.all_points
nindex = grid.nindex
ndim = grid.ndim
# Forward-simulate all trajectories from initial points in the discretization
if no_traj:
end_states = all_points
for t in range(1, horizon):
end_states = closed_loop_dynamics(end_states)
else:
trajectories = np.empty((nindex, ndim, horizon))
trajectories[:, :, 0] = all_points
for t in range(1, horizon):
# simulate all states in the grid
trajectories[:, :, t] = closed_loop_dynamics(trajectories[:, :, t - 1])
end_states = trajectories[:, :, -1]
if equilibrium is None:
equilibrium = np.zeros((1, ndim))
# Compute an approximate ROA as all states that end up "close" to 0
dists = np.linalg.norm(end_states - equilibrium, ord=2, axis=1, keepdims=True).ravel()
roa = (dists <= tol)
if no_traj:
return roa
else:
return roa, trajectories