Skip to content

Commit

Permalink
improved kalman filter code and example, updated readme
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriele committed Jul 16, 2024
1 parent 0746677 commit 6567a43
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 127 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ See also the [examples](examples) folder.

Tested on Ubuntu 20.04 LTS.

Git clone the repository and run the file `setup.py` to install the required dependencies.
Firstly, `git clone` this repository. To install the dependencies in a venv virtual environment run:

```
python -m venv rlgainenv
source ./rlgainenv/bin/activate
```

then, run the command `python3 setup.py install`or `pip install -r requirements.txt` to install the required dependencies.

#### Maintainer

Expand Down
61 changes: 27 additions & 34 deletions atoms/kalmanFilter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import numpy
from numpy import dot, sum, tile
import numpy as np
from numpy.linalg import inv, det
from atoms.atoms_helpers import Helpers

Expand All @@ -18,6 +17,7 @@ class KalmanFilter:

def __init__(self, debug=False):
self.variables = {}
self.debug = debug

if debug:
self.logger = Helpers.init_logger()
Expand All @@ -38,10 +38,10 @@ def setup(self, variables):
if var in var_keys:
self.variables.update({var: variables[var]})
else:
raise ValueError('Required variable', var, 'not found in the input dictionary.')
raise ValueError(f'Required variable {var} not found in the input dictionary.')

# add matrix P to the variables dictionary
self.variables.update({'P': numpy.zeros(variables['Q'].shape)})
# Initialize matrix P to the identity matrix scaled by a large number
self.variables.update({'P': np.eye(variables['Q'].shape[0]) * 1000})

def predict(self):
"""
Expand All @@ -54,11 +54,14 @@ def predict(self):
- Q = process noise covariance matrix
"""
# calculate X(k) from (k-1) quantities
self.variables['X'] = dot(self.variables['A'], self.variables['X']) + dot(self.variables['B'],
self.variables['U'])
self.variables['X'] = self.variables['A'] @ self.variables['X'] + self.variables['B'] @ self.variables['U']

# calculate the state covariance P(k) from (k-1) quantities
self.variables['P'] = dot(self.variables['A'], dot(self.variables['P'],
self.variables['A'].T)) + self.variables['Q']
self.variables['P'] = self.variables['A'] @ self.variables['P'] @ self.variables['A'].T + self.variables['Q']

if self.debug:
self.logger.info(f"Predicted state X: {self.variables['X']}")
self.logger.info(f"Predicted state covariance P: {self.variables['P']}")

def update(self, y_measured):
"""
Expand All @@ -70,40 +73,30 @@ def update(self, y_measured):
- R = measurements noise covariance matrix
Returns the predictive probability (likelihood) of the measurements.
"""
y_mean_predicted = dot(self.variables['C'], self.variables['X'])
y_covariance = self.variables['R'] + dot(self.variables['C'], dot(self.variables['P'], self.variables['C'].T))
k_gain = dot(self.variables['P'], dot(self.variables['C'].T, inv(y_covariance)))
y_mean_predicted = self.variables['C'] @ self.variables['X']
y_covariance = self.variables['R'] + self.variables['C'] @ self.variables['P'] @ self.variables['C'].T
k_gain = self.variables['P'] @ self.variables['C'].T @ inv(y_covariance)

# correct the predicted state and covariance matrix
self.variables['X'] = self.variables['X'] + dot(k_gain, (y_measured - y_mean_predicted))
self.variables['P'] = self.variables['P'] - dot(k_gain, dot(y_covariance, k_gain.T))
self.variables['X'] = self.variables['X'] + k_gain @ (y_measured - y_mean_predicted)
self.variables['P'] = self.variables['P'] - k_gain @ self.variables['C'] @ self.variables['P']
x_estimated = self.variables['X']

# calculate the predictive probability of the measurements
y_predictive_prob = self.__gauss_pdf(y_measured, y_mean_predicted, y_covariance)

if self.debug:
self.logger.info(f"Updated state X: {self.variables['X']}")
self.logger.info(f"Updated state covariance P: {self.variables['P']}")
self.logger.info(f"Kalman Gain K: {k_gain}")
self.logger.info(f"Predictive probability: {y_predictive_prob}")

return x_estimated, y_predictive_prob

@staticmethod
def __gauss_pdf(y_measured, y_mean_predicted, y_covariance):

# see also https://arxiv.org/pdf/1204.0375.pdf
if y_mean_predicted.shape[1] == 1:
delta_y = y_measured - tile(y_mean_predicted, y_measured.shape[1])
E = 0.5 * sum(delta_y * (dot(inv(y_covariance), delta_y)), axis=0)
E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance))
y_predictive_prob = numpy.exp(-E)

elif y_measured.shape[1] == 1:
delta_y = tile(y_measured, y_mean_predicted.shape[1]) - y_mean_predicted
E = 0.5 * sum(delta_y * (dot(inv(y_covariance), delta_y)), axis=0)
E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance))
y_predictive_prob = numpy.exp(-E)

else:
delta_y = y_measured - y_mean_predicted
E = 0.5 * dot(delta_y.T, dot(inv(y_covariance), delta_y))
E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance))
y_predictive_prob = numpy.exp(-E)

delta_y = y_measured - y_mean_predicted
exponent = -0.5 * delta_y.T @ inv(y_covariance) @ delta_y
normalization = 0.5 * len(y_mean_predicted) * np.log(2 * np.pi) + 0.5 * np.log(det(y_covariance))
y_predictive_prob = np.exp(exponent - normalization)
return y_predictive_prob
108 changes: 59 additions & 49 deletions examples/example_kalmanFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,91 @@
from atoms.kalmanFilter import KalmanFilter

"""
In the example code we use KF to track a mobile user connected to a wireless network. The discrete-time dynamical
system is given by:
Example of using Kalman Filter to track a mobile user connected to a wireless network.
The discrete-time dynamical system is given by:
X(k+1) = A*X(k) + B*U(k)
The matrix of measurement Y describes the estimated position of the mobile:
The measurement matrix Y describes the estimated position of the mobile:
Y(k) = C*X(k)
We plot the estimated, the real trajectory of the mobile user, and the measurements.
We plot the estimated trajectory, the real trajectory, and the noisy measurements.
"""

# Initialize logger
logger = Helpers.init_logger()
logger.info('Example of Kalman Filter estimation.')

# time step of mobile user movement
dt = 0.1

# initialization of state matrices
A = np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]])
X = np.array([[0.0], [0.0], [0.1], [0.1]])
# Time step of mobile user movement
dt = 0.01

# Initialization of state matrices
A = np.array([
[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
X = np.array([[0.0], [0.0], [0.1], [0.1]]) # Initial state
B = np.eye(X.shape[0])
U = np.zeros((X.shape[0], 1))
U = np.zeros((X.shape[0], 1)) # Control input

# process noise covariance matrix and initial state covariance
# Process noise covariance matrix
Q = np.eye(X.shape[0])
P = np.diag((0.01, 0.01, 0.01, 0.01))

# measurement matrices (state X plus a random gaussian noise)
Y = np.array([[X[0, 0] + abs(np.random.randn(1)[0])], [X[1, 0] + abs(np.random.randn(1)[0])]])
# Measurement matrix (state X plus a random Gaussian noise)
C = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])

# measurement noise covariance
R = np.eye(Y.shape[0])
# Measurement noise covariance
R = np.eye(C.shape[0])

# Set up the KF class and simulate the estimation process
# Set up the Kalman Filter class
logger.info('Setting up the KF class...')
variables = {
'X': X, 'A': A, 'B': B, 'U': U, 'Q': Q, 'C': C, 'R': R
}
kf = KalmanFilter(debug=True)
kf.setup(variables)

var = {}
var.update({'X': X})
var.update({'A': A})
var.update({'B': B})
var.update({'U': U})
var.update({'Q': Q})
var.update({'C': C})
var.update({'R': R})

kf = KalmanFilter()
kf.setup(var)

# number of iterations in Kalman Filter
# Number of iterations in Kalman Filter
n_iter = 50
y_predictive_prob = np.zeros((n_iter, 1))
y_measured = np.zeros((C.shape[0], n_iter))
x_estimated = np.zeros((X.shape[0], n_iter))

# Applying the Kalman Filter
for i in np.arange(0, n_iter):
for i in range(n_iter):
kf.predict()
x_est, y_predict = kf.update(Y)
x_estimated[0:4, i] = x_est.reshape(1, -1)
y_predictive_prob[i, 0] = y_predict.reshape(1, -1)
Y = np.array([[X[0, 0] + abs(0.1 * np.random.randn(1)[0])], [X[1, 0] + abs(0.1 * np.random.randn(1)[0])]])
# Simulate measurement with some noise
Y = C @ X + np.random.normal(0, 0.1, (C.shape[0], 1))
y_measured[:, i] = Y.flatten()

x_est, _ = kf.update(Y)
x_estimated[:, i] = x_est.flatten()

# Simulate real system's next state (for demonstration purposes)
X = A @ X + B @ U

logger.info('Estimation complete.')

# Plot the results
logger.info('Plotting results ...')

time = np.arange(0, 50*dt, dt)
x_estimated = x_estimated.transpose()
plt.figure()
plt.plot(time, x_estimated)
plt.xlabel('Time')
plt.ylabel('State (X)')
plt.title('KF results')
plt.legend(['x1', 'x2', 'x3', 'x4'], loc='upper right')
time = np.arange(0, n_iter * dt, dt)

fig, axs = plt.subplots(2, 2, figsize=(10, 8))

# Plot each state variable
for i in range(X.shape[0]):
row, col = divmod(i, 2)
axs[row, col].plot(time, x_estimated[i, :], label='Estimated')
axs[row, col].scatter(time, y_measured[i % 2, :], s=10, color='r', label='Measured', alpha=0.6)
axs[row, col].set_xlabel('Time [s]')
axs[row, col].set_ylabel(f'State X{i + 1}')
axs[row, col].legend(loc='upper right')
axs[row, col].grid(True)

fig.suptitle('Kalman Filter Results')
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

logger.info('Done!')
81 changes: 39 additions & 42 deletions examples/example_linearMPC.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,77 @@
Rewrite the problem in its state-space form:
y(k) = [x_1(k); x_2(k)]
y(k+1) = A*y(k) + B*u(k) = [1 1; 0 1]*y(k) + [0; 1]*u(k)
y(k+1) = A*y(k) + B*u(k) = [1 dt; 0 1]*y(k) + [0; dt]*u(k)
Setup a constrained linear-quadratic MPC problem to stabilize the system using multiple shooting.
"""
logger = Helpers.init_logger()
logger.info('Example of linear MPC problem.')

# set the state and input dimensions, initial time, and define matrices A and B
# Define the time step
dt = 0.025

# Define state-space matrices for a double integrator
n_x = 2
n_u = 2
time_init = 0
A = np.block([[np.eye(n_x), np.eye(n_x)],
[np.zeros((n_x, n_x)), np.eye(n_x)]])
B = np.block([[np.zeros((n_x, n_u))], [np.eye(n_u)]])
n_u = 1
A = np.array([[1, dt], [0, 1]])
B = np.array([[0], [dt]])

# define all variables needed for setting up the MPC problem
# Define all variables needed for setting up the MPC problem
var = {}
var.update({'N': 6})
var.update({'N': 20})
var.update({'A': A})
var.update({'B': B})
var.update({'Q_N': np.diag([20, 20, 10, 10], k=0)})
var.update({'Q': np.diag([2, 2, 1, 1], k=0)})
var.update({'R': 15*np.eye(n_u)})
var.update({'x_r': np.array([np.cos(2*np.pi*time_init), -np.cos(2*np.pi*time_init), 0, 0])})
var.update({'x_0': np.array([0.8, -0.8, 0, 0])})
var.update({'x_min': np.array([-2*np.pi, -2*np.pi, -100*np.pi/180, -100*np.pi/180])})
var.update({'x_max': np.array([2*np.pi, 2*np.pi, 100*np.pi/180, 100*np.pi/180])})
var.update({'u_min': np.array([-5, -5])})
var.update({'u_max': np.array([5, 5])})

# create the linear MPC object and setup
var.update({'Q_N': np.diag([200, 200])})
var.update({'Q': np.diag([2, 2])})
var.update({'R': 0.1 * np.eye(n_u)})
var.update({'x_r': np.array([1.0, 0.0])})
var.update({'x_0': np.array([0.0, 0.5])})
var.update({'x_min': np.array([-2, -10])})
var.update({'x_max': np.array([2, 10])})
var.update({'u_min': np.array([-5])})
var.update({'u_max': np.array([5])})

# Create the linear MPC object and setup
opti = LinearMPC(debug=True)
opti.setup(var)

# simulate the problem in closed loop
time = time_init
# Simulate the problem in closed loop
time = 0
n_sim = 250
y = np.zeros((n_sim, 2*n_x))
y_r = np.zeros((n_sim, 2*n_x))
y = np.zeros((n_sim, n_x))
y_r = np.zeros((n_sim, n_x))
logger.info('Running simulation ...')

for i in range(n_sim):

# solve the problem
# Solve the problem
u_star = opti.solve()

# apply first control input and update initial conditions
# Apply first control input and update initial conditions
y[i, :] = var['x_0']
u = u_star[(var['N']+1)*2*n_x:(var['N']+1)*2*n_x+n_u]
u = u_star[n_x * (var['N']+1):n_x * (var['N']+1) + n_u]
x_0 = A.dot(var['x_0']) + B.dot(u)
var.update({'x_0': x_0})

# update the reference trajectory
time = time + 0.025
x_r = np.array([np.cos(2*np.pi*time), -np.cos(2*np.pi*time), 0, 0])
# Update the reference trajectory
time = time + dt
x_r = np.array([1.0, 0.0])
y_r[i, :] = x_r
var.update({'x_r': x_r})
opti.update(x_r=x_r, x_0=x_0)

# plot the results
# Plot the results
logger.info('Plotting results ...')
fig, axs = plt.subplots(2, 2)
fig, axs = plt.subplots(2, 1)
cont = 0

for i in range(n_x):
for j in range(n_x):
axs[i, j].plot(range(n_sim), y[:, cont], range(n_sim), y_r[:, cont])
axs[i, j].set_xlabel('iters')
axs[i, j].set_ylabel('data')
# set legend
axs[i, j].legend(['measured', 'reference'], loc='upper left')
axs[i, j].grid(True)
fig.tight_layout()
cont = cont + 1
axs[i].plot(range(n_sim), y[:, i], range(n_sim), y_r[:, i])
axs[i].set_xlabel('Iterations')
axs[i].set_ylabel(f'x_{i+1}')
axs[i].legend(['Measured', 'Reference'], loc='upper left')
axs[i].grid(True)

fig.tight_layout()
plt.show()
logger.info('Done!')
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='atoms',
version='1.0',
version='1.1',
packages=find_packages(),
install_requires=[
'osqp',
Expand Down

0 comments on commit 6567a43

Please sign in to comment.