Source code for csnlp.wrappers.mpc.scenario_based_mpc

from inspect import signature
from typing import Callable, Literal, Optional, TypeVar, Union

import casadi as cs
import numpy as np
import numpy.typing as npt

from csnlp.multistart.multistart_nlp import _chained_substitute, _n

from ..wrapper import Nlp
from .mpc import Mpc, _callable2csfunc, _create_ati_mats
from .mpc import _n as _name_init_state

SymType = TypeVar("SymType", cs.SX, cs.MX)
MatType = TypeVar("MatType", SymType, cs.DM, np.ndarray)


[docs] class ScenarioBasedMpc(Mpc[SymType]): """Implementation of the Scenario-based Model Predictive Control :cite:`schildbach_scenario_2014`, here referred to as SCMPC, a well-known stochastic MPC formulation. Parameters ---------- nlp : Nlp NLP scheme to be wrapped n_scenarios : int Number of scenarios to be considered in the scenario-based MPC formulation. Must be a positive integer. prediction_horizon : int A positive integer for the prediction horizon of the MPC controller. control_horizon : int, optional A positive integer for the control horizon of the MPC controller. If not given, it is set equal to the control horizon. input_spacing : int, optional Spacing between independent input actions. This argument allows to reduce the number of free actions along the control horizon by allowing only the first action every ``n`` to be free, and the following ``n-1`` to be fixed equal to that action (where ``n`` is given by ``input_spacing``). By default, no spacing is allowed, i.e., ``1``. shooting : 'single' or 'multi', optional Type of approach in the direct shooting for parametrizing the control trajectory. See Section 8.5 in :cite:`rawlings_model_2017`. By default, direct shooting is used. Raises ------ ValueError Raises if the shooting method is invalid; or if any of the horizons are invalid; or if the number of scenarios is not a positive integer. """ def __init__( self, nlp: Nlp[SymType], n_scenarios: int, prediction_horizon: int, control_horizon: Optional[int] = None, input_spacing: int = 1, shooting: Literal["single", "multi"] = "multi", ) -> None: if n_scenarios < 1: raise ValueError("The number of scenarios must be a positive integer.") super().__init__( nlp, prediction_horizon, control_horizon, input_spacing, shooting ) self._n_scenarios = n_scenarios self.single_states: dict[str, SymType] = {} self.single_disturbances: dict[str, SymType] = {} self.single_slacks: dict[str, SymType] = {} @property def n_scenarios(self) -> int: """Gets the number of scenarios.""" return self._n_scenarios @property def ns_all(self) -> int: """Gets the number of states of the SCMPC controller considering all scenarios.""" return self.ns * self._n_scenarios @property def nd(self) -> int: return super().nd // self._n_scenarios @property def nd_all(self) -> int: """Gets the number of disturbances of the SCMPC controller considering all scenarios.""" return super().nd
[docs] def name_i(self, base_name: str, i: int) -> str: """Gets the name of the i-th scenario.""" return _n(base_name, i)
[docs] def states_i(self, i: int) -> dict[str, SymType]: """Gets the symbolic states belonging to the i-th scenario.""" return {n: self.states[_n(n, i)] for n in self.single_states}
[docs] def slacks_i(self, i: int) -> list[SymType]: """Gets the symbolic slack variables belonging to the i-th scenario.""" return {n: self.slacks[_n(n, i)] for n in self.single_slacks}
[docs] def disturbances_i(self, i: int) -> dict[str, SymType]: """Gets the symbolic disturbances belonging to the i-th scenario.""" return {n: self.disturbances[_n(n, i)] for n in self.single_disturbances}
[docs] def state( self, name: str, size: int = 1, discrete: bool = False, lb: Union[npt.ArrayLike, cs.DM] = -np.inf, ub: Union[npt.ArrayLike, cs.DM] = +np.inf, bound_initial: bool = True, bound_terminal: bool = True, ) -> tuple[SymType, list[Optional[SymType]], SymType]: """Adds one state variable per scenario to the SCMPC controller. Automatically creates the (shared) constraint on the initial conditions for these states. Parameters ---------- name : str Name of the state. size : int Size of the state (assumed to be a vector). discrete : bool, optional Flag indicating if the state is discrete. Defaults to ``False``. lb : array_like, casadi.DM, optional Hard lower bound of the state, by default ``-np.inf``. ub : array_like, casadi.DM, optional Hard upper bound of the state, by default ``+np.inf``. bound_initial : bool, optional If ``False``, then the upper and lower bounds on the initial state are not imposed, i.e., set to ``+/- np.inf`` (since the initial state is constrained to be equal to the current state of the system, it is sometimes advantageous to remove its bounds). By default ``True``. bound_terminal : bool, optional Same as above, but for the terminal state. By default ``True``. Returns ------- single state : casadi.SX or MX Symbol corresponding to the state of a single scenario. This is useful for automatically defining, e.g., the objective and constraints over the various scenarios of the SCMPC controller, but it is not used in the actual NLP solver. states : list of casadi.SX or MX or None The list of the state symbolic variable. If `shooting=single`, then `None` is returned since the states will only be available once the dynamics are set. initial state : casadi.SX or MX The initial state symbolic parameter. Raises ------ ValueError Raises if there exists already a state with the same name. RuntimeError Raises in single shooting if lower or upper bounds have been specified, since these can only be set after the dynamics have been set via the `constraint` method. """ N = self._prediction_horizon if self._is_multishooting: shape = (size, N + 1) lb = np.broadcast_to(lb, shape).astype(float) ub = np.broadcast_to(ub, shape).astype(float) if not bound_initial: lb[:, 0] = -np.inf ub[:, 0] = +np.inf if not bound_terminal: lb[:, -1] = -np.inf ub[:, -1] = +np.inf elif np.any(lb != -np.inf) or np.any(ub != +np.inf): raise RuntimeError( "in single shooting, lower and upper state bounds can only be " "created after the dynamics have been set" ) # create as many states as scenarions, but only one initial state x0_name = _name_init_state(name) x0 = self.nlp.parameter(x0_name, (size, 1)) self._initial_states[x0_name] = x0 xs = [] for i in range(self._n_scenarios): name_i = _n(name, i) if self._is_multishooting: x_i = self.nlp.variable(name_i, shape, discrete, lb, ub)[0] self.nlp.constraint(_n(x0_name, i), x_i[:, 0], "==", x0) else: x_i = None xs.append(x_i) self._states[name_i] = x_i # create also a single symbol for the state x_single = self.nlp._sym_type.sym(name, size, N + 1) self.single_states[name] = x_single return x_single, xs, x0
[docs] def disturbance(self, name: str, size: int = 1) -> tuple[SymType, list[SymType]]: """Adds one disturbance parameter per scenario to the SCMPC controller along the whole prediction horizon. Parameters ---------- name : str Name of the disturbance. size : int, optional Size of the disturbance (assumed to be a vector). Defaults to ``1``. Returns ------- single disturbance : casadi.SX or MX Symbol corresponding to the disturbance of a single scenario. See the note for :meth:``state``. disturbances : list of casadi.SX or MX The symbols for the new disturbances of each scenario in the SCMPC controller. """ shape = (size, self._prediction_horizon) ds = [] for i in range(self._n_scenarios): name_i = _n(name, i) d_i = self.nlp.parameter(name_i, shape) ds.append(d_i) self._disturbances[name_i] = d_i d_single = self.nlp._sym_type.sym(name, shape) self.single_disturbances[name] = d_single return d_single, ds
[docs] def constraint_from_single( self, name: str, lhs: Union[SymType, np.ndarray, cs.DM], op: Literal["==", ">=", "<="], rhs: Union[SymType, np.ndarray, cs.DM], soft: bool = False, simplify: bool = True, ) -> tuple[list[SymType], ...]: """Similarly to :meth:`csnlp.wrappers.Mpc.constraint`, adds a constraint to the MPC scheme. However, instead of manually creating the constraint for each scenario, this method allows to define only one constraint expression for a single scenario, which is then automatically declined for all scenarios. The symbolical expression must be made up of the single scenario states and disturbances, returned as first output by the methods :meth:`state` and :meth:`disturbance`, respectively. Note that the return types are lists of symbolical variables. Returns ------- exprs : list of casadi.SX or MX The constraint expression in canonical form, i.e., :math:`g(x,u) = 0` or :math:`h(x,u) <= 0`, for each scenario. lams : list of casadi.SX or MX The symbol corresponding to the constraint's multipliers, for each scenario. single slack : casadi.SX or MX Symbol corresponding to the slack from a single scenario. This is useful for automatically defining, e.g., the objective over the various scenarios of the SCMPC controller, but it is not used in the actual NLP solver. Only returned if `soft=True`; otherwise, only a 2-tuple is returned. slacks : list of casadi.SX or MX, optional Each scenario's slack variable in case of `soft=True`; otherwise, only a 2-tuple is returned. """ expr = lhs - rhs if simplify: expr = cs.cse(cs.simplify(expr)) cons = [] lams = [] if soft: slacks = [] for i in range(self._n_scenarios): expr_i = self._chained_substitution_for_scenario_i(expr, i) out = self.constraint(_n(name, i), expr_i, op, 0, soft) cons.append(out[0]) lams.append(out[1]) if soft: slacks.append(out[2]) if soft: slack_name = f"slack_{name}" single_slack = self.nlp.sym_type.sym(slack_name, expr.shape) self.single_slacks[slack_name] = single_slack return cons, lams, single_slack, slacks return cons, lams
[docs] def minimize_from_single(self, objective: SymType) -> None: """Similarly to :meth:`csnlp.Nlp.minimize`, adds the objective to be minimized to the NLP scheme. However, instead of manually creating the objective for each scenario, this method allows to define only one expression for a single scenario, which is then automatically declined and summed for all scenarios. The symbolical expression must be made up of the single scenario states, disturbances, and slacks, returned as first output by the methods :meth:`state`, :meth:`disturbance`, and :meth:`constraint_from_single`, respectively. """ return self.nlp.minimize( sum( self._chained_substitution_for_scenario_i(objective, i) for i in range(self._n_scenarios) ) / self._n_scenarios )
[docs] def set_affine_dynamics( self, A: MatType, B: MatType, D: MatType, c: Optional[MatType] = None, parallelization: Literal[ "serial", "unroll", "inline", "thread", "openmp" ] = "thread", max_num_threads: Optional[int] = None, ) -> tuple[ Optional[MatType], Optional[MatType], Optional[MatType], Optional[MatType] ]: if D is None: raise ValueError( "The dynamics matrix D must be given, as SCMPC is a tool to account for" " stochastic disturbances, and if there are none, a nominal MPC should " "suffice (see `Mpc` wrapper)." ) if max_num_threads is None: max_num_threads = max(self._prediction_horizon, self._n_scenarios) return super().set_affine_dynamics(A, B, D, c, parallelization, max_num_threads)
[docs] def set_nonlinear_dynamics( self, F: Union[ cs.Function, Callable[[tuple[npt.ArrayLike, ...]], tuple[npt.ArrayLike, ...]], ], parallelization: Literal[ "serial", "unroll", "inline", "thread", "openmp" ] = "thread", max_num_threads_or_unrolling_base: Optional[int] = None, ) -> None: n_in = F.n_in() if isinstance(F, cs.Function) else len(signature(F).parameters) nd = self.nd if n_in != 3 or nd == 0: raise ValueError( "The dynamics function must have 3 arguments: the state, the action, " "and the disturbance. This is because SCMPC is a tool to account for " "stochastic disturbances, and if there are none, a nominal MPC should " "suffice (see `Mpc` wrapper)." ) if not isinstance(F, cs.Function): F = _callable2csfunc(F, self.nlp.sym_type, (self.ns, self.na, nd)) if max_num_threads_or_unrolling_base is None: max_num_threads_or_unrolling_base = max( self._prediction_horizon, self._n_scenarios ) return super().set_nonlinear_dynamics( F, parallelization, max_num_threads_or_unrolling_base )
def _set_singleshooting_affine_dynamics( self, A: MatType, B: MatType, D: MatType, c: Optional[MatType] ) -> tuple[MatType, MatType, Optional[MatType], Optional[MatType]]: disturbance_names = self.single_disturbances.keys() X0 = cs.vcat(self._initial_states.values()) U = cs.vec(cs.vcat(self._actions_exp.values())) # NOTE: different from vvcat! D_all = cs.hcat( [ cs.vec( cs.vcat([self._disturbances[_n(n, i)] for n in disturbance_names]) ) for i in range(self._n_scenarios) ] ) F, G, H, L = _create_ati_mats(self._prediction_horizon, A, B, D, c) X_next_pred = F @ X0 + G @ U + H @ D_all if L is not None: X_next_pred += L state_names = self.single_states.keys() N = self._prediction_horizon ns = A.shape[0] cumsizes = np.cumsum([0] + [s.shape[0] for s in self._initial_states.values()]) for i in range(self._n_scenarios): X_i = cs.vertcat(X0, X_next_pred[:, i]).reshape((ns, N + 1)) X_i_split = cs.vertsplit(X_i, cumsizes) for n, x in zip(state_names, X_i_split): self._states[_n(n, i)] = x return F, G, H, L def _set_multishooting_nonlinear_dynamics( self, F: cs.Function, _: int, parallelization: Literal["serial", "unroll", "inline", "thread", "openmp"], max_num_threads: int, ) -> None: state_names = self.single_states.keys() disturbance_names = self.single_disturbances.keys() U = cs.vcat(self._actions_exp.values()) X_all_ = [] X_all_next_ = [] D_all_ = [] for i in range(self._n_scenarios): X_i = cs.vcat([self._states[_n(n, i)] for n in state_names]) X_all_.append(X_i[:, :-1]) X_all_next_.append(X_i[:, 1:]) D_all_.append( cs.vcat([self._disturbances[_n(n, i)] for n in disturbance_names]) ) X_all = cs.hcat(X_all_) X_all_next = cs.hcat(X_all_next_) D_all = cs.hcat(D_all_) Fmap = F.map( self._prediction_horizon * self._n_scenarios, parallelization, max_num_threads, ) X_all_next_pred = Fmap(X_all, U, D_all) self.constraint("dyn", X_all_next, "==", X_all_next_pred) def _set_singleshooting_nonlinear_dynamics( self, F: cs.Function, _: int, base: int ) -> None: disturbance_names = self.single_disturbances.keys() X0 = cs.vcat(self._initial_states.values()) U = cs.vcat(self._actions_exp.values()) D_all = cs.hcat( [ cs.vcat([self._disturbances[_n(n, i)] for n in disturbance_names]) for i in range(self._n_scenarios) ] ) N = self._prediction_horizon Fmapaccum = F.mapaccum(N, {"base": base, "allow_free": True}) X_next_hcat = Fmapaccum(X0, U, D_all) state_names = self.single_states.keys() cumsizes = np.cumsum([0] + [s.shape[0] for s in self._initial_states.values()]) X_next_split = cs.horzsplit(X_next_hcat, N) for i in range(self._n_scenarios): X_i = cs.horzcat(X0, X_next_split[i]) X_i_split = cs.vertsplit(X_i, cumsizes) for n, x in zip(state_names, X_i_split): self._states[_n(n, i)] = x def _chained_substitution_for_scenario_i( self, expr: SymType, i: int ) -> Union[SymType, cs.DM]: """Iternal utility to perform substitutions in chain for the i-th scenario.""" return _chained_substitute( expr, (self.single_states, self.states_i(i)), (self.single_disturbances, self.disturbances_i(i)), (self.single_slacks, self.slacks_i(i)), )