diff --git a/reformulate.py b/reformulate.py new file mode 100644 index 0000000..6ce3222 --- /dev/null +++ b/reformulate.py @@ -0,0 +1,392 @@ +import numpy as np +from scipy import linalg +import picos as pc +from collections import namedtuple + +HybridSystem = namedtuple('HybridSystem', + 'Acs, bs, bprimes, Abars, Abarprimes, C') + +# Class definitions + +class LinearSystem(): + def __init__(self, A, B, C): + # sanity checks + if any(not isinstance(x, np.ndarray) for x in [A, B, C]): + raise TypeError + if len(A) != len(A[0]): + raise ValueError("Matrix A is not square") + if len(A[0]) != len(B): + raise ValueError("Matrix B is not compatible with matrix A") + if len(A) != len(C[0]): + raise ValueError("Matrix C is not compatible with matrix A") + + self.A = A + self.B = B + self.C = C + + print (">> Read matrices") + print (f"A size {len(A)}x{len(A[0])}") + print (f"B size {len(B)}x{len(B[0])}") + print (f"C size {len(C)}x{len(C[0])}") + + def dimension(self): + # dimension of state space + return len(self.A) + + def inputs(self): + # number of inputs + return len(self.B[0]) + + def outputs(self): + # number of outputs + return len(self.C) + +class PIController(): + def __init__(self, modes, conditions, KP, KI): + # sanity checks + if not isinstance(modes, int): + raise TypeError + if any( any(not isinstance(x, np.ndarray) for x in lst) for lst in [KP, KI]): + raise TypeError + if not (modes > 0): + raise ValueError("At least one mode is expected") + if len(conditions) != modes: + raise ValueError("Conditions list has an incorrect length") + if len(KP) != modes: + raise ValueError("KP list has an incorrect length") + if len(KI) != modes: + raise ValueError("KI list has an incorrect length") + r = len(KP[0]) + p = len(KP[0][0]) + if ((not all(len(x) == r for x in KP)) or + (not all(len(x[0]) == p for x in KP))): + raise ValueError("some matrix in KP is not of the right size") + if ((not all(len(x) == r for x in KI)) or + (not all(len(x[0]) == p for x in KI))): + raise ValueError("some matrix in KI is not of the right size") + #if not all(len(x) == p for x in conditions): + # raise ValueError("some condition vector is not of the right size") + + self.modes = modes + self.conditions = conditions + self.KP = KP + self.KI = KI + + print (">> Controller matrices") + print (f"KP1 size {len(KP[0])}x{len(KP[0][0])}") + print (f"KP2 size {len(KP[1])}x{len(KP[1][0])}") + print (f"KI1 size {len(KI[0])}x{len(KI[0][0])}") + print (f"KI2 size {len(KI[1])}x{len(KI[1][0])}") + + def inputs(self): + return len(self.KP[0]) + +##### +# FIXME quick attempt to normalize zeros. +def normalize(matrix): + eps = np.finfo(np.float32).eps + for i in range(len(matrix)): + for j in range(len(matrix[0])): + x = np.real(matrix[i][j]) + if (x < eps and x > -eps): + matrix[i][j] = np.array(0.0, dtype='double') +##### + +def reformulate(system, controller, refvalues): + r = controller.inputs() + A = system.A + # remove the additional columns used for state disturbance + # (these may have been introduced in the reduced matrices) + B = system.B[:,:r] + # update the B matrix in system + system.B = B + C = system.C + n = system.dimension() + p = system.outputs() + + # these parts are the same for each mode + Ac_top = np.hstack([A, B]) + Cc = np.hstack([C, np.zeros([p,r])]) + + AcList = [] + bList = [] + bprimeList = [] + + # build Ac, b and b' matrices for each mode + for mode in range(controller.modes): + KP = controller.KP[mode] + KI = controller.KI[mode] + if (len(KP) != len(B[0])) or (len(KI) != len(B[0])): + raise ValueError("The number of inputs in the controller matrices does not match the number of inputs in the given system") + + N = -1 * np.dot(KP, np.dot(C,A)) - np.dot(KI, C) + M = -1 * np.dot(KP, np.dot(C,B)) + Ac_bot = np.hstack([N, M]) + Ac = np.vstack([Ac_top, Ac_bot]) + AcList.append(Ac) + + if len(refvalues) != p: + raise ValueError("The number of reference values does not match the number of outputs in the given system") + b_bot = np.dot(KI, refvalues) + b = np.vstack([np.zeros([n,1]), b_bot]) + bList.append(b) + + # translated version of b (experimental) + + # Schur complement of block A in Ac + S = M - np.dot(N, np.dot(np.linalg.inv(A), B)) + + if mode == 0: + S0inv = np.linalg.inv(S) + b_prime = np.zeros([n+r, 1]) + else: + # note: here we rely on the fact that S0inv has already been computed + KIeff = KI - np.dot(np.dot(S, S0inv), controller.KI[0]) + + # attempt to normalize KIeff'zeros. + # currently disabled not to introduce spurious numerical errors. + # normalize(KIeff) + + b_prime_bot = np.dot(KIeff, refvalues) + b_prime = np.vstack([np.zeros([n,1]), b_prime_bot]) + + bprimeList.append(b_prime) + + return (AcList, bList, bprimeList, Cc) + + +def check_stability(A): + if not isinstance(A, np.ndarray): + raise TypeError + + if (np.linalg.det(A) == 0): + print ("[Warning] Matrix A is singular") + + eigenvalues = np.linalg.eig(A)[0] + + for x in eigenvalues: + if (np.real(x) >= 0): + print("[Warning] Found unstable eigenvalue:", x) + +# ------------------------- +# miniengine model + +def get_miniengine_model(): + A = np.array([ + [-3.82987, -0.0418250, -1019.08], + [0.284779, -1.76641, -349.432], + [0, 0, -62.5000] + ]) + + B = np.array([ + [0], + [0], + [-0.790569], + ]) + + C = np.array([ + [0.00182850, 0, 0], + [0.0000102703, 0.0000308108, 0] + ]) + + return LinearSystem(A,B,C) + +def get_miniengine_controller(): + KP1 = np.array([ + [1, 0] + ]) + KP2 = np.array([ + [0, 10] + ]) + + KI1 = np.array([ + [10, 0] + ]) + KI2 = np.array([ + [0, 200] + ]) + + return PIController(2, [[], []], [KP1, KP2], [KI1, KI2]) + +def get_miniengine_references(): + return np.array([[0.5], [5]]) + +# ------------------------- +# full model + +def get_full_model(): + import scipy.io + from pathlib import Path + fname = str(Path(__file__).parent / 'simulink' / 'data_aero_02.mat') + mat = scipy.io.loadmat(fname) + A = np.array(mat['GA'], dtype='double') + B = np.array(mat['GB'], dtype='double') + C = np.array(mat['GC'], dtype='double') + return LinearSystem(A, B, C) + +def get_full_controller(kp2_gain=None): + import scipy.io + from pathlib import Path + fname = str(Path(__file__).parent / 'simulink' / 'data_aero_02.mat') + mat = scipy.io.loadmat(fname) + + # Important: + # Couting from 0 index (while matlab starts from 1), + # and modalities' indexes are swapped! + # mode 1 : y0 - r0 < switch_th + # mode 2 : y0 - r0 >= switch_th + # hence: kp01 == Kp12 ... + + kp01 = mat['Kp12'][0][0] # 1 + kp02 = mat['Kp11'][0][0] # 0.1 + kp1 = mat['Kp2'][0][0] # 10 + kp2 = mat['Kp3'][0][0] # 0.5 + if kp2_gain is not None: + kp1 *= kp2_gain + + ki01 = mat['Ki12'][0][0] # 10 + ki02 = mat['Ki11'][0][0] # 20 + ki1 = mat['Ki2'][0][0] # 100 + ki2 = mat['Ki3'][0][0] # 2 + + KP1 = np.array([ + [kp01, 0, 0, 0], + [0, 0, kp1, 0], + [0, 0, 0, kp2] + ], dtype='double') + KP2 = np.array([ + [0, kp02, 0, 0], + [0, 0, kp1, 0], + [0, 0, 0, kp2] + ], dtype='double') + + KI1 = np.array([ + [ki01, 0, 0, 0], + [0, 0, ki1, 0], + [0, 0, 0, ki2] + ], dtype='double') + KI2 = np.array([ + [0, ki02, 0, 0], + [0, 0, ki1, 0], + [0, 0, 0, ki2] + ], dtype='double') + + return PIController(2, [[], []], [KP1, KP2], [KI1, KI2]) + +def get_full_references(use_zero_references=False): + # TRY WITH NULL REFERENCES: this would not require to translate the matrix. + if use_zero_references: + return np.array([[0], [0], [0], [0]]) + return np.array([[0.5], [5], [-1], [20]]) + +# ------------------------- +# reduced model + +def get_reduced_model(n, round): + from pathlib import Path + fname = str(Path(__file__).parent / 'simulink' / 'data_aero_02_reduced_models_rounded.mat') \ + if round else \ + str(Path(__file__).parent / 'simulink' / 'data_aero_02_reduced_models.mat') + import scipy.io + mat = scipy.io.loadmat(fname) + maybe_round = 'round_' if round else '' + A = np.array(mat[f'GA_{maybe_round}{n}'], dtype='double') + B = np.array(mat[f'GB_{maybe_round}{n}'], dtype='double') + C = np.array(mat[f'GC_{maybe_round}{n}'], dtype='double') + return LinearSystem(A, B, C) + +def get_reduced_controller(size=None, round=None): + # for size 3 with integer coefficients, we need to change the controller. + kp2_gain = 0.8 if (round and size == 3) else 1 + return get_full_controller(kp2_gain) + +def get_reduced_references(use_zero_references=False): + return get_full_references(use_zero_references) + +def select_model(args): + def parse_references(references): + assert not references is None + + references = references.strip() + ref_strs = references.split(",") + + float_references = [] + for s in ref_strs: + f_value = float(s.strip()) + float_references.append([f_value]) + return np.array(float_references) + + if args.size == 18: + mod = get_full_model() + ctl = get_full_controller() + if args.references is None: + refs = get_full_references() + else: + refs = parse_references(args.references) + assert (len(refs) == len(get_full_references())) + # elif args.miniengine: + # mod = get_miniengine_model() + # ctl = get_miniengine_controller() + # refs = get_miniengine_references() + else: + mod = get_reduced_model(args.size, args.round) + ctl = get_reduced_controller(args.size, args.round) + if args.references is None: + refs = get_reduced_references() + else: + refs = parse_references(args.references) + assert (len(refs) == len(get_reduced_references())) + + # Change of second column in matrix B in order to work with the old controller. + change_B_matrix = True #not args.miniengine + if change_B_matrix: + u2_index = 1 + for i in range(len(mod.B)): + mod.B[i, u2_index] = -mod.B[i, u2_index] + + return mod,ctl,refs + +def get_ode_model_aux(mod, ctl, refs, verbose=False): + (Acs, bs, bprimes, Cc) = reformulate(mod, ctl, refs) + + # reformulated matrices in w for each modality + Abars = [] + Abarprimes = [] + + for i in range(ctl.modes): + if verbose: + print("Checking stability for mode", i) + check_stability(Acs[i]) + + # in w coordinates + Abar_i = np.vstack([ + np.hstack([Acs[i], bs[i]]), + np.zeros([1, mod.dimension() + mod.inputs() + 1]) + ]) + Abars.append(Abar_i) + + # in w' coordinates: scaled to have equilibrium point in the origin + Abar_i_centered = np.vstack([ + np.hstack([Acs[i], bprimes[i]]), + np.zeros([1, mod.dimension() + mod.inputs() + 1]) + ]) + Abarprimes.append(Abar_i_centered) + + # Compute equilibrium points. + # This was useful only to double check the simulations with simulink. + try: + for i in range(len(Acs)): + eq = np.linalg.solve(Acs[i], bs[i]) + y = np.dot(Cc, eq) + print (f"Equilibrium point mode {i}:", y) + except np.linalg.LinAlgError: + print ("[WARNING]: A is a singular matrix") + + return HybridSystem(Acs=Acs, bs=bs, Abars=Abars, Abarprimes=Abarprimes, bprimes=bprimes, C=Cc) + +# ---------------------- +# main function +def get_ode_model(args, verbose=False): + mod,ctl,refs = select_model(args) + return get_ode_model_aux(mod, ctl, refs, verbose) + diff --git a/serialization.py b/serialization.py new file mode 100644 index 0000000..00fd3f3 --- /dev/null +++ b/serialization.py @@ -0,0 +1,105 @@ +""" +Read/write results in libsmt. +""" + +import json + +from pysmt.smtlib.parser import SmtLibParser + +from barrier.lzz.serialization import ( + _get_smt_vars, + _get_smt_formula, + _get_smt_formula_pred, + fromStringFormula, readVar, + parse_dyn_sys +) + + +def importSynthesis(problem_json, env): + """ + + """ + + parser = SmtLibParser(env) + + dyn_systems = [None,None] + dyn_systems[0],_,_ = parse_dyn_sys(env, problem_json["sys_m0"], True, True) + dyn_systems[1],_,_ = parse_dyn_sys(env, problem_json["sys_m1"], True, True) + + all_vars = [] + vars_decl_str = None + for l in [problem_json["sys_m0"]["varsDecl"],problem_json["sys_m1"]["varsDecl"]]: + for var_decl in l: + app = [] + readVar(parser, var_decl, app) + if not app[0] in all_vars: + all_vars += app + vars_decl_str = var_decl if vars_decl_str is None else "%s\n%s" % (vars_decl_str, var_decl) + + theta = fromStringFormula(parser, vars_decl_str, problem_json["theta"]).args()[0] + reference_values = [] + for json_r in problem_json["reference_values"]: + r = fromStringFormula(parser, vars_decl_str, json_r).args()[0] + reference_values.append(r) + + if "lyapunov_m0" in problem_json: + lyapunov_m0 = fromStringFormula(parser, vars_decl_str, problem_json["lyapunov_m0"]).args()[0] + else: + lyapunov_m0 = None + + if "lyapunov_m1" in problem_json: + lyapunov_m1 = fromStringFormula(parser, vars_decl_str, problem_json["lyapunov_m1"]).args()[0] + else: + lyapunov_m1 = None + + if "assumption" in problem_json: + assumption = fromStringFormula(parser, vars_decl_str, problem_json["assumption"]) + else: + assumption = None + + return (dyn_systems, + theta, + reference_values, + lyapunov_m0, + lyapunov_m1, + assumption) + + +def serializeSynthesis(outstream, + dyn_systems, + theta, + reference_values, + lyapunov_m0, + lyapunov_m1, + assumption, + env): + """ + Writes invar_problem problem on outstream + """ + cont_vars_0_smt = [_get_smt_vars(v, env) for v in dyn_systems[0].states()] + cont_vars_1_smt = [_get_smt_vars(v, env) for v in dyn_systems[1].states()] + json_data = { + "sys_m0" : { + "contVars" : cont_vars_0_smt, + "varsDecl" : cont_vars_0_smt + [_get_smt_vars(v, env) for v in dyn_systems[0].inputs()], + "vectorField": [_get_smt_formula_pred(dyn_systems[0].get_ode(v), env) for v in dyn_systems[0].states()], + }, + "sys_m1" : { + "contVars" : cont_vars_1_smt, + "varsDecl" : cont_vars_1_smt + [_get_smt_vars(v, env) for v in dyn_systems[1].inputs()], + "vectorField": [_get_smt_formula_pred(dyn_systems[1].get_ode(v), env) for v in dyn_systems[1].states()], + }, + "theta" : _get_smt_formula_pred(theta, env), + "reference_values" : [_get_smt_formula_pred(v, env) for v in reference_values] + } + + if not lyapunov_m0 is None: + json_data["lyapunov_m0"] = _get_smt_formula_pred(lyapunov_m0, env) + + if not lyapunov_m1 is None: + json_data["lyapunov_m1"] = _get_smt_formula_pred(lyapunov_m1, env) + + if not assumption is None: + json_data["assumption"] = _get_smt_formula(assumption, env) + + json.dump(json_data, outstream) diff --git a/svl_single_mode.py b/svl_single_mode.py new file mode 100644 index 0000000..33eb3cd --- /dev/null +++ b/svl_single_mode.py @@ -0,0 +1,246 @@ +import logging +import numpy as np +from scipy import linalg +import picos as pc +import control + +from barrier.lyapunov.piecewise_quadratic import ( + Affine, LeConst, Edge, NumericAffineHS, PiecewiseQuadraticLF, + get_ge, + validate, validate_single_mode +) + +def global_asymptotic_stability_numpy(Ac, gas_opts): + """ + Synthesise a lyapunov function proving global asymptitic stability + for the linear homogeneus system der(x) = Ax + b. + + The input is NumPy matrix. + + Return the matrix P such that the lyapunov function is V(x) = x^T P x + """ + + if gas_opts.read_from_file: + import os + from numpy import array + logging.critical(f'Reading from file {gas_opts.read_from_file}') + if os.path.splitext(gas_opts.read_from_file)[1] == '.npy': + P = np.load(gas_opts.read_from_file) + else: + with open(gas_opts.read_from_file, 'r') as fin: + str_P = fin.read() + P = eval(str_P) + return P + + if gas_opts.use_transpose: + logging.critical(f'Synthesizing lyapunov with transpose') + evAc, vAc = np.linalg.eig(Ac) + vAcinv = np.linalg.inv(vAc) + P = np.real(np.dot(np.conj(np.transpose(vAcinv)),vAcinv)) + return P + + if gas_opts.use_control: + logging.critical(f'Synthesizing lyapunov with control') + Q = np.identity(len(Ac)) + P = control.lyap(np.transpose(Ac), Q) + return P + + # here is use SDP + if gas_opts.sdp_exponential: + logging.critical(f'Synthesizing lyapunov with exponential') + n = len(Ac) + + if gas_opts.alpha0: + alphac = 0.1 + else: + # compute alpha + alphac = round(2 * min( abs(np.real(x)) for x in np.linalg.eig(Ac)[0] ) - 0.01, 2) + logging.critical(f'Found alpha = {alphac}') + + solver_name = gas_opts.sdp_solver + logging.critical(f'Solving with {solver_name}') + + candidate_best = None + delta_robustness = 0.0000000001 + while delta_robustness < 1: # CHECK: is there an upper bound? + try: + sdp = pc.Problem() + alpha = pc.Constant("alpha", alphac) + P = pc.SymmetricVariable("P", n) + if not gas_opts.no_robust: + nu = pc.RealVariable("nu") + sdp.add_constraint(nu >> delta_robustness) + I = pc.Constant("I", np.identity(n)) + sdp.add_constraint(P - nu * I >> 0) + else: + sdp.add_constraint(P >> 0) + A = pc.Constant("A", Ac) + # sdp.add_constraint(A.T * P + P * A + alpha * P + nu * I << 0) + sdp.add_constraint(A.T * P + P * A + alpha * P << 0) + sol = sdp.solve(solver=solver_name) + + candidate_best = P.np + + logging.info("Try to increase nu") + delta_robustness *= 10 + + except pc.modeling.problem.SolutionFailure: + break + + if candidate_best is None: + raise pc.modeling.problem.SolutionFailure() + + logging.info(f"Found with nu = {delta_robustness / 10}") + return candidate_best + + if gas_opts.sdp_simple: + logging.critical(f'Synthesizing lyapunov with simple') + solver_name = gas_opts.sdp_solver + logging.critical(f'Solving with {solver_name}') + + sdp = pc.Problem() + P = pc.SymmetricVariable("P", len(Ac)) + sdp.add_constraint(P >> 0) + A = pc.Constant("A", Ac) + sdp.add_constraint(A.T * P + P * A << 0) + + # TODO: what if there is no solution? (not handled now) + sol = sdp.solve(solver=solver_name) + Xnum = P.np + + return P.np + + raise Exception("No option was chosen for synthesizing lyapunov function.") + +# Note: This code is extracted from routine used in `svl_ohlerking1` and modified to use only one modality. +def create_HS(Ac, b, C, Theta, mode1=True): + n = len(Ac) + + # single modality + modes = set([1]) + + # CHECKME. + # Note: in the svl_ohlerking1 etc approaches, `b` is `np.zeros()`, + # since they work with the translated matrix. + # Here, it should use the original b matrix. + flow = { 1 : [Affine(Ac, b)] } + + edges = [] + + # CHECKME: is this the right implementation of the invariant y0 - r0 > theta ? + # recall: refs values should have been embedded as values in the A matrix. + v = np.zeros(n) + v[0] = - Theta + M = np.vstack([ + C[0], + np.zeros([n-1, n]) + ]) + if mode1: + # y0 - r0 > theta + m1_invar = [get_ge(M, v)] + else: + m1_invar = [LeConst(M, v)] + invariant = { 1 : m1_invar } + + hs = NumericAffineHS(modes, n, flow, edges, invariant, True) + hs.has_last_var_dummy = True + + return hs + +def svl_single_mode(Ac, b, C, Theta, mode1=False, verbose=False): + n = len(Ac) + + # Create a Numeric Affine HS + hs = create_HS(Ac, b, C, Theta, mode1) + + hs.make_homogeneous() + + + # Create a Lyapunov function + lf = PiecewiseQuadraticLF() + # TODO: CHECK THESE PARAMETERS. + # these are common to every strategy. + lf.alpha, lf.beta, lf.gamma = (0, 0.1, 0) + + Ibar = np.vstack([ + np.hstack([np.identity(n-1), np.zeros([n-1,1])]), + np.zeros([1,n]) + ]) + lf.I_tilda = { 1 : Ibar } + + # Helpers: 3 different approaches to synthesize Lyapunov func. + + # approach 1: simple stability + # expose simple stability + def simple_stability(): + return global_asymptotic_stability_numpy(Ac) + + # approach 2: exponential stability: Note: cannot be validated right now. + def exponential_stability(alphac): + sdp = pc.Problem() + P = pc.SymmetricVariable("P", n) + sdp.add_constraint(P >> 0) + A = pc.Constant("A", Ac) + alpha = pc.Constant("alpha", alphac) + sdp.add_constraint(A.T * P + P * A + alpha * P << 0) + sol = sdp.solve() + + return P.np + + # approach 3: can be validated + def exponential_stability_with_robustness_variable(alphac, delta_robustness=0.0001): + sdp = pc.Problem() + alpha = pc.Constant("alpha", alphac) + P = pc.SymmetricVariable("P", n) + nu = pc.RealVariable("nu") + + sdp.add_constraint(nu >> delta_robustness) + + I = pc.Constant("I", np.identity(n)) + sdp.add_constraint(P - nu * I >> 0) + A = pc.Constant("A", Ac) + +# sdp.add_constraint(A.T * P + P * A + alpha * P << 0) + sdp.add_constraint(A.T * P + P * A + alpha * P + nu * I << 0) + + sol = sdp.solve() + # TODO CHECK SOLUTION + + return P.np + #eof helpers + + # TODO: add dispatcher between the three approaches. + + print("Synthesising a Lyapunov function...") + + pick = 0 + if (pick == 0): + Pnp = simple_stability() + lf.lyapunov_map = { 1 : Pnp } + val_fun = lambda : validate_single_mode(hs, lf, 1) + elif (pick == 1): + alpha = 3.43 # Alberto: 3.44 has eigenvalues ~ 10^-14 + Pnp = exponential_stability(alpha) + lf.lyapunov_map = { 1 : Pnp } + val_fun = lambda : validate_single_mode(hs, lf, 1, True, alpha) + elif (pick == 2): + alpha = 3.43 + robustness = 0.00000001 + robustness = 0.0000001 + Pnp = exponential_stability_with_robustness_variable(alpha, robustness) + lf.lyapunov_map = { 1 : Pnp } + val_fun = lambda : validate_single_mode(hs, lf, 1, True, alpha) + else: + raise Exception() + + print("Validating the function...") + (is_valid, lyapunov_function_smt) = val_fun() + + print (f"synthetized Lyapunov") + if is_valid: +# if validate(hs, lf): + print ("OK: validation of Lyapunov function") + return (is_valid, lyapunov_function_smt[1]) + else: + print ("FAIL: validation of Lyapunov function") + return (is_valid, None)