diff --git a/VBOC/Safe MPC/comparison.py b/VBOC/Safe MPC/comparison.py new file mode 100644 index 0000000..c2924ea --- /dev/null +++ b/VBOC/Safe MPC/comparison.py @@ -0,0 +1,64 @@ +import numpy as np +import matplotlib.pyplot as plt + +# Load data +data_dir = "data/" +data_no = np.load(data_dir + "results_no_constraint.npz") + +use_data = 'receidinghard' # soft_term, soft_traj, hard_term, receidinghard, receidingsoft + +if use_data == 'soft_term': + data_vboc = np.load(data_dir + "results_softterm.npz") +if use_data == 'soft_traj': + data_vboc = np.load(data_dir + "results_softtraj.npz") +if use_data == 'hard_term': + data_vboc = np.load(data_dir + "results_hardterm.npz") +if use_data == 'receidinghard': + data_vboc = np.load(data_dir + "results_receidinghard.npz") +if use_data == 'receidingsoft': + data_vboc = np.load(data_dir + "results_receidingsoft.npz") + +dt = data_no["dt"] +tot_time = data_no["tot_time"] +times_no = data_no["times"] +res_steps_no = data_no["res_steps"] + +times_hard = data_vboc["times"] +res_steps_hard = data_vboc["res_steps_term"] +better = data_vboc["better"] +worse = data_vboc["worse"] +equal = data_vboc["equal"] + +# Plot timing +plt.figure() +plt.plot(np.linspace(0, len(times_no), len(times_no)), times_no, label="naive MPC", color='red') +plt.plot(np.linspace(0, len(times_hard), len(times_hard)), times_hard, label="MPC + VBOC", color='green') +plt.plot(np.linspace(0, len(times_no), len(times_no)), np.ones(len(times_no)) * np.quantile(times_no, 0.9), + label="90% quantile naive MPC", color='fuchsia', linestyle='--') +plt.plot(np.linspace(0, len(times_hard), len(times_hard)), np.ones(len(times_hard)) * np.quantile(times_hard, 0.9), + label="90% quantile MPC + VBOC", color='DarkBlue', linestyle='--') +plt.xlabel('MPC Iteration') +plt.ylabel('Solve time [s]') +plt.legend() +plt.title("Solve time comparison") +plt.savefig(data_dir + 'times_' + use_data + '.png') + +# Barchart that states when the hard terminal constraint is better, equal or worse than the naive MPC +plt.figure() +plt.bar(["Better", "Equal", "Worse"], [better, equal, worse], color=["green", "blue", "red"]) +plt.ylabel("Number") +plt.title("Comparison between MPC + VBOC and naive MPC") +plt.savefig(data_dir + 'numbers_' + use_data + '.png') + +# Directly compare the number of iteration taken by the two MPCs before the first infeasible solution +plt.figure() +total = np.linspace(1, 100, 100) +plt.plot(total, res_steps_no, label="naive MPC", color='red') +plt.plot(total, np.ones(len(total)) * np.mean(res_steps_no), label="mean naive MPC", color='fuchsia', linestyle='--') +plt.plot(total, res_steps_hard, label="MPC + VBOC", color='green') +plt.plot(total, np.ones(len(total)) * np.mean(res_steps_hard), label="mean MPC + VBOC", color='DarkBlue', linestyle='--') +plt.title("Comparison between MPC + VBOC and naive MPC") +plt.xlabel("Number of problems") +plt.ylabel("Number of iterations") +plt.legend() +plt.savefig(data_dir + 'iterations_' + use_data + '.png') diff --git a/VBOC/Safe MPC/hard_terminal_constraints/2dof_sym.py b/VBOC/Safe MPC/hard_terminal_constraints/2dof_sym.py deleted file mode 100644 index 710dba1..0000000 --- a/VBOC/Safe MPC/hard_terminal_constraints/2dof_sym.py +++ /dev/null @@ -1,147 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from numpy.linalg import norm as norm -import time -from doublependulum_class_fixedveldir import OCPdoublependulumINIT, SYMdoublependulumINIT -import warnings -warnings.filterwarnings("ignore") -import torch -import torch.nn as nn -from my_nn import NeuralNetRegression -import math -from multiprocessing import Pool -import random -from scipy.stats import qmc - -def simulate(p): - x0 = np.array([data[p,0], data[p,1], 1e-10, 1e-10]) - - simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) - simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) - simX[0] = np.copy(x0) - - failed_iter = -1 - - tot_time = 0 - calls = 0 - - # Guess: - x_sol_guess = np.full((ocp.N+1, ocp.ocp.dims.nx), x0) - u_sol_guess = np.full((ocp.N, ocp.ocp.dims.nu), np.array([ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x0[0]),ocp.g*ocp.l2*ocp.m2*math.sin(x0[1])])) - - for f in range(tot_steps): - - temp = time.time() - status = ocp.OCP_solve(simX[f], q_ref, x_sol_guess, u_sol_guess) - tot_time += time.time() - temp - calls += 1 - - if status != 0: - - if failed_iter >= ocp.N-1 or failed_iter < 0: - break - - failed_iter += 1 - - simU[f] = u_sol_guess[0] - - for i in range(ocp.N-1): - x_sol_guess[i] = np.copy(x_sol_guess[i+1]) - u_sol_guess[i] = np.copy(u_sol_guess[i+1]) - - x_sol_guess[ocp.N-1] = np.copy(x_sol_guess[ocp.N]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - else: - failed_iter = 0 - - simU[f] = ocp.ocp_solver.get(0, "u") - - for i in range(ocp.N-1): - x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") - u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") - - x_sol_guess[ocp.N-1] = ocp.ocp_solver.get(ocp.N, "x") - x_sol_guess[ocp.N] = np.copy(x_sol_guess[ocp.N-1]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - noise = np.array([(ocp.thetamax-ocp.thetamin)*noise_intensity*random.uniform(-1, 1), (ocp.thetamax-ocp.thetamin)*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1)]) - - sim.acados_integrator.set("u", simU[f]) - sim.acados_integrator.set("x", simX[f]) - status = sim.acados_integrator.solve() - simX[f+1] = sim.acados_integrator.get("x") + noise - simU[f] = u_sol_guess[0] - - tm = tot_time/calls - - return f, tm - -start_time = time.time() - -# Pytorch params: -device = torch.device("cpu") - -model_dir = NeuralNetRegression(4, 300, 1).to(device) -model_dir.load_state_dict(torch.load('../model_2dof_vboc')) -mean_dir = torch.load('../mean_2dof_vboc') -std_dir = torch.load('../std_2dof_vboc') -safety_margin = 2.0 - -noise_intensity = 0 - -params = list(model_dir.parameters()) - -ocp = OCPdoublependulumINIT(True, params, mean_dir, std_dir, safety_margin) -sim = SYMdoublependulumINIT(True) - -# Generate low-discrepancy unlabeled samples: -sampler = qmc.Halton(d=2, scramble=False) -sample = sampler.random(n=pow(10, 2)) -q_max = ocp.thetamax -q_min = ocp.thetamin -l_bounds = [q_min, q_min] -u_bounds = [q_max, q_max] -data = qmc.scale(sample, l_bounds, u_bounds) - -tot_steps = 100 -q_ref = np.array([(ocp.thetamax+ocp.thetamin)/2, ocp.thetamax - 0.05]) - -# MPC controller without terminal constraints: -cpu_num = 30 -with Pool(cpu_num) as p: - res = np.array(p.map(simulate, range(data.shape[0]))) - -res_steps_term, stats = zip(*res) - -print('Mean solve time: ' + str(np.mean(stats))) - -res_steps = np.load('../no_constraints/res_steps_noconstr.npy') - -better = 0 -equal = 0 -worse = 0 - -plt.figure() -for i in range(res_steps.shape[0]): - if res_steps_term[i]-res_steps[i]>0: - plt.plot(data[i,0], data[i,1],color='darkblue',marker="*",markersize=9,zorder=1,linestyle="None") - better += 1 - elif res_steps_term[i]-res_steps[i]==0: - plt.plot(data[i,0], data[i,1],color='darkgrey',marker=".",markersize=7,zorder=1,linestyle="None") - equal += 1 - else: - plt.plot(data[i,0], data[i,1],color='darkred',marker="X",markersize=7,zorder=1,linestyle="None") - worse += 1 -plt.title('MPC standard vs MPC with hard term constraints') - -print('MPC standard vs MPC with hard term constraints') -print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) -print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) - -print(np.array(res_steps_term).astype(int)) - -np.save('res_steps_hardtermconstr.npy', np.array(res_steps_term).astype(int)) - -plt.show() \ No newline at end of file diff --git a/VBOC/Safe MPC/hard_terminal_constraints/3dof_sym.py b/VBOC/Safe MPC/hard_terminal_constraints/3dof_sym.py index 2c75f78..4e0b6ff 100644 --- a/VBOC/Safe MPC/hard_terminal_constraints/3dof_sym.py +++ b/VBOC/Safe MPC/hard_terminal_constraints/3dof_sym.py @@ -32,9 +32,8 @@ def simulate(p): for f in range(tot_steps): - temp = time.time() - status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess) - times[f] = time.time() - temp + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') if status != 0: @@ -64,11 +63,12 @@ def simulate(p): x_sol_guess[N] = np.copy(x_sol_guess[N-1]) u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + simU[f] += noise_vec[f] + sim.acados_integrator.set("u", simU[f]) sim.acados_integrator.set("x", simX[f]) status = sim.acados_integrator.solve() simX[f+1] = sim.acados_integrator.get("x") - simU[f] = u_sol_guess[0] return f, times @@ -86,19 +86,21 @@ def simulate(p): cpu_num = 1 test_num = 100 -time_step = 4*1e-3 -tot_time = 0.16 #0.1 and 0.115 0.002s, 0.15 0.027s, 0.2 0.003s, 0.25 0.0035s, 0.3 0.0039s +time_step = 5*1e-3 +tot_time = 0.16 tot_steps = 100 regenerate = True x_sol_guess_vec = np.load('../x_sol_guess.npy') u_sol_guess_vec = np.load('../u_sol_guess.npy') +noise_vec = np.load('../noise.npy') +noise_vec = np.load('../selected_joint.npy') quant = 10. r = 1 -while quant > 3*1e-3: +while quant > 4*1e-3: ocp = OCPtriplependulumHardTerm("SQP_RTI", time_step, tot_time, list(model.parameters()), mean, std, regenerate) sim = SYMtriplependulum(time_step, tot_time, True) @@ -120,17 +122,19 @@ def simulate(p): times = np.array([i for f in stats for i in f if i is not None]) - quant = np.quantile(times, 0.9) + quant = np.quantile(times, 0.99) print('iter: ', str(r)) print('tot time: ' + str(tot_time)) - print('90 percent quantile solve time: ' + str(quant)) + print('99 percent quantile solve time: ' + str(quant)) print('Mean solve time: ' + str(np.mean(times))) - tot_time -= 2*1e-2 + tot_time -= time_step r += 1 -print(np.array(res_steps_term).astype(int)) + print(np.array(res_steps_term).astype(int)) + + del ocp np.save('res_steps_hardterm.npy', np.array(res_steps_term).astype(int)) @@ -151,4 +155,8 @@ def simulate(p): print('MPC standard vs MPC with hard term constraints') print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) \ No newline at end of file +print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) + +np.savez('../data/results_hardterm.npz', res_steps_term=res_steps_term, + better=better, worse=worse, equal=equal, times=times, + dt=time_step, tot_time=tot_time) \ No newline at end of file diff --git a/VBOC/Safe MPC/hard_terminal_constraints/doublependulum_class_fixedveldir.py b/VBOC/Safe MPC/hard_terminal_constraints/doublependulum_class_fixedveldir.py deleted file mode 100644 index 7e4bc73..0000000 --- a/VBOC/Safe MPC/hard_terminal_constraints/doublependulum_class_fixedveldir.py +++ /dev/null @@ -1,258 +0,0 @@ -from acados_template import AcadosOcp, AcadosOcpSolver, AcadosSim, AcadosSimSolver -import numpy as np -from acados_template import AcadosModel -from casadi import SX, vertcat, cos, sin, fmax, norm_2 -import scipy.linalg as lin - -class OCPdoublependulum: - def __init__(self): - - # --------------------SET MODEL-------------------- - # ------------------------------------------------- - model_name = "double_pendulum_ode" - - # constants - self.m1 = 0.4 # mass of the first link [kself.g] - self.m2 = 0.4 # mass of the second link [kself.g] - self.g = 9.81 # self.gravity constant [m/s^2] - self.l1 = 0.8 # lenself.gth of the first link [m] - self.l2 = 0.8 # lenself.gth of the second link [m] - - # states - theta1 = SX.sym("theta1") - theta2 = SX.sym("theta2") - dtheta1 = SX.sym("dtheta1") - dtheta2 = SX.sym("dtheta2") - self.x = vertcat(theta1, theta2, dtheta1, dtheta2) - - # xdot - theta1_dot = SX.sym("theta1_dot") - theta2_dot = SX.sym("theta1_dot") - dtheta1_dot = SX.sym("dtheta2_dot") - dtheta2_dot = SX.sym("dtheta2_dot") - xdot = vertcat(theta1_dot, theta2_dot, dtheta1_dot, dtheta2_dot) - - # controls - C1 = SX.sym("C1") - C2 = SX.sym("C2") - u = vertcat(C1, C2) - - # parameters - p = [] - - # dynamics - f_expl = vertcat( - dtheta1, - dtheta2, - ( - self.l1**2 - * self.l2 - * self.m2 - * dtheta1**2 - * sin(-2 * theta2 + 2 * theta1) - + 2 * C2 * cos(-theta2 + theta1) * self.l1 - + 2 - * ( - self.g * sin(-2 * theta2 + theta1) * self.l1 * self.m2 / 2 - + sin(-theta2 + theta1) * dtheta2**2 * self.l1 * self.l2 * self.m2 - + self.g * self.l1 * (self.m1 + self.m2 / 2) * sin(theta1) - - C1 - ) - * self.l2 - ) - / self.l1**2 - / self.l2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ( - -self.g - * self.l1 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + 2 * theta1) - - self.l1 - * self.l2**2 - * self.m2**2 - * dtheta2**2 - * sin(-2 * theta2 + 2 * theta1) - - 2 - * dtheta1**2 - * self.l1**2 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + theta1) - + 2 * C1 * cos(-theta2 + theta1) * self.l2 * self.m2 - + self.l1 - * (self.m1 + self.m2) - * (sin(theta2) * self.g * self.l2 * self.m2 - 2 * C2) - ) - / self.l2**2 - / self.l1 - / self.m2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ) - - self.model = AcadosModel() - - f_impl = xdot - f_expl - - self.model.f_expl_expr = f_expl - self.model.f_impl_expr = f_impl - self.model.x = self.x - self.model.xdot = xdot - self.model.u = u - self.model.p = p - self.model.name = model_name - - -class OCPdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate, nn_params, mean, std, safety_margin): - - # inherit initialization - super().__init__() - - # ---------------------SET OCP--------------------- - # ------------------------------------------------- - self.ocp = AcadosOcp() - - # times - self.Tf = 0.01 - self.N = int(1000 * self.Tf) - self.ocp.solver_options.tf = self.Tf - self.ocp.dims.N = self.N - - self.nx = self.model.x.size()[0] - self.nu = self.model.u.size()[0] - self.ny = self.nx + self.nu - self.ny_e = self.nx - - # cost - Q = np.diag([1e4, 1e4, 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4]) - - self.ocp.cost.W_e = Q - self.ocp.cost.W = lin.block_diag(Q, R) - - self.ocp.cost.cost_type = "LINEAR_LS" - self.ocp.cost.cost_type_e = "LINEAR_LS" - - self.ocp.cost.Vx = np.zeros((self.ny, self.nx)) - self.ocp.cost.Vx[: self.nx, :self.nx] = np.eye(self.nx) - self.ocp.cost.Vu = np.zeros((self.ny, self.nu)) - self.ocp.cost.Vu[self.nx:, :self.nu] = np.eye(self.nu) - self.ocp.cost.Vx_e = np.eye(self.nx) - - # reference - self.ocp.cost.yref = np.array([np.pi, np.pi, 0., 0., 0., 0.]) - self.ocp.cost.yref_e = np.array([np.pi, np.pi, 0., 0.]) - - # set constraints - self.Cmax = 10. - self.thetamax = np.pi / 4 + np.pi - self.thetamin = - np.pi / 4 + np.pi - self.dthetamax = 10. - - Cmax_limits = np.array([self.Cmax, self.Cmax]) - Xmax_limits = np.array([self.thetamax, self.thetamax, self.dthetamax, self.dthetamax]) - Xmin_limits = np.array([self.thetamin, self.thetamin, -self.dthetamax, -self.dthetamax]) - - self.ocp.constraints.lbu = -Cmax_limits - self.ocp.constraints.ubu = Cmax_limits - self.ocp.constraints.idxbu = np.array([0, 1]) - self.ocp.constraints.lbx = Xmin_limits - self.ocp.constraints.ubx = Xmax_limits - self.ocp.constraints.idxbx = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_e = Xmin_limits - self.ocp.constraints.ubx_e = Xmax_limits - self.ocp.constraints.idxbx_e = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_0 = Xmin_limits - self.ocp.constraints.ubx_0 = Xmax_limits - self.ocp.constraints.idxbx_0 = np.array([0, 1, 2, 3]) - - # nonlinear constraints - self.model.con_h_expr_e = self.nn_decisionfunction(nn_params, mean, std, safety_margin, self.x) - - self.ocp.constraints.lh_e = np.array([0.]) - self.ocp.constraints.uh_e = np.array([1e6]) - - # ------------------------------------------------- - - # self.ocp.solver_options.nlp_solver_type = "SQP" - self.ocp.solver_options.qp_solver = "FULL_CONDENSING_HPIPM" - self.ocp.solver_options.tol = 1e-2 - self.ocp.solver_options.qp_solver_iter_max = 100 - self.ocp.solver_options.nlp_solver_max_iter = 1000 - self.ocp.solver_options.globalization = "MERIT_BACKTRACKING" - self.ocp.solver_options.alpha_reduction = 0.3 - self.ocp.solver_options.alpha_min = 1e-2 - self.ocp.solver_options.levenberg_marquardt = 1. - - # ocp model - self.ocp.model = self.model - - # solver - self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) - - def OCP_solve(self, x0, q_ref, x_sol_guess, u_sol_guess): - - # Reset current iterate: - self.ocp_solver.reset() - - self.ocp_solver.constraints_set(0, "lbx", x0) - self.ocp_solver.constraints_set(0, "ubx", x0) - - # Set parameters, guesses and constraints: - for i in range(self.N): - self.ocp_solver.set(i, 'x', x_sol_guess[i]) - self.ocp_solver.set(i, 'u', u_sol_guess[i]) - self.ocp_solver.cost_set(i, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.,0.,0.])) - - self.ocp_solver.set(self.N, 'x', x_sol_guess[self.N]) - self.ocp_solver.cost_set(self.N, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.])) - - # Solve the OCP: - status = self.ocp_solver.solve() - - return status - - def nn_decisionfunction(self, params, mean, std, safety_margin, x): - - vel_norm = fmax(norm_2(x[2:]), 1e-3) - - mean = vertcat(mean,mean,0.,0.) - std = vertcat(std,std,vel_norm,vel_norm) - - out = (x - mean) / std - it = 0 - - for param in params: - - param = SX(param.tolist()) - - if it % 2 == 0: - out = param @ out - else: - out = param + out - - if it == 1 or it == 3: - out = fmax(0., out) - - it += 1 - - return out - vel_norm - - -class SYMdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate): - - # inherit initialization - super().__init__() - - sim = AcadosSim() - sim.model = self.model - sim.solver_options.T = 1e-3 - sim.solver_options.num_stages = 4 - self.acados_integrator = AcadosSimSolver(sim, build=regenerate) diff --git a/VBOC/Safe MPC/no_constraints/2dof_sym.py b/VBOC/Safe MPC/no_constraints/2dof_sym.py deleted file mode 100644 index 8a1de94..0000000 --- a/VBOC/Safe MPC/no_constraints/2dof_sym.py +++ /dev/null @@ -1,116 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from numpy.linalg import norm as norm -import time -from doublependulum_class_fixedveldir import OCPdoublependulumINIT, SYMdoublependulumINIT -import warnings -warnings.filterwarnings("ignore") -import torch -import torch.nn as nn -from my_nn import NeuralNetRegression -import math -from multiprocessing import Pool -from scipy.stats import qmc -import random - -def simulate(p): - x0 = np.array([data[p,0], data[p,1], 1e-10, 1e-10]) - - simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) - simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) - simX[0] = np.copy(x0) - - times = [None] * tot_steps - - failed_iter = -1 - - calls = 0 - - # Guess: - x_sol_guess = np.full((ocp.N+1, ocp.ocp.dims.nx), x0) - u_sol_guess = np.full((ocp.N, ocp.ocp.dims.nu), np.array([ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x0[0]),ocp.g*ocp.l2*ocp.m2*math.sin(x0[1])])) - - for f in range(tot_steps): - - temp = time.time() - status = ocp.OCP_solve(simX[f], q_ref, x_sol_guess, u_sol_guess) - times[f] = time.time() - temp - calls += 1 - - if status != 0: - - if failed_iter >= ocp.N-1 or failed_iter < 0: - break - - failed_iter += 1 - - simU[f] = u_sol_guess[0] - - for i in range(ocp.N-1): - x_sol_guess[i] = np.copy(x_sol_guess[i+1]) - u_sol_guess[i] = np.copy(u_sol_guess[i+1]) - - x_sol_guess[ocp.N-1] = np.copy(x_sol_guess[ocp.N]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - else: - failed_iter = 0 - - simU[f] = ocp.ocp_solver.get(0, "u") - - for i in range(ocp.N-1): - x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") - u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") - - x_sol_guess[ocp.N-1] = ocp.ocp_solver.get(ocp.N, "x") - x_sol_guess[ocp.N] = np.copy(x_sol_guess[ocp.N-1]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - noise = np.array([(ocp.thetamax-ocp.thetamin)*noise_intensity*random.uniform(-1, 1), (ocp.thetamax-ocp.thetamin)*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1)]) - - sim.acados_integrator.set("u", simU[f]) - sim.acados_integrator.set("x", simX[f]) - status = sim.acados_integrator.solve() - simX[f+1] = sim.acados_integrator.get("x") + noise - simU[f] = u_sol_guess[0] - - return f, times - -start_time = time.time() - -# Pytorch params: -device = torch.device("cpu") # pytorch device - -ocp = OCPdoublependulumINIT(True) -sim = SYMdoublependulumINIT(True) - -# Generate low-discrepancy unlabeled samples: -sampler = qmc.Halton(d=2, scramble=False) -sample = sampler.random(n=pow(10, 2)) -q_max = ocp.thetamax -q_min = ocp.thetamin -l_bounds = [q_min, q_min] -u_bounds = [q_max, q_max] -data = qmc.scale(sample, l_bounds, u_bounds) - -noise_intensity = 0 - -tot_steps = 100 -q_ref = np.array([(ocp.thetamax+ocp.thetamin)/2, ocp.thetamax - 0.05]) - -# MPC controller without terminal constraints: -cpu_num = 30 -with Pool(cpu_num) as p: - res = p.map(simulate, range(data.shape[0])) - -res_steps, stats = zip(*res) - -times = np.array([i for f in stats for i in f if i is not None]) - -print('90 percent quantile solve time: ' + str(np.quantile(times, 0.9))) -print('Mean solve time: ' + str(np.mean(times))) - -print(np.array(res_steps).astype(int)) -print(times) - -np.save('res_steps_noconstr.npy', np.array(res_steps).astype(int)) diff --git a/VBOC/Safe MPC/no_constraints/3dof_sym.py b/VBOC/Safe MPC/no_constraints/3dof_sym.py index 242beed..7d71d8f 100644 --- a/VBOC/Safe MPC/no_constraints/3dof_sym.py +++ b/VBOC/Safe MPC/no_constraints/3dof_sym.py @@ -7,6 +7,7 @@ from triplependulum_class_vboc import OCPtriplependulumSTD, SYMtriplependulum from multiprocessing import Pool from scipy.stats import qmc +import random import warnings warnings.filterwarnings("ignore") @@ -30,9 +31,8 @@ def simulate(p): for f in range(tot_steps): - temp = time.time() - status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess) - times[f] = time.time() - temp + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') if status != 0: @@ -62,11 +62,12 @@ def simulate(p): x_sol_guess[N] = np.copy(x_sol_guess[N-1]) u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + simU[f] += noise_vec[f] + sim.acados_integrator.set("u", simU[f]) sim.acados_integrator.set("x", simX[f]) status = sim.acados_integrator.solve() simX[f+1] = sim.acados_integrator.get("x") - simU[f] = u_sol_guess[0] return f, times @@ -79,7 +80,7 @@ def init_guess(p): x_sol_guess = np.full((N+1, ocp.ocp.dims.nx), x0) u_sol_guess = np.zeros((N, ocp.ocp.dims.nu)) - status = ocp.OCP_solve(x0, x_sol_guess, u_sol_guess) + status = ocp.OCP_solve(x0, x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[0]) if status == 0: @@ -96,7 +97,7 @@ def init_guess(p): cpu_num = 1 test_num = 100 -time_step = 4*1e-3 +time_step = 5*1e-3 tot_time = 0.16 tot_steps = 100 @@ -111,6 +112,9 @@ def init_guess(p): N = ocp.ocp.dims.N +joint_vec = np.array([random.choice(range(ocp.nu)) for _ in range(tot_steps)]) +np.save('../selected_joint', joint_vec) + with Pool(30) as p: res = p.map(init_guess, range(data.shape[0])) @@ -119,6 +123,11 @@ def init_guess(p): np.save('../x_sol_guess', np.asarray(x_sol_guess_vec)) np.save('../u_sol_guess', np.asarray(u_sol_guess_vec)) +noise_perc = 0 +noise_vec = np.full((ocp.ocp.dims.nu,tot_steps), np.random.normal(0,noise_perc*ocp.Cmax/200,tot_steps)) +noise_vec = noise_vec.reshape((tot_steps,ocp.ocp.dims.nu)) +np.save('../noise', noise_vec) + del ocp ocp = OCPtriplependulumSTD("SQP_RTI", time_step, tot_time, True) @@ -134,9 +143,14 @@ def init_guess(p): times = np.array([i for f in stats for i in f if i is not None]) -print('90 percent quantile solve time: ' + str(np.quantile(times, 0.9))) +print('99 percent quantile solve time: ' + str(np.quantile(times, 0.99))) print('Mean solve time: ' + str(np.mean(times))) print(np.array(res_steps).astype(int)) +print(np.mean(res_steps)) np.save('res_steps_noconstr.npy', np.array(res_steps).astype(int)) + +# Save the results in an npz file +np.savez('../data/results_no_constraint.npz', times=times, + dt=time_step, tot_time=tot_time, res_steps=res_steps) diff --git a/VBOC/Safe MPC/no_constraints/doublependulum_class_fixedveldir.py b/VBOC/Safe MPC/no_constraints/doublependulum_class_fixedveldir.py deleted file mode 100644 index 1afb119..0000000 --- a/VBOC/Safe MPC/no_constraints/doublependulum_class_fixedveldir.py +++ /dev/null @@ -1,226 +0,0 @@ -from acados_template import AcadosOcp, AcadosOcpSolver, AcadosSim, AcadosSimSolver -import numpy as np -from acados_template import AcadosModel -from casadi import SX, vertcat, cos, sin, fmax, norm_2 -import scipy.linalg as lin - -class OCPdoublependulum: - def __init__(self): - - # --------------------SET MODEL-------------------- - # ------------------------------------------------- - model_name = "double_pendulum_ode" - - # constants - self.m1 = 0.4 # mass of the first link [kself.g] - self.m2 = 0.4 # mass of the second link [kself.g] - self.g = 9.81 # self.gravity constant [m/s^2] - self.l1 = 0.8 # lenself.gth of the first link [m] - self.l2 = 0.8 # lenself.gth of the second link [m] - - # states - theta1 = SX.sym("theta1") - theta2 = SX.sym("theta2") - dtheta1 = SX.sym("dtheta1") - dtheta2 = SX.sym("dtheta2") - self.x = vertcat(theta1, theta2, dtheta1, dtheta2) - - # xdot - theta1_dot = SX.sym("theta1_dot") - theta2_dot = SX.sym("theta2_dot") - dtheta1_dot = SX.sym("dtheta1_dot") - dtheta2_dot = SX.sym("dtheta2_dot") - xdot = vertcat(theta1_dot, theta2_dot, dtheta1_dot, dtheta2_dot) - - # controls - C1 = SX.sym("C1") - C2 = SX.sym("C2") - u = vertcat(C1, C2) - - # parameters - p = [] - - # dynamics - f_expl = vertcat( - dtheta1, - dtheta2, - ( - self.l1**2 - * self.l2 - * self.m2 - * dtheta1**2 - * sin(-2 * theta2 + 2 * theta1) - + 2 * C2 * cos(-theta2 + theta1) * self.l1 - + 2 - * ( - self.g * sin(-2 * theta2 + theta1) * self.l1 * self.m2 / 2 - + sin(-theta2 + theta1) * dtheta2**2 * self.l1 * self.l2 * self.m2 - + self.g * self.l1 * (self.m1 + self.m2 / 2) * sin(theta1) - - C1 - ) - * self.l2 - ) - / self.l1**2 - / self.l2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ( - -self.g - * self.l1 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + 2 * theta1) - - self.l1 - * self.l2**2 - * self.m2**2 - * dtheta2**2 - * sin(-2 * theta2 + 2 * theta1) - - 2 - * dtheta1**2 - * self.l1**2 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + theta1) - + 2 * C1 * cos(-theta2 + theta1) * self.l2 * self.m2 - + self.l1 - * (self.m1 + self.m2) - * (sin(theta2) * self.g * self.l2 * self.m2 - 2 * C2) - ) - / self.l2**2 - / self.l1 - / self.m2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ) - - self.model = AcadosModel() - - f_impl = xdot - f_expl - - self.model.f_expl_expr = f_expl - self.model.f_impl_expr = f_impl - self.model.x = self.x - self.model.xdot = xdot - self.model.u = u - self.model.p = p - self.model.name = model_name - - -class OCPdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate): - - # inherit initialization - super().__init__() - - # ---------------------SET OCP--------------------- - # ------------------------------------------------- - self.ocp = AcadosOcp() - - # times - self.Tf = 0.01 - self.N = int(1000 * self.Tf) - self.ocp.solver_options.tf = self.Tf - self.ocp.dims.N = self.N - - self.nx = self.model.x.size()[0] - self.nu = self.model.u.size()[0] - self.ny = self.nx + self.nu - self.ny_e = self.nx - - # cost - Q = np.diag([1e4, 1e4, 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4]) - - self.ocp.cost.W_e = Q - self.ocp.cost.W = lin.block_diag(Q, R) - - self.ocp.cost.cost_type = "LINEAR_LS" - self.ocp.cost.cost_type_e = "LINEAR_LS" - - self.ocp.cost.Vx = np.zeros((self.ny, self.nx)) - self.ocp.cost.Vx[: self.nx, :self.nx] = np.eye(self.nx) - self.ocp.cost.Vu = np.zeros((self.ny, self.nu)) - self.ocp.cost.Vu[self.nx:, :self.nu] = np.eye(self.nu) - self.ocp.cost.Vx_e = np.eye(self.nx) - - # reference - self.ocp.cost.yref = np.array([np.pi, np.pi, 0., 0., 0., 0.]) - self.ocp.cost.yref_e = np.array([np.pi, np.pi, 0., 0.]) - - # set constraints - self.Cmax = 10. - self.thetamax = np.pi / 4 + np.pi - self.thetamin = - np.pi / 4 + np.pi - self.dthetamax = 10. - - Cmax_limits = np.array([self.Cmax, self.Cmax]) - Xmax_limits = np.array([self.thetamax, self.thetamax, self.dthetamax, self.dthetamax]) - Xmin_limits = np.array([self.thetamin, self.thetamin, -self.dthetamax, -self.dthetamax]) - - self.ocp.constraints.lbu = -Cmax_limits - self.ocp.constraints.ubu = Cmax_limits - self.ocp.constraints.idxbu = np.array([0, 1]) - self.ocp.constraints.lbx = Xmin_limits - self.ocp.constraints.ubx = Xmax_limits - self.ocp.constraints.idxbx = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_e = Xmin_limits - self.ocp.constraints.ubx_e = Xmax_limits - self.ocp.constraints.idxbx_e = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_0 = Xmin_limits - self.ocp.constraints.ubx_0 = Xmax_limits - self.ocp.constraints.idxbx_0 = np.array([0, 1, 2, 3]) - - # ------------------------------------------------- - - # self.ocp.solver_options.nlp_solver_type = "SQP" - self.ocp.solver_options.qp_solver = "FULL_CONDENSING_HPIPM" - self.ocp.solver_options.tol = 1e-2 - self.ocp.solver_options.qp_solver_iter_max = 100 - self.ocp.solver_options.nlp_solver_max_iter = 1000 - self.ocp.solver_options.globalization = "MERIT_BACKTRACKING" - self.ocp.solver_options.alpha_reduction = 0.3 - self.ocp.solver_options.alpha_min = 1e-2 - self.ocp.solver_options.levenberg_marquardt = 1. - - # ocp model - self.ocp.model = self.model - - # solver - self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) - - def OCP_solve(self, x0, q_ref, x_sol_guess, u_sol_guess): - - # Reset current iterate: - self.ocp_solver.reset() - - self.ocp_solver.constraints_set(0, "lbx", x0) - self.ocp_solver.constraints_set(0, "ubx", x0) - - # Set parameters, guesses and constraints: - for i in range(self.N): - self.ocp_solver.set(i, 'x', x_sol_guess[i]) - self.ocp_solver.set(i, 'u', u_sol_guess[i]) - self.ocp_solver.cost_set(i, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.,0.,0.])) - - self.ocp_solver.set(self.N, 'x', x_sol_guess[self.N]) - self.ocp_solver.cost_set(self.N, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.])) - - # Solve the OCP: - status = self.ocp_solver.solve() - - return status - - -class SYMdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate): - - # inherit initialization - super().__init__() - - sim = AcadosSim() - sim.model = self.model - sim.solver_options.T = 1e-3 - sim.solver_options.num_stages = 4 - self.acados_integrator = AcadosSimSolver(sim, build=regenerate) diff --git a/VBOC/Safe MPC/receiding_hard_constraints/2dof_sym.py b/VBOC/Safe MPC/receiding_hard_constraints/2dof_sym.py deleted file mode 100644 index e49caa1..0000000 --- a/VBOC/Safe MPC/receiding_hard_constraints/2dof_sym.py +++ /dev/null @@ -1,173 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from numpy.linalg import norm as norm -import time -from doublependulum_class_fixedveldir import OCPdoublependulumINIT, SYMdoublependulumINIT -import warnings -warnings.filterwarnings("ignore") -import torch -import torch.nn as nn -from my_nn import NeuralNetRegression -import math -from multiprocessing import Pool -from scipy.stats import qmc -import scipy.linalg as lin -import random - -def simulate(p): - x0 = np.array([data[p,0], data[p,1], 1e-10, 1e-10]) - - simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) - simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) - simX[0] = np.copy(x0) - - failed_iter = 0 - receiding = 0 - - tot_time = 0 - calls = 0 - - # Guess: - x_sol_guess = np.full((ocp.N+1, ocp.ocp.dims.nx), x0) - u_sol_guess = np.full((ocp.N, ocp.ocp.dims.nu), np.array([ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x0[0]),ocp.g*ocp.l2*ocp.m2*math.sin(x0[1])])) - - for f in range(tot_steps): - receiding = 0 - - temp = time.time() - - if failed_iter == 0 and f > 0: - for i in range(1,ocp.N+1): - if ocp.nn_decisionfunction(params, mean_dir, std_dir, safety_margin, ocp.ocp_solver.get(i, 'x')) >= 0.: - receiding = ocp.N - i + 1 - - receiding_iter = ocp.N-failed_iter-receiding - Q = np.diag([1e-2+pow(10,receiding_iter/ocp.N*2), 1e-2+pow(10,receiding_iter/ocp.N*2), 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4]) - - for i in range(ocp.N): - ocp.ocp_solver.cost_set(i, "W", lin.block_diag(Q, R)) - - ocp.ocp_solver.cost_set(ocp.N, "W", Q) - - for i in range(ocp.N+1): - if i == receiding_iter: - ocp.ocp_solver.cost_set(i, "Zl", 1e12*np.ones((1,))) - else: - ocp.ocp_solver.cost_set(i, "Zl", pow(10, (1-receiding_iter/ocp.N)*6)*np.ones((1,))) - - status = ocp.OCP_solve(simX[f], q_ref, x_sol_guess, u_sol_guess) - tot_time += time.time() - temp - calls += 1 - - if status != 0: - # print("acados returned status " + str(status) + " at iteration " + str(f)) - # print(ocp.ocp_solver.get_residuals()) - - if failed_iter >= ocp.N-1 or f == 0: - break - - failed_iter += 1 - - simU[f] = u_sol_guess[0] - - for i in range(ocp.N-1): - x_sol_guess[i] = np.copy(x_sol_guess[i+1]) - u_sol_guess[i] = np.copy(u_sol_guess[i+1]) - - x_sol_guess[ocp.N-1] = np.copy(x_sol_guess[ocp.N]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - else: - failed_iter = 0 - - simU[f] = ocp.ocp_solver.get(0, "u") - - for i in range(ocp.N-1): - x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") - u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") - - x_sol_guess[ocp.N-1] = ocp.ocp_solver.get(ocp.N, "x") - x_sol_guess[ocp.N] = np.copy(x_sol_guess[ocp.N-1]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - noise = np.array([(ocp.thetamax-ocp.thetamin)*noise_intensity*random.uniform(-1, 1), (ocp.thetamax-ocp.thetamin)*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1)]) - - sim.acados_integrator.set("u", simU[f]) - sim.acados_integrator.set("x", simX[f]) - status = sim.acados_integrator.solve() - simX[f+1] = sim.acados_integrator.get("x") + noise - simU[f] = u_sol_guess[0] - - tm = tot_time/calls - - return f, tm - -start_time = time.time() - -# Pytorch params: -device = torch.device("cpu") - -model_dir = NeuralNetRegression(4, 300, 1).to(device) -model_dir.load_state_dict(torch.load('../model_2dof_vboc')) -mean_dir = torch.load('../mean_2dof_vboc') -std_dir = torch.load('../std_2dof_vboc') -safety_margin = 2.0 - -noise_intensity = 0 - -params = list(model_dir.parameters()) - -ocp = OCPdoublependulumINIT(True, params, mean_dir, std_dir, safety_margin) -sim = SYMdoublependulumINIT(True) - -# Generate low-discrepancy unlabeled samples: -sampler = qmc.Halton(d=2, scramble=False) -sample = sampler.random(n=pow(10, 2)) -q_max = ocp.thetamax -q_min = ocp.thetamin -l_bounds = [q_min, q_min] -u_bounds = [q_max, q_max] -data = qmc.scale(sample, l_bounds, u_bounds) - -tot_steps = 100 -q_ref = np.array([(ocp.thetamax+ocp.thetamin)/2, ocp.thetamax - 0.05]) - -# MPC controller wit soft constraints: -cpu_num = 30 -with Pool(cpu_num) as p: - res = np.array(p.map(simulate, range(data.shape[0]))) - -res_steps_term, stats = zip(*res) - -print('Mean solve time: ' + str(np.mean(stats))) - -res_steps = np.load('../no_constraints/res_steps_noconstr.npy') - -better = 0 -equal = 0 -worse = 0 - -plt.figure() -for i in range(res_steps.shape[0]): - if res_steps_term[i]-res_steps[i]>0: - plt.plot(data[i,0], data[i,1],color='darkblue',marker="*",markersize=9,zorder=1,linestyle="None") - better += 1 - elif res_steps_term[i]-res_steps[i]==0: - plt.plot(data[i,0], data[i,1],color='darkgrey',marker=".",markersize=7,zorder=1,linestyle="None") - equal += 1 - else: - plt.plot(data[i,0], data[i,1],color='darkred',marker="X",markersize=7,zorder=1,linestyle="None") - worse += 1 -plt.title('MPC standard vs MPC with receiding hard constraints') - -print('MPC standard vs MPC with receiding hard constraints') -print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) -print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) - -print(np.array(res_steps_term).astype(int)) - -np.save('res_steps_hardreceidingconstr.npy', np.array(res_steps_term).astype(int)) - -plt.show() \ No newline at end of file diff --git a/VBOC/Safe MPC/receiding_hard_constraints/3dof_sym.py b/VBOC/Safe MPC/receiding_hard_constraints/3dof_sym.py index 406563c..cb259e6 100644 --- a/VBOC/Safe MPC/receiding_hard_constraints/3dof_sym.py +++ b/VBOC/Safe MPC/receiding_hard_constraints/3dof_sym.py @@ -5,7 +5,7 @@ import numpy as np import time import torch -from triplependulum_class_vboc import OCPtriplependulumSoftTraj, SYMtriplependulum +from triplependulum_class_vboc import OCPtriplependulumHardTraj, SYMtriplependulum from my_nn import NeuralNetDIR from multiprocessing import Pool import scipy.linalg as lin @@ -37,11 +37,11 @@ def simulate(p): if failed_iter == 0 and f > 0: for i in range(1,N+1): - if ocp.nn_decisionfunction_conservative(params, mean, std, safety_margin, ocp.ocp_solver.get(i, 'x')) >= 0.: + if ocp.nn_decisionfunction(params, mean, std, ocp.ocp_solver.get(i, 'x')) >= 0.: receiding = N - i + 1 receiding_iter = N-failed_iter-receiding - Q = np.diag([1e-2+pow(10,receiding_iter/N*2), 1e-4, 1e-4, 1e-4, 1e-4, 1e-4]) + Q = np.diag([1e-4+pow(10,receiding_iter/N*4), 1e-4, 1e-4, 1e-4, 1e-4, 1e-4]) R = np.diag([1e-4, 1e-4, 1e-4]) for i in range(N): @@ -49,15 +49,22 @@ def simulate(p): ocp.ocp_solver.cost_set(N, "W", Q) + # for i in range(N+1): + # if i == receiding_iter: + # ocp.ocp_solver.cost_set(i, "Zl", 1e8*np.ones((1,))) + # elif i == N: + # ocp.ocp_solver.cost_set(i, "Zl", pow(10, (1-receiding_iter/N)*4)*np.ones((1,))) + # else: + # ocp.ocp_solver.cost_set(i, "Zl", np.zeros((1,))) + for i in range(N+1): if i == receiding_iter: - ocp.ocp_solver.cost_set(i, "Zl", 1e12*np.ones((1,))) + ocp.ocp_solver.constraints_set(i, "lh", np.array([0.])) else: - ocp.ocp_solver.cost_set(i, "Zl", pow(10, (1-receiding_iter/N)*6)*np.ones((1,))) + ocp.ocp_solver.constraints_set(i, "lh", np.array([-1e6])) - temp = time.time() - status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess) - times[f] = time.time() - temp + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') if status != 0: @@ -87,11 +94,12 @@ def simulate(p): x_sol_guess[N] = np.copy(x_sol_guess[N-1]) u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + simU[f] += noise_vec[f] + sim.acados_integrator.set("u", simU[f]) sim.acados_integrator.set("x", simX[f]) status = sim.acados_integrator.solve() simX[f+1] = sim.acados_integrator.get("x") - simU[f] = u_sol_guess[0] return f, times @@ -109,51 +117,44 @@ def simulate(p): cpu_num = 1 test_num = 100 -time_step = 4*1e-3 -tot_time = 0.16 #0.1 0.011s, 0.06 0.008s, 0.04 0.006s, 0.03 0.0044s +time_step = 5*1e-3 +tot_time = 0.16 tot_steps = 100 regenerate = True x_sol_guess_vec = np.load('../x_sol_guess.npy') u_sol_guess_vec = np.load('../u_sol_guess.npy') +noise_vec = np.load('../noise.npy') +noise_vec = np.load('../selected_joint.npy') params = list(model.parameters()) -quant = 10. -r = 1 - -while quant > 3*1e-3: +ocp = OCPtriplependulumHardTraj("SQP_RTI", time_step, tot_time, params, mean, std, regenerate) +sim = SYMtriplependulum(time_step, tot_time, regenerate) - ocp = OCPtriplependulumSoftTraj("SQP_RTI", time_step, tot_time, params, mean, std, safety_margin, regenerate) - sim = SYMtriplependulum(time_step, tot_time, regenerate) +# Generate low-discrepancy unlabeled samples: +sampler = qmc.Halton(d=ocp.ocp.dims.nu, scramble=False) +sample = sampler.random(n=test_num) +l_bounds = ocp.Xmin_limits[:ocp.ocp.dims.nu] +u_bounds = ocp.Xmax_limits[:ocp.ocp.dims.nu] +data = qmc.scale(sample, l_bounds, u_bounds) - # Generate low-discrepancy unlabeled samples: - sampler = qmc.Halton(d=ocp.ocp.dims.nu, scramble=False) - sample = sampler.random(n=test_num) - l_bounds = ocp.Xmin_limits[:ocp.ocp.dims.nu] - u_bounds = ocp.Xmax_limits[:ocp.ocp.dims.nu] - data = qmc.scale(sample, l_bounds, u_bounds) +N = ocp.ocp.dims.N - N = ocp.ocp.dims.N +# MPC controller without terminal constraints: +with Pool(cpu_num) as p: + res = p.map(simulate, range(data.shape[0])) - # MPC controller without terminal constraints: - with Pool(cpu_num) as p: - res = p.map(simulate, range(data.shape[0])) +res_steps_traj, stats = zip(*res) - res_steps_traj, stats = zip(*res) +times = np.array([i for f in stats for i in f if i is not None]) - times = np.array([i for f in stats for i in f if i is not None]) +quant = np.quantile(times, 0.99) - quant = np.quantile(times, 0.9) - - print('iter: ', str(r)) - print('tot time: ' + str(tot_time)) - print('90 percent quantile solve time: ' + str(quant)) - print('Mean solve time: ' + str(np.mean(times))) - - tot_time -= 2*1e-2 - r += 1 +print('tot time: ' + str(tot_time)) +print('99 percent quantile solve time: ' + str(quant)) +print('Mean solve time: ' + str(np.mean(times))) print(np.array(res_steps_traj).astype(int)) @@ -176,4 +177,8 @@ def simulate(p): print('MPC standard vs MPC with receiding hard constraints') print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) \ No newline at end of file +print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) + +np.savez('../data/results_receidinghard.npz', res_steps_term=res_steps_traj, + better=better, worse=worse, equal=equal, times=times, + dt=time_step, tot_time=tot_time) \ No newline at end of file diff --git a/VBOC/Safe MPC/receiding_hard_constraints/doublependulum_class_fixedveldir.py b/VBOC/Safe MPC/receiding_hard_constraints/doublependulum_class_fixedveldir.py deleted file mode 100644 index 0eb7c29..0000000 --- a/VBOC/Safe MPC/receiding_hard_constraints/doublependulum_class_fixedveldir.py +++ /dev/null @@ -1,277 +0,0 @@ -from acados_template import AcadosOcp, AcadosOcpSolver, AcadosSim, AcadosSimSolver -import numpy as np -from acados_template import AcadosModel -from casadi import SX, vertcat, cos, sin, fmax, norm_2 -import scipy.linalg as lin - -class OCPdoublependulum: - def __init__(self): - - # --------------------SET MODEL-------------------- - # ------------------------------------------------- - model_name = "double_pendulum_ode" - - # constants - self.m1 = 0.4 # mass of the first link [kself.g] - self.m2 = 0.4 # mass of the second link [kself.g] - self.g = 9.81 # self.gravity constant [m/s^2] - self.l1 = 0.8 # lenself.gth of the first link [m] - self.l2 = 0.8 # lenself.gth of the second link [m] - - # states - theta1 = SX.sym("theta1") - theta2 = SX.sym("theta2") - dtheta1 = SX.sym("dtheta1") - dtheta2 = SX.sym("dtheta2") - self.x = vertcat(theta1, theta2, dtheta1, dtheta2) - - # xdot - theta1_dot = SX.sym("theta1_dot") - theta2_dot = SX.sym("theta1_dot") - dtheta1_dot = SX.sym("dtheta2_dot") - dtheta2_dot = SX.sym("dtheta2_dot") - xdot = vertcat(theta1_dot, theta2_dot, dtheta1_dot, dtheta2_dot) - - # controls - C1 = SX.sym("C1") - C2 = SX.sym("C2") - u = vertcat(C1, C2) - - # parameters - p = [] - - # dynamics - f_expl = vertcat( - dtheta1, - dtheta2, - ( - self.l1**2 - * self.l2 - * self.m2 - * dtheta1**2 - * sin(-2 * theta2 + 2 * theta1) - + 2 * C2 * cos(-theta2 + theta1) * self.l1 - + 2 - * ( - self.g * sin(-2 * theta2 + theta1) * self.l1 * self.m2 / 2 - + sin(-theta2 + theta1) * dtheta2**2 * self.l1 * self.l2 * self.m2 - + self.g * self.l1 * (self.m1 + self.m2 / 2) * sin(theta1) - - C1 - ) - * self.l2 - ) - / self.l1**2 - / self.l2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ( - -self.g - * self.l1 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + 2 * theta1) - - self.l1 - * self.l2**2 - * self.m2**2 - * dtheta2**2 - * sin(-2 * theta2 + 2 * theta1) - - 2 - * dtheta1**2 - * self.l1**2 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + theta1) - + 2 * C1 * cos(-theta2 + theta1) * self.l2 * self.m2 - + self.l1 - * (self.m1 + self.m2) - * (sin(theta2) * self.g * self.l2 * self.m2 - 2 * C2) - ) - / self.l2**2 - / self.l1 - / self.m2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ) - - self.model = AcadosModel() - - f_impl = xdot - f_expl - - self.model.f_expl_expr = f_expl - self.model.f_impl_expr = f_impl - self.model.x = self.x - self.model.xdot = xdot - self.model.u = u - self.model.p = p - self.model.name = model_name - - -class OCPdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate, nn_params, mean, std, safety_margin): - - # inherit initialization - super().__init__() - - # ---------------------SET OCP--------------------- - # ------------------------------------------------- - self.ocp = AcadosOcp() - - # times - self.Tf = 0.01 - self.N = int(1000 * self.Tf) - self.ocp.solver_options.tf = self.Tf - self.ocp.dims.N = self.N - - self.nx = self.model.x.size()[0] - self.nu = self.model.u.size()[0] - self.ny = self.nx + self.nu - self.ny_e = self.nx - - # cost - Q = np.diag([1e4, 1e4, 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4]) - - self.ocp.cost.W_e = Q - self.ocp.cost.W = lin.block_diag(Q, R) - - self.ocp.cost.cost_type = "LINEAR_LS" - self.ocp.cost.cost_type_e = "LINEAR_LS" - - self.ocp.cost.Vx = np.zeros((self.ny, self.nx)) - self.ocp.cost.Vx[: self.nx, :self.nx] = np.eye(self.nx) - self.ocp.cost.Vu = np.zeros((self.ny, self.nu)) - self.ocp.cost.Vu[self.nx:, :self.nu] = np.eye(self.nu) - self.ocp.cost.Vx_e = np.eye(self.nx) - - # reference - self.ocp.cost.yref = np.array([np.pi, np.pi, 0., 0., 0., 0.]) - self.ocp.cost.yref_e = np.array([np.pi, np.pi, 0., 0.]) - - # set constraints - self.Cmax = 10. - self.thetamax = np.pi / 4 + np.pi - self.thetamin = - np.pi / 4 + np.pi - self.dthetamax = 10. - - Cmax_limits = np.array([self.Cmax, self.Cmax]) - Xmax_limits = np.array([self.thetamax, self.thetamax, self.dthetamax, self.dthetamax]) - Xmin_limits = np.array([self.thetamin, self.thetamin, -self.dthetamax, -self.dthetamax]) - - self.ocp.constraints.lbu = -Cmax_limits - self.ocp.constraints.ubu = Cmax_limits - self.ocp.constraints.idxbu = np.array([0, 1]) - self.ocp.constraints.lbx = Xmin_limits - self.ocp.constraints.ubx = Xmax_limits - self.ocp.constraints.idxbx = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_e = Xmin_limits - self.ocp.constraints.ubx_e = Xmax_limits - self.ocp.constraints.idxbx_e = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_0 = Xmin_limits - self.ocp.constraints.ubx_0 = Xmax_limits - self.ocp.constraints.idxbx_0 = np.array([0, 1, 2, 3]) - - # nonlinear constraints - self.model.con_h_expr_e = self.nn_decisionfunction(nn_params, mean, std, safety_margin, self.x) - - self.ocp.constraints.lh_e = np.array([0.]) - self.ocp.constraints.uh_e = np.array([1e6]) - - self.ocp.constraints.idxsh_e = np.array([0]) - - self.ocp.cost.zl_e = np.zeros((1,)) - self.ocp.cost.zu_e = np.zeros((1,)) - self.ocp.cost.Zu_e = np.zeros((1,)) - self.ocp.cost.Zl_e = np.zeros((1,)) - - self.model.con_h_expr = self.model.con_h_expr_e - - self.ocp.constraints.lh = np.array([0.]) - self.ocp.constraints.uh = np.array([1e6]) - - self.ocp.constraints.idxsh = np.array([0]) - - self.ocp.cost.zl = np.zeros((1,)) - self.ocp.cost.zu = np.zeros((1,)) - self.ocp.cost.Zu = np.zeros((1,)) - self.ocp.cost.Zl = np.zeros((1,)) - - # ------------------------------------------------- - - # self.ocp.solver_options.nlp_solver_type = "SQP" - self.ocp.solver_options.qp_solver = "FULL_CONDENSING_HPIPM" - self.ocp.solver_options.tol = 1e-2 - self.ocp.solver_options.qp_solver_iter_max = 100 - self.ocp.solver_options.nlp_solver_max_iter = 1000 - self.ocp.solver_options.globalization = "MERIT_BACKTRACKING" - self.ocp.solver_options.alpha_reduction = 0.3 - self.ocp.solver_options.alpha_min = 1e-2 - self.ocp.solver_options.levenberg_marquardt = 1. - - # ocp model - self.ocp.model = self.model - - # solver - self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) - - def OCP_solve(self, x0, q_ref, x_sol_guess, u_sol_guess): - - # Reset current iterate: - self.ocp_solver.reset() - - self.ocp_solver.constraints_set(0, "lbx", x0) - self.ocp_solver.constraints_set(0, "ubx", x0) - - # Set parameters, guesses and constraints: - for i in range(self.N): - self.ocp_solver.set(i, 'x', x_sol_guess[i]) - self.ocp_solver.set(i, 'u', u_sol_guess[i]) - self.ocp_solver.cost_set(i, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.,0.,0.])) - - self.ocp_solver.set(self.N, 'x', x_sol_guess[self.N]) - self.ocp_solver.cost_set(self.N, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.])) - - # Solve the OCP: - status = self.ocp_solver.solve() - - return status - - def nn_decisionfunction(self, params, mean, std, safety_margin, x): - - vel_norm = fmax(norm_2(x[2:]), 1e-3) - - mean = vertcat(mean,mean,0.,0.) - std = vertcat(std,std,vel_norm,vel_norm) - - out = (x - mean) / std - it = 0 - - for param in params: - - param = SX(param.tolist()) - - if it % 2 == 0: - out = param @ out - else: - out = param + out - - if it == 1 or it == 3: - out = fmax(0., out) - - it += 1 - - return out*(100-safety_margin)/100 - vel_norm - - -class SYMdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate): - - # inherit initialization - super().__init__() - - sim = AcadosSim() - sim.model = self.model - sim.solver_options.T = 1e-3 - sim.solver_options.num_stages = 4 - self.acados_integrator = AcadosSimSolver(sim, build=regenerate) diff --git a/VBOC/Safe MPC/receiding_soft_constraints/3dof_sym.py b/VBOC/Safe MPC/receiding_soft_constraints/3dof_sym.py new file mode 100644 index 0000000..34ac4c6 --- /dev/null +++ b/VBOC/Safe MPC/receiding_soft_constraints/3dof_sym.py @@ -0,0 +1,178 @@ +import os +import sys +sys.path.insert(1, os.getcwd() + '/..') + +import numpy as np +import time +import torch +from triplependulum_class_vboc import OCPtriplependulumSoftTraj, SYMtriplependulum +from my_nn import NeuralNetDIR +from multiprocessing import Pool +import scipy.linalg as lin +from scipy.stats import qmc + +import warnings +warnings.filterwarnings("ignore") + +def simulate(p): + + x0 = np.zeros((ocp.ocp.dims.nx,)) + x0[:ocp.ocp.dims.nu] = data[p] + + simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) + simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) + simX[0] = np.copy(x0) + + times = [None] * tot_steps + + failed_iter = -1 + + # Guess: + x_sol_guess = x_sol_guess_vec[p] + u_sol_guess = u_sol_guess_vec[p] + + for f in range(tot_steps): + + receiding = 0 + + if failed_iter == 0 and f > 0: + for i in range(1,N+1): + if ocp.nn_decisionfunction_conservative(params, mean, std, safety_margin, ocp.ocp_solver.get(i, 'x')) >= 0.: + receiding = N - i + 1 + + receiding_iter = N-failed_iter-receiding + Q = np.diag([1e-4+pow(10,receiding_iter/N*4), 1e-4, 1e-4, 1e-4, 1e-4, 1e-4]) + R = np.diag([1e-4, 1e-4, 1e-4]) + + for i in range(N): + ocp.ocp_solver.cost_set(i, "W", lin.block_diag(Q, R)) + + ocp.ocp_solver.cost_set(N, "W", Q) + + for i in range(N+1): + if i == receiding_iter: + ocp.ocp_solver.cost_set(i, "Zl", 1e8*np.ones((1,))) + elif i == N and receiding_iter != N: + ocp.ocp_solver.cost_set(i, "Zl", pow(10, (1-receiding_iter/N)*4)*np.ones((1,))) + else: + ocp.ocp_solver.cost_set(i, "Zl", np.zeros((1,))) + + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') + + if status != 0: + + if failed_iter >= N-1 or failed_iter < 0: + break + + failed_iter += 1 + + simU[f] = u_sol_guess[0] + + for i in range(N-1): + x_sol_guess[i] = np.copy(x_sol_guess[i+1]) + u_sol_guess[i] = np.copy(u_sol_guess[i+1]) + + x_sol_guess[N-1] = np.copy(x_sol_guess[N]) + + else: + failed_iter = 0 + + simU[f] = ocp.ocp_solver.get(0, "u") + + for i in range(N-1): + x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") + u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") + + x_sol_guess[N-1] = ocp.ocp_solver.get(N, "x") + x_sol_guess[N] = np.copy(x_sol_guess[N-1]) + u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + + simU[f] += noise_vec[f] + + sim.acados_integrator.set("u", simU[f]) + sim.acados_integrator.set("x", simX[f]) + status = sim.acados_integrator.solve() + simX[f+1] = sim.acados_integrator.get("x") + + return f, times + +start_time = time.time() + +# Pytorch params: +device = torch.device("cpu") + +model = NeuralNetDIR(6, 500, 1).to(device) +model.load_state_dict(torch.load('../model_3dof_vboc')) +mean = torch.load('../mean_3dof_vboc') +std = torch.load('../std_3dof_vboc') +safety_margin = 2.0 + +cpu_num = 1 +test_num = 100 + +time_step = 5*1e-3 +tot_time = 0.16 +tot_steps = 100 + +regenerate = True + +x_sol_guess_vec = np.load('../x_sol_guess.npy') +u_sol_guess_vec = np.load('../u_sol_guess.npy') +noise_vec = np.load('../noise.npy') +noise_vec = np.load('../selected_joint.npy') + +params = list(model.parameters()) + +ocp = OCPtriplependulumSoftTraj("SQP_RTI", time_step, tot_time, params, mean, std, safety_margin, regenerate) +sim = SYMtriplependulum(time_step, tot_time, regenerate) + +# Generate low-discrepancy unlabeled samples: +sampler = qmc.Halton(d=ocp.ocp.dims.nu, scramble=False) +sample = sampler.random(n=test_num) +l_bounds = ocp.Xmin_limits[:ocp.ocp.dims.nu] +u_bounds = ocp.Xmax_limits[:ocp.ocp.dims.nu] +data = qmc.scale(sample, l_bounds, u_bounds) + +N = ocp.ocp.dims.N + +# MPC controller without terminal constraints: +with Pool(cpu_num) as p: + res = p.map(simulate, range(data.shape[0])) + +res_steps_traj, stats = zip(*res) + +times = np.array([i for f in stats for i in f if i is not None]) + +quant = np.quantile(times, 0.99) + +print('tot time: ' + str(tot_time)) +print('99 percent quantile solve time: ' + str(quant)) +print('Mean solve time: ' + str(np.mean(times))) + +print(np.array(res_steps_traj).astype(int)) + +np.save('res_steps_receiding.npy', np.array(res_steps_traj).astype(int)) + +res_steps = np.load('../no_constraints/res_steps_noconstr.npy') + +better = 0 +equal = 0 +worse = 0 + +for i in range(res_steps.shape[0]): + if res_steps_traj[i]-res_steps[i]>0: + better += 1 + elif res_steps_traj[i]-res_steps[i]==0: + equal += 1 + else: + worse += 1 + +print('MPC standard vs MPC with receiding hard constraints') +print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) +print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) +print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) + +np.savez('../data/results_receidingsoft.npz', res_steps_term=res_steps_traj, + better=better, worse=worse, equal=equal, times=times, + dt=time_step, tot_time=tot_time) \ No newline at end of file diff --git a/VBOC/Safe MPC/soft_terminal_constraints/3dof_sym.py b/VBOC/Safe MPC/soft_terminal_constraints/3dof_sym.py new file mode 100644 index 0000000..29ac97b --- /dev/null +++ b/VBOC/Safe MPC/soft_terminal_constraints/3dof_sym.py @@ -0,0 +1,164 @@ +import os +import sys +sys.path.insert(1, os.getcwd() + '/..') + +import numpy as np +import time +import torch +from triplependulum_class_vboc import OCPtriplependulumSoftTerm, SYMtriplependulum +from my_nn import NeuralNetDIR +from multiprocessing import Pool +from scipy.stats import qmc + +import warnings +warnings.filterwarnings("ignore") + +def simulate(p): + + x0 = np.zeros((ocp.ocp.dims.nx,)) + x0[:ocp.ocp.dims.nu] = data[p] + + simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) + simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) + simX[0] = np.copy(x0) + + times = [None] * tot_steps + + failed_iter = -1 + + # Guess: + x_sol_guess = x_sol_guess_vec[p] + u_sol_guess = u_sol_guess_vec[p] + + for f in range(tot_steps): + + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') + + if status != 0: + + if failed_iter >= N-1 or failed_iter < 0: + break + + failed_iter += 1 + + simU[f] = u_sol_guess[0] + + for i in range(N-1): + x_sol_guess[i] = np.copy(x_sol_guess[i+1]) + u_sol_guess[i] = np.copy(u_sol_guess[i+1]) + + x_sol_guess[N-1] = np.copy(x_sol_guess[N]) + + else: + failed_iter = 0 + + simU[f] = ocp.ocp_solver.get(0, "u") + + for i in range(N-1): + x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") + u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") + + x_sol_guess[N-1] = ocp.ocp_solver.get(N, "x") + x_sol_guess[N] = np.copy(x_sol_guess[N-1]) + u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + + simU[f] += noise_vec[f] + + sim.acados_integrator.set("u", simU[f]) + sim.acados_integrator.set("x", simX[f]) + status = sim.acados_integrator.solve() + simX[f+1] = sim.acados_integrator.get("x") + + return f, times + +start_time = time.time() + +# Pytorch params: +device = torch.device("cpu") + +model = NeuralNetDIR(6, 500, 1).to(device) +model.load_state_dict(torch.load('../model_3dof_vboc')) +mean = torch.load('../mean_3dof_vboc') +std = torch.load('../std_3dof_vboc') +safety_margin = 2.0 + +cpu_num = 1 +test_num = 100 + +time_step = 5*1e-3 +tot_time = 0.16 +tot_steps = 100 + +regenerate = True + +x_sol_guess_vec = np.load('../x_sol_guess.npy') +u_sol_guess_vec = np.load('../u_sol_guess.npy') +noise_vec = np.load('../noise.npy') +noise_vec = np.load('../selected_joint.npy') + +quant = 10. +r = 1 + +while quant > 4*1e-3: + + ocp = OCPtriplependulumSoftTerm("SQP_RTI", time_step, tot_time, list(model.parameters()), mean, std, safety_margin, regenerate) + sim = SYMtriplependulum(time_step, tot_time, True) + + N = ocp.ocp.dims.N + + # Generate low-discrepancy unlabeled samples: + sampler = qmc.Halton(d=ocp.ocp.dims.nu, scramble=False) + sample = sampler.random(n=test_num) + l_bounds = ocp.Xmin_limits[:ocp.ocp.dims.nu] + u_bounds = ocp.Xmax_limits[:ocp.ocp.dims.nu] + data = qmc.scale(sample, l_bounds, u_bounds) + + ocp.ocp_solver.cost_set(N, "Zl", 1e4*np.ones((1,))) + + # MPC controller without terminal constraints: + with Pool(cpu_num) as p: + res = p.map(simulate, range(data.shape[0])) + + res_steps_term, stats = zip(*res) + + times = np.array([i for f in stats for i in f if i is not None]) + + quant = np.quantile(times, 0.9) + + print('iter: ', str(r)) + print('tot time: ' + str(tot_time)) + print('99 percent quantile solve time: ' + str(quant)) + print('Mean solve time: ' + str(np.mean(times))) + + tot_time -= time_step + r += 1 + + print(np.array(res_steps_term).astype(int)) + + del ocp + +np.save('res_steps_hardterm.npy', np.array(res_steps_term).astype(int)) + +res_steps = np.load('../no_constraints/res_steps_noconstr.npy') + +better = 0 +equal = 0 +worse = 0 + +for i in range(res_steps.shape[0]): + if res_steps_term[i]-res_steps[i]>0: + better += 1 + elif res_steps_term[i]-res_steps[i]==0: + equal += 1 + else: + worse += 1 + +print('MPC standard vs MPC with hard term constraints') +print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) +print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) +print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) + +np.savez('../data/results_softterm.npz', res_steps_term=res_steps_term, + better=better, worse=worse, equal=equal, times=times, + dt=time_step, tot_time=tot_time) \ No newline at end of file diff --git a/VBOC/Safe MPC/soft_traj_constraints/2dof_sym.py b/VBOC/Safe MPC/soft_traj_constraints/2dof_sym.py deleted file mode 100644 index b939c35..0000000 --- a/VBOC/Safe MPC/soft_traj_constraints/2dof_sym.py +++ /dev/null @@ -1,149 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from numpy.linalg import norm as norm -import time -from doublependulum_class_fixedveldir import OCPdoublependulumINIT, SYMdoublependulumINIT -import warnings -warnings.filterwarnings("ignore") -import torch -import torch.nn as nn -from my_nn import NeuralNetRegression -import math -from multiprocessing import Pool -import random -from scipy.stats import qmc - -def simulate(p): - x0 = np.array([data[p,0], data[p,1], 1e-10, 1e-10]) - - simX = np.ndarray((tot_steps + 1, ocp.ocp.dims.nx)) - simU = np.ndarray((tot_steps, ocp.ocp.dims.nu)) - simX[0] = np.copy(x0) - - failed_iter = -1 - - tot_time = 0 - calls = 0 - - # Guess: - x_sol_guess = np.full((ocp.N+1, ocp.ocp.dims.nx), x0) - u_sol_guess = np.full((ocp.N, ocp.ocp.dims.nu), np.array([ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x0[0]),ocp.g*ocp.l2*ocp.m2*math.sin(x0[1])])) - - for f in range(tot_steps): - - temp = time.time() - status = ocp.OCP_solve(simX[f], q_ref, x_sol_guess, u_sol_guess) - tot_time += time.time() - temp - calls += 1 - - if status != 0: - - if failed_iter >= ocp.N-1 or failed_iter < 0: - break - - failed_iter += 1 - - simU[f] = u_sol_guess[0] - - for i in range(ocp.N-1): - x_sol_guess[i] = np.copy(x_sol_guess[i+1]) - u_sol_guess[i] = np.copy(u_sol_guess[i+1]) - - x_sol_guess[ocp.N-1] = np.copy(x_sol_guess[ocp.N]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - else: - failed_iter = 0 - - simU[f] = ocp.ocp_solver.get(0, "u") - - for i in range(ocp.N-1): - x_sol_guess[i] = ocp.ocp_solver.get(i+1, "x") - u_sol_guess[i] = ocp.ocp_solver.get(i+1, "u") - - x_sol_guess[ocp.N-1] = ocp.ocp_solver.get(ocp.N, "x") - x_sol_guess[ocp.N] = np.copy(x_sol_guess[ocp.N-1]) - u_sol_guess[ocp.N-1] = [ocp.g*ocp.l1*(ocp.m1+ocp.m2)*math.sin(x_sol_guess[ocp.N-1,0]),ocp.g*ocp.l2*ocp.m2*math.sin(x_sol_guess[ocp.N-1,1])] - - noise = np.array([(ocp.thetamax-ocp.thetamin)*noise_intensity*random.uniform(-1, 1), (ocp.thetamax-ocp.thetamin)*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1), ocp.dthetamax*2*noise_intensity*random.uniform(-1, 1)]) - - sim.acados_integrator.set("u", simU[f]) - sim.acados_integrator.set("x", simX[f]) - status = sim.acados_integrator.solve() - simX[f+1] = sim.acados_integrator.get("x") + noise - simU[f] = u_sol_guess[0] - - tm = tot_time/calls - - return f, tm - -start_time = time.time() - -# Pytorch params: -device = torch.device("cpu") - -model_dir = NeuralNetRegression(4, 300, 1).to(device) -model_dir.load_state_dict(torch.load('../model_2dof_vboc')) -mean_dir = torch.load('../mean_2dof_vboc') -std_dir = torch.load('../std_2dof_vboc') -safety_margin = 2.0 - -noise_intensity = 0 - -ocp = OCPdoublependulumINIT(True, list(model_dir.parameters()), mean_dir, std_dir, safety_margin) -sim = SYMdoublependulumINIT(True) - -# Generate low-discrepancy unlabeled samples: -sampler = qmc.Halton(d=2, scramble=False) -sample = sampler.random(n=pow(10, 2)) -q_max = ocp.thetamax -q_min = ocp.thetamin -l_bounds = [q_min, q_min] -u_bounds = [q_max, q_max] -data = qmc.scale(sample, l_bounds, u_bounds) - -tot_steps = 100 -q_ref = np.array([(ocp.thetamax+ocp.thetamin)/2, ocp.thetamax - 0.05]) - -# MPC controller wit soft constraints: -for i in range(1,ocp.N): - ocp.ocp_solver.cost_set(i, "Zl", 1e6*np.ones((1,))) -ocp.ocp_solver.cost_set(ocp.N, "Zl", 1e6*np.ones((1,))) - -cpu_num = 30 -with Pool(cpu_num) as p: - res = np.array(p.map(simulate, range(data.shape[0]))) - -res_steps_term, stats = zip(*res) - -print('Mean solve time: ' + str(np.mean(stats))) - -res_steps = np.load('../no_constraints/res_steps_noconstr.npy') - -better = 0 -equal = 0 -worse = 0 - -plt.figure() -for i in range(res_steps.shape[0]): - if res_steps_term[i]-res_steps[i]>0: - plt.plot(data[i,0], data[i,1],color='darkblue',marker="*",markersize=9,zorder=1,linestyle="None") - better += 1 - elif res_steps_term[i]-res_steps[i]==0: - plt.plot(data[i,0], data[i,1],color='darkgrey',marker=".",markersize=7,zorder=1,linestyle="None") - equal += 1 - else: - plt.plot(data[i,0], data[i,1],color='darkred',marker="X",markersize=7,zorder=1,linestyle="None") - worse += 1 -plt.title('MPC standard vs MPC with soft traj constraints') - -print('MPC standard vs MPC with soft traj constraints') -print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) -print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) - -print(np.array(res_steps_term).astype(int)) - -np.save('res_steps_softtrajconstr.npy', np.array(res_steps_term).astype(int)) - -plt.show() \ No newline at end of file diff --git a/VBOC/Safe MPC/soft_traj_constraints/3dof_sym.py b/VBOC/Safe MPC/soft_traj_constraints/3dof_sym.py index d726a0f..0de2db9 100644 --- a/VBOC/Safe MPC/soft_traj_constraints/3dof_sym.py +++ b/VBOC/Safe MPC/soft_traj_constraints/3dof_sym.py @@ -32,9 +32,8 @@ def simulate(p): for f in range(tot_steps): - temp = time.time() - status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess) - times[f] = time.time() - temp + status = ocp.OCP_solve(simX[f], x_sol_guess, u_sol_guess, ocp.thetamax-0.05, joint_vec[f]) + times[f] = ocp.ocp_solver.get_stats('time_tot') if status != 0: @@ -64,11 +63,12 @@ def simulate(p): x_sol_guess[N] = np.copy(x_sol_guess[N-1]) u_sol_guess[N-1] = np.copy(u_sol_guess[N-2]) + simU[f] += noise_vec[f] + sim.acados_integrator.set("u", simU[f]) sim.acados_integrator.set("x", simX[f]) status = sim.acados_integrator.solve() simX[f+1] = sim.acados_integrator.get("x") - simU[f] = u_sol_guess[0] return f, times @@ -86,19 +86,21 @@ def simulate(p): cpu_num = 1 test_num = 100 -time_step = 4*1e-3 -tot_time = 0.16 #0.1 0.018s, 0.05 0.005s, 0.045 0.0046s, 0.04 0.0043 +time_step = 5*1e-3 +tot_time = 0.16 tot_steps = 100 regenerate = True x_sol_guess_vec = np.load('../x_sol_guess.npy') u_sol_guess_vec = np.load('../u_sol_guess.npy') +noise_vec = np.load('../noise.npy') +noise_vec = np.load('../selected_joint.npy') quant = 10. r = 1 -while quant > 3*1e-3: +while quant > 4*1e-3: ocp = OCPtriplependulumSoftTraj("SQP_RTI", time_step, tot_time, list(model.parameters()), mean, std, safety_margin, regenerate) sim = SYMtriplependulum(time_step, tot_time, regenerate) @@ -124,19 +126,18 @@ def simulate(p): times = np.array([i for f in stats for i in f if i is not None]) - quant = np.quantile(times, 0.9) - - print('90 percent quantile solve time: ' + str(np.quantile(times, 0.9))) - print('Mean solve time: ' + str(np.mean(times))) + quant = np.quantile(times, 0.99) print('iter: ', str(r)) print('tot time: ' + str(tot_time)) - print('90 percent quantile solve time: ' + str(quant)) + print('99 percent quantile solve time: ' + str(quant)) print('Mean solve time: ' + str(np.mean(times))) - tot_time -= 2*1e-2 + tot_time -= time_step r += 1 + del ocp + print(np.array(res_steps_traj).astype(int)) np.save('res_steps_softtraj.npy', np.array(res_steps_traj).astype(int)) @@ -158,4 +159,8 @@ def simulate(p): print('MPC standard vs MPC with soft traj constraints') print('Percentage of initial states in which the MPC+VBOC behaves better: ' + str(better)) print('Percentage of initial states in which the MPC+VBOC behaves equal: ' + str(equal)) -print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) \ No newline at end of file +print('Percentage of initial states in which the MPC+VBOC behaves worse: ' + str(worse)) + +np.savez('../data/results_softtraj.npz', res_steps_term=res_steps_traj, + better=better, worse=worse, equal=equal, times=times, + dt=time_step, tot_time=tot_time) \ No newline at end of file diff --git a/VBOC/Safe MPC/soft_traj_constraints/doublependulum_class_fixedveldir.py b/VBOC/Safe MPC/soft_traj_constraints/doublependulum_class_fixedveldir.py deleted file mode 100644 index 0eb7c29..0000000 --- a/VBOC/Safe MPC/soft_traj_constraints/doublependulum_class_fixedveldir.py +++ /dev/null @@ -1,277 +0,0 @@ -from acados_template import AcadosOcp, AcadosOcpSolver, AcadosSim, AcadosSimSolver -import numpy as np -from acados_template import AcadosModel -from casadi import SX, vertcat, cos, sin, fmax, norm_2 -import scipy.linalg as lin - -class OCPdoublependulum: - def __init__(self): - - # --------------------SET MODEL-------------------- - # ------------------------------------------------- - model_name = "double_pendulum_ode" - - # constants - self.m1 = 0.4 # mass of the first link [kself.g] - self.m2 = 0.4 # mass of the second link [kself.g] - self.g = 9.81 # self.gravity constant [m/s^2] - self.l1 = 0.8 # lenself.gth of the first link [m] - self.l2 = 0.8 # lenself.gth of the second link [m] - - # states - theta1 = SX.sym("theta1") - theta2 = SX.sym("theta2") - dtheta1 = SX.sym("dtheta1") - dtheta2 = SX.sym("dtheta2") - self.x = vertcat(theta1, theta2, dtheta1, dtheta2) - - # xdot - theta1_dot = SX.sym("theta1_dot") - theta2_dot = SX.sym("theta1_dot") - dtheta1_dot = SX.sym("dtheta2_dot") - dtheta2_dot = SX.sym("dtheta2_dot") - xdot = vertcat(theta1_dot, theta2_dot, dtheta1_dot, dtheta2_dot) - - # controls - C1 = SX.sym("C1") - C2 = SX.sym("C2") - u = vertcat(C1, C2) - - # parameters - p = [] - - # dynamics - f_expl = vertcat( - dtheta1, - dtheta2, - ( - self.l1**2 - * self.l2 - * self.m2 - * dtheta1**2 - * sin(-2 * theta2 + 2 * theta1) - + 2 * C2 * cos(-theta2 + theta1) * self.l1 - + 2 - * ( - self.g * sin(-2 * theta2 + theta1) * self.l1 * self.m2 / 2 - + sin(-theta2 + theta1) * dtheta2**2 * self.l1 * self.l2 * self.m2 - + self.g * self.l1 * (self.m1 + self.m2 / 2) * sin(theta1) - - C1 - ) - * self.l2 - ) - / self.l1**2 - / self.l2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ( - -self.g - * self.l1 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + 2 * theta1) - - self.l1 - * self.l2**2 - * self.m2**2 - * dtheta2**2 - * sin(-2 * theta2 + 2 * theta1) - - 2 - * dtheta1**2 - * self.l1**2 - * self.l2 - * self.m2 - * (self.m1 + self.m2) - * sin(-theta2 + theta1) - + 2 * C1 * cos(-theta2 + theta1) * self.l2 * self.m2 - + self.l1 - * (self.m1 + self.m2) - * (sin(theta2) * self.g * self.l2 * self.m2 - 2 * C2) - ) - / self.l2**2 - / self.l1 - / self.m2 - / (self.m2 * cos(-2 * theta2 + 2 * theta1) - 2 * self.m1 - self.m2), - ) - - self.model = AcadosModel() - - f_impl = xdot - f_expl - - self.model.f_expl_expr = f_expl - self.model.f_impl_expr = f_impl - self.model.x = self.x - self.model.xdot = xdot - self.model.u = u - self.model.p = p - self.model.name = model_name - - -class OCPdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate, nn_params, mean, std, safety_margin): - - # inherit initialization - super().__init__() - - # ---------------------SET OCP--------------------- - # ------------------------------------------------- - self.ocp = AcadosOcp() - - # times - self.Tf = 0.01 - self.N = int(1000 * self.Tf) - self.ocp.solver_options.tf = self.Tf - self.ocp.dims.N = self.N - - self.nx = self.model.x.size()[0] - self.nu = self.model.u.size()[0] - self.ny = self.nx + self.nu - self.ny_e = self.nx - - # cost - Q = np.diag([1e4, 1e4, 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4]) - - self.ocp.cost.W_e = Q - self.ocp.cost.W = lin.block_diag(Q, R) - - self.ocp.cost.cost_type = "LINEAR_LS" - self.ocp.cost.cost_type_e = "LINEAR_LS" - - self.ocp.cost.Vx = np.zeros((self.ny, self.nx)) - self.ocp.cost.Vx[: self.nx, :self.nx] = np.eye(self.nx) - self.ocp.cost.Vu = np.zeros((self.ny, self.nu)) - self.ocp.cost.Vu[self.nx:, :self.nu] = np.eye(self.nu) - self.ocp.cost.Vx_e = np.eye(self.nx) - - # reference - self.ocp.cost.yref = np.array([np.pi, np.pi, 0., 0., 0., 0.]) - self.ocp.cost.yref_e = np.array([np.pi, np.pi, 0., 0.]) - - # set constraints - self.Cmax = 10. - self.thetamax = np.pi / 4 + np.pi - self.thetamin = - np.pi / 4 + np.pi - self.dthetamax = 10. - - Cmax_limits = np.array([self.Cmax, self.Cmax]) - Xmax_limits = np.array([self.thetamax, self.thetamax, self.dthetamax, self.dthetamax]) - Xmin_limits = np.array([self.thetamin, self.thetamin, -self.dthetamax, -self.dthetamax]) - - self.ocp.constraints.lbu = -Cmax_limits - self.ocp.constraints.ubu = Cmax_limits - self.ocp.constraints.idxbu = np.array([0, 1]) - self.ocp.constraints.lbx = Xmin_limits - self.ocp.constraints.ubx = Xmax_limits - self.ocp.constraints.idxbx = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_e = Xmin_limits - self.ocp.constraints.ubx_e = Xmax_limits - self.ocp.constraints.idxbx_e = np.array([0, 1, 2, 3]) - - self.ocp.constraints.lbx_0 = Xmin_limits - self.ocp.constraints.ubx_0 = Xmax_limits - self.ocp.constraints.idxbx_0 = np.array([0, 1, 2, 3]) - - # nonlinear constraints - self.model.con_h_expr_e = self.nn_decisionfunction(nn_params, mean, std, safety_margin, self.x) - - self.ocp.constraints.lh_e = np.array([0.]) - self.ocp.constraints.uh_e = np.array([1e6]) - - self.ocp.constraints.idxsh_e = np.array([0]) - - self.ocp.cost.zl_e = np.zeros((1,)) - self.ocp.cost.zu_e = np.zeros((1,)) - self.ocp.cost.Zu_e = np.zeros((1,)) - self.ocp.cost.Zl_e = np.zeros((1,)) - - self.model.con_h_expr = self.model.con_h_expr_e - - self.ocp.constraints.lh = np.array([0.]) - self.ocp.constraints.uh = np.array([1e6]) - - self.ocp.constraints.idxsh = np.array([0]) - - self.ocp.cost.zl = np.zeros((1,)) - self.ocp.cost.zu = np.zeros((1,)) - self.ocp.cost.Zu = np.zeros((1,)) - self.ocp.cost.Zl = np.zeros((1,)) - - # ------------------------------------------------- - - # self.ocp.solver_options.nlp_solver_type = "SQP" - self.ocp.solver_options.qp_solver = "FULL_CONDENSING_HPIPM" - self.ocp.solver_options.tol = 1e-2 - self.ocp.solver_options.qp_solver_iter_max = 100 - self.ocp.solver_options.nlp_solver_max_iter = 1000 - self.ocp.solver_options.globalization = "MERIT_BACKTRACKING" - self.ocp.solver_options.alpha_reduction = 0.3 - self.ocp.solver_options.alpha_min = 1e-2 - self.ocp.solver_options.levenberg_marquardt = 1. - - # ocp model - self.ocp.model = self.model - - # solver - self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) - - def OCP_solve(self, x0, q_ref, x_sol_guess, u_sol_guess): - - # Reset current iterate: - self.ocp_solver.reset() - - self.ocp_solver.constraints_set(0, "lbx", x0) - self.ocp_solver.constraints_set(0, "ubx", x0) - - # Set parameters, guesses and constraints: - for i in range(self.N): - self.ocp_solver.set(i, 'x', x_sol_guess[i]) - self.ocp_solver.set(i, 'u', u_sol_guess[i]) - self.ocp_solver.cost_set(i, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.,0.,0.])) - - self.ocp_solver.set(self.N, 'x', x_sol_guess[self.N]) - self.ocp_solver.cost_set(self.N, 'y_ref', np.array([q_ref[0],q_ref[1],0.,0.])) - - # Solve the OCP: - status = self.ocp_solver.solve() - - return status - - def nn_decisionfunction(self, params, mean, std, safety_margin, x): - - vel_norm = fmax(norm_2(x[2:]), 1e-3) - - mean = vertcat(mean,mean,0.,0.) - std = vertcat(std,std,vel_norm,vel_norm) - - out = (x - mean) / std - it = 0 - - for param in params: - - param = SX(param.tolist()) - - if it % 2 == 0: - out = param @ out - else: - out = param + out - - if it == 1 or it == 3: - out = fmax(0., out) - - it += 1 - - return out*(100-safety_margin)/100 - vel_norm - - -class SYMdoublependulumINIT(OCPdoublependulum): - def __init__(self, regenerate): - - # inherit initialization - super().__init__() - - sim = AcadosSim() - sim.model = self.model - sim.solver_options.T = 1e-3 - sim.solver_options.num_stages = 4 - self.acados_integrator = AcadosSimSolver(sim, build=regenerate) diff --git a/VBOC/Safe MPC/triplependulum_class_vboc.py b/VBOC/Safe MPC/triplependulum_class_vboc.py index c6ec193..b9ab6d9 100644 --- a/VBOC/Safe MPC/triplependulum_class_vboc.py +++ b/VBOC/Safe MPC/triplependulum_class_vboc.py @@ -106,11 +106,11 @@ def __init__(self, nlp_solver_type, time_step, tot_time): self.ny_e = self.nx # cost - Q = np.diag([1e4, 1e-4, 1e-4, 1e-4, 1e-4, 1e-4]) - R = np.diag([1e-4, 1e-4, 1e-4]) + self.Q = np.diag([1e-4, 1e-4, 1e-4, 1e-4, 1e-4, 1e-4]) + self.R = np.diag([1e-4, 1e-4, 1e-4]) - self.ocp.cost.W_e = Q - self.ocp.cost.W = lin.block_diag(Q, R) + self.ocp.cost.W_e = self.Q + self.ocp.cost.W = lin.block_diag(self.Q, self.R) self.ocp.cost.cost_type = "LINEAR_LS" self.ocp.cost.cost_type_e = "LINEAR_LS" @@ -127,9 +127,11 @@ def __init__(self, nlp_solver_type, time_step, tot_time): self.thetamin = - np.pi / 4 + np.pi self.dthetamax = 10. + self.yref = np.array([np.pi,np.pi,np.pi,0.,0.,0.,0.,0.,0.]) + # reference - self.ocp.cost.yref = np.array([self.thetamax-0.05,np.pi,np.pi,0.,0.,0.,0.,0.,0.]) - self.ocp.cost.yref_e = np.array([self.thetamax-0.05,np.pi,np.pi,0.,0.,0.,]) + self.ocp.cost.yref = self.yref + self.ocp.cost.yref_e = self.yref[:self.nx] self.Cmax_limits = np.array([self.Cmax, self.Cmax, self.Cmax]) self.Cmin_limits = np.array([-self.Cmax, -self.Cmax, -self.Cmax]) @@ -160,7 +162,7 @@ def __init__(self, nlp_solver_type, time_step, tot_time): self.ocp.solver_options.alpha_min = 1e-2 self.ocp.solver_options.levenberg_marquardt = 1e-2 - def OCP_solve(self, x0, x_sol_guess, u_sol_guess): + def OCP_solve(self, x0, x_sol_guess, u_sol_guess, ref, joint): # Reset current iterate: self.ocp_solver.reset() @@ -168,12 +170,22 @@ def OCP_solve(self, x0, x_sol_guess, u_sol_guess): self.ocp_solver.constraints_set(0, "lbx", x0) self.ocp_solver.constraints_set(0, "ubx", x0) + yref = self.yref + yref[joint] = ref + Q = self.Q + Q[joint] = 1e4 + W = lin.block_diag(Q, self.R) + # Set parameters, guesses and constraints: for i in range(self.ocp.dims.N): self.ocp_solver.set(i, 'x', x_sol_guess[i]) self.ocp_solver.set(i, 'u', u_sol_guess[i]) + self.ocp_solver.cost_set(i, 'yref', yref, api='new') + self.ocp_solver.cost_set(i, 'W', W, api='new') self.ocp_solver.set(self.ocp.dims.N, 'x', x_sol_guess[self.ocp.dims.N]) + self.ocp_solver.cost_set(self.ocp.dims.N, 'yref', yref[:self.ocp.dims.nx], api='new') + self.ocp_solver.cost_set(self.ocp.dims.N, 'W', Q, api='new') # Solve the OCP: status = self.ocp_solver.solve() @@ -237,7 +249,109 @@ def nn_decisionfunction(self, params, mean, std, x): it += 1 return out - vel_norm - + + +class OCPtriplependulumHardTraj(OCPtriplependulum): + def __init__(self, nlp_solver_type, time_step, tot_time, nn_params, mean, std, regenerate): + + # inherit initialization + super().__init__(nlp_solver_type, time_step, tot_time) + + # nonlinear constraints + self.model.con_h_expr_e = self.nn_decisionfunction(nn_params, mean, std, self.x) + + self.ocp.constraints.lh_e = np.array([0.]) + self.ocp.constraints.uh_e = np.array([1e6]) + + self.model.con_h_expr = self.model.con_h_expr_e + + self.ocp.constraints.lh = np.array([0.]) + self.ocp.constraints.uh = np.array([1e6]) + + # ocp model + self.ocp.model = self.model + + # solver + self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) + + def nn_decisionfunction(self, params, mean, std, x): + + vel_norm = fmax(norm_2(x[2:]), 1e-3) + + mean = vertcat(mean,mean,mean,0.,0.,0.) + std = vertcat(std,std,std,vel_norm,vel_norm,vel_norm) + + out = (x - mean) / std + it = 0 + + for param in params: + + param = SX(param.tolist()) + + if it % 2 == 0: + out = param @ out + else: + out = param + out + + if it == 1 or it == 3: + out = fmax(0., out) + + it += 1 + + return out - vel_norm + + +class OCPtriplependulumSoftTerm(OCPtriplependulum): + def __init__(self, nlp_solver_type, time_step, tot_time, nn_params, mean, std, safety_margin, regenerate): + + # inherit initialization + super().__init__(nlp_solver_type, time_step, tot_time) + + # nonlinear constraints + self.model.con_h_expr_e = self.nn_decisionfunction_conservative(nn_params, mean, std, safety_margin, self.x) + + self.ocp.constraints.lh_e = np.array([0.]) + self.ocp.constraints.uh_e = np.array([1e6]) + + self.ocp.constraints.idxsh_e = np.array([0]) + + self.ocp.cost.zl_e = np.zeros((1,)) + self.ocp.cost.zu_e = np.zeros((1,)) + self.ocp.cost.Zu_e = np.zeros((1,)) + self.ocp.cost.Zl_e = np.zeros((1,)) + + # ocp model + self.ocp.model = self.model + + # solver + self.ocp_solver = AcadosOcpSolver(self.ocp, build=regenerate) + + def nn_decisionfunction_conservative(self, params, mean, std, safety_margin, x): + + vel_norm = fmax(norm_2(x[2:]), 1e-3) + + mean = vertcat(mean,mean,mean,0.,0.,0.) + std = vertcat(std,std,std,vel_norm,vel_norm,vel_norm) + + out = (x - mean) / std + it = 0 + + for param in params: + + param = SX(param.tolist()) + + if it % 2 == 0: + out = param @ out + else: + out = param + out + + if it == 1 or it == 3: + out = fmax(0., out) + + it += 1 + + return out*(100-safety_margin)/100 - vel_norm + class OCPtriplependulumSoftTraj(OCPtriplependulum): def __init__(self, nlp_solver_type, time_step, tot_time, nn_params, mean, std, safety_margin, regenerate):