from collections.abc import Iterable, Iterator
from functools import lru_cache
from itertools import repeat
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
Literal,
Optional,
TypeVar,
Union,
)
import casadi as cs
import numpy as np
import numpy.typing as npt
from joblib import Parallel, delayed
if TYPE_CHECKING:
from joblib.memory import MemorizedFunc
from ..core.cache import invalidate_cache
from ..core.solutions import (
EagerSolution,
LazySolution,
Solution,
_is_infeas,
)
from ..nlps.nlp import Nlp
from ..nlps.objective import _solve_and_get_stats
SymType = TypeVar("SymType", cs.SX, cs.MX)
def _n(sym_name: str, scenario: int) -> str:
"""Internal utility for the naming convention of the ``i``-th scenario's symbols."""
return f"{sym_name}__{scenario}"
def _chained_substitute(
expr: SymType, *old_and_new: tuple[dict[str, SymType], dict[str, SymType]]
) -> Union[SymType, cs.DM]:
"""Internal utility to perform substitutions in chain."""
OLD = []
NEW = []
for old, new in old_and_new:
if old and new:
for n, old_ in old.items():
OLD.append(old_)
NEW.append(new[n])
return cs.substitute(expr, cs.vvcat(OLD), cs.vvcat(NEW))
def _cmp_key(
sol: dict[str, Any], plugin_solver: Optional[str]
) -> tuple[bool, bool, float]:
"""Internal utility, similar to :func:`Solution.cmp_key`, but for native CasADi's
solution dictionaries."""
stats = sol["stats"]
status = str(stats["return_status"])
is_infeas = _is_infeas(status, plugin_solver) if plugin_solver is not None else None
if is_infeas is None:
is_infeas = "infeas" in status.lower()
return is_infeas, not stats["success"], float(sol["f"])
def _get_kkt_stats(
sol: dict[str, Any],
ng: int,
nonmasked_lbx_idx: Union[slice, npt.NDArray[np.int64]],
nonmasked_ubx_idx: Union[slice, npt.NDArray[np.int64]],
lbx: npt.NDArray[np.float64],
ubx: npt.NDArray[np.float64],
dlagrangian: cs.Function,
tol_eq: float,
tol_ineq: float,
tol_comp: float,
tol_stat: float,
) -> dict[str, Any]:
"""Internal utility to check the KKT conditions of a solution."""
g_and_h = sol["g"]
g = np.asarray(g_and_h[:ng, :].elements())
h = np.asarray(g_and_h[ng:, :].elements())
if (h > tol_ineq).any() or (np.abs(g) > tol_eq).any():
return {"success": False, "return_status": "infeasible (primal)"}
lam_g_and_h = sol["lam_g"]
lam_h = np.asarray(lam_g_and_h[ng:, :].elements())
lam_x = np.asarray(sol["lam_x"].elements())
lam_lbx = -np.minimum(lam_x[nonmasked_lbx_idx], 0)
lam_ubx = np.maximum(lam_x[nonmasked_ubx_idx], 0)
x = np.asarray(sol["x"].elements())
if (
(np.abs(lam_lbx * (lbx - x[nonmasked_lbx_idx])) > tol_comp).any()
or (np.abs(lam_ubx * (x[nonmasked_ubx_idx] - ubx)) > tol_comp).any()
or (np.abs(lam_h * h) > tol_comp).any()
):
return {"success": False, "return_status": "infeasible (complementary)"}
lam_g = lam_g_and_h[:ng, :]
dL = dlagrangian(x, sol["p"], lam_g, lam_h, lam_lbx, lam_ubx)
dL = np.asarray(dL.elements()).flatten()
if (np.abs(dL) > tol_stat).any():
return {"success": False, "return_status": "non-stationarity point"}
return {"success": True, "return_status": "suspected success"}
[docs]
class MultistartNlp(Nlp[SymType], Generic[SymType]):
"""Base class for NLP with multistarting. This class lays the foundation for solving
an NLP problem (described as an instance of :class:`csnlp.Nlp`) multiple times with
different initial conditions, and requires subclasses to implement the actual
multistarting logic in :meth:`solve_multi`.
Parameters
----------
args, kwargs
See inherited :meth:`csnlp.Nlp.__init__`.
starts : int
A positive integer for the number of multiple starting guesses to optimize.
Raises
------
ValueError
Raises if the scenario number is invalid.
"""
is_multi: ClassVar[bool] = True
"""Flag to indicate that this is a multistart NLP."""
def __init__(self, *args: Any, starts: int, **kwargs: Any) -> None:
if starts <= 0:
raise ValueError("Number of scenarios must be positive and > 0.")
super().__init__(*args, **kwargs)
self._starts = starts
def __call__(
self,
pars: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
vals0: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
**kwargs: Any,
) -> Union[Solution[SymType], list[Solution[SymType]]]:
# call solve_multi only if either pars or vals0 is an iterable; otherwise, run
# the single, base NLP
if (pars is None or isinstance(pars, dict)) and (
vals0 is None or isinstance(vals0, dict)
):
return self.solve(pars, vals0)
return self.solve_multi(pars, vals0, **kwargs)
@property
def starts(self) -> int:
"""Gets the number of starts."""
return self._starts
[docs]
def solve_multi(
self,
pars: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
vals0: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
return_all_sols: bool = False,
**_: Any,
) -> Union[Solution[SymType], list[Solution[SymType]]]:
"""Solves the NLP with multiple initial conditions.
Parameters
----------
pars : dict of (str, array_like) or iterable of, optional
An iterable that, for each multistart, contains a dictionary with, for each
parameter in the NLP scheme, the corresponding numerical value. In case a
single dict is passed, the same is used across all scenarions. Can be
``None`` if no parameters are present.
vals0 : dict of (str, array_like) or iterable of, optional
An iterable that, for each multistart, contains a dictionary with, for each
variable in the NLP scheme, the corresponding initial guess. In case a
single dict is passed, the same is used across all scenarions. By default
``None``, in which case initial guesses are not passed to the solver.
return_all_sols : bool, optional
If ``True``, returns the solution of each multistart of the NLP; otherwise,
only the best solution is returned. By default, ``False``.
Returns
-------
Solution or list of Solutions
Depending on the flags ``return_all_sols``, returns
- the best solution out of all multiple starts
- all the solutions (one per start).
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement `solve_multi`"
)
[docs]
class StackedMultistartNlp(MultistartNlp[SymType], Generic[SymType]):
"""A class that models and solves an NLP problem from multiple starting initial
guesses by automatically stacking the original problem multiple independent times
in the same, larger-scale NLP. This allows to solve the original problem multiple
times via a single call to the solver."""
def __init__(self, *args: Any, starts: int, **kwargs: Any) -> None:
# this class essentially is a facade that hides an internal nlp in which the
# problem (variables, parameters, etc.) are duplicated by the requested number
# of multiple starts. For this reason, all methods are overridden to create
# multiples of these in the hidden nlp.
super().__init__(*args, starts=starts, **kwargs)
self._stacked_nlp = Nlp(*args, **kwargs) # actual nlp
@lru_cache(maxsize=32)
def _vars_i(self, i: int) -> dict[str, SymType]:
"""Internal utility to retrieve the variables of the ``i``-th scenario."""
nlp = self._stacked_nlp.unwrapped
return {k: nlp._vars[_n(k, i)] for k in self._vars}
@lru_cache(maxsize=32)
def _pars_i(self, i: int) -> dict[str, SymType]:
"""Internal utility to retrieve the parameters of the ``i``-th scenario."""
nlp = self._stacked_nlp.unwrapped
return {k: nlp._pars[_n(k, i)] for k in self._pars}
@lru_cache(maxsize=32)
def _dual_vars_i(self, i: int) -> dict[str, SymType]:
"""Internal utility to retrieve the dual variables of the ``i``-th scenario."""
nlp = self._stacked_nlp.unwrapped
return {k: nlp._dual_vars[_n(k, i)] for k in self._dual_vars}
[docs]
@invalidate_cache(_pars_i)
def parameter(self, name: str, shape: tuple[int, int] = (1, 1)) -> SymType:
out = super().parameter(name, shape)
for i in range(self._starts):
self._stacked_nlp.parameter(_n(name, i), shape)
return out
[docs]
@invalidate_cache(_vars_i, _dual_vars_i)
def variable(
self,
name: str,
shape: tuple[int, int] = (1, 1),
discrete: bool = False,
lb: Union[npt.ArrayLike, cs.DM] = -np.inf,
ub: Union[npt.ArrayLike, cs.DM] = +np.inf,
) -> tuple[SymType, SymType, SymType]:
out = super().variable(name, shape, discrete, lb, ub)
for i in range(self._starts):
self._stacked_nlp.variable(_n(name, i), shape, discrete, lb, ub)
return out
[docs]
@invalidate_cache(_vars_i, _dual_vars_i)
def constraint(
self,
name: str,
lhs: Union[SymType, np.ndarray, cs.DM],
op: Literal["==", ">=", "<="],
rhs: Union[SymType, np.ndarray, cs.DM],
soft: bool = False,
simplify: bool = True,
) -> tuple[SymType, ...]:
expr = lhs - rhs
if simplify:
expr = cs.cse(cs.simplify(expr))
out = super().constraint(name, expr, op, 0, soft, False)
# NOTE: the line above already created the slack variables in each scenario, so
# below we have to pass both soft=False and the expression with slack included.
vars = self.variables
pars = self.parameters
expr_ = out[0] # slack-relaxed expression in the form h(x,p)<=0 or g(x,p)==0
op_: Literal["==", "<="] = "==" if op == "==" else "<="
for i in range(self._starts):
expr_i = _chained_substitute(
expr_, (vars, self._vars_i(i)), (pars, self._pars_i(i))
)
self._stacked_nlp.constraint(_n(name, i), expr_i, op_, 0, False, False)
return out
[docs]
def minimize(self, objective: SymType) -> None:
out = super().minimize(objective)
vars = self.variables
pars = self.parameters
self._fs: list[SymType] = [
_chained_substitute(
objective, (vars, self._vars_i(i)), (pars, self._pars_i(i))
)
for i in range(self._starts)
]
self._stacked_nlp.minimize(sum(self._fs))
return out
[docs]
def init_solver(
self,
opts: Optional[dict[str, Any]] = None,
solver: str = "ipopt",
type: Optional[Literal["nlp", "conic"]] = None,
) -> None:
out = super().init_solver(opts, solver, type)
self._stacked_nlp.init_solver(opts, solver, type)
return out
[docs]
def solve_multi(
self,
pars: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
vals0: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
return_all_sols: bool = False,
return_stacked_sol: bool = False,
**_: Any,
) -> Union[Solution[SymType], list[Solution[SymType]]]:
assert not (return_stacked_sol and return_all_sols), (
"`return_all_sols` and `return_stacked_sol` can't be both true."
)
if pars is not None:
pars_iter = repeat(pars, self.starts) if isinstance(pars, dict) else pars
pars = {_n(n, i): ps[n] for i, ps in enumerate(pars_iter) for n in ps}
if vals0 is not None:
v0_iter = repeat(vals0, self.starts) if isinstance(vals0, dict) else vals0
vals0 = {_n(n, i): v0[n] for i, v0 in enumerate(v0_iter) for n in v0}
multi_sol = self._stacked_nlp.solve(pars, vals0)
if return_stacked_sol:
return multi_sol
x_ = self._x
p_ = self._p
lam_g_and_h_ = cs.vertcat(self._lam_g, self._lam_h)
lam_lbx_and_ubx_ = cs.vertcat(self._lam_lbx, self._lam_ubx)
vars_ = self.variables
pars_ = self.parameters
duals_ = self.dual_variables
fs = [float(multi_sol.value(f)) for f in self._fs]
all_vars = cs.vertcat(x_, lam_g_and_h_, lam_lbx_and_ubx_, p_)
splits = np.cumsum(
(0, x_.size1(), lam_g_and_h_.size1(), lam_lbx_and_ubx_.size1(), p_.size1())
)
solver_plugin = self.unwrapped._solver_plugin
def get_ith_sol(idx: int) -> EagerSolution[SymType]:
vals = {n: multi_sol.vals[_n(n, idx)] for n in vars_}
dual_vals = {n: multi_sol.dual_vals[_n(n, idx)] for n in duals_}
# get the value of p, and, lam g, lam h and lam lbx and lam ubx in a single
# substitution to save some time
all_vals = multi_sol.value(
_chained_substitute(
all_vars,
(vars_, self._vars_i(idx)),
(pars_, self._pars_i(idx)),
(duals_, self._dual_vars_i(idx)),
)
)
x, lam_g_and_h, lam_lbx_and_ubx, p = cs.vertsplit(all_vals, splits)
return EagerSolution(
fs[idx],
p_,
p,
x_,
x,
lam_g_and_h_,
lam_g_and_h,
lam_lbx_and_ubx_,
lam_lbx_and_ubx,
vars_,
vals,
duals_,
dual_vals,
multi_sol.stats,
solver_plugin,
)
if return_all_sols:
return [get_ith_sol(i) for i in range(self._starts)]
return get_ith_sol(np.argmin(fs).item())
[docs]
def remove_variable_bounds(
self,
name: str,
direction: Literal["lb", "ub", "both"],
idx: Union[None, tuple[int, int], list[tuple[int, int]]] = None,
) -> None:
idx = [idx] if isinstance(idx, tuple) else list(idx)
super().remove_variable_bounds(name, direction, idx)
for i in range(self._starts):
self._stacked_nlp.remove_variable_bounds(_n(name, i), direction, idx)
[docs]
def remove_constraints(
self, name: str, idx: Union[None, tuple[int, int], list[tuple[int, int]]] = None
) -> None:
idx = [idx] if isinstance(idx, tuple) else list(idx)
super().remove_constraints(name, idx)
for i in range(self._starts):
self._stacked_nlp.remove_constraints(_n(name, i), idx)
[docs]
class ParallelMultistartNlp(MultistartNlp[SymType], Generic[SymType]):
"""A class that solves an NLP problem multiple times, with different initial
starting conditions, via parallelization of the computations via :mod:`joblib`.
Parameters
----------
args, kwargs
See inherited :meth:`csnlp.Nlp.__init__`.
starts : int
A positive integer for the number of multiple starting guesses to optimize.
parallel_kwargs: dict, optional
A dictionary with keyword arguments to be used to instantiate the parallel
:class:`joblib.Parallel` backend. By default, ``None``, in which case the
parallel backend is instantiated with default arguments.
Raises
------
ValueError
Raises if the scenario number is invalid.
"""
def __init__(
self,
*args: Any,
starts: int,
parallel_kwargs: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> None:
super().__init__(*args, starts=starts, **kwargs)
self._parallel_kwargs = parallel_kwargs if parallel_kwargs is not None else {}
self._parallel: Optional[Parallel] = None
[docs]
def initialize_parallel(self) -> None:
"""Initializes the parallel backend."""
self._parallel = Parallel(**self._parallel_kwargs)
self._parallel.__enter__()
[docs]
def terminate_parallel(self) -> None:
"""Terminates the parallel backend."""
self._parallel.__exit__(None, None, None)
[docs]
def solve_multi(
self,
pars: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
vals0: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
return_all_sols: bool = False,
**_: Any,
) -> Union[Solution[SymType], list[Solution[SymType]]]:
if self._solver is None:
raise RuntimeError("Solver uninitialized.")
if self._parallel is None:
self.initialize_parallel()
shared_kwargs = {
"lbx": self._lbx.data,
"ubx": self._ubx.data,
"lbg": np.concatenate((np.zeros(self.ng), np.full(self.nh, -np.inf))),
"ubg": 0,
}
pars_iter = (
repeat(pars, self.starts)
if pars is None or isinstance(pars, dict)
else pars
)
vals0_iter = (
repeat(vals0, self.starts)
if vals0 is None or isinstance(vals0, dict)
else vals0
)
kwargs = (
self._process_pars_and_vals0(shared_kwargs.copy(), p, v0)
for p, v0 in zip(pars_iter, vals0_iter)
)
sols: Iterator[dict[str, Any]] = self._parallel(
delayed(_solve_and_get_stats)(self._solver, kw) for kw in kwargs
)
if return_all_sols:
return [LazySolution.from_casadi_solution(sol, self) for sol in sols]
best_sol = min(sols, key=lambda s: _cmp_key(s, self._solver_plugin))
return LazySolution.from_casadi_solution(best_sol, self)
[docs]
class MappedMultistartNlp(MultistartNlp[SymType], Generic[SymType]):
"""A class that solves an NLP problem multiple times, with different initial
conditions, in parallel via :func:`casadi.Function.map` parallelization.
See
`this wiki <https://github.com/casadi/casadi/wiki/FAQ:-How-to-use-map%28%29-and-mapaccum%28%29-to-speed-up-calculations%3F>`_
for more details on how to enable parallelization via mapping in CasADi.
Parameters
----------
args, kwargs
See inherited :meth:`csnlp.Nlp.__init__`.
starts : int
A positive integer for the number of multiple starting guesses to optimize.
parallelization : "serial", "unroll", "inline", "thread", "openmp"
The type of parallelization to use (see :func:`casadi.Function.map`). By
default, ``"serial"`` is selected.
max_num_threads : int, optional
Maximum number of threads to use in parallelization; if ``None``, the number of
threads is equal to the number of starts.
Raises
------
ValueError
Raises if the scenario number is invalid.
"""
def __init__(
self,
*args: Any,
starts: int,
parallelization: Literal[
"serial", "unroll", "inline", "thread", "openmp"
] = "serial",
max_num_threads: Optional[int] = None,
tol_stat: float = 1e-8,
tol_eq: float = 1e-8,
tol_ineq: float = 1e-8,
tol_comp: float = 1e-8,
**kwargs: Any,
) -> None:
super().__init__(*args, starts=starts, **kwargs)
self._mapped_solver: Optional[MemorizedFunc] = None
self._parallelization = parallelization
self._max_num_threads = max_num_threads
self._tol_eq = tol_eq
self._tol_ineq = tol_ineq
self._tol_comp = tol_comp
self._tol_stat = tol_stat
[docs]
def init_solver(self, *args: Any, **kwargs: Any) -> None:
super().init_solver(*args, **kwargs)
solver: cs.Function = self._solver.func
mapped_solver = solver.map(
self._starts, self._parallelization, self._max_num_threads or self._starts
)
self._mapped_solver = self._cache.cache(mapped_solver)
lagrangian = (
self.f
+ cs.dot(self.lam_g, self.g)
+ cs.dot(self.lam_h, self.h)
+ cs.dot(self.lam_lbx, self.h_lbx)
+ cs.dot(self.lam_ubx, self.h_ubx)
)
dL = cs.jacobian(lagrangian, self.x).T
self._kkt_stationarity = cs.Function(
"kkt_stationarity",
[self.x, self.p, self.lam_g, self.lam_h, self.lam_lbx, self.lam_ubx],
[dL],
["x", "p", "lam_g", "lam_h", "lam_lbx", "lam_ubx"],
["dL"],
)
[docs]
def solve_multi(
self,
pars: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
vals0: Union[
None, dict[str, npt.ArrayLike], Iterable[dict[str, npt.ArrayLike]]
] = None,
return_all_sols: bool = False,
_return_mapped_sol: bool = False,
**_: Any,
) -> Union[Solution[SymType], list[Solution[SymType]]]:
assert not (_return_mapped_sol and return_all_sols), (
"`return_all_sols` and `_return_mapped_sol` can't be both true."
)
if self._mapped_solver is None:
raise RuntimeError("Solver uninitialized.")
pars_iter = (
repeat(pars, self.starts)
if pars is None or isinstance(pars, dict)
else pars
)
vals0_iter = (
repeat(vals0, self.starts)
if vals0 is None or isinstance(vals0, dict)
else vals0
)
x0s = []
ps = []
default = cs.DM.zeros(0, 1)
for p, v0 in zip(pars_iter, vals0_iter):
kwargs = self._process_pars_and_vals0({}, p, v0)
x0s.append(kwargs.get("x0", default))
ps.append(kwargs.get("p", default))
single_kwargs = {
"x0": cs.hcat(x0s),
"p": cs.hcat(ps),
"lbx": self._lbx.data,
"ubx": self._ubx.data,
"lbg": np.concatenate((np.zeros(self.ng), np.full(self.nh, -np.inf))),
"ubg": 0,
}
single_sol: dict[str, cs.DM] = self._mapped_solver(**single_kwargs)
# if the user wants the mapped solution, return it, though it's not a Solution
# and it's mostly for debugging
if _return_mapped_sol:
single_sol["p"] = single_kwargs["p"]
single_sol["stats"] = self._mapped_solver.func.stats()
return single_sol # NOTE: this is NOT a Solution object
sols: list[dict[str, Any]] = []
ng = self.ng
nonmasked_lbx_idx = self.nonmasked_lbx_idx
nonmasked_ubx_idx = self.nonmasked_ubx_idx
lbx = self._lbx.data[nonmasked_lbx_idx]
ubx = self._ubx.data[nonmasked_ubx_idx]
dlagrangian = self._kkt_stationarity
tol_eq = self._tol_eq
tol_ineq = self._tol_ineq
tol_comp = self._tol_comp
tol_stat = self._tol_stat
for i, p in enumerate(ps):
sol_i = {k: v[:, i] if v.size2() else v for k, v in single_sol.items()}
sol_i["p"] = p
sol_i["stats"] = _get_kkt_stats(
sol_i,
ng,
nonmasked_lbx_idx,
nonmasked_ubx_idx,
lbx,
ubx,
dlagrangian,
tol_eq,
tol_ineq,
tol_comp,
tol_stat,
)
sols.append(sol_i)
if return_all_sols:
return [LazySolution.from_casadi_solution(sol, self) for sol in sols]
# NOTE: since the return status is here hand-crafted, don't pass the plugin name
# for detecting infeasibility so as to default to searching for "infeas"
best_sol = min(sols, key=lambda s: _cmp_key(s, None))
return LazySolution.from_casadi_solution(best_sol, self)