diff --git a/README.md b/README.md index 9cd4261..8b6e91d 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,14 @@ See also the [examples](examples) folder. Tested on Ubuntu 20.04 LTS. -Git clone the repository and run the file `setup.py` to install the required dependencies. +Firstly, `git clone` this repository. To install the dependencies in a venv virtual environment run: + +``` +python -m venv rlgainenv +source ./rlgainenv/bin/activate +``` + +then, run the command `python3 setup.py install`or `pip install -r requirements.txt` to install the required dependencies. #### Maintainer diff --git a/atoms/kalmanFilter.py b/atoms/kalmanFilter.py index a208464..68909fd 100644 --- a/atoms/kalmanFilter.py +++ b/atoms/kalmanFilter.py @@ -1,5 +1,4 @@ -import numpy -from numpy import dot, sum, tile +import numpy as np from numpy.linalg import inv, det from atoms.atoms_helpers import Helpers @@ -18,6 +17,7 @@ class KalmanFilter: def __init__(self, debug=False): self.variables = {} + self.debug = debug if debug: self.logger = Helpers.init_logger() @@ -38,10 +38,10 @@ def setup(self, variables): if var in var_keys: self.variables.update({var: variables[var]}) else: - raise ValueError('Required variable', var, 'not found in the input dictionary.') + raise ValueError(f'Required variable {var} not found in the input dictionary.') - # add matrix P to the variables dictionary - self.variables.update({'P': numpy.zeros(variables['Q'].shape)}) + # Initialize matrix P to the identity matrix scaled by a large number + self.variables.update({'P': np.eye(variables['Q'].shape[0]) * 1000}) def predict(self): """ @@ -54,11 +54,14 @@ def predict(self): - Q = process noise covariance matrix """ # calculate X(k) from (k-1) quantities - self.variables['X'] = dot(self.variables['A'], self.variables['X']) + dot(self.variables['B'], - self.variables['U']) + self.variables['X'] = self.variables['A'] @ self.variables['X'] + self.variables['B'] @ self.variables['U'] + # calculate the state covariance P(k) from (k-1) quantities - self.variables['P'] = dot(self.variables['A'], dot(self.variables['P'], - self.variables['A'].T)) + self.variables['Q'] + self.variables['P'] = self.variables['A'] @ self.variables['P'] @ self.variables['A'].T + self.variables['Q'] + + if self.debug: + self.logger.info(f"Predicted state X: {self.variables['X']}") + self.logger.info(f"Predicted state covariance P: {self.variables['P']}") def update(self, y_measured): """ @@ -70,40 +73,30 @@ def update(self, y_measured): - R = measurements noise covariance matrix Returns the predictive probability (likelihood) of the measurements. """ - y_mean_predicted = dot(self.variables['C'], self.variables['X']) - y_covariance = self.variables['R'] + dot(self.variables['C'], dot(self.variables['P'], self.variables['C'].T)) - k_gain = dot(self.variables['P'], dot(self.variables['C'].T, inv(y_covariance))) + y_mean_predicted = self.variables['C'] @ self.variables['X'] + y_covariance = self.variables['R'] + self.variables['C'] @ self.variables['P'] @ self.variables['C'].T + k_gain = self.variables['P'] @ self.variables['C'].T @ inv(y_covariance) # correct the predicted state and covariance matrix - self.variables['X'] = self.variables['X'] + dot(k_gain, (y_measured - y_mean_predicted)) - self.variables['P'] = self.variables['P'] - dot(k_gain, dot(y_covariance, k_gain.T)) + self.variables['X'] = self.variables['X'] + k_gain @ (y_measured - y_mean_predicted) + self.variables['P'] = self.variables['P'] - k_gain @ self.variables['C'] @ self.variables['P'] x_estimated = self.variables['X'] # calculate the predictive probability of the measurements y_predictive_prob = self.__gauss_pdf(y_measured, y_mean_predicted, y_covariance) + if self.debug: + self.logger.info(f"Updated state X: {self.variables['X']}") + self.logger.info(f"Updated state covariance P: {self.variables['P']}") + self.logger.info(f"Kalman Gain K: {k_gain}") + self.logger.info(f"Predictive probability: {y_predictive_prob}") + return x_estimated, y_predictive_prob @staticmethod def __gauss_pdf(y_measured, y_mean_predicted, y_covariance): - - # see also https://arxiv.org/pdf/1204.0375.pdf - if y_mean_predicted.shape[1] == 1: - delta_y = y_measured - tile(y_mean_predicted, y_measured.shape[1]) - E = 0.5 * sum(delta_y * (dot(inv(y_covariance), delta_y)), axis=0) - E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance)) - y_predictive_prob = numpy.exp(-E) - - elif y_measured.shape[1] == 1: - delta_y = tile(y_measured, y_mean_predicted.shape[1]) - y_mean_predicted - E = 0.5 * sum(delta_y * (dot(inv(y_covariance), delta_y)), axis=0) - E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance)) - y_predictive_prob = numpy.exp(-E) - - else: - delta_y = y_measured - y_mean_predicted - E = 0.5 * dot(delta_y.T, dot(inv(y_covariance), delta_y)) - E = E + 0.5 * y_mean_predicted.shape[0] * numpy.log(2 * numpy.pi) + 0.5 * numpy.log(det(y_covariance)) - y_predictive_prob = numpy.exp(-E) - + delta_y = y_measured - y_mean_predicted + exponent = -0.5 * delta_y.T @ inv(y_covariance) @ delta_y + normalization = 0.5 * len(y_mean_predicted) * np.log(2 * np.pi) + 0.5 * np.log(det(y_covariance)) + y_predictive_prob = np.exp(exponent - normalization) return y_predictive_prob diff --git a/examples/example_kalmanFilter.py b/examples/example_kalmanFilter.py index f7a3de5..5153e15 100644 --- a/examples/example_kalmanFilter.py +++ b/examples/example_kalmanFilter.py @@ -4,81 +4,91 @@ from atoms.kalmanFilter import KalmanFilter """ -In the example code we use KF to track a mobile user connected to a wireless network. The discrete-time dynamical -system is given by: +Example of using Kalman Filter to track a mobile user connected to a wireless network. +The discrete-time dynamical system is given by: X(k+1) = A*X(k) + B*U(k) - -The matrix of measurement Y describes the estimated position of the mobile: - + +The measurement matrix Y describes the estimated position of the mobile: + Y(k) = C*X(k) - -We plot the estimated, the real trajectory of the mobile user, and the measurements. + +We plot the estimated trajectory, the real trajectory, and the noisy measurements. """ + +# Initialize logger logger = Helpers.init_logger() logger.info('Example of Kalman Filter estimation.') -# time step of mobile user movement -dt = 0.1 - -# initialization of state matrices -A = np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]]) -X = np.array([[0.0], [0.0], [0.1], [0.1]]) +# Time step of mobile user movement +dt = 0.01 + +# Initialization of state matrices +A = np.array([ + [1, 0, dt, 0], + [0, 1, 0, dt], + [0, 0, 1, 0], + [0, 0, 0, 1] +]) +X = np.array([[0.0], [0.0], [0.1], [0.1]]) # Initial state B = np.eye(X.shape[0]) -U = np.zeros((X.shape[0], 1)) +U = np.zeros((X.shape[0], 1)) # Control input -# process noise covariance matrix and initial state covariance +# Process noise covariance matrix Q = np.eye(X.shape[0]) -P = np.diag((0.01, 0.01, 0.01, 0.01)) -# measurement matrices (state X plus a random gaussian noise) -Y = np.array([[X[0, 0] + abs(np.random.randn(1)[0])], [X[1, 0] + abs(np.random.randn(1)[0])]]) +# Measurement matrix (state X plus a random Gaussian noise) C = np.array([[1, 0, 0, 0], [0, 1, 0, 0]]) -# measurement noise covariance -R = np.eye(Y.shape[0]) +# Measurement noise covariance +R = np.eye(C.shape[0]) -# Set up the KF class and simulate the estimation process +# Set up the Kalman Filter class logger.info('Setting up the KF class...') +variables = { + 'X': X, 'A': A, 'B': B, 'U': U, 'Q': Q, 'C': C, 'R': R +} +kf = KalmanFilter(debug=True) +kf.setup(variables) -var = {} -var.update({'X': X}) -var.update({'A': A}) -var.update({'B': B}) -var.update({'U': U}) -var.update({'Q': Q}) -var.update({'C': C}) -var.update({'R': R}) - -kf = KalmanFilter() -kf.setup(var) - -# number of iterations in Kalman Filter +# Number of iterations in Kalman Filter n_iter = 50 -y_predictive_prob = np.zeros((n_iter, 1)) +y_measured = np.zeros((C.shape[0], n_iter)) x_estimated = np.zeros((X.shape[0], n_iter)) # Applying the Kalman Filter -for i in np.arange(0, n_iter): +for i in range(n_iter): kf.predict() - x_est, y_predict = kf.update(Y) - x_estimated[0:4, i] = x_est.reshape(1, -1) - y_predictive_prob[i, 0] = y_predict.reshape(1, -1) - Y = np.array([[X[0, 0] + abs(0.1 * np.random.randn(1)[0])], [X[1, 0] + abs(0.1 * np.random.randn(1)[0])]]) + # Simulate measurement with some noise + Y = C @ X + np.random.normal(0, 0.1, (C.shape[0], 1)) + y_measured[:, i] = Y.flatten() + + x_est, _ = kf.update(Y) + x_estimated[:, i] = x_est.flatten() + + # Simulate real system's next state (for demonstration purposes) + X = A @ X + B @ U logger.info('Estimation complete.') # Plot the results logger.info('Plotting results ...') - -time = np.arange(0, 50*dt, dt) -x_estimated = x_estimated.transpose() -plt.figure() -plt.plot(time, x_estimated) -plt.xlabel('Time') -plt.ylabel('State (X)') -plt.title('KF results') -plt.legend(['x1', 'x2', 'x3', 'x4'], loc='upper right') +time = np.arange(0, n_iter * dt, dt) + +fig, axs = plt.subplots(2, 2, figsize=(10, 8)) + +# Plot each state variable +for i in range(X.shape[0]): + row, col = divmod(i, 2) + axs[row, col].plot(time, x_estimated[i, :], label='Estimated') + axs[row, col].scatter(time, y_measured[i % 2, :], s=10, color='r', label='Measured', alpha=0.6) + axs[row, col].set_xlabel('Time [s]') + axs[row, col].set_ylabel(f'State X{i + 1}') + axs[row, col].legend(loc='upper right') + axs[row, col].grid(True) + +fig.suptitle('Kalman Filter Results') +plt.tight_layout(rect=[0, 0, 1, 0.96]) plt.show() logger.info('Done!') diff --git a/examples/example_linearMPC.py b/examples/example_linearMPC.py index d0db76e..9ff8214 100644 --- a/examples/example_linearMPC.py +++ b/examples/example_linearMPC.py @@ -14,80 +14,77 @@ Rewrite the problem in its state-space form: y(k) = [x_1(k); x_2(k)] - y(k+1) = A*y(k) + B*u(k) = [1 1; 0 1]*y(k) + [0; 1]*u(k) + y(k+1) = A*y(k) + B*u(k) = [1 dt; 0 1]*y(k) + [0; dt]*u(k) Setup a constrained linear-quadratic MPC problem to stabilize the system using multiple shooting. """ logger = Helpers.init_logger() logger.info('Example of linear MPC problem.') -# set the state and input dimensions, initial time, and define matrices A and B +# Define the time step +dt = 0.025 + +# Define state-space matrices for a double integrator n_x = 2 -n_u = 2 -time_init = 0 -A = np.block([[np.eye(n_x), np.eye(n_x)], - [np.zeros((n_x, n_x)), np.eye(n_x)]]) -B = np.block([[np.zeros((n_x, n_u))], [np.eye(n_u)]]) +n_u = 1 +A = np.array([[1, dt], [0, 1]]) +B = np.array([[0], [dt]]) -# define all variables needed for setting up the MPC problem +# Define all variables needed for setting up the MPC problem var = {} -var.update({'N': 6}) +var.update({'N': 20}) var.update({'A': A}) var.update({'B': B}) -var.update({'Q_N': np.diag([20, 20, 10, 10], k=0)}) -var.update({'Q': np.diag([2, 2, 1, 1], k=0)}) -var.update({'R': 15*np.eye(n_u)}) -var.update({'x_r': np.array([np.cos(2*np.pi*time_init), -np.cos(2*np.pi*time_init), 0, 0])}) -var.update({'x_0': np.array([0.8, -0.8, 0, 0])}) -var.update({'x_min': np.array([-2*np.pi, -2*np.pi, -100*np.pi/180, -100*np.pi/180])}) -var.update({'x_max': np.array([2*np.pi, 2*np.pi, 100*np.pi/180, 100*np.pi/180])}) -var.update({'u_min': np.array([-5, -5])}) -var.update({'u_max': np.array([5, 5])}) - -# create the linear MPC object and setup +var.update({'Q_N': np.diag([200, 200])}) +var.update({'Q': np.diag([2, 2])}) +var.update({'R': 0.1 * np.eye(n_u)}) +var.update({'x_r': np.array([1.0, 0.0])}) +var.update({'x_0': np.array([0.0, 0.5])}) +var.update({'x_min': np.array([-2, -10])}) +var.update({'x_max': np.array([2, 10])}) +var.update({'u_min': np.array([-5])}) +var.update({'u_max': np.array([5])}) + +# Create the linear MPC object and setup opti = LinearMPC(debug=True) opti.setup(var) -# simulate the problem in closed loop -time = time_init +# Simulate the problem in closed loop +time = 0 n_sim = 250 -y = np.zeros((n_sim, 2*n_x)) -y_r = np.zeros((n_sim, 2*n_x)) +y = np.zeros((n_sim, n_x)) +y_r = np.zeros((n_sim, n_x)) logger.info('Running simulation ...') for i in range(n_sim): - - # solve the problem + # Solve the problem u_star = opti.solve() - # apply first control input and update initial conditions + # Apply first control input and update initial conditions y[i, :] = var['x_0'] - u = u_star[(var['N']+1)*2*n_x:(var['N']+1)*2*n_x+n_u] + u = u_star[n_x * (var['N']+1):n_x * (var['N']+1) + n_u] x_0 = A.dot(var['x_0']) + B.dot(u) var.update({'x_0': x_0}) - # update the reference trajectory - time = time + 0.025 - x_r = np.array([np.cos(2*np.pi*time), -np.cos(2*np.pi*time), 0, 0]) + # Update the reference trajectory + time = time + dt + x_r = np.array([1.0, 0.0]) y_r[i, :] = x_r var.update({'x_r': x_r}) opti.update(x_r=x_r, x_0=x_0) -# plot the results +# Plot the results logger.info('Plotting results ...') -fig, axs = plt.subplots(2, 2) +fig, axs = plt.subplots(2, 1) cont = 0 for i in range(n_x): - for j in range(n_x): - axs[i, j].plot(range(n_sim), y[:, cont], range(n_sim), y_r[:, cont]) - axs[i, j].set_xlabel('iters') - axs[i, j].set_ylabel('data') - # set legend - axs[i, j].legend(['measured', 'reference'], loc='upper left') - axs[i, j].grid(True) - fig.tight_layout() - cont = cont + 1 + axs[i].plot(range(n_sim), y[:, i], range(n_sim), y_r[:, i]) + axs[i].set_xlabel('Iterations') + axs[i].set_ylabel(f'x_{i+1}') + axs[i].legend(['Measured', 'Reference'], loc='upper left') + axs[i].grid(True) +fig.tight_layout() plt.show() logger.info('Done!') diff --git a/setup.py b/setup.py index 83308db..3bbf2d4 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='atoms', - version='1.0', + version='1.1', packages=find_packages(), install_requires=[ 'osqp',