Page MenuHomec4science

linear_affine_engine.py
No OneTemporary

File Metadata

Created
Mon, May 6, 06:21

linear_affine_engine.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from collections.abc import Iterable
from copy import deepcopy as copy
from .hfengine_base import HFEngineBase
from rrompy.utilities.base.decorators import affine_construct
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, TupleAny,
paramVal)
from rrompy.utilities.expression import (expressionEvaluator, createMonomial,
createMonomialList)
from rrompy.utilities.numerical.hash_derivative import (
hashDerivativeToIdx as hashD)
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['LinearAffineEngine', 'checkIfAffine']
class LinearAffineEngine(HFEngineBase):
"""Generic solver for affine parametric problems."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._affinePoly = True
self.nAs, self.nbs = 1, 1
@property
def affinePoly(self):
return self._affinePoly
@property
def nAs(self):
"""Value of nAs."""
return self._nAs
@nAs.setter
def nAs(self, nAs):
nAsOld = self._nAs if hasattr(self, "_nAs") else -1
if nAs != nAsOld:
self._nAs = nAs
self.resetAs()
@property
def nbs(self):
"""Value of nbs."""
return self._nbs
@nbs.setter
def nbs(self, nbs):
nbsOld = self._nbs if hasattr(self, "_nbs") else -1
if nbs != nbsOld:
self._nbs = nbs
self.resetbs()
@property
def spacedim(self):
if (hasattr(self, "bs") and isinstance(self.bs, Iterable)
and self.bs[0] is not None):
return len(self.bs[0])
return super().spacedim
def getMonomialSingleWeight(self, deg:List[int]):
return createMonomial(deg, True)
def getMonomialWeights(self, n:int):
return createMonomialList(n, self.npar, True)
def setAs(self, As:List[Np2D]):
"""Assign terms of operator of linear system."""
if len(As) != self.nAs:
raise RROMPyException(("Expected number {} of terms of As not "
"matching given list length {}.").format(self.nAs,
len(As)))
self.As = [copy(A) for A in As]
def setthAs(self, thAs:List[List[TupleAny]]):
"""Assign terms of operator of linear system."""
if len(thAs) != self.nAs:
raise RROMPyException(("Expected number {} of terms of thAs not "
"matching given list length {}.").format(self.nAs,
len(thAs)))
self.thAs = copy(thAs)
def setbs(self, bs:List[Np1D]):
"""Assign terms of RHS of linear system."""
if len(bs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of bs not "
"matching given list length {}.").format(self.nbs,
len(bs)))
self.bs = [copy(b) for b in bs]
def setthbs(self, thbs:List[List[TupleAny]]):
"""Assign terms of RHS of linear system."""
if len(thbs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of thbs not "
"matching given list length {}.").format(self.nbs,
len(thbs)))
self.thbs = copy(thbs)
def resetAs(self):
"""Reset (derivatives of) operator of linear system."""
if hasattr(self, "_nAs"):
self.setAs([None] * self.nAs)
self.setthAs([None] * self.nAs)
def resetbs(self):
"""Reset (derivatives of) RHS of linear system."""
if hasattr(self, "_nbs"):
self.setbs([None] * self.nbs)
self.setthbs([None] * self.nbs)
def _assembleObject(self, mu:paramVal, objs:ListAny, th:ListAny,
derI:int) -> Np2D:
"""Assemble (derivative of) affine object from list of affine terms."""
muE = self.mapParameterList(mu)
obj = None
for j in range(len(objs)):
if len(th[j]) <= derI and th[j][-1] is not None:
raise RROMPyException(("Cannot assemble operator. Non enough "
"derivatives of theta provided."))
if len(th[j]) > derI and th[j][derI] is not None:
expr = expressionEvaluator(th[j][derI], muE)
if isinstance(expr, Iterable):
if len(expr) > 1:
raise RROMPyException(("Size mismatch in value of "
"theta function. Only scalars "
"allowed."))
expr = expr[0]
if obj is None:
obj = expr * objs[j]
else:
obj = obj + expr * objs[j]
return obj
@abstractmethod
def buildA(self):
"""Build terms of operator of linear system."""
if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs)
if self.As[0] is None:
self.As[0] = scsp.eye(self.spacedim, dtype = np.complex,
format = "csr")
for j in range(1, self.nAs):
if self.As[j] is None: self.As[j] = self.baselineA()
@affine_construct
def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
derI = hashD(der) if isinstance(der, Iterable) else der
if derI < 0 or derI > self.nAs - 1: return self.baselineA()
self.buildA()
assembledA = self._assembleObject(mu, self.As, self.thAs, derI)
if assembledA is None: return self.baselineA()
return assembledA
@abstractmethod
def buildb(self):
"""Build terms of RHS of linear system."""
if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs)
for j in range(self.nbs):
if self.bs[j] is None: self.bs[j] = self.baselineb()
@affine_construct
def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
derI = hashD(der) if isinstance(der, Iterable) else der
if derI < 0 or derI > self.nbs - 1: return self.baselineb()
self.buildb()
assembledb = self._assembleObject(mu, self.bs, self.thbs, derI)
if assembledb is None: return self.baselineb()
return assembledb
def checkIfAffine(engine, msg : str = "apply method", noA : bool = False):
msg = ("Cannot {} because of non-affine parametric dependence{}. Consider "
"using EIM to define a new engine.").format(msg, " of RHS" * noA)
if (not (hasattr(engine.b, "is_affine") and engine.b.is_affine)
or not (noA or (hasattr(engine.A, "is_affine") and engine.A.is_affine))):
raise RROMPyException(msg)

Event Timeline