from math import ceil
from typing import Callable, Literal, Optional, TypeVar, Union
import casadi as cs
import numpy as npy
import numpy.typing as npt
from csnlp.multistart.multistart_nlp import _chained_substitute, _n
from ...util.math import repeat
from ..wrapper import Nlp
from .mpc import Mpc, _create_ati_mats
from .scenario_based_mpc import ScenarioBasedMpc
SymType = TypeVar("SymType", cs.SX, cs.MX)
MatType = TypeVar("MatType", SymType, cs.DM, npy.ndarray)
[docs]
class MultiScenarioMpc(ScenarioBasedMpc[SymType]):
"""Multi-scenario model predictive control (MSMPC) wrapper for a :class:`csnlp.Nlp`
scheme. It generlizes :class:`csnlp.wrappers.ScenarioBasedMpc` to allow for multiple
scenarios, each with its own state trajectory, action sequence (a number of these
actions can be shared across all scenarios; see ``input_sharing``), and parameters.
Parameters
----------
nlp : Nlp
NLP scheme to be wrapped
n_scenarios : int
Number of scenarios to be considered in the scenario-based MPC formulation. Must
be a positive integer.
prediction_horizon : int
A positive integer for the prediction horizon of the MPC controller.
control_horizon : int, optional
A positive integer for the control horizon of the MPC controller. If not given,
it is set equal to the control horizon.
input_spacing : int, optional
Spacing between independent input actions. This argument allows to reduce the
number of free actions along the control horizon by allowing only the first
action every ``n`` to be free, and the following ``n-1`` to be fixed equal to
that action (where ``n`` is given by ``input_spacing``). By default, no spacing
is allowed, i.e., ``1``.
input_sharing : int, optional
Number of actions that are shared across all scenarios. If ``0``, each scenario
has its own action trajectory; if ``n``, all scenarios share the same first
``n`` actions. Must not be larger than the number of free actions in the
control horizon. By default, ``1``, so only the first action is shared across
the scenarios.
shooting : 'single' or 'multi', optional
Type of approach in the direct shooting for parametrizing the control
trajectory. See Section 8.5 in :cite:`rawlings_model_2017`. By default, direct
shooting is used.
Raises
------
ValueError
Raises if the shooting method is invalid; or if any of the horizons are invalid;
or if the number of scenarios is not a positive integer; or if
``input_sharing`` is not a valid integer (nonnegative and not larger than the
number of free actions in the control horizon).
"""
def __init__(
self,
nlp: Nlp[SymType],
n_scenarios: int,
prediction_horizon: int,
control_horizon: Optional[int] = None,
input_spacing: int = 1,
input_sharing: int = 1,
shooting: Literal["single", "multi"] = "multi",
) -> None:
super().__init__(
nlp,
n_scenarios,
prediction_horizon,
control_horizon,
input_spacing,
shooting,
)
nu_free = ceil(self.control_horizon / self._input_spacing)
if not isinstance(input_sharing, (int, npy.integer, npy.int_)) or not (
0 <= input_sharing <= nu_free
):
raise ValueError(
f"`input_sharing` in range [0, {nu_free}]; got {input_sharing} instead."
)
self._input_sharing = input_sharing
self.single_parameters: dict[str, SymType] = {}
self.single_actions: dict[str, SymType] = {}
self.single_actions_exp: dict[str, SymType] = {}
@property
def na(self) -> int:
"""Number of actions in a single scenario in the NLP scheme."""
return super().na // self._n_scenarios
@property
def na_all(self) -> int:
"""Gets the number of actions of the MSMPC controller considering all
scenarios."""
return super().na
@property
def np(self) -> int:
"""Number of parameters in a single scenario in the NLP scheme."""
return self.nlp.np // self._n_scenarios
@property
def np_all(self) -> int:
"""Gets the number of parameters of the MSMPC controller considering all
scenarios."""
return self.nlp.np
[docs]
def parameters_i(self, i: int) -> dict[str, SymType]:
"""Gets the symbolic parameters belonging to the i-th scenario."""
return {n: self.parameters[_n(n, i)] for n in self.single_parameters}
[docs]
def actions_i(self, i: int) -> dict[str, SymType]:
"""Gets the symbolic actions belonging to the i-th scenario."""
return {n: self.actions[_n(n, i)] for n in self.single_actions_exp}
[docs]
def actions_expanded_i(self, i: int) -> dict[str, SymType]:
"""Gets the symbolic expanded actions belonging to the i-th scenario."""
return {n: self.actions_expanded[_n(n, i)] for n in self.single_actions_exp}
[docs]
def parameter(
self, name: str, shape: tuple[int, int] = (1, 1)
) -> tuple[SymType, list[SymType]]:
"""Adds one parameter per scenario to the MSMPC controller.
Parameters
----------
name : str
Name of the new parameter. Must not be already in use.
shape : tuple of 2 ints, optional
Shape of the new parameter. By default a scalar, i.e., ``(1, 1)``.
Returns
-------
single_parameter : casadi.SX or MX
Symbol corresponding to the parameter of a single scenario. See the note
for :meth:``state``
parameters : list of casadi.SX or MX
The symbols for the new parameters of each scenario in the MSMPC controller.
Raises
------
ValueError
Raises if there is already another parameter with the same name ``name``.
"""
ps = []
for i in range(self._n_scenarios):
name_i = _n(name, i)
p_i = self.nlp.parameter(name_i, shape)
ps.append(p_i)
p_single = self.nlp._sym_type.sym(name, shape)
self.single_parameters[name] = p_single
return p_single, ps
[docs]
def action(
self,
name: str,
size: int = 1,
discrete: bool = False,
lb: Union[npt.ArrayLike, cs.DM] = -npy.inf,
ub: Union[npt.ArrayLike, cs.DM] = +npy.inf,
) -> tuple[SymType, SymType, list[SymType], list[SymType]]:
"""Adds one control action variable per scenario to the MSMPC controller.
Automatically handles sharing of actions across scenarios, generating less free
actions if some of these are to be shared across scenarios.
Parameters
----------
name : str
Name of the control action.
size : int, optional
Size of the control action (assumed to be a vector). Defaults to ``1``.
discrete : bool, optional
Flag indicating if the action is discrete. Defaults to ``False``.
lb : array_like, casadi.DM, optional
Hard lower bound of the control action, by default ``-np.inf``.
ub : array_like, casadi.DM, optional
Hard upper bound of the control action, by default ``+np.inf``.
Returns
-------
single action : casadi.SX or MX
Symbol corresponding to the action of a single scenario. This is useful for
automatically defining, e.g., the objective and constraints over the various
scenarios of the MSMPC controller, but it is not used in the actual NLP
solver.
single action expanded : casadi.SX or MX
Same as above, but the action is expanded to the same length of the
prediction horizon.
actions : list of casadi.SX or MX
The list of the action symbolic variable.
actions expanded : list of casadi.SX or MX
The list of the same action symbolic variable, but expanded to the same
length of the prediction horizon.
"""
Np = self._prediction_horizon
Nc = self._control_horizon
spacing = self._input_spacing
nu_free = ceil(Nc / spacing)
sharing = self._input_sharing
shape = (size, nu_free)
def _expand(u: SymType) -> SymType:
u_exp = u if spacing == 1 else repeat(u, (1, spacing))[:, :Nc]
if gap := Np - u_exp.shape[1]:
last = u_exp[:, -1]
u_exp = cs.horzcat(u_exp, *(last for _ in range(gap)))
return u_exp
us = []
us_exp = []
if sharing == 0:
# no sharing, each scenario has its own action trajectory
for i in range(self._n_scenarios):
name_i = _n(name, i)
u_i = self.nlp.variable(name_i, shape, discrete, lb, ub)[0]
u_exp_i = _expand(u_i)
us.append(u_i)
us_exp.append(u_exp_i)
self._actions[name_i] = u_i
self._actions_exp[name_i] = u_exp_i
elif sharing == nu_free:
# full sharing, all scenarios share the same action trajectory
u_unique = self.nlp.variable(_n(name, "shared"), shape, discrete, lb, ub)[0]
u_exp_unique = _expand(u_unique)
for i in range(self._n_scenarios):
name_i = _n(name, i)
us.append(u_unique)
us_exp.append(u_exp_unique)
self._actions[name_i] = u_unique
self._actions_exp[name_i] = u_exp_unique
else:
# partial sharing, each scenario has its own action trajectory, but
# the first `sharing` actions are shared
u_shared = self.nlp.variable(
_n(name, "shared"), (size, sharing), discrete, lb, ub
)[0]
shape_not_shared = (size, nu_free - sharing)
for i in range(self._n_scenarios):
name_i = _n(name, i)
u_i = self.nlp.variable(name_i, shape_not_shared, discrete, lb, ub)[0]
u_i = cs.horzcat(u_shared, u_i)
u_exp_i = _expand(u_i)
us.append(u_i)
us_exp.append(u_exp_i)
self._actions[name_i] = u_i
self._actions_exp[name_i] = u_exp_i
# create also a single symbol for the action and its expanded version
u_single = self.nlp._sym_type.sym(name, shape)
u_exp_single = _expand(u_single)
self.single_actions[name] = u_single
self.single_actions_exp[name] = u_exp_single
return u_single, u_exp_single, us, us_exp
[docs]
def set_affine_dynamics(
self,
A: MatType,
B: MatType,
D: Optional[MatType] = None,
c: Optional[MatType] = None,
parallelization: Literal[
"serial", "unroll", "inline", "thread", "openmp"
] = "thread",
max_num_threads: Optional[int] = None,
) -> tuple[
Optional[MatType], Optional[MatType], Optional[MatType], Optional[MatType]
]:
# NOTE: contrary to `ScenarioBasedMpc`, `D` can be optional here. Also, no need
# to take the number of scenarios into account for threading, as we will run the
# dynamics once and then chain-substitute for each scenario.
return Mpc.set_affine_dynamics(
self, A, B, D, c, parallelization, max_num_threads
)
[docs]
def set_nonlinear_dynamics(
self,
F: Union[
cs.Function,
Callable[[tuple[npt.ArrayLike, ...]], tuple[npt.ArrayLike, ...]],
],
parallelization: Literal[
"serial", "unroll", "inline", "thread", "openmp"
] = "thread",
max_num_threads_or_unrolling_base: Optional[int] = None,
) -> None:
# NOTE: contrary to `ScenarioBasedMpc`, `F` is allowed to have no disturbances.
# Also, no need to take the number of scenarios into account for threading, as
# we will run the dynamics once and then chain-substitute for each scenario.
return Mpc.set_nonlinear_dynamics(
self, F, parallelization, max_num_threads_or_unrolling_base
)
def _set_singleshooting_affine_dynamics(
self, A: MatType, B: MatType, D: Optional[MatType], c: Optional[MatType]
) -> tuple[MatType, MatType, Optional[MatType], Optional[MatType]]:
# NOTE: conversely to `ScenarioBasedMpc`, here first we create the dynamics with
# the single states/actions/disturbances (we suppose single parameters have been
# also used in A,B,D,c), and then chain-substitute to get each scenario's
# dynamics
ns = A.shape[0]
N = self._prediction_horizon
F, G, H, L = _create_ati_mats(self._prediction_horizon, A, B, D, c)
x_0 = cs.vcat(self._initial_states.values())
U = cs.vec(
cs.vcat(self.single_actions_exp.values()) # NOTE: different from vvcat!
)
X_next = F @ x_0 + G @ U
if H is not None:
X_next += H @ cs.vec(cs.vcat(self.single_disturbances.values()))
if L is not None:
X_next += L
X = cs.vertcat(x_0, X_next).reshape((ns, N + 1))
state_names = self.single_states.keys()
cumsizes = npy.cumsum([0] + [s.shape[0] for s in self._initial_states.values()])
for i in range(self._n_scenarios):
X_i = self._chained_substitution_for_scenario_i(X, i, True)
X_i_split = cs.vertsplit(X_i, cumsizes)
for n, x in zip(state_names, X_i_split):
self._states[_n(n, i)] = x
return F, G, H, L
def _set_multishooting_nonlinear_dynamics(
self,
F: cs.Function,
n_in: int,
parallelization: Literal["serial", "unroll", "inline", "thread", "openmp"],
max_num_threads: int,
) -> None:
# NOTE: see note in `_set_singleshooting_affine_dynamics`
X = cs.vcat(self.single_states.values())
U = cs.vcat(self.single_actions_exp.values())
if n_in < 3:
args = (X[:, :-1], U)
else:
D = cs.vcat(self.single_disturbances.values())
args = (X[:, :-1], U, D)
Fmap = F.map(self._prediction_horizon, parallelization, max_num_threads)
X_next = Fmap(*args)
constraints = []
for i in range(self._n_scenarios):
X_i = _chained_substitute(X[:, 1:], (self.single_states, self.states_i(i)))
X_next_i = self._chained_substitution_for_scenario_i(X_next, i)
constraints.append(X_i - X_next_i)
self.constraint("dyn", cs.vvcat(constraints), "==", 0.0)
def _set_singleshooting_nonlinear_dynamics(
self, F: cs.Function, n_in: int, base: int
) -> None:
# NOTE: see note in `_set_singleshooting_affine_dynamics`
X0 = cs.vcat(self._initial_states.values())
U = cs.vcat(self.single_actions_exp.values())
if n_in < 3:
args = (X0, U)
else:
D = cs.vcat(self.single_disturbances.values())
args = (X0, U, D)
Fmapaccum = F.mapaccum(
self._prediction_horizon, {"base": base, "allow_free": True}
)
X_next = Fmapaccum(*args)
X = cs.horzcat(X0, X_next)
state_names = self.single_states.keys()
cumsizes = npy.cumsum([0] + [s.shape[0] for s in self._initial_states.values()])
for i in range(self._n_scenarios):
X_i = self._chained_substitution_for_scenario_i(X, i, True)
X_i_split = cs.vertsplit(X_i, cumsizes)
for n, x in zip(state_names, X_i_split):
self._states[_n(n, i)] = x
def _chained_substitution_for_scenario_i(
self, expr: SymType, i: int, skip_states: bool = False
) -> Union[SymType, cs.DM]:
"""Iternal utility to perform substitutions in chain for the i-th scenario."""
states = ({}, {}) if skip_states else (self.single_states, self.states_i(i))
return _chained_substitute(
expr,
(self.single_parameters, self.parameters_i(i)),
states,
(self.single_actions, self.actions_i(i)),
(self.single_disturbances, self.disturbances_i(i)),
(self.single_slacks, self.slacks_i(i)),
)